|
- # coding=utf-8
- # Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
-
- """Pretrain GPT2"""
-
- import torch
- import os
-
- from megatron import get_args
- from megatron import print_rank_0
- from megatron import get_timers
- from megatron import get_tokenizer
- from megatron import mpu
- from megatron.data.gpt2_dataset import build_train_valid_test_datasets
- from megatron.model import GPT2Model
- from megatron.training import pretrain
- from megatron.utils import get_ltor_masks_and_position_ids
- from megatron.utils import reduce_losses
- from megatron.initialize import initialize_megatron
- from megatron.training import get_model,evaluate_and_print_results,\
- build_train_valid_test_data_iterators
- from megatron.utils import make_data_loader
-
- from poc.param_server_mp import ParamHunter
- from poc.thgy_client import THGYApiClient
- import time
- import datetime
- import shutil
-
- _PARAM_HUNTER:ParamHunter = None
- _EVAL_DATA_LOADER = None
-
- def model_provider():
- """Build the model."""
- global _PARAM_HUNTER
- print_rank_0('building GPT2 model ...')
-
- model = GPT2Model(num_tokentypes=0, parallel_output=True)
-
- _PARAM_HUNTER = ParamHunter(model, is_agent=True, debug=False)
-
- return model
-
-
- def get_batch(data_iterator):
- """Generate a batch"""
- args = get_args()
- tokenizer = get_tokenizer()
-
- # Items and their type.
- keys = ['text']
- datatype = torch.int64
-
- # Broadcast data.
- if data_iterator is not None:
- data = next(data_iterator)
- else:
- data = None
- data_b = mpu.broadcast_data(keys, data, datatype)
-
- # Unpack.
- tokens_ = data_b['text'].long()
- labels = tokens_[:, 1:].contiguous()
- tokens = tokens_[:, :-1].contiguous()
-
- # Get the masks and postition ids.
- attention_mask, loss_mask, position_ids = get_ltor_masks_and_position_ids(
- tokens,
- tokenizer.eod,
- args.reset_position_ids,
- args.reset_attention_mask,
- args.eod_mask_loss)
-
- return tokens, labels, loss_mask, attention_mask, position_ids
-
-
- def forward_step(data_iterator, model):
- """Forward step."""
- args = get_args()
- timers = get_timers()
-
- # Get the batch.
- timers('batch generator').start()
- tokens, labels, loss_mask, attention_mask, position_ids = get_batch(data_iterator)
- timers('batch generator').stop()
- # Forward model.
- losses = model(tokens, position_ids, attention_mask, labels=labels)
- loss_mask = loss_mask.view(-1)
- loss = torch.sum(losses.view(-1) * loss_mask) / loss_mask.sum()
-
- # Reduce loss for logging.
- reduced_loss = reduce_losses([loss])
-
- return loss, {'lm loss': reduced_loss[0]}
-
-
- def train_valid_test_datasets_provider(train_val_test_num_samples):
- """Build train, valid, and test datasets."""
- args = get_args()
- global _EVAL_DATA_LOADER
-
- print_rank_0('> building train, validation, and test datasets for GPT2 ...')
- # train_ds, valid_ds, test_ds = build_train_valid_test_datasets(
- # data_prefix=args.data_path,
- # data_impl=args.data_impl,
- # splits_string=args.split,
- # train_valid_test_num_samples=train_val_test_num_samples,
- # seq_length=args.seq_length,
- # seed=args.seed,
- # skip_warmup=(not args.mmap_warmup))
-
- tmp_dataset, _, _ = build_train_valid_test_datasets(
- data_prefix=args.data_path_eval,
- data_impl=args.data_impl,
- splits_string=args.split,
- train_valid_test_num_samples=train_val_test_num_samples,
- seq_length=args.seq_length,
- seed=args.seed,
- skip_warmup=(not args.mmap_warmup))
-
- _EVAL_DATA_LOADER = iter(make_data_loader(tmp_dataset))
-
- print_rank_0("> finished creating GPT2 datasets ...")
-
- return None, None, None
-
-
-
- def usr_args_provider(parser):
- group = parser.add_argument_group(title='poc')
-
- group.add_argument('--globalStep', type=int, default=0,
- help='')
- group.add_argument('--num_round_perEpoch', type=int, default=1,
- help='')
- group.add_argument('--num_epoch', type=int, default=1,
- help='')
- group.add_argument('--initial', type=str, default=None, choices=['False', 'True'],
- help='initial flag')
- group.add_argument('--uuid', type=str, default='test',
- help='')
- group.add_argument('--data-path-eval', type=str, default=None,
- help='eval data path')
- group.add_argument('--using_poc_flag', type=int, default=1, choices=[0, 1],
- help='weather to use poc')
-
- return parser
-
-
- def _parse_param(model_path):
- split_items = os.path.basename(model_path).split('-')
- if len(split_items) == 4:
- timestamp, step, _round, uuid = split_items
- elif len(split_items) > 4:
- timestamp = split_items[0]
- step = split_items[1]
- _round = split_items[2]
- uuid = split_items[3]
- else:
- uuid = 'avg'
- timestamp=''
- _round = 1
- step = 1
- #界面展示用
- #获取任务组id
- group_id = 0
- _task_id = 0
- try:
- client_uuids = [d.name for d in os.scandir(os.path.dirname(os.path.dirname(model_path))) if d.is_dir() and 'avg' not in d.name]
- _group_id, _task_id = client_uuids[0].split('_')
- group_id=int(_group_id)
- except Exception as e:
- print(e)
- return step, _round, uuid, group_id, _task_id
-
-
- # @app.route('/')
- # def index():
- # #训练之前,需要从JCCE.agent初始化model的参数
- # model_path = request.args.get("model_path")
-
- def _watchdog_callback(model_path):
- global _PARAM_HUNTER
- print('======'*20)
- print(model_path)
- step, _round, uuid, group_id, task_id= _parse_param(model_path)
-
- # step, _round, uuid, group_id = 1, 1, 'avg', 0
- print(step, _round, uuid, group_id, task_id)
-
- _PARAM_HUNTER.model.cpu()
- print(f"args.local_rank:{args.local_rank}")
- print(f"mpu.get_model_parallel_rank():{mpu.get_model_parallel_rank()}")
- if(mpu.get_model_parallel_rank() == args.local_rank or mpu.get_model_parallel_rank() ==args.local_rank):
- model, fill_params_num = _PARAM_HUNTER.fill_params(step, _round, uuid, mpu.get_model_parallel_rank())
- print(f"_watchdog_callback...........")
- print(mpu.get_model_parallel_rank())
- print('\n')
- print(f'fill_params_num : {fill_params_num}')
- _PARAM_HUNTER.model.cuda(torch.cuda.current_device())
-
-
- #accu = test(model, test_data)
-
- prefix = 'poc'
- loss,ppl = evaluate_and_print_results(prefix, forward_step,
- _EVAL_DATA_LOADER, model,
- iteration=0, verbose=False)
-
-
- start_time = datetime.datetime.now()
- end_time = datetime.datetime.now()
- result = api_client.add_task_training_data(group_id, 0, _round,
- recall=0, precision=loss,
- startTime=start_time.strftime("%Y-%m-%d %H:%M:%S.%f"),
- endTime=end_time.strftime("%Y-%m-%d %H:%M:%S.%f"))
-
-
- #同步监听avg模型目录
- def _watchdog(model_path):
-
- if not os.path.exists(model_path):
- os.makedirs(model_path)
-
- try:
- path_to_watch = model_path
- before = dict ([(f.path, None) for f in os.scandir(path_to_watch)])
- time_log = time.time()
- while True:
- time.sleep(2)
- after = dict ([(f.path, None) for f in os.scandir(path_to_watch)])
- added = [f for f in after if not f in before]
- removed = [f for f in before if not f in after]
- if added:
- print(f'-----------Took {time.time()-time_log} seconds per epoch-----------')
- time_log = time.time()
- _watchdog_callback(added[0])
- print("Added: ", ", ".join(added))
- before = after
- except KeyboardInterrupt:
- print(f'stop watching folder:{model_path}')
-
- if __name__ == "__main__":
- # Initalize and get arguments, timers, and Tensorboard writer.
- initialize_megatron(extra_args_provider=usr_args_provider,
- args_defaults={'tokenizer_type': 'GPT2BPETokenizer'})
-
- _ = build_train_valid_test_data_iterators(
- train_valid_test_datasets_provider)
-
- model = get_model(model_provider)
-
- print('------build model success!!!!----------')
- args = get_args()
- if args.using_poc_flag:
- # api客户端
- api_client = THGYApiClient()
- _watchdog(args.save)
- else:
- prefix = ''
- # _MODEL.cuda(torch.cuda.current_device())
- loss,ppl = evaluate_and_print_results(prefix, forward_step,
- _EVAL_DATA_LOADER, model,
- iteration=0, verbose=False)
- print(loss,ppl)
-
-
-
|