|
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
-
- # @Date: 2021/3/26
- # @Author: qing
- import os
- import time
- import argparse
- import numpy as np
- import moxing as mox
- import mindspore.common.dtype as mstype
- import mindspore.communication.management as D
-
- from mindspore.context import ParallelMode
- from mindspore import context
- from mindspore.common.tensor import Tensor
- from mindspore.train.model import Model
- from mindspore.parallel._cost_model_context import _set_multi_subgraphs
- from mindspore.parallel import set_algo_parameters
- from mindspore.parallel._auto_parallel_context import auto_parallel_context
-
- # from pangu_dropout_recompute_eos import EvalNet, PANGU, PANGUWithLoss, CrossEntropyLoss, EvalNet_p
- from pangu_dropout_recompute_eos_fp16 import EvalNet, PANGU, PANGUWithLoss, CrossEntropyLoss, EvalNet_p
- from pangu_wrapcell_gradient_scale_eos import PANGUTrainOneStepWithLossScaleCell, VirtualDatasetOneInputCell
- from utils_fix import PANGUConfig, LearningRate
- from tokenization_jieba import JIEBATokenizerNew
- from model_path import models_path
- from eval_webqa import do_eval_webqa
- from eval_wsc import do_eval_wsc
-
- os.environ['HCCL_CONNECT_TIMEOUT'] = '1800'
- project_root = os.path.abspath(os.path.dirname(os.path.realpath(__file__)) + os.path.sep + "..")
- print('project_root:', project_root)
-
- def ckpt_copy_tar(obs_path, target_path="/cache/ckpt"):
- """
- requires the obs_path to be a complete name
- Copy tar file from the obs to the /cache/
- """
- sub_name_list = ['_0.tar', '_1.tar', '_2.tar', '_3.tar']
- for item in sub_name_list:
- sub_name = obs_path + item
- tmp_name = 'model.tar'
-
- mox.file.copy(sub_name, os.path.join(target_path, tmp_name))
- os.system('cd {}; tar -xvf {}'.format(target_path, tmp_name))
-
- def get_ckpt_file_list(ckpt_path):
- # path = os.listdir(ckpt_path)
- # print('Get path list:', path, flush=True)
-
- returned_list = []
- for i in range(0, 512):
- returned_list.append('filerted_{}.ckpt'.format(i))
- # filtered = [item for item in path if 'embedding' not in item]
- # filtered.sort(key = lambda x: int(x.split('.')[0].split('_')[1]))
- returned_list = [os.path.join(ckpt_path, item) for item in returned_list if 'embedding' not in item]
- print("Sorted list", returned_list)
- for item in returned_list:
- fsize = os.path.getsize(item)
- f_gb = fsize / float(1024) / 1024 / 1024
- print(item, " :{:.2f}".format(f_gb))
-
- return returned_list
-
- def get_args():
- parser = argparse.ArgumentParser(description="PANGU inferencing")
-
- parser.add_argument('--device_id',
- type=int,
- default=0,
- help="Device id, default is 0.")
- parser.add_argument("--device_num",
- type=int,
- default=128,
- help="Use device nums, default is 1.")
- parser.add_argument("--distribute",
- type=str,
- default="true",
- choices=["true", "false"],
- help="Run distribute, default is false.")
- parser.add_argument("--optimizer",
- type=str,
- default="adam",
- choices=["adam", "lamb"],
- help="select which optimizer to be used, default adam")
- parser.add_argument("--epoch_size",
- type=int,
- default=1,
- help="Epoch size, default is 10.")
- parser.add_argument("--warmup_step",
- type=int,
- default=2000,
- help="Warmup step, default is 10000.")
- parser.add_argument("--start_lr",
- type=float,
- default="1e-4",
- help="Start learning rate, default is 5e-5.")
- parser.add_argument("--end_lr",
- type=float,
- default="1e-10",
- help="End learning rate, default is 1e-10.")
- parser.add_argument("--sink_size",
- type=int,
- default=2,
- help="Sink size for every iteration, default is 100")
- parser.add_argument('--data_url',
- required=True,
- default=None,
- help='Location of data.')
- parser.add_argument('--train_url',
- required=True,
- default=None,
- help='Location of training outputs.')
- parser.add_argument('--whl_pkg',
- type=str,
- default='',
- help='Location of mindspore whl.')
- parser.add_argument('--bucket_dir',
- type=str,
- default='s3://mindspore-file/ckpt/pangu_1024_13b_exp46',
- help='Obs ckpt dir')
- parser.add_argument('--soma_bucket_dir',
- type=str,
- default='s3://pcl-verify/soma/pangu_1024_13b_exp46',
- help='Obs soma dir')
- parser.add_argument("--sample_count",
- type=int,
- default=1000000,
- help="sample_count, default is 1000000.")
- parser.add_argument("--eod_id",
- type=int,
- default=9,
- help="eod_id.")
- parser.add_argument("--eod_reset",
- type=int,
- default=0,
- help="eod_reset 0/1.")
- parser.add_argument("--full_batch",
- type=int,
- default=1,
- help="full_batch 0/1.")
- parser.add_argument("--save_step",
- type=int,
- default=3000,
- help="a large step")
-
- TEMP = ['exp61_PANGU3_1-3000_2', 'exp61_PANGU3_4-5500_2', 'exp61_PANGU3_4-15500_2', 'exp61_PANGU3_4-25500_2', 'Newexp65_PANGU3-16000_2']
- INDEX = 4
-
- parser.add_argument("--ckpt_path", type=str,
- default='s3://mindspore-file/huangxinjing/filtered_ckpt/{}/{}part'.format(TEMP[INDEX], TEMP[INDEX]),
- help="path for saved checkpoint ")
- parser.add_argument("--word_embedding_path", type=str,
- default='s3://mindspore-file/huangxinjing/filtered_ckpt/{}/{}_word_embedding.npy'.format(TEMP[INDEX], TEMP[INDEX]),
- help="path for word embedding")
- parser.add_argument("--position_embedding_path", type=str,
- default='s3://mindspore-file/huangxinjing/filtered_ckpt/{}/{}_position_embedding.npy'.format(TEMP[INDEX], TEMP[INDEX]),
- help="path for position embedding")
- parser.add_argument("--top_query_embedding_path", type=str,
- default='s3://mindspore-file/huangxinjing/filtered_ckpt/{}/{}_top_query_embedding.npy'.format(TEMP[INDEX], TEMP[INDEX]),
- help="path for top_query embedding")
-
- parser.add_argument("--tokenizer_path", type=str, default='s3://pcl-verify/jiangfq/bpe_4w_pcl/', help="path for vocab_file and model_file")
- parser.add_argument("--save_path", type=str, default='s3://pcl-verify/jiangfq/data/', help="path for vocab_file and model_file")
- parser.add_argument("--data_path", type=str, default='s3://pcl-verify/jiangfq/data/', help="Remote read text")
-
- args = parser.parse_args()
-
- return args
-
- def get_model(args_opt):
- EXEC_PATH = os.path.join(project_root, 'inference-2b')
- device_id = int(os.getenv("DEVICE_ID"))
- rank_id_str = os.getenv('RANK_ID', '0')
- rank_id = int(
- rank_id_str[rank_id_str.rfind('-') +
- 1:]) # 'RANK_ID': 'job24535502-job-facereidtome-hn-0/1'
- print('rank_id:{}'.format(rank_id), "rank_id str:{}".format(rank_id_str))
- device_id = int(os.getenv('DEVICE_ID'))
- local_rank = rank_id
- print('local_rank:{}, device id:{}'.format(local_rank, device_id))
-
- strategy_ckpt_path = "/cache/strategy/ckpt_strategy_{}.ckpt".format(local_rank)
- mox.file.copy(src_url="obs://mindspore-file/strategy_ckpt/pangu_1024_13b_exp66cktp_strategy_512.ckpt",
- dst_url=strategy_ckpt_path)
-
- # mox.file.copy(src_url="obs://pcl-verify/filtered_ckpt/ckpt_strategy_pangu.ckpt", dst_url=strategy_ckpt_path)
-
- mox.file.copy(args_opt.word_embedding_path, '/cache/word_embedding.npy')
- mox.file.copy(args_opt.position_embedding_path, '/cache/position_embedding.npy')
- mox.file.copy(args_opt.top_query_embedding_path, '/cache/top_query_embedding.npy')
-
- # copy the embedding path
- # reload
- from mindspore.train.serialization import load_checkpoint, load_param_into_net, \
- load_distributed_checkpoint # _load_single_param,
-
- if local_rank % 8 == 0:
- os.system('ulimit -s 102400')
- ckpt_copy_tar(args_opt.ckpt_path, target_path="/cache/ckpt_file")
- # mox.file.copy(src_url=args_opt.tokenizer_path + '/vocab10.model', dst_url="/cache/tokenizer/vocab10.model")
- # mox.file.copy(src_url=args_opt.tokenizer_path + '/vocab10.vocab', dst_url="/cache/tokenizer/vocab10.vocab")
- print("setting env success.")
- # 刷包或下载数据集结束后,写一个文件来表示下载成功
- f = open("%s/install.txt" % (EXEC_PATH), 'w')
- f.close()
- # 此处用于阻塞其他进程,直到刷包以及下载数据集完成为止
- while not os.path.exists("%s/install.txt" % (EXEC_PATH)):
- time.sleep(1)
- print('local_rank:{}, device id:{} start to run...'.format(
- local_rank, device_id),
- flush=True)
- save_graphs_path = "/var/log/npu/slog/device" + str(local_rank) + "/"
- context.set_context(save_graphs=False,
- save_graphs_path=save_graphs_path,
- mode=context.GRAPH_MODE,
- device_target="Ascend",
- device_id=device_id)
- context.set_context(variable_memory_max_size="30GB")
- full_batch = bool(args_opt.full_batch)
- if args_opt.distribute == "true":
- D.init()
- device_num = D.get_group_size()
- rank = D.get_rank()
- print("device_id is {}, rank_id is {}, device_num is {}".format(
- device_id, rank, device_num))
-
- context.reset_auto_parallel_context()
- context.set_auto_parallel_context(
- parallel_mode=ParallelMode.SEMI_AUTO_PARALLEL,
- gradients_mean=False,
- device_num=device_num,
- full_batch=full_batch,
- strategy_ckpt_load_file=strategy_ckpt_path,
- enable_parallel_optimizer=False)
- auto_parallel_context().set_loss_repeated_mean(True)
- set_algo_parameters(elementwise_op_strategy_follow=True)
- _set_multi_subgraphs()
-
- else:
- rank = 0
- device_num = 1
-
- # model_parallel_num = 8
- model_parallel_num = 1
- data_parallel_num = int(device_num / model_parallel_num)
- per_batch_size = 1
- batch_size = per_batch_size * data_parallel_num
- config = PANGUConfig(
- data_parallel_num=data_parallel_num,
- model_parallel_num=model_parallel_num,
- batch_size=batch_size,
- seq_length=1024,
- vocab_size=40000,
- embedding_size=5120, # 5120, # 353M 8B
- num_layers=40, # 40,
- num_heads=40, # ,
- expand_ratio=4,
- post_layernorm_residual=False,
- dropout_rate=0.1, # 0.0,
- compute_dtype=mstype.float16,
- use_past=False,
- self_layernorm=True,
- forward_reduce_scatter=True,
- word_emb_dp=True,
- eod_reset=bool(args_opt.eod_reset))
- print("===config is: ", config, flush=True)
- pangu = PANGU(config)
- pangu_ = VirtualDatasetOneInputCell(pangu)
- eval_pangu = EvalNet_p(pangu_, generate=True)
- eval_pangu.set_train(False)
-
- model = Model(eval_pangu)
- fake_input = Tensor(np.ones(shape=(1, config.seq_length)), mstype.int32)
- predict_layout = model.infer_predict_layout(fake_input)
-
- ckpt_path = "/cache/ckpt_file"
- ckpt_file_list = get_ckpt_file_list(ckpt_path)
- print("Start to load distributed checkpoint", flush=True)
- # res2 = load_distributed_checkpoint(eval_pangu, ['/cache/ckpt_file/embedding.ckpt']*512, predict_layout)
- # res = load_distributed_checkpoint(eval_pangu, ckpt_file_list, predict_layout)
- res = load_distributed_checkpoint(eval_pangu, ckpt_file_list)
-
- ckpt = load_checkpoint(ckpt_file_list[0])
- TRAINING_STEP = ckpt['has_trained_step'].asnumpy()
- print('@@@@@@@ Saved model training step is: {} @@@@@@@@@@@@'.format(TRAINING_STEP))
-
- print('#### Load ckpt success!!! ####')
-
- time.sleep(10)
- return model, config, rank
-
- def get_model_loss(args_opt):
- EXEC_PATH = os.path.join(project_root, 'inference-2b')
- device_id = int(os.getenv("DEVICE_ID"))
- rank_id_str = os.getenv('RANK_ID', '0')
- rank_id = int(
- rank_id_str[rank_id_str.rfind('-') +
- 1:]) # 'RANK_ID': 'job24535502-job-facereidtome-hn-0/1'
- print('rank_id:{}'.format(rank_id), "rank_id str:{}".format(rank_id_str))
- device_id = int(os.getenv('DEVICE_ID'))
- local_rank = rank_id
- print('local_rank:{}, device id:{}'.format(local_rank, device_id))
-
- strategy_ckpt_path = "/cache/strategy/ckpt_strategy_{}.ckpt".format(local_rank)
- mox.file.copy(src_url="obs://mindspore-file/strategy_ckpt/pangu_1024_13b_exp66cktp_strategy_512.ckpt", dst_url=strategy_ckpt_path)
-
- # mox.file.copy(src_url="obs://pcl-verify/filtered_ckpt/ckpt_strategy_pangu.ckpt", dst_url=strategy_ckpt_path)
-
- mox.file.copy(args_opt.word_embedding_path, '/cache/word_embedding.npy')
- mox.file.copy(args_opt.position_embedding_path, '/cache/position_embedding.npy')
- mox.file.copy(args_opt.top_query_embedding_path, '/cache/top_query_embedding.npy')
-
- # copy the embedding path
- # reload
- from mindspore.train.serialization import load_checkpoint, load_param_into_net, \
- load_distributed_checkpoint # _load_single_param,
-
- if local_rank % 8 == 0:
- os.system('ulimit -s 102400')
- ckpt_copy_tar(args_opt.ckpt_path, target_path="/cache/ckpt_file")
- # mox.file.copy(src_url=args_opt.tokenizer_path + '/vocab10.model', dst_url="/cache/tokenizer/vocab10.model")
- # mox.file.copy(src_url=args_opt.tokenizer_path + '/vocab10.vocab', dst_url="/cache/tokenizer/vocab10.vocab")
- print("setting env success.")
- # 刷包或下载数据集结束后,写一个文件来表示下载成功
- f = open("%s/install.txt" % (EXEC_PATH), 'w')
- f.close()
- # 此处用于阻塞其他进程,直到刷包以及下载数据集完成为止
- while not os.path.exists("%s/install.txt" % (EXEC_PATH)):
- time.sleep(1)
- print('local_rank:{}, device id:{} start to run...'.format(
- local_rank, device_id),
- flush=True)
- save_graphs_path = "/var/log/npu/slog/device" + str(local_rank) + "/"
- context.set_context(save_graphs=False,
- save_graphs_path=save_graphs_path,
- mode=context.GRAPH_MODE,
- device_target="Ascend",
- device_id=device_id)
- context.set_context(variable_memory_max_size="30GB")
- full_batch = bool(args_opt.full_batch)
- if args_opt.distribute == "true":
- D.init()
- device_num = D.get_group_size()
- rank = D.get_rank()
- print("device_id is {}, rank_id is {}, device_num is {}".format(
- device_id, rank, device_num))
-
- context.reset_auto_parallel_context()
- context.set_auto_parallel_context(
- parallel_mode=ParallelMode.SEMI_AUTO_PARALLEL,
- gradients_mean=False,
- device_num=device_num,
- full_batch=full_batch,
- strategy_ckpt_load_file=strategy_ckpt_path,
- enable_parallel_optimizer=False)
- auto_parallel_context().set_loss_repeated_mean(True)
- set_algo_parameters(elementwise_op_strategy_follow=True)
- _set_multi_subgraphs()
-
- else:
- rank = 0
- device_num = 1
-
- model_parallel_num = 1
- data_parallel_num = int(device_num / model_parallel_num)
- per_batch_size = 1
- batch_size = per_batch_size * data_parallel_num
- config = PANGUConfig(
- data_parallel_num=data_parallel_num,
- model_parallel_num=model_parallel_num,
- batch_size=batch_size,
- seq_length=1024,
- vocab_size=40000,
- embedding_size=5120, # 5120, # 353M 8B
- num_layers=40, # 40,
- num_heads=40, # ,
- expand_ratio=4,
- post_layernorm_residual=False,
- dropout_rate=0.1, # 0.0,
- compute_dtype=mstype.float16,
- use_past=False,
- self_layernorm=True,
- forward_reduce_scatter=True,
- word_emb_dp=True,
- eod_reset=bool(args_opt.eod_reset))
- print("===config is: ", config, flush=True)
-
- pangu = PANGU(config)
- loss = CrossEntropyLoss(config)
- eval_pangu = PANGUWithLoss(config, pangu, loss, eos_token=9)
- eval_pangu = VirtualDatasetOneInputCell(eval_pangu)
- eval_pangu.set_train(False)
- model = Model(eval_pangu)
-
- fake_input = Tensor(np.ones(shape=(1, config.seq_length+1)), mstype.int32)
- predict_layout = model.infer_predict_layout(fake_input)
-
- ckpt_path = "/cache/ckpt_file"
- ckpt_file_list = get_ckpt_file_list(ckpt_path)
- print("Start to load distributed checkpoint", flush=True)
- # res2 = load_distributed_checkpoint(eval_pangu, ['/cache/ckpt_file/embedding.ckpt']*512, predict_layout)
- res = load_distributed_checkpoint(eval_pangu, ckpt_file_list)
-
- ckpt = load_checkpoint(ckpt_file_list[0])
- TRAINING_STEP = ckpt['has_trained_step'].asnumpy()
- print('@@@@@@@ Saved model training step is: {} @@@@@@@@@@@@'.format(TRAINING_STEP))
-
- print('#### Load ckpt success!!! ####')
-
- time.sleep(10)
- return model, config, rank
-
- def get_tokenizer(args_opt):
- rank_id_str = os.getenv('RANK_ID', '0')
- rank_id = int(rank_id_str[rank_id_str.rfind('-') + 1:])
- src_url_model = args_opt.tokenizer_path + 'vocab.model'
- src_url_vocab = args_opt.tokenizer_path + 'vocab.vocab'
- vocab_path = "/cache/vocab.vocab"
- model_path = "/cache/vocab.model"
- copy_flag_file = os.path.join(os.path.dirname(__file__), "tokenizer.txt")
- print("get_tokenizer, rank_id:", rank_id)
-
- if rank_id % 8 == 0 and not os.path.exists(copy_flag_file):
- mox.file.copy(src_url=src_url_model, dst_url=model_path)
- mox.file.copy(src_url=src_url_vocab, dst_url=vocab_path)
- f = open(copy_flag_file, 'w')
- f.close()
- # 此处用于阻塞其他进程,直到刷包以及下载数据集完成为止
- while not os.path.exists(copy_flag_file):
- print("ERROR: vocab文件不存在,等待rank0下载,copy_flag_file is: ", copy_flag_file)
- time.sleep(1)
-
- tokenizer = JIEBATokenizerNew(vocab_path, model_path)
-
- return tokenizer
-
-
- def do_eval(task_mode, model_name):
- args_opt = get_args()
-
- args_opt.ckpt_path = models_path[model_name] + "part"
- args_opt.word_embedding_path = models_path[model_name] + "_word_embedding.npy"
- args_opt.position_embedding_path = models_path[model_name] + "_position_embedding.npy"
- args_opt.top_query_embedding_path = models_path[model_name] + "_top_query_embedding.npy"
-
- if "4w4_step" == model_name:
- args_opt.ckpt_path = models_path[model_name]+ ".tar"
-
- tokenizer = get_tokenizer(args_opt)
-
- if "generate"==task_mode:
- model, config, rank = get_model(args_opt)
-
- # webqa test
- name_prefix = model_name + "_close_book_qa"
- data_path_obs = os.path.join(args_opt.data_path, "webqa/input/")
- save_path_obs = os.path.join(args_opt.save_path, "webqa/output/")
- do_eval_webqa(tokenizer, model, config, rank, data_path_obs, save_path_obs, name_prefix)
-
- del model
-
- elif "ppl"==task_mode:
- model, config, rank = get_model_loss(args_opt)
-
- # wsc test
- name_prefix = model_name + "_wsc"
- data_path_obs = os.path.join(args_opt.data_path, "wsc/input/")
- save_path_obs = os.path.join(args_opt.save_path, "wsc/output/")
- do_eval_wsc(tokenizer, model, config, rank, data_path_obs, save_path_obs, name_prefix)
-
- del model
-
- else:
- print("Error, task mode error.")
-
-
- if __name__ == "__main__":
- for model_name in models_path:
- do_eval(task_mode="generate", model_name=model_name)
- do_eval(task_mode="ppl", model_name=model_name)
|