|
- #coding=utf-8
-
- from megatron import get_args
-
- from tools.ms_pt_map_paramName import getParamNameMappingDict
- import torch
- import sys
- import os
- sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__),
- os.path.pardir)))
- from poc import client_agent
- from megatron import mpu
- import numpy as np
- import json
-
-
- def Convert2(tup, di):
- for a, b in tup:
- di.setdefault(a, []).append(b)
- return di
-
- class ParamHunter:
- def __init__(self, model, is_agent, debug=False):
- args = get_args()
- self.model:torch.nn.Module = model
- #assert(isinstance(self.keras_model, KM.Model))
- self.pt_ms_name_dic = getParamNameMappingDict(args.num_layers)
- self.ms_pt_name_dic = dict([[self.pt_ms_name_dic[key], key]
- for key in self.pt_ms_name_dic])
- self.param_keys = self._get_param_list()
- self.debug = debug
- client_agent.initialize_gc(is_agent=is_agent)
-
- if debug:
- print('using debug mode!')
- print(self.param_keys[:3])
-
-
- #获取模型参数名
- def _get_param_list(self):
- param_keys=[]
- if 1: #isinstance(keras_model, KM.Model):
- for name, _ in self.model.named_parameters():
- param_keys.append(self.pt_ms_name_dic[name])
- param_keys.sort()
- return param_keys
-
-
- def _parameterNameValue(self, rank):
- var_value = []
- params_num = 0
-
- for name, param in self.model.named_parameters():
- # if layer_weights:
- params_num += 1
- if 'dense_h_to_4h.weight' in name or 'dense_4h_to_h.weight' in name:
- param = param.T
- var_value.append((self.pt_ms_name_dic[name]+str(rank), param.detach().numpy()))
- var_value.sort(key=lambda x: x[0])
- return var_value, params_num
-
-
- def _setModelPara(self, items, rank):
- params_num = 0
- model_nameParam = dict(self.model.named_parameters())
- print(f"mpu.get_model_parallel_src_rank():{mpu.get_model_parallel_src_rank()}")
- print(f"mpu.get_model_parallel_group():{mpu.get_model_parallel_group()}")
- with torch.no_grad():
- for name, weights in items:
-
- if name[-1]==str(rank):
- name_ = self.ms_pt_name_dic[name[:-1]]
- param = model_nameParam[name_]
- shape_before = param.shape
- if 'dense_h_to_4h.weight' in name_ or 'dense_4h_to_h.weight' in name_:
- weights = weights.T
- param.set_(torch.Tensor(weights).type_as(param).contiguous())
-
- shape_after = param.shape
- if shape_after != shape_before:
- print(f"parameter ''{name_}''; original shape is {shape_before} "
- f"shape after setting is {shape_after}")
- params_num += 1
- return params_num
-
-
- def init_params(self, initial, gloabelStep, rank):
- '''
- 从服务端获取参数
- initial==True: 获取同样的初始化参数
- initial==False: 获取各个节点平均后的参数
- '''
- #获取参数名称列表
-
- is_moved_to_cpu = False
- if next(self.model.parameters()).device != 'cpu':
- self.model.cpu()
- is_moved_to_cpu = True
-
- params_num = 0
- print(f"gloabelStep:{gloabelStep}")
- try:
- if ((initial == True) and (gloabelStep == 0)):
- print(f"initialize_params_from_server")
- initial_params = client_agent.initialize_params_from_server(self.param_keys, self.debug)
- else:
- print(f"get_avg_params_from_server")
- initial_params = client_agent.get_avg_params_from_server(self.param_keys, self.debug)
-
- params_num = self._setModelPara(initial_params, rank)
- except Exception as e:
- print('init params error!!')
- print(e)
-
- if is_moved_to_cpu:
- self.model.cuda(torch.cuda.current_device())
-
- return self.model, params_num
-
- class NumpyEncoder(json.JSONEncoder):
- """ Special json encoder for numpy types """
-
- def default(self, obj):
- if isinstance(obj, (np.int_, np.intc, np.intp, np.int8,
- np.int16, np.int32, np.int64, np.uint8,
- np.uint16, np.uint32, np.uint64)):
- return int(obj)
- elif isinstance(obj, (np.float_, np.float16, np.float32,
- np.float64)):
- return float(obj)
- elif isinstance(obj, (np.ndarray,)):
- return obj.tolist()
- return json.JSONEncoder.default(self, obj)
-
- def Convert(tup, di):
- di = dict(tup)
- print(f"di[:2]:{di[:2]}")
- return di
-
-
-
- def Convert3(lst):
- res_dct = {lst[i]: lst[i + 1] for i in range(0, len(lst), 2)}
- print(f"eeeeeeeee:{res_dct}")
- return res_dct
-
- def upload_params(self, uuid, step_per_round, rank):
- '''
- :@keras_model: Keras的模型
- '''
- is_moved_to_cpu = False
- if next(self.model.parameters()).device != 'cpu':
- self.model.cpu()
- is_moved_to_cpu = True
-
- params_num = 0
- #file_path="save"+str(rank)+".txt"
- try:
- var_value, params_num = self._parameterNameValue(rank)
- print(f"len(var_value):{len(var_value)}")
- print(f'upload_params....................')
- if True:
- print(var_value[:3])
- print(f"size var_value:{sys.getsizeof(var_value)}")
- print(f"size of var_value:{var_value.__sizeof__()}")
- client_agent.upload_final_models_to_server(uuid, step_per_round, var_value)
-
- except Exception as e:
- pass
-
- if is_moved_to_cpu:
- self.model.cuda(torch.cuda.current_device())
- return params_num
-
-
- #从agent的50054端口获取模型参数
- def fill_params(self, step, _round, uuid, rank):
- params_num = 0
- try :
- #50054端口加载模型参数时需要传递的变量
- uuids = [f'{step}-{_round}-{uuid}']
- #print(f"self.param_keys:{self.param_keys}")
- initial_params = client_agent.get_avg_params_from_server_localfile(self.param_keys, uuids, self.debug)
-
- if initial_params is None:
- print('!!!!!!!!!!!!! Fill params false !!!!!!!!!!!!!!')
- else :
- if True:
- print(f"local_rank:{rank}")
- #print(f"fill param:{initial_params[:3]}")
- params_num = self._setModelPara(initial_params, rank)
- except Exception as e:
- print('fill params error!!')
- print(e)
-
- return self.model, params_num
-
- def first_init(self):
- '''
- 向参数服务器上传随机参数,当各个训练节点第一次训练时起始参数相同
- '''
- # 加载数据集,然后将数据集划分为train/val两部分
- try:
- var_value, params_num = self._parameterNameValue()
- #print(var_value)
- client_agent.first_init(var_value)
- except Exception as e:
- print(e)
-
-
-
- if __name__ == "__main__":
- pass
|