7 Commits

Author SHA1 Message Date
  liupengfei 1d84a8b541 Revert "112" 1 month ago
  liupengfei 863a7badb8 Revert "122" 1 month ago
  liupengfei 2373cd5d4c Revert "去除多余代码" 1 month ago
  liupengfei d84ff4194d Revert "修改部分代码" 1 month ago
  liupengfei 4b6ff0d121 Revert "训练?" 1 month ago
  liupengfei 5a3887dad7 Revert "修改一下代码" 1 month ago
  liupengfei 9cdd2076ae Revert "加个点?" 1 month ago
4 changed files with 60 additions and 314 deletions
Split View
  1. +1
    -1
      default_config.yaml
  2. +1
    -1
      src/model_utils/moxing_adapter.py
  3. +58
    -46
      train.py
  4. +0
    -266
      train_copy.py

+ 1
- 1
default_config.yaml View File

@@ -48,7 +48,7 @@ mindir_file_name: "Flownet2" # Save file path
file_format: "MINDIR" # Save file format

# Modelarts Setup
enable_modelarts: 1 # Is training on modelarts
enable_modelarts: 0 # Is training on modelarts


# data_url,train_url是固定用于在modelarts上训练的参数,表示数据集的路径和输出模型的路径


+ 1
- 1
src/model_utils/moxing_adapter.py View File

@@ -18,7 +18,7 @@
import os
import functools
from mindspore import context
from .config import config
from config import config

_global_sync_count = 0



+ 58
- 46
train.py View File

@@ -47,10 +47,23 @@ import argparse
def set_save_ckpt_dir():
"""set save ckpt dir"""
ckpt_save_dir = config.save_checkpoint_path
if config.run_distribute:
ckpt_save_dir = ckpt_save_dir + "/ckpt_" + str(get_rank()) + "/"
if config.enable_modelarts and config.run_distribute:
ckpt_save_dir = ckpt_save_dir + "/ckpt_" + str(get_rank_id()) + "/"
else:
if config.run_distribute:
ckpt_save_dir = ckpt_save_dir + "/ckpt_" + str(get_rank()) + "/"
return ckpt_save_dir


# def init_lr(step_size):
# """init lr"""
# if config.warmup_epochs <= 0:
# return config.lr
# else:
# lr = warmup_cosine_annealing_lr(config.lr, step_size, config.warmup_epochs, config.epoch_size,
# config.pretrain_epoch_size * step_size)
# return lr

# 使用eval
def apply_eval(eval_param):
eval_model = eval_param["model"]
@@ -120,43 +133,43 @@ def add_eval_callback(model, ckpt_save_dir, cbs):


def run_train():
set_seed(config.seed)
device_id = int(os.getenv('DEVICE_ID', '3'))
print('DEVICE_ID ccc =',device_id)
context.set_context(mode=context.GRAPH_MODE, enable_graph_kernel=False, device_target=config.device_target,device_id=device_id)
context.set_context(save_graphs=True)
set_seed(config.seed)
init()
device_id = int(os.getenv('DEVICE_ID', '0'))
ascend_device_id=int(os.getenv('ASCEND_DEVICE_ID', '0'))
print('DEVICE_ID ccc =',device_id)
print('ASCEND_DEVICE_ID ccc =',ascend_device_id)

# rank_size = int(os.environ.get("RANK_SIZE", 1))
device_num = get_group_size()
rank_id = get_rank()
print('device_num ccc =',device_num)
print('rank_id ccc =',rank_id)

ms.context.set_context(mode=ms.context.GRAPH_MODE, enable_graph_kernel=False, device_target=config.device_target,device_id=device_id)
ms.context.set_context(save_graphs=True)
# ms.set_context(save_graphs=True)
# profiles = Profiler()
ds.config.set_enable_shared_mem(False)
device_num = get_device_num()
if config.device_target == "Ascend":
config.device_id = get_device_id()
if config.run_distribute == 1:
config.device_id = get_device_id()
# TODO lpf rank和 group_size 的设置是否有问题
# rank = config.device_id # rank = config.device_id
# group_size = get_group_size() # group_size = get_group_size()
# ms.set_context(device_id=config.device_id)
if device_num > 1:
ms.context.reset_auto_parallel_context()
ms.context.set_auto_parallel_context(device_num=device_num, parallel_mode=ParallelMode.DATA_PARALLEL,
gradients_mean=True)
init()
device_num = get_group_size()
print('device_num ccc =',device_num)
parallel_mode = ParallelMode.DATA_PARALLEL
rank = get_rank()
group_size = get_group_size()
print('rank ccc =',rank)
print('group_size ccc =',group_size)

# context.reset_auto_parallel_context()
# context.set_auto_parallel_context(device_num=device_num, parallel_mode=ParallelMode.DATA_PARALLEL,
# gradients_mean=True)
else:
print('log this ')
parallel_mode = ParallelMode.STAND_ALONE
rank = 0
group_size = 1
context.set_auto_parallel_context(parallel_mode=parallel_mode, gradients_mean=True, device_num=group_size)

# load dataset by config param
config.training_dataset_class = tools.module_to_dict(datasets)[config.train_data]
print('config.train_data_path =',config.train_data_path )
flownet_train_gen = config.training_dataset_class(config.crop_type, config.crop_size, config.eval_size,
config.train_data_path)
sampler = datasets.DistributedSampler(flownet_train_gen, rank=rank, group_size=group_size, shuffle=True)
sampler = datasets.DistributedSampler(flownet_train_gen, rank=0, group_size=1, shuffle=True)
print('sampler')
train_dataset = ds.GeneratorDataset(flownet_train_gen, ["images", "flow"],
sampler=sampler, num_parallel_workers=config.num_parallel_workers)
@@ -173,7 +186,8 @@ def run_train():
loss_scale_manager = DynamicLossScaleManager(init_loss_scale=65536, scale_factor=2, scale_window=2000)
else:
loss_scale_manager = FixedLossScaleManager(config.scale, drop_overflow_update=False)

# lr = ms.Tensor(init_lr(step_size=step_size))
optim = Adam(params=net.trainable_params(), learning_rate=config.lr)

load_pre_trained_checkpoint(net, config.pre_trained, config.pre_trained_ckpt_path)
@@ -190,7 +204,9 @@ def run_train():
# profiles.analyse()


def copy_obs_to_process():

if __name__ == '__main__':

parser = argparse.ArgumentParser(description='MindSpore Lenet Example')

# define 2 parameters for running on modelArts
@@ -199,7 +215,10 @@ def copy_obs_to_process():
help='path to training/inference dataset folder' )

parser.add_argument('--train_url',
help='model folder to save/load' )
help='model folder to save/load' )

parser.add_argument('--ASCEND_DEVICE_ID',
help='model folder to save/load' )

parser.add_argument(
'--device_target',
@@ -223,11 +242,13 @@ def copy_obs_to_process():

print('args.data_url=',args.data_url)

print('args.ASCEND_DEVICE_ID=',args.ASCEND_DEVICE_ID)
run_train()

def copy_ckpt_to_train():
######################## 将输出的模型拷贝到obs(固定写法) ########################
parser = argparse.ArgumentParser(description='MindSpore Lenet Example')
args = parser.parse_args()
######################## 将数据集从obs拷贝到训练镜像中 ########################
#将dataset_path指向data_url,save_checkpoint_path指向train_url
obs_train_url ="/home/work/user-job-dir/model/"
try:
@@ -238,13 +259,4 @@ def copy_ckpt_to_train():
print('moxing upload {} to {} failed: '.format(obs_train_url,
args.train_url) + str(e))
print('args.train_url=',args.train_url)


if __name__ == '__main__':
copy_obs_to_process()

run_train()

copy_ckpt_to_train()


+ 0
- 266
train_copy.py View File

@@ -1,266 +0,0 @@
# Copyright 2022 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0 dadasdasd
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# hhhhhh
# ============================================================================
import moxing as mox
import os
import datetime
import glob
import mindspore as ms
import mindspore.dataset as ds
import mindspore.log as logger
import mindspore.nn as nn
from mindspore.context import ParallelMode
from mindspore.nn.optim.adam import Adam
from mindspore.train.callback import ModelCheckpoint, CheckpointConfig, LossMonitor, TimeMonitor
from mindspore.train.loss_scale_manager import DynamicLossScaleManager, FixedLossScaleManager
from mindspore.train.model import Model
from mindspore.train.serialization import load_checkpoint, load_param_into_net
from mindspore.communication.management import init, get_rank, get_group_size
from mindspore.common import set_seed
from mindspore import context
from mindspore.profiler import Profiler
from src.eval_callback import EvalCallBack
import src.dataset as datasets
import src.models as models
from src.metric import FlowNetEPE
import src.model_utils.tools as tools
from src.model_utils.config import config
from src.model_utils.device_adapter import get_device_id, get_device_num, get_rank_id
from pprint import pprint, pformat
import argparse



# 设置保存ckpt目录
def set_save_ckpt_dir():
"""set save ckpt dir"""
ckpt_save_dir = config.save_checkpoint_path
if config.enable_modelarts and config.run_distribute:
ckpt_save_dir = ckpt_save_dir + "/ckpt_" + str(get_rank_id()) + "/"
else:
if config.run_distribute:
ckpt_save_dir = ckpt_save_dir + "/ckpt_" + str(get_rank()) + "/"
return ckpt_save_dir


# def init_lr(step_size):
# """init lr"""
# if config.warmup_epochs <= 0:
# return config.lr
# else:
# lr = warmup_cosine_annealing_lr(config.lr, step_size, config.warmup_epochs, config.epoch_size,
# config.pretrain_epoch_size * step_size)
# return lr

# 使用eval
def apply_eval(eval_param):
eval_model = eval_param["model"]
eval_ds = eval_param["dataset"]
metrics_name = eval_param["metrics_name"]
res = eval_model.eval(eval_ds, dataset_sink_mode=False)
return res[metrics_name]


# 负载预训练检查点
def load_pre_trained_checkpoint(net, pre_trained, checkpoint_path):
param_dict = None
if pre_trained:
if os.path.isdir(checkpoint_path):
ckpt_save_dir = os.path.join(checkpoint_path, "ckpt_0")
ckpt_pattern = os.path.join(ckpt_save_dir, "*.ckpt")
ckpt_files = glob.glob(ckpt_pattern)
if not ckpt_files:
logger.warning(f"There is no ckpt file in {ckpt_save_dir}, "
f"pre_trained is unsupported.")
else:
ckpt_files.sort(key=os.path.getmtime, reverse=True)
time_stamp = datetime.datetime.now()
print(f"time stamp {time_stamp.strftime('%Y.%m.%d-%H:%M:%S')}"
f" pre trained ckpt model {ckpt_files[0]} loading",
flush=True)
param_dict = load_checkpoint(ckpt_files[0])
elif os.path.isfile(checkpoint_path):
param_dict = load_checkpoint(checkpoint_path)
else:
print(f"Invalid pre_trained {checkpoint_path} parameter.")
return
load_param_into_net(net, param_dict)
print(f"loaded param from {checkpoint_path} into net")


# 添加ckpt回调
def add_ckpt_callback(step_size, ckpt_save_dir, cbs):
if config.save_checkpoint:
config_ck = CheckpointConfig(save_checkpoint_steps=step_size * config.save_ckpt_interval,
keep_checkpoint_max=config.keep_checkpoint_max)
ckpoint_cb = ModelCheckpoint(prefix="flownet2_", directory=ckpt_save_dir, config=config_ck)
cbs += [ckpoint_cb]


# 添加eval回调
def add_eval_callback(model, ckpt_save_dir, cbs):
if config.run_evalCallback:
if config.eval_data_path is None or (not os.path.isdir(config.eval_data_path)):
raise ValueError("{} is not a existing path.".format(config.eval_data_path))

config.eval_dataset_class = tools.module_to_dict(datasets)[config.eval_data]
flownet_eval_gen = config.eval_dataset_class(config.crop_type, config.crop_size, config.eval_size,
config.eval_data_path)
# sampler = datasets.DistributedSampler(flownet_eval_gen, rank, group_size) sampler=sampler,
eval_dataset = ds.GeneratorDataset(flownet_eval_gen, ["images", "flow"],
num_parallel_workers=config.num_parallel_workers,
max_rowsize=config.max_rowsize)
eval_dataset = eval_dataset.batch(config.batch_size)

eval_param_dict = {"model": model, "dataset": eval_dataset, "metrics_name": "mean error"}
eval_cb = EvalCallBack(apply_eval, eval_param_dict, interval=config.eval_interval,
eval_start_epoch=config.eval_start_epoch, save_best_ckpt=config.save_best_ckpt,
ckpt_directory=ckpt_save_dir, besk_ckpt_name="best_acc.ckpt",
metrics_name="FlownetAcc")
cbs += [eval_cb]


def run_train():
set_seed(config.seed)
init()
device_id = int(os.getenv('DEVICE_ID', '0'))
print('DEVICE_ID ccc =',device_id)
device_num = get_group_size()
print('device_num ccc =',device_num)

context.set_context(mode=context.GRAPH_MODE, enable_graph_kernel=False, device_target=config.device_target,device_id=device_id)
context.set_context(save_graphs=True)
# profiles = Profiler()
ds.config.set_enable_shared_mem(False)
if config.device_target == "Ascend":
config.device_id = get_device_id()

if device_num > 1:
init()
parallel_mode = ParallelMode.DATA_PARALLEL
rank = get_rank()
group_size = get_group_size()
print('rank ccc =',rank)
print('group_size ccc =',group_size)

context.reset_auto_parallel_context()
context.set_auto_parallel_context(device_num=device_num, parallel_mode=ParallelMode.DATA_PARALLEL,
gradients_mean=True)
else:
parallel_mode = ParallelMode.STAND_ALONE
rank = 0
group_size = 1

context.set_auto_parallel_context(parallel_mode=parallel_mode, gradients_mean=True, device_num=group_size)
# load dataset by config param
config.training_dataset_class = tools.module_to_dict(datasets)[config.train_data]
print('config.train_data_path =',config.train_data_path )
flownet_train_gen = config.training_dataset_class(config.crop_type, config.crop_size, config.eval_size,
config.train_data_path)
sampler = datasets.DistributedSampler(flownet_train_gen, rank=rank, group_size=group_size, shuffle=True)
print('sampler')
train_dataset = ds.GeneratorDataset(flownet_train_gen, ["images", "flow"],
sampler=sampler, num_parallel_workers=config.num_parallel_workers)
train_dataset = train_dataset.batch(config.batch_size, num_parallel_workers=config.num_parallel_workers)
step_size = train_dataset.get_dataset_size()
print("Step size: ", step_size,flush=True)

# load model by config param
config.model_class = tools.module_to_dict(models)[config.model]
net = config.model_class(config.rgb_max, config.batchNorm)

loss = nn.L1Loss()
if config.is_dynamicLoss_scale == 1:
loss_scale_manager = DynamicLossScaleManager(init_loss_scale=65536, scale_factor=2, scale_window=2000)
else:
loss_scale_manager = FixedLossScaleManager(config.scale, drop_overflow_update=False)

# lr = ms.Tensor(init_lr(step_size=step_size))
optim = Adam(params=net.trainable_params(), learning_rate=config.lr)

load_pre_trained_checkpoint(net, config.pre_trained, config.pre_trained_ckpt_path)
model = Model(network=net, loss_fn=loss, optimizer=optim, metrics={'FlowNetEPE': FlowNetEPE()},
amp_level="O0", keep_batchnorm_fp32=True, loss_scale_manager=loss_scale_manager)

time_cb = TimeMonitor(data_size=step_size)
loss_cb = LossMonitor()
cbs = [time_cb, loss_cb]
ckpt_save_dir = set_save_ckpt_dir()
add_ckpt_callback(step_size, ckpt_save_dir, cbs)
add_eval_callback(model, ckpt_save_dir, cbs)
model.train(config.epoch_size, train_dataset, callbacks=cbs, dataset_sink_mode=True)
# profiles.analyse()


def copy_obs_to_process():
parser = argparse.ArgumentParser(description='MindSpore Lenet Example')

# define 2 parameters for running on modelArts
# data_url,train_url是固定用于在modelarts上训练的参数,表示数据集的路径和输出模型的路径
parser.add_argument('--data_url',
help='path to training/inference dataset folder' )

parser.add_argument('--train_url',
help='model folder to save/load' )

parser.add_argument(
'--device_target',
type=str,
default="Ascend",
choices=['Ascend', 'GPU', 'CPU'],
help='device where the code will be implemented (default: Ascend)')

args = parser.parse_args()
######################## 将数据集从obs拷贝到训练镜像中 (固定写法)########################
# 在训练环境中定义data_url和train_url,并把数据从obs拷贝到相应的固定路径
obs_data_url = '/home/work/user-job-dir/data/'
try:
mox.file.copy_parallel(args.data_url, obs_data_url)
print("Successfully Download {} to {}".format(args.data_url,
obs_data_url))
except Exception as e:
print('moxing download {} to {} failed: '.format(
args.data_url, obs_data_url) + str(e))

print('args.data_url=',args.data_url)


def copy_ckpt_to_train():
######################## 将输出的模型拷贝到obs(固定写法) ########################
parser = argparse.ArgumentParser(description='MindSpore Lenet Example')
args = parser.parse_args()
#将dataset_path指向data_url,save_checkpoint_path指向train_url
obs_train_url ="/home/work/user-job-dir/model/"
try:
mox.file.copy_parallel(obs_train_url, args.train_url)
print("Successfully Upload {} to {}".format(obs_train_url,
args.train_url))
except Exception as e:
print('moxing upload {} to {} failed: '.format(obs_train_url,
args.train_url) + str(e))
print('args.train_url=',args.train_url)


if __name__ == '__main__':
copy_obs_to_process()

run_train()
copy_ckpt_to_train()


Loading…
Cancel
Save