|
- import os
- os.sys.path.insert(1, os.path.split(__file__)[0] + '/AISynergy-core/src')
- import random
- import numpy as np
- import os
-
- import datetime
- from AISyncore.client.thgy_client import THGYApiClient
- from AISyncore.common import _ntp_time_recorder
-
-
-
-
- TRAIN_VERSION = 'MS_GPU' # PYTORCH_GPU PYTORCH_MLU MS_GPU MS_NPU
- if TRAIN_VERSION == 'PYTORCH_GPU' or TRAIN_VERSION == 'PYTORCH_MLU':
- from tqdm import tqdm
- import torch
- from torchvision.models import resnet50
- import dataset_pytorch
-
-
- elif TRAIN_VERSION == 'MS_GPU':
- import dataset_ms
- import huawei
- from mindspore.train.serialization import load_param_into_net
- from mindspore import Parameter
- import mindspore as ms
- ms.context.set_context(mode=ms.PYNATIVE_MODE, device_target="GPU")
-
- elif TRAIN_VERSION == 'MS_NPU':
- import dataset_ms
- import huawei
- from mindspore.train.serialization import load_param_into_net
- from mindspore import Parameter
- import mindspore as ms
-
-
- class Config():
- def __init__(self):
- if TRAIN_VERSION == 'PYTORCH_GPU':
- self.DEVICE = torch.device("cuda:0")
- self.EPOCHS = 100
- self.LOCAL_EPOCHS = 1
- self.BATCH_SIZE = 64
- self.LEARNING_RATE = 0.01
- self.DECAY_RATIO = 0.6
- elif TRAIN_VERSION == 'PYTORCH_MLU':
- self.DEVICE = torch.device("mlu")
- self.EPOCHS = 100
- self.LOCAL_EPOCHS = 1
- self.BATCH_SIZE = 64
- self.LEARNING_RATE = 0.01
- self.DECAY_RATIO = 0.6
- elif TRAIN_VERSION == 'MS_GPU' or TRAIN_VERSION == 'MS_NPU':
- self.DEVICE = None
- self.EPOCHS = 18
- self.LOCAL_EPOCHS = 2
- self.BATCH_SIZE = 16
- self.LEARNING_RATE = 0.001
- self.DECAY_RATIO = 0.8
- self.DATA_ROOT = "./data/cifar10_split_to_3/non_iid_train"
- self.TOTAL_CLIENTS = 3
- self.VAL_STEPS = -1
-
-
-
- self.AISYNERGY_BACKEND_IP=os.environ['AISYNERGY_BACKEND_IP']
- self.AISYNERGY_BACKEND_PORT=os.environ['AISYNERGY_BACKEND_PORT']
- self.TASK_GROUP_ID=int(os.environ['TASK_GROUP_ID'])
- self.TASK_ID=int(os.environ['TASK_ID'])
- self.TASK_SERVER_IP=os.environ['TASK_SERVER_IP']
- self.TASK_SERVER_PORT=os.environ['TASK_SERVER_PORT']
-
-
- _config = Config()
-
-
- def fit_server_config(rnd, client):
- """Return training configuration dict for each round.
-
- Keep batch size fixed at 32, perform two rounds of training with one
- local epoch, increase to two local epochs afterwards.
- """
- if client == 'server':
- rnd = _ntp_time_recorder.current_epoch
-
- print('----------current epoch:{}--------------'.format(rnd))
-
- if rnd < int(_config.EPOCHS * _config.DECAY_RATIO):
- config = {
- "local_epochs": _config.LOCAL_EPOCHS,
- "learning_rate": _config.LEARNING_RATE,
- "current_global_epoch": rnd,
- }
- else:
- config = {
- "local_epochs": _config.LOCAL_EPOCHS,
- "learning_rate": _config.LEARNING_RATE * 0.1,
- "current_global_epoch": rnd
- }
-
- if client is not None and client != 'server': # None used in client dry , 'server' used in server.py
- server_time_record = _ntp_time_recorder.get_server_time_record(client)
- if server_time_record is not None:
- config = {**config, **server_time_record}
- return config
-
-
-
- _plot_client = THGYApiClient("http://" + _config.AISYNERGY_BACKEND_IP + ":" + _config.AISYNERGY_BACKEND_PORT)
-
- if _config.TASK_ID != 0:
- init_params_num = 23528522
- _plot_client.async_executor(_plot_client.add_training_parameters, 0, _config.TASK_ID, init_params_num)
-
-
-
-
- def load_partition_dataloader(idx: int, batch_size = _config.BATCH_SIZE):
- if idx > -1:
- data_root = _config.DATA_ROOT + '/client_' + str(idx)
- else:
- data_root = _config.DATA_ROOT + '/all'
-
- if TRAIN_VERSION == 'PYTORCH_GPU' or TRAIN_VERSION == 'PYTORCH_MLU':
- trainloader, testloader, num_examples = dataset_pytorch.load_dataloader(data_root, batch_size)
-
- elif TRAIN_VERSION == 'MS_GPU' or TRAIN_VERSION == 'MS_NPU':
- trainloader, testloader, num_examples = dataset_ms.load_dataloader(data_root, batch_size)
-
- return trainloader, testloader, num_examples
-
-
-
- def train(net, trainloader, testloader, device, server_config):
- """Train the network on the training set."""
- print("Starting training...")
-
- epochs = server_config["local_epochs"]
- lr = server_config['learning_rate']
-
- if TRAIN_VERSION == 'PYTORCH_GPU' or TRAIN_VERSION == 'PYTORCH_MLU':
- net.to(device) # move model to GPU if available
- criterion = torch.nn.CrossEntropyLoss().to(device)
- optimizer = torch.optim.SGD(
- net.parameters(), lr=lr, momentum=0.9, weight_decay=1e-3
- )
- net.train()
- for _ in range(epochs):
- for images, labels in tqdm(trainloader):
- images, labels = images.to(device), labels.to(device)
- optimizer.zero_grad()
- loss = criterion(net(images), labels)
- loss.backward()
- optimizer.step()
-
- elif TRAIN_VERSION == 'MS_GPU' or TRAIN_VERSION == 'MS_NPU':
- huawei.train_net(net, trainloader, lr=lr, epoch=epochs)
-
- test_loss, test_acc = test(net, testloader, device, server_config)
-
- results = {
- "test_loss": test_loss,
- "test_accuracy": test_acc,
- }
- print(results)
- return results
-
- def test(net, testloader, device, server_config):
- """Validate the network on the entire test set."""
- print("Starting evalutation...")
- current_global_epoch = server_config['current_global_epoch']
-
-
- if TRAIN_VERSION == 'PYTORCH_GPU' or TRAIN_VERSION == 'PYTORCH_MLU':
- net.to(device) # move model to GPU if available
- criterion = torch.nn.CrossEntropyLoss().to(device)
- correct, total, loss = 0, 0, 0.0
- net.eval()
- with torch.no_grad():
- for batch_idx, (images, labels) in tqdm(enumerate(testloader)):
- images, labels = images.to(device), labels.to(device)
- outputs = net(images)
- loss += criterion(outputs, labels).item()
- _, predicted = torch.max(outputs.data, 1)
- total += labels.size(0)
- correct += (predicted == labels).sum().item()
-
-
- loss /= batch_idx + 1
- accuracy = correct / total
-
-
-
- elif TRAIN_VERSION == 'MS_GPU' or TRAIN_VERSION == 'MS_NPU':
- loss, accuracy = huawei.test_epoch(net, testloader)
-
- print("test_loss", loss, "accuracy", accuracy)
-
-
-
- _plot_client.async_executor(_plot_client.add_task_training_data, _config.TASK_GROUP_ID, _config.TASK_ID, current_global_epoch, 0.0,
- accuracy, datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f"), datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f"))
-
- if _config.TASK_ID != 0:
-
- if _ntp_time_recorder.current_epoch is not None:
- # check
- if 'last_client_epoch' not in server_config: # TODO drop 2 time record now
- print('------------------because of the new connection, time record dropped---------------------')
- else:
- assert _ntp_time_recorder.current_epoch == server_config['last_client_epoch']
-
- # complete time record from the previous round
- client_last_epoch_time_recorder = _ntp_time_recorder.time_records[_ntp_time_recorder.current_epoch]
- server_msg_s = client_last_epoch_time_recorder['client_recv_msg'] - server_config['server_send_msg']
- client_computation_s = client_last_epoch_time_recorder['client_send_msg'] - client_last_epoch_time_recorder['client_recv_msg']
- client_msg_s = server_config['server_recv_msg'] - client_last_epoch_time_recorder['client_send_msg']
- server_syn = server_config['server_recv_all_msg'] - server_config['server_recv_msg']
- server_computation_s = server_config['server_epoch_end'] - server_config['server_recv_all_msg']
- print(_ntp_time_recorder.current_epoch, server_msg_s, client_computation_s, client_msg_s, server_syn, server_computation_s)
-
- _plot_client.async_executor(_plot_client.add_training_time, _config.TASK_GROUP_ID, _config.TASK_ID, _ntp_time_recorder.current_epoch,
- int(round(server_msg_s)), int(round(client_computation_s)), int(round(client_msg_s)), int(round(server_syn)), int(round(server_computation_s)))
-
- _ntp_time_recorder.update_client_epoch(current_global_epoch)
-
- return loss, accuracy
-
- def load_model(pretrained=False, num_classes=10):
- if TRAIN_VERSION == 'PYTORCH_GPU' or TRAIN_VERSION == 'PYTORCH_MLU':
-
- model = resnet50(pretrained=pretrained, num_classes=num_classes)
- elif TRAIN_VERSION == 'MS_GPU' or TRAIN_VERSION == 'MS_NPU':
- model = huawei.resnet50(num_classes=num_classes)
- return model
-
-
- def get_model_params(model):
- print("get_model_params")
- if TRAIN_VERSION == 'PYTORCH_GPU' or TRAIN_VERSION == 'PYTORCH_MLU':
- weights = [val.cpu().numpy() for _, val in model.state_dict().items()]
- elif TRAIN_VERSION == 'MS_GPU' or TRAIN_VERSION == 'MS_NPU':
- weights = [value.asnumpy() for key, value in model.parameters_dict().items()]
- return weights
-
- def set_model_params(model, weights):
- print("set_model_params")
- if TRAIN_VERSION == 'PYTORCH_GPU' or TRAIN_VERSION == 'PYTORCH_MLU':
- state_dict = {}
- for idx, key in enumerate(model.state_dict().keys()):
- state_dict[key] = torch.tensor(weights[idx])
- model.load_state_dict(state_dict, strict=True)
- elif TRAIN_VERSION == 'MS_GPU' or TRAIN_VERSION == 'MS_NPU':
- state_dict = {}
- for idx, key in enumerate(model.parameters_dict().keys()):
- state_dict[key] = Parameter(weights[idx])
- load_param_into_net(model, state_dict, strict_load=True)
- return model
-
- def setup_seed(seed):
-
- np.random.seed(seed)
- random.seed(seed)
- if TRAIN_VERSION == 'PYTORCH_GPU' or TRAIN_VERSION == 'PYTORCH_MLU':
- torch.manual_seed(seed)
- torch.cuda.manual_seed_all(seed)
- torch.backends.cudnn.deterministic = True
|