|
- import argparse
- import logging
- import pprint
- import random
- import os
- import ast
- import sys
- import multiprocessing as mp
- import mindspore as ms
- from mindspore import context, nn
- from mindspore.train import FixedLossScaleManager, Model
- from mindspore.communication import init, get_group_size, get_rank
- from mindspore.train.callback import ModelCheckpoint, CheckpointConfig
-
- from moco.loader import create_dataset_moco
- from moco.resnet import resnet50
- from moco.builder import MoCo, WithLossCell, MoCoTrainStep
- from moco.lr_scheduler import multi_step_lr, cosine_lr
- from moco.logger import setup_logger, TrainMonitor
-
- from openi import openi_multidataset_to_env as DatasetToEnv
- from openi import pretrain_to_env
- from openi import env_to_openi
-
- model_names = ["resnet50"]
-
- parser = argparse.ArgumentParser(description='MindSpore Unsupervised Training')
- group = parser.add_argument_group('OpenI')
- group.add_argument('--device_target', type=str, default='Ascend')
- group.add_argument('--multi_data_url', type=str, default='/cache/data', help='obs path to dataset')
- group.add_argument('--data_url', type=str, default='/cache/data', help='obs path to dataset')
- group.add_argument('--train_url', type=str, default='/cache/output', help='obs path to dumped ckpt')
- parser.add_argument('--isModelArts', type=ast.literal_eval, default=True)
- parser.add_argument('--distribute', type=ast.literal_eval, default=True)
- parser.add_argument('--data', default='/path/to/imagenet', type=str, metavar='DIR',
- help='path to dataset')
- parser.add_argument('-o', '--output-dir', default='/path/to/output', type=str, metavar='DIR',
- help='path to output')
- parser.add_argument('-a', '--arch', default='resnet50', choices=model_names, metavar='ARCH',
- help='model architecture: ' + ' | '.join(model_names) + ' (default: resnet50)')
- parser.add_argument('-j', '--workers', default=32, type=int, metavar='N',
- help='number of data loading workers (default: 32)')
- parser.add_argument('--epochs', default=200, type=int, metavar='N',
- help='number of total epochs to run')
- parser.add_argument('-b', '--batch-size', default=256, type=int, metavar='N',
- help='mini-batch size (default: 256), this is the total batch size of all GPUs on the current node '
- 'when using Data Parallel or Distributed Data Parallel')
- parser.add_argument('--lr', '--learning-rate', default=0.03, type=float, metavar='LR',
- help='initial learning rate', dest='learning_rate')
- parser.add_argument('--milestones', default=[120, 160], nargs='*', type=int,
- help='learning rate schedule (when to drop lr by 10x)')
- parser.add_argument('--momentum', default=0.9, type=float, metavar='M',
- help='momentum of SGD solver')
- parser.add_argument('--wd', '--weight-decay', default=1e-4, type=float, metavar='W',
- help='weight decay (default: 1e-4)', dest="weight_decay")
- parser.add_argument('-p', '--print-freq', default=100, type=int, metavar='N',
- help='print frequency (default: 100)')
- parser.add_argument('--seed', default=None, type=int,
- help='seed for initializing training')
- parser.add_argument('--distributed', default=True, type=bool,
- help='if distributed training')
- parser.add_argument('--mode', default=0, type=int,
- help='running in GRAPH_MODE(0) or PYNATIVE_MODE(1).')
- parser.add_argument('--amp-level', default='O0', type=str,
- help='level for auto mixed precision training')
- parser.add_argument('--loss-scale', default=128, type=int,
- help='magnification factor of gradients')
- parser.add_argument('--dataset-sink-mode', default=True, type=bool,
- help='whether to sink data')
-
- # moco specific configs:
- parser.add_argument('--moco-dim', default=128, type=int,
- help='feature dimension (default: 128)')
- parser.add_argument('--moco-k', default=65536, type=int,
- help='queue size; number of negative keys (default: 65536)')
- parser.add_argument('--moco-m', default=0.999, type=float,
- help='moco momentum of updating key encoder (default: 0.999)')
- parser.add_argument('--moco-t', default=0.2, type=float,
- help='softmax temperature (default: 0.07)')
-
- # options for moco v2
- parser.add_argument('--mlp', default=True, type=bool,
- help='use mlp head')
- parser.add_argument('--aug-plus', default=True, type=bool,
- help='use moco v2 data augmentation')
- parser.add_argument('--cos', default=True, type=bool,
- help='use cosine lr schedule')
-
- #group.add_argument('--configpath', type=str, default='')
-
-
- def main():
- args = parser.parse_args()
-
- data_dir = '/cache/data'
- train_dir = '/cache/output'
- pretrain_dir = '/cache/pretrain'
-
- if not os.path.exists(data_dir):
- os.makedirs(data_dir)
- if not os.path.exists(train_dir):
- os.makedirs(train_dir)
- if not os.path.exists(pretrain_dir):
- os.makedirs(pretrain_dir)
-
- if args.seed is not None:
- random.seed(args.seed)
- ms.set_seed(args.seed)
- context.set_context(reserve_class_name_in_scope=False)
- context.set_context(mode=args.mode)
- if args.distributed:
- init()
- rank_id = get_rank()
- device_num = get_group_size()
- context.set_auto_parallel_context(device_num=device_num, parallel_mode='data_parallel',
- gradients_mean=True, parameter_broadcast=True)
- else:
- rank_id = 0
- device_num = 1
- if args.isModelArts:
- os.environ["RANK_ID"] = f"{rank_id}" # logical id
- os.environ["DEVICE_ID"] = f"{rank_id}"
-
- if args.isModelArts:
- setup_logger(output_dir='/cache/outputs', rank=rank_id)
- else:
- setup_logger(output_dir=args.output_dir, rank=rank_id)
- _logger = logging.getLogger('moco')
- _logger.info(f"Experiment Configuration:\n{pprint.pformat(args.__dict__)}")
-
- _logger.info(f"Building dataset from {args.data}...")
- assert args.batch_size % device_num == 0, "Global batch size must be divisible by the number of devices!"
- local_bs = args.batch_size // device_num
- global_bs = args.batch_size
- if args.isModelArts:
- import moxing as mox
- # download dataset from obs to cache
- #mox.file.copy_parallel(src_url=args.data_url, dst_url='/cache/dataset/device_' + os.getenv('DEVICE_ID'))
- #train_dataset_path = '/cache/dataset/device_' + os.getenv('DEVICE_ID') + args.traindir
- # create dataset
-
- #DatasetToEnv(args.multi_data_url, data_dir)
- #train_dataset_path = os.path.join(data_dir + "/imagenet", "train")
-
- data_path = '/cache/sfs/data/imagenet/'
- train_dataset_path = os.path.join(data_path, "train")
- train_dataset = create_dataset_moco(dataset_path=train_dataset_path, aug_plus=args.aug_plus,
- batch_size=local_bs, workers=args.workers,
- rank_id=rank_id, device_num=device_num)
- else:
- train_dataset = create_dataset_moco(dataset_path=os.path.join(args.data, 'train'), aug_plus=args.aug_plus,
- batch_size=local_bs, workers=args.workers,
- rank_id=rank_id, device_num=device_num)
- n_batches = train_dataset.get_dataset_size()
- _logger.info(f"Local batch size: {local_bs}, global batch size: {global_bs}, number of devices: {device_num}, "
- f"got {n_batches} batches per epoch.")
-
- _logger.info(f"Building model {args.arch}...")
- network = MoCo(resnet50, args.moco_dim, args.moco_k, args.moco_m, args.moco_t, args.mlp, rank_id, device_num)
-
- # define loss function (criterion) and optimizer
- manager = nn.DynamicLossScaleUpdateCell(loss_scale_value=2**24, scale_factor=2, scale_window=1000)
- criterion = nn.SoftmaxCrossEntropyWithLogits(sparse=True, reduction='mean')
- net_with_criterion = WithLossCell(network, criterion)
- if args.cos: # cosine lr schedule
- lrs = cosine_lr(args.learning_rate, n_batches, args.epochs)
- else: # stepwise lr schedule
- lrs = multi_step_lr(args.learning_rate, args.milestones, n_batches, args.epochs)
-
- if args.amp_level != "O0":
- _logger.warning("amp_level has to be O0, for MoCoTrainStep doesn't support loss scale!")
- optimizer = nn.Momentum(network.trainable_params(), learning_rate=lrs,
- momentum=args.momentum, weight_decay=args.weight_decay)
- train_one_step = MoCoTrainStep(net_with_criterion, optimizer, manager)
- model = Model(train_one_step)
-
- # callbacks
- callbacks = [TrainMonitor(per_print_steps=args.print_freq)]
- if rank_id == 0:
- if args.isModelArts:
- if not os.path.exists('/cache/outputs/device_' + os.getenv('DEVICE_ID') + '/'):
- os.mkdir('/cache/outputs/device_' + os.getenv('DEVICE_ID') + '/')
- save_checkpoint_path = '/cache/outputs/device_' + os.getenv('DEVICE_ID') + '/'
- else:
- save_checkpoint_path = args.output_dir
- callbacks.append(ModelCheckpoint(prefix=args.arch, directory=save_checkpoint_path,
- config=CheckpointConfig(save_checkpoint_steps=n_batches)))
-
- _logger.info("Start training...")
- # Init Profiler
- # Note that the Profiler should be initialized before model.train
- # profiler = ms.Profiler(output_path='./profiler_data', profile_communication=True, profile_memory=True)
- model.train(args.epochs, train_dataset, callbacks=callbacks, dataset_sink_mode=args.dataset_sink_mode)
- if args.isModelArts:
-
- #mox.file.copy_parallel(src_url='/cache/outputs', dst_url=args.train_url)
-
- env_to_openi(train_dir, args.train_url)
- # model.train(5, train_dataset, callbacks=callbacks, dataset_sink_mode=True, sink_size=100)
- # Profiler end
- # profiler.analyse()
-
-
- if __name__ == '__main__':
- args = parser.parse_args()
- #os.environ['MINDSPORE_DUMP_CONFIG'] = args.configpath
- os.environ['MS_DIAGNOSTIC_DATA_PATH'] = '/cache/outputs/'
- if args.distributed:
- mp.set_start_method("spawn")
-
- RANK_SIZE = 8
- #os.environ["RANK_TABLE_FILE"] = "/path/to/rank_table.json"
- #print(f"Args: {' '.join(sys.argv[1:])}")
-
- processes = [mp.Process(target=main)]
- [p.start() for p in processes]
- [p.join() for p in processes]
- else:
- main()
|