|
- # Copyright 2022 Huawei Technologies Co., Ltd
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- # ============================================================================
- """Train RegNet."""
- import os
- import time
- import argparse
- import mindspore as ms
- import mindspore.dataset as ds
- import mindspore.dataset.transforms.c_transforms as C
- import mindspore.dataset.vision.c_transforms as vision
- from mindspore import dtype as mstype
- from mindspore import nn
- from mindspore.communication.management import init, get_rank, get_group_size
- from mindspore.context import ParallelMode
- from mindspore.train import Model
- from mindspore.train.callback import ModelCheckpoint, CheckpointConfig, LossMonitor, TimeMonitor
- from mindspore import context
- import moxing as mox
-
- from src.regnet import regnet20, RegNetWithLossCell, TrainingWrapper
-
-
- def create_dataset(data_path, batch_size=32, repeat_num=1, num_parallel_workers=1):
- rescale = 1.0 / 255.0
- shift = 0.0
-
- data_set = ds.Cifar10Dataset(data_path, usage='train')
-
- random_crop_op = vision.RandomCrop((32, 32), (4, 4, 4, 4))
- random_horizontal_op = vision.RandomHorizontalFlip()
- rescale_op = vision.Rescale(rescale, shift)
- normalize_op = vision.Normalize((0.4465, 0.4822, 0.4914), (0.2010, 0.1994, 0.2023))
- changeswap_op = vision.HWC2CHW()
- type_cast_op = C.TypeCast(mstype.int32)
-
- c_trans = [random_crop_op, random_horizontal_op]
- c_trans += [rescale_op, normalize_op, changeswap_op]
- data_set = data_set.map(operations=type_cast_op, input_columns="label", num_parallel_workers=num_parallel_workers)
- data_set = data_set.map(operations=c_trans, input_columns="image", num_parallel_workers=num_parallel_workers)
- data_set = data_set.shuffle(buffer_size=10)
- data_set = data_set.batch(batch_size=batch_size, drop_remainder=True)
- data_set = data_set.repeat(repeat_num)
-
- return data_set
-
-
- def create_dataset_parallel(data_path, batch_size=32, repeat_num=1, num_parallel_workers=1):
- rescale = 1.0 / 255.0
- shift = 0.0
- shard_id = get_rank()
- num_shards = get_group_size()
-
- data_set = ds.Cifar10Dataset(data_path, num_shards=num_shards, shard_id=shard_id, usage='train')
-
- random_crop_op = vision.RandomCrop((32, 32), (4, 4, 4, 4))
- random_horizontal_op = vision.RandomHorizontalFlip()
- rescale_op = vision.Rescale(rescale, shift)
- normalize_op = vision.Normalize((0.4465, 0.4822, 0.4914), (0.2010, 0.1994, 0.2023))
- changeswap_op = vision.HWC2CHW()
- type_cast_op = C.TypeCast(mstype.int32)
-
- c_trans = [random_crop_op, random_horizontal_op]
- c_trans += [rescale_op, normalize_op, changeswap_op]
- data_set = data_set.map(operations=type_cast_op, input_columns="label", num_parallel_workers=num_parallel_workers)
- data_set = data_set.map(operations=c_trans, input_columns="image", num_parallel_workers=num_parallel_workers)
- data_set = data_set.shuffle(buffer_size=10)
- data_set = data_set.batch(batch_size=batch_size, drop_remainder=True)
- data_set = data_set.repeat(repeat_num)
-
- return data_set
-
-
- def ObsToEnv(obs_data_url, data_dir):
- try:
- mox.file.copy_parallel(obs_data_url, data_dir)
- print("Successfully Download {} to {}".format(obs_data_url, data_dir))
- except Exception as e:
- print('moxing download {} to {} failed: '.format(obs_data_url, data_dir) + str(e))
- f = open("/cache/download_input.txt", 'w')
- f.close()
- try:
- if os.path.exists("/cache/download_input.txt"):
- print("download_input succeed")
- except Exception as e:
- print("download_input failed")
- return
-
-
- def EnvToObs(train_dir, obs_train_url):
- try:
- mox.file.copy_parallel(train_dir, obs_train_url)
- print("Successfully Upload {} to {}".format(train_dir, obs_train_url))
- except Exception as e:
- print('moxing upload {} to {} failed: '.format(train_dir, obs_train_url) + str(e))
- return
-
-
- def DownloadFromQizhi(obs_data_url, data_dir, strategy_save_file=None):
- device_num = int(os.getenv('RANK_SIZE'))
- if device_num == 1:
- ObsToEnv(obs_data_url, data_dir)
- context.set_context(mode=context.GRAPH_MODE, device_target=args.device_target)
- if device_num > 1:
- # set device_id and init for multi-card training
- context.set_context(mode=context.GRAPH_MODE, device_target=args.device_target,
- device_id=int(os.getenv('ASCEND_DEVICE_ID')))
- context.reset_auto_parallel_context()
- context.set_auto_parallel_context(device_num=device_num, parallel_mode=ParallelMode.DATA_PARALLEL,
- gradients_mean=True, parameter_broadcast=True,
- strategy_ckpt_save_file=strategy_save_file)
- init()
- # Copying obs data does not need to be executed multiple times, just let the 0th card copy the data
- local_rank = int(os.getenv('RANK_ID'))
- if local_rank % 8 == 0:
- ObsToEnv(obs_data_url, data_dir)
- # If the cache file does not exist, it means that the copy data has not been completed,
- # and Wait for 0th card to finish copying data
- while not os.path.exists("/cache/download_input.txt"):
- time.sleep(1)
- return
-
-
- def UploadToQizhi(train_dir, obs_train_url):
- device_num = int(os.getenv('RANK_SIZE'))
- local_rank = int(os.getenv('RANK_ID'))
- if device_num == 1:
- EnvToObs(train_dir, obs_train_url)
- if device_num > 1:
- if local_rank % 8 == 0:
- EnvToObs(train_dir, obs_train_url)
- return
-
-
- parser = argparse.ArgumentParser(description='MindSpore RegNet')
- parser.add_argument('--data_url', help='path to training/inference dataset folder', default='/cache/data/')
- parser.add_argument('--train_url', help='output folder to save/load', default='/cache/output/')
- parser.add_argument('--device_target', type=str, default="Ascend", choices=['Ascend', 'CPU'])
- parser.add_argument("--batch_size", type=int, default=64)
- parser.add_argument("--image_size", type=int, default=32)
- parser.add_argument("--initial_lr", type=float, default=0.05)
- parser.add_argument("--class_num", type=int, default=10)
- parser.add_argument("--save_checkpoint_steps", type=int, default=970)
- parser.add_argument("--keep_checkpoint_max", type=int, default=50)
- parser.add_argument("--checkpoint_prefix", type=str, default="RegNet")
- parser.add_argument("--max_epoch", type=int, default=2000)
-
- if __name__ == '__main__':
- args = parser.parse_args()
- data_dir = '/cache/data'
- train_dir = '/cache/output'
- device_num = int(os.getenv('RANK_SIZE'))
- DownloadFromQizhi(args.data_url, data_dir, strategy_save_file=train_dir + '/strategy_ckpt/')
- if device_num == 1:
- outputDirectory = train_dir + "/"
- if device_num > 1:
- outputDirectory = train_dir + "/" + str(get_rank()) + "/"
- if not os.path.exists(data_dir):
- os.makedirs(data_dir)
- if not os.path.exists(train_dir):
- os.makedirs(train_dir)
- if device_num == 1:
- ds_train = create_dataset(data_dir, args.batch_size)
- if device_num > 1:
- ds_train = create_dataset_parallel(data_dir, args.batch_size)
- if ds_train.get_dataset_size() == 0:
- raise ValueError("Please check dataset size > 0 and batch_size <= dataset size")
- regnet = regnet20(batch_size=args.batch_size, im_size=args.image_size, class_num=args.class_num)
- ce_loss = nn.SoftmaxCrossEntropyWithLogits(sparse=True)
- regnet.set_train(True)
- net = RegNetWithLossCell(regnet, ce_loss)
- # opt = ms.nn.Adam(net.trainable_params(), args.initial_lr)
- opt = ms.nn.SGD(net.trainable_params(), learning_rate=0.01, weight_decay=1e-4, momentum=0.9)
- net = TrainingWrapper(net, opt)
- model = Model(net)
- time_cb = TimeMonitor(data_size=ds_train.get_dataset_size())
- config_ck = CheckpointConfig(save_checkpoint_steps=args.save_checkpoint_steps,
- keep_checkpoint_max=args.keep_checkpoint_max)
- ckpoint_cb = ModelCheckpoint(prefix=args.checkpoint_prefix,
- directory=outputDirectory,
- config=config_ck)
- print("============== Starting Training ==============")
- model.train(args.max_epoch, ds_train, callbacks=[time_cb, ckpoint_cb, LossMonitor()],
- dataset_sink_mode=True)
- UploadToQizhi(train_dir, args.train_url)
|