|
- import os
- import sys
- import argparse
- import yaml
-
- # openi needed
- import moxing as mox
- import json
- import time
- from upload_for_c2net import UploadOutput
-
- import mindspore
- from mindspore import nn, ops, Model, FixedLossScaleManager, context, DynamicLossScaleManager
- from mindspore.communication import get_rank, init
- from mindspore.context import ParallelMode
- from mindspore.train.callback import CheckpointConfig, ModelCheckpoint, LossMonitor, TimeMonitor
- # from mindspore.profiler import Profiler
-
- from model.delg import delg, ArcFace
- # from data.get_dataset import create_dataset
- from data.get_dataset_from_origin import create_dataset
-
-
- class LossFunc(nn.Cell):
- def __init__(self, state=None, batch_size=None):
- super(LossFunc, self).__init__()
- self.state = state
- # global
- self.arcface = ArcFace(96264, batch_size)
- # local
- self.avg_pool = ops.ReduceMean(keep_dims=True)
- self.local_fc = nn.Dense(512, 96264)
- # loss
- self.loss_fn = nn.SoftmaxCrossEntropyWithLogits(sparse=True, reduction="mean")
- # self.auto_encoder_loss_fn = nn.MSELoss()
-
- def construct(self, base, label):
- label = ops.clip_by_value(label, 0, 96264)
- if self.state == "global":
- global_logits = self.arcface(base, label)
- global_loss = self.loss_fn(global_logits, label)
- total_loss = global_loss
- else:
- (global_feature, local_feature, block3, attn_scores) = base
- # global
- global_logits = self.arcface(global_feature, label)
- # local
- local_logits = self.avg_pool(local_feature, (2, 3))
- local_logits = local_logits.view(local_logits.shape[0], -1)
- local_logits = self.local_fc(local_logits)
- # loss
- global_loss = self.loss_fn(global_logits, label.view(-1))
- local_loss = self.loss_fn(local_logits, label.view(-1))
- # auto_encoder_loss = self.auto_encoder_loss_fn(dim_expanded_features, block3) * 10.0
-
- total_loss = global_loss + local_loss
-
- return total_loss
-
-
- class WithLossCell(nn.Cell):
- def __init__(self, backbone, loss_fn):
- super(WithLossCell, self).__init__()
- self._backbone = backbone
- self._loss_fn = loss_fn
-
- def construct(self, data, label):
- base = self._backbone(data)
-
- return self._loss_fn(base, label)
-
- ### Copy multiple datasets from obs to training image ###
- def MultiObsToEnv(multi_data_url, data_dir):
- #--multi_data_url is json data, need to do json parsing for multi_data_url
- multi_data_json = json.loads(multi_data_url)
- for i in range(len(multi_data_json)):
- path = data_dir + "/" + multi_data_json[i]["dataset_name"]
- if not os.path.exists(path):
- os.makedirs(path)
- try:
- mox.file.copy_parallel(multi_data_json[i]["dataset_url"], path)
- print("Successfully Download {} to {}".format(multi_data_json[i]["dataset_url"],path))
- except Exception as e:
- print('moxing download {} to {} failed: '.format(
- multi_data_json[i]["dataset_url"], path) + str(e))
- #Set a cache file to determine whether the data has been copied to obs.
- #If this file exists during multi-card training, there is no need to copy the dataset multiple times.
- f = open("/cache/download_input.txt", 'w')
- f.close()
- try:
- if os.path.exists("/cache/download_input.txt"):
- print("download_input succeed")
- except Exception as e:
- print("download_input failed")
- return
- ### Copy the output model to obs ###
- def EnvToObs(train_dir, obs_train_url):
- try:
- mox.file.copy_parallel(train_dir, obs_train_url)
- print("Successfully Upload {} to {}".format(train_dir,
- obs_train_url))
- except Exception as e:
- print('moxing upload {} to {} failed: '.format(train_dir,
- obs_train_url) + str(e))
- return
- def DownloadFromQizhi(multi_data_url, data_dir):
- device_num = int(os.getenv('RANK_SIZE'))
- if device_num == 1:
- MultiObsToEnv(multi_data_url,data_dir)
- context.set_context(mode=context.GRAPH_MODE,device_target=args.device_target)
- if device_num > 1:
- # set device_id and init for multi-card training
- context.set_context(mode=context.GRAPH_MODE, device_target=args.device_target, device_id=int(os.getenv('ASCEND_DEVICE_ID')))
- context.reset_auto_parallel_context()
- context.set_auto_parallel_context(device_num = device_num, parallel_mode=ParallelMode.DATA_PARALLEL, gradients_mean=True, parameter_broadcast=True)
- init()
- #Copying obs data does not need to be executed multiple times, just let the 0th card copy the data
- local_rank=int(os.getenv('RANK_ID'))
- if local_rank%8==0:
- MultiObsToEnv(multi_data_url,data_dir)
- # mox.file.copy_parallel("obs://sysu-dhuang/linpeijia/gldv2/mindrecord", data_dir)
- # print("Done!!")
- #If the cache file does not exist, it means that the copy data has not been completed,
- #and Wait for 0th card to finish copying data
- while not os.path.exists("/cache/download_input.txt"):
- time.sleep(1)
- return
- def UploadToQizhi(train_dir, obs_train_url):
- device_num = int(os.getenv('RANK_SIZE'))
- local_rank=int(os.getenv('RANK_ID'))
- if device_num == 1:
- EnvToObs(train_dir, obs_train_url)
- if device_num > 1:
- if local_rank%8==0:
- EnvToObs(train_dir, obs_train_url)
- return
-
- def run_train(cfg):
- # profiler = Profiler(output_path=cfg['profiler_data_url'])
- # load data
- # train_dataset = create_dataset(cfg=cfg, data_path=cfg['mindrecord_url'])
- train_dataset = create_dataset(cfg=cfg, data_path=cfg['origin_data_root_url'])
-
- # initiall forward net
- if cfg['use_openi']:
- pretrain_ckpt_path = sys.path[0] + "/ckpt/resnet50_ascend_v130_imagenet2012_official_cv_bs32_acc77.06.ckpt"
- delg_net = delg(pretrain_ckpt=pretrain_ckpt_path, state=cfg["state"])
- else:
- delg_net = delg(pretrain_ckpt=cfg['pretraind_resnet50_url'], state=cfg["state"])
-
- # loss func
- loss_func = LossFunc(cfg["state"], cfg["batch_size"])
- # loss_func = LossFunc()
-
- delg_net_with_loss = WithLossCell(delg_net, loss_func)
-
- # optimizer & dynamic lr
- init_lr = cfg['lr']
- lr_schedule = nn.PolynomialDecayLR(learning_rate=init_lr, end_learning_rate=0.00001, decay_steps=500000, power=1.0)
- opt = nn.SGD(params=delg_net_with_loss.trainable_params(), learning_rate=lr_schedule)
-
- # callback list
- cb_config = CheckpointConfig(save_checkpoint_steps=cfg['save_checkpoint_steps'],
- keep_checkpoint_max=cfg['keep_checkpoint_max'])
- device_num = int(os.getenv('RANK_SIZE'))
- if device_num > 1:
- model_ck_cb = ModelCheckpoint(prefix="delg_", directory=cfg['saved_ckpt_url'] + str(get_rank()) + "/",
- config=cb_config)
- else:
- model_ck_cb = ModelCheckpoint(prefix="delg_", directory=cfg['saved_ckpt_url'], config=cb_config)
- loss_cb = LossMonitor()
- time_cb = TimeMonitor()
- callback_list = [time_cb, loss_cb, model_ck_cb]
-
- # train
- # loss_scale_manager = FixedLossScaleManager()
- loss_scale_manager = DynamicLossScaleManager(init_loss_scale=65536, scale_factor=2, scale_window=2000)
- model = Model(network=delg_net_with_loss, loss_scale_manager=loss_scale_manager, optimizer=opt, amp_level="O3")
- # model.train(epoch=cfg['epoch_size'], train_dataset=train_dataset, callbacks=callback_list, dataset_sink_mode=True, sink_size=10)
- model.train(epoch=cfg['epoch_size'], train_dataset=train_dataset, callbacks=callback_list, dataset_sink_mode=True)
- # profiler.analyse()
-
- if cfg['use_openi']:
- UploadToQizhi(train_dir,args.train_url)
-
-
- parser = argparse.ArgumentParser(description="DELG Mindspore Version")
- parser.add_argument('--yaml_path',
- help='Path to training config',
- default='/cache/user-job-dir/code/delg_config.yaml')# /data2/DELG_leimingxuan/DELG_Final/delg_config.yaml
- parser.add_argument(
- '--device_target',
- type=str,
- default="Ascend",
- choices=['Ascend', 'CPU'],
- help='device where the code will be implemented (default: Ascend),if to use the CPU on the Qizhi platform:device_target=CPU')
-
- parser.add_argument('--data_url',
- help='path to training/inference dataset folder',
- default= '/cache/data1/')
-
- parser.add_argument('--multi_data_url',
- help='path to multi dataset',
- default= '/cache/data/')
-
- parser.add_argument('--train_url',
- help='model folder to save/load',
- default= '/cache/output/')
-
- if __name__ == '__main__':
- args = parser.parse_args()
- # ymal_path = args.yaml_path
- ymal_path = sys.path[0] + "/delg_config.yaml"
- print("ymal_path: ", ymal_path)
- with open(ymal_path, 'r', encoding='utf-8') as f:
- cfg = yaml.load(f.read(), Loader=yaml.FullLoader)
-
- if cfg['use_openi']:
- data_dir = '/cache/data'
- train_dir = '/cache/output'
-
- if not os.path.exists(data_dir):
- os.makedirs(data_dir)
- if not os.path.exists(train_dir):
- os.makedirs(train_dir)
-
- ###Initialize and copy data to training image
- DownloadFromQizhi(args.multi_data_url, data_dir)
- # local_rank=int(os.getenv('RANK_ID'))
- # if local_rank%8==0:
- # mox.file.copy_parallel("obs://sysu-dhuang/linpeijia/gldv2/mindrecord", data_dir)
- # print("Done!!")
- # for filename in os.listdir(data_dir):
- # print(filename)
- # else:
- # time.sleep(10)
- else:
- context.set_context(mode=context.GRAPH_MODE, device_target=cfg['device_target'])
- # context.set_context(mode=context.PYNATIVE_MODE, device_target=cfg['device_target'])
- device_id = int(os.getenv("DEVICE_ID"))
- context.set_context(device_id=device_id)
- if cfg['device_num'] > 1:
- context.reset_auto_parallel_context()
- context.set_auto_parallel_context(parallel_mode=ParallelMode.DATA_PARALLEL, gradients_mean=True)
- init()
-
- run_train(cfg)
|