|
- # coding=utf-8
- # Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
-
- """Pretrain GPT2"""
-
- import torch
- import os
-
- from megatron import get_args
- from megatron import print_rank_0
- from megatron import get_timers
- from megatron import get_tokenizer
- from megatron import mpu
- from megatron.data.gpt2_dataset import build_train_valid_test_datasets
- from megatron.model import GPT2Model
- from megatron.training import pretrain
- from megatron.utils import get_ltor_masks_and_position_ids
- from megatron.utils import reduce_losses
- from poc.param_server import ParamHunter
- from megatron.training import get_model,evaluate_and_print_results, \
- build_train_valid_test_data_iterators
- from megatron.utils import make_data_loader
-
- from tools.merge_mp_partitions import merge_mp_partition
- from tools.split_full_model_into_mp_model import split_full_model, build_model_for_rank
-
- import datetime
- import time
- from poc.thgy_client import THGYApiClient
-
- _PARAM_HUNTER:ParamHunter = None
- _EVAL_DATA_LOADER = None
- _ELAPSED_PULL_TIME=0
- _MODEL_TRAIN = None
-
- _MERGED_MODEL_POC = None
- _PARTITION_MODELS_POC = []
- #elapsed_pull_time =0
-
- def model_provider():
- """Build the model."""
- global _PARAM_HUNTER, _ELAPSED_PULL_TIME, _MODEL_TRAIN
- print_rank_0('building GPT2 model ...')
- args = get_args()
-
- model = GPT2Model(num_tokentypes=0, parallel_output=True)
- _MODEL_TRAIN = model
-
- start_pull_time = datetime.datetime.now()
-
- if args.using_poc_flag and not mpu.using_model_parallel():
- if mpu.is_rank_0():
- _PARAM_HUNTER = ParamHunter(model, is_agent=False, debug=False)
- _PARAM_HUNTER.init_params(args.initial == 'True', args.globalStep)
-
- mpu.broadcast_params_in_data_group(model)
-
- if args.using_poc_flag and mpu.using_model_parallel():
- global _MERGED_MODEL_POC, _PARTITION_MODELS_POC
- if mpu.is_rank_0():
-
- # build cpu models; incloud full model and some rank model
- for rank in range(args.model_parallel_size):
- _PARTITION_MODELS_POC.append(build_model_for_rank(rank, args.model_parallel_size))
- _MERGED_MODEL_POC = build_model_for_rank(0, 1)
-
- _PARAM_HUNTER = ParamHunter(_MERGED_MODEL_POC, is_agent=False, debug=False)
- _PARAM_HUNTER.init_params(args.initial == 'True', args.globalStep)
-
- split_full_model(_MERGED_MODEL_POC, _PARTITION_MODELS_POC)
-
- mpu.scatter_params_in_model_group(_PARTITION_MODELS_POC, model)
- mpu.broadcast_params_in_data_group(model)
-
-
- end_pull_time = datetime.datetime.now()
- _ELAPSED_PULL_TIME = (end_pull_time - start_pull_time).seconds
-
- return model
-
-
- def get_batch(data_iterator):
- """Generate a batch"""
- args = get_args()
- tokenizer = get_tokenizer()
-
- # Items and their type.
- keys = ['text']
- datatype = torch.int64
-
- # Broadcast data.
- if data_iterator is not None:
- data = next(data_iterator)
- else:
- data = None
- data_b = mpu.broadcast_data(keys, data, datatype)
-
- # Unpack.
- tokens_ = data_b['text'].long()
- labels = tokens_[:, 1:].contiguous()
- tokens = tokens_[:, :-1].contiguous()
-
- # Get the masks and postition ids.
- attention_mask, loss_mask, position_ids = get_ltor_masks_and_position_ids(
- tokens,
- tokenizer.eod,
- args.reset_position_ids,
- args.reset_attention_mask,
- args.eod_mask_loss)
-
- return tokens, labels, loss_mask, attention_mask, position_ids
-
-
- def forward_step(data_iterator, model):
- """Forward step."""
- args = get_args()
- timers = get_timers()
-
- # Get the batch.
- timers('batch generator').start()
- tokens, labels, loss_mask, attention_mask, position_ids = get_batch(data_iterator)
- timers('batch generator').stop()
- # Forward model.
- losses = model(tokens, position_ids, attention_mask, labels=labels)
- loss_mask = loss_mask.view(-1)
- loss = torch.sum(losses.view(-1) * loss_mask) / loss_mask.sum()
-
- # Reduce loss for logging.
- reduced_loss = reduce_losses([loss])
-
- return loss, {'lm loss': reduced_loss[0]}
-
-
- def train_valid_test_datasets_provider(train_val_test_num_samples):
- """Build train, valid, and test datasets."""
- global _EVAL_DATA_LOADER
- args = get_args()
-
- print_rank_0('> building train, validation, and test datasets for GPT2 ...')
-
- if args.is_train_one_epoch:
- train_val_test_num_samples[0] = 10
-
- train_ds, valid_ds, test_ds = build_train_valid_test_datasets(
- data_prefix=args.data_path,
- data_impl=args.data_impl,
- splits_string=args.split,
- train_valid_test_num_samples=train_val_test_num_samples,
- seq_length=args.seq_length,
- seed=args.seed,
- skip_warmup=(not args.mmap_warmup))
-
- print_rank_0(' loading eval dataset for poc ...')
- tmp_dataset, _, _ = build_train_valid_test_datasets(
- data_prefix=args.data_path_eval,
- data_impl=args.data_impl,
- splits_string="1000,0,0",
- train_valid_test_num_samples=[10,0,0],
- seq_length=args.seq_length,
- seed=args.seed,
- skip_warmup=(not args.mmap_warmup))
-
- _EVAL_DATA_LOADER = iter(make_data_loader(tmp_dataset))
- print_rank_0("> finished creating GPT2 datasets ...")
-
- return train_ds, valid_ds, test_ds
-
-
-
- def usr_args_provider(parser):
- group = parser.add_argument_group(title='poc')
-
- group.add_argument('--globalStep', type=int, default=0,
- help='please set to 0, if using_poc_flag is disable')
- group.add_argument('--num_round_perEpoch', type=int, default=4,
- help='')
- group.add_argument('--num_epoch', type=int, default=5,
- help='')
- group.add_argument('--initial', type=str, default='True', choices=['False', 'True'],
- help='initial flag')
- group.add_argument('--uuid', type=str, default='0',
- help='')
- group.add_argument('--data-path-eval', type=str, default=None,
- help='eval data path')
- group.add_argument('--is_train_one_epoch', type=int, default=1, choices=[0, 1],
- help='train one epoch, if set to be 1, args:train-iters will be ignore'
- ' and set --train-iters to be None')
- group.add_argument('--using_poc_flag', type=int, default=1, choices=[0, 1],
- help='weather to use poc')
-
- return parser
-
-
- def upload_param_ui(loss, trainStart_time):
- global _PARAM_HUNTER
- args = get_args()
- uuid = args.uuid
- if '_' in uuid:
- group_id, task_id = map(int, uuid.split('_'))
- else:
- group_id = 0
- task_id = int(uuid)
-
- step_per_round = int(os.environ["CLIENT_STEP"]) \
- if 'CLIENT_STEP' in os.environ else 1
-
- trainEnd_time = datetime.datetime.now()
-
- api_client.add_task_training_data(group_id, task_id, args.globalStep+1,
- recall=0, precision=loss,
- startTime=trainStart_time.strftime(
- "%Y-%m-%d %H:%M:%S.%f"),
- endTime=trainEnd_time.strftime("%Y-%m-%d %H:%M:%S.%f"))
- upload_param_nums = 350000
- api_client.add_training_parameters(1, task_id, upload_param_nums)
- _PARAM_HUNTER.upload_params(uuid, step_per_round)
-
-
- if __name__ == "__main__":
- start_time = datetime.datetime.now()
- api_client = THGYApiClient()
-
- pretrain(train_valid_test_datasets_provider, model_provider, forward_step,
- args_defaults={'tokenizer_type': 'GPT2BPETokenizer'},
- extra_args_provider=usr_args_provider)
-
- args = get_args()
- uuid = args.uuid
- if '_' in uuid:
- group_id, task_id = map(int, uuid.split('_'))
- else:
- group_id = 0
- task_id = int(uuid)
- timers = get_timers()
- if args.using_poc_flag :
-
- prefix = ''
- print("evaluate_and_print_results..........")
- loss,ppl = evaluate_and_print_results(prefix, forward_step,
- _EVAL_DATA_LOADER, _MODEL_TRAIN,
- iteration=0, verbose=False)
- print(loss, ppl)
-
- _MODEL_TRAIN.cpu()
- if mpu.using_model_parallel():
- partial_models_tensors_bucket = mpu.gather_params_in_model_group(_MODEL_TRAIN)
- if mpu.is_rank_0():
- merge_mp_partition(_MERGED_MODEL_POC, partial_models_tensors_bucket,
- _PARTITION_MODELS_POC)
-
- if mpu.is_rank_0():
- upload_start_time = datetime.datetime.now()
- upload_param_ui(loss, start_time)
- upload_finish_time = datetime.datetime.now()
-
- upload_time = (upload_finish_time - upload_start_time).seconds
-
- #timers('whole process').stop()
- end_time = datetime.datetime.now()
- whole_time = (end_time - start_time).seconds
- elapsed_train_time = whole_time-_ELAPSED_PULL_TIME-upload_time
-
-
- print('elapsed_train_time')
- print(elapsed_train_time)
- api_client.add_training_time(group_id, task_id, args.globalStep+1 , _ELAPSED_PULL_TIME, elapsed_train_time, upload_time)
-
-
-
|