|
- # Copyright 2020 Huawei Technologies Co., Ltd
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- # ============================================================================
- '''cifar_resnet50
- The sample can be run on Ascend 910 AI processor.
- '''
- import os
- import random
- import argparse
- import json
- import mindspore.common.dtype as mstype
- import mindspore.dataset as ds
- import mindspore.dataset.vision.c_transforms as C
- import mindspore.dataset.transforms.c_transforms as C2
- from mindspore.nn.loss import SoftmaxCrossEntropyWithLogits
- from mindspore.communication.management import init
- from mindspore.nn.optim.momentum import Momentum
- from mindspore.train.model import Model
- from mindspore.context import ParallelMode
- from mindspore import context
- from mindspore.train.callback import ModelCheckpoint, CheckpointConfig, LossMonitor
- from mindspore.train.serialization import load_checkpoint, load_param_into_net
- from mindspore.parallel._auto_parallel_context import auto_parallel_context
- from resnet import resnet50
- import moxing as mox
-
- workroot = '/home/work/user-job-dir'
-
- random.seed(1)
- parser = argparse.ArgumentParser(description='Image classification')
- parser.add_argument('--run_distribute', type=bool, default=False, help='Run distribute.')
- parser.add_argument('--device_num', type=int, default=1, help='Device num.')
- parser.add_argument('--do_train', type=bool, default=True, help='Do train or not.')
- parser.add_argument('--do_eval', type=bool, default=False, help='Do eval or not.')
- parser.add_argument('--epoch_size', type=int, default=1, help='Epoch size.')
- parser.add_argument('--batch_size', type=int, default=32, help='Batch size.')
- parser.add_argument('--num_classes', type=int, default=10, help='Num classes.')
- parser.add_argument('--checkpoint_path', type=str, default=None, help='CheckPoint file path.')
- parser.add_argument('--dataset_path', type=str, default=None, help='Dataset path.')
- parser.add_argument('--data_url',
- help='path to training/inference dataset folder',
- default= workroot + '/data/')
- parser.add_argument('--multi_data_url',
- help='path to training/inference dataset folder',
- default= workroot + '/data/')
- parser.add_argument('--train_url',
- help='model folder to save/load',
- default= workroot + '/model/')
- parser.add_argument(
- '--device_target',
- type=str,
- default="Ascend",
- choices=['Ascend', 'CPU'],
- help='device where the code will be implemented (default: CPU),若要在启智平台上使用NPU,需要在启智平台训练界面上加上运行参数device_target=Ascend')
-
- #modelarts已经默认使用data_url和train_url
-
-
- args_opt = parser.parse_args()
-
- context.set_context(mode=context.GRAPH_MODE, device_target=args_opt.device_target)
-
- if args_opt.device_target == "Ascend":
- device_id = int(os.getenv('DEVICE_ID'))
- context.set_context(device_id=device_id)
-
- def create_dataset(data_dir,repeat_num=1, training=True):
- """
- create data for next use such as training or infering
- """
- cifar_ds = ds.Cifar10Dataset(data_dir)
-
- resize_height = 224
- resize_width = 224
- rescale = 1.0 / 255.0
- shift = 0.0
-
- # define map operations
- random_crop_op = C.RandomCrop((32, 32), (4, 4, 4, 4)) # padding_mode default CONSTANT
- random_horizontal_op = C.RandomHorizontalFlip()
- resize_op = C.Resize((resize_height, resize_width)) # interpolation default BILINEAR
- rescale_op = C.Rescale(rescale, shift)
- normalize_op = C.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010))
- changeswap_op = C.HWC2CHW()
- type_cast_op = C2.TypeCast(mstype.int32)
-
- c_trans = []
- if training:
- c_trans = [random_crop_op, random_horizontal_op]
- c_trans += [resize_op, rescale_op, normalize_op,
- changeswap_op]
-
- # apply map operations on images
- cifar_ds = cifar_ds.map(operations=type_cast_op, input_columns="label")
- cifar_ds = cifar_ds.map(operations=c_trans, input_columns="image")
-
- # apply shuffle operations
- cifar_ds = cifar_ds.shuffle(buffer_size=10)
-
- # apply batch operations
- cifar_ds = cifar_ds.batch(batch_size=args_opt.batch_size, drop_remainder=True)
-
- # apply repeat operations
- cifar_ds = cifar_ds.repeat(repeat_num)
-
- return cifar_ds
-
- def ObsToEnv(obs_data_url, data_dir):
- try:
- mox.file.copy_parallel(obs_data_url, data_dir)
- print("Successfully Download {} to {}".format(obs_data_url, data_dir))
- except Exception as e:
- print('moxing download {} to {} failed: '.format(obs_data_url, data_dir) + str(e))
- #Set a cache file to determine whether the data has been copied to obs.
- #If this file exists during multi-card training, there is no need to copy the dataset multiple times.
- f = open("/cache/download_input.txt", 'w')
- f.close()
- try:
- if os.path.exists("/cache/download_input.txt"):
- print("download_input succeed")
- except Exception as e:
- print("download_input failed")
- return
-
- ### Copy the output to obs###
- def EnvToObs(train_dir, obs_train_url):
- try:
- mox.file.copy_parallel(train_dir, obs_train_url)
- print("Successfully Upload {} to {}".format(train_dir,obs_train_url))
- except Exception as e:
- print('moxing upload {} to {} failed: '.format(train_dir,obs_train_url) + str(e))
- return
-
- def DownloadFromQizhi(obs_data_url, data_dir):
- device_num = int(os.getenv('RANK_SIZE'))
- if device_num == 1:
- ObsToEnv(obs_data_url,data_dir)
- context.set_context(mode=context.GRAPH_MODE,device_target=args_opt.device_target)
- if device_num > 1:
- # set device_id and init for multi-card training
- context.set_context(mode=context.GRAPH_MODE, device_target=args_opt.device_target, device_id=int(os.getenv('ASCEND_DEVICE_ID')))
- context.reset_auto_parallel_context()
- context.set_auto_parallel_context(device_num = device_num, parallel_mode=ParallelMode.DATA_PARALLEL, gradients_mean=True, parameter_broadcast=True)
- init()
- #Copying obs data does not need to be executed multiple times, just let the 0th card copy the data
- local_rank=int(os.getenv('RANK_ID'))
- if local_rank%8==0:
- ObsToEnv(obs_data_url,data_dir)
- #If the cache file does not exist, it means that the copy data has not been completed,
- #and Wait for 0th card to finish copying data
- while not os.path.exists("/cache/download_input.txt"):
- time.sleep(1)
- return
-
- def UploadToQizhi(train_dir, obs_train_url):
- device_num = int(os.getenv('RANK_SIZE'))
- local_rank=int(os.getenv('RANK_ID'))
- if device_num == 1:
- EnvToObs(train_dir, obs_train_url)
- if device_num > 1:
- if local_rank%8==0:
- EnvToObs(train_dir, obs_train_url)
- return
-
-
- if __name__ == '__main__':
- # in this way by judging the mark of args, users will decide which function to use
- if not args_opt.do_eval and args_opt.run_distribute:
- context.set_auto_parallel_context(device_num=args_opt.device_num, parallel_mode=ParallelMode.DATA_PARALLEL)
- auto_parallel_context().set_all_reduce_fusion_split_indices([140])
- init()
-
- data_dir = '/cache/data'
- train_dir = '/cache/output'
- if not os.path.exists(data_dir):
- os.makedirs(data_dir)
- if not os.path.exists(train_dir):
- os.makedirs(train_dir)
-
- multi_data_json = json.loads(args_opt.multi_data_url)
- ###Initialize and copy data to training image
- DownloadFromQizhi(multi_data_json[0]["dataset_url"], data_dir)
-
-
- epoch_size = args_opt.epoch_size
- net = resnet50(args_opt.batch_size, args_opt.num_classes)
- ls = SoftmaxCrossEntropyWithLogits(sparse=True, reduction="mean")
- opt = Momentum(filter(lambda x: x.requires_grad, net.get_parameters()), 0.01, 0.9)
-
- model = Model(net, loss_fn=ls, optimizer=opt, metrics={'acc'})
-
- # as for train, users could use model.train
- if args_opt.do_train:
- dataset = create_dataset(data_dir)
- batch_num = dataset.get_dataset_size()
- config_ck = CheckpointConfig(save_checkpoint_steps=batch_num, keep_checkpoint_max=35)
- ckpoint_cb = ModelCheckpoint(prefix="train_resnet_cifar10", directory=train_dir, config=config_ck)
- loss_cb = LossMonitor()
- model.train(epoch_size, dataset, callbacks=[ckpoint_cb, loss_cb])
-
- UploadToQizhi(train_dir,args_opt.train_url)
-
|