|
- # Copyright 2020 Huawei Technologies Co., Ltd
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- # ============================================================================
- """train resnet."""
- import os
- import argparse
- import ast
- from mindspore import context
- from mindspore import Tensor
- from mindspore.nn.optim.momentum import Momentum
- from mindspore.nn.optim.sgd import SGD
- from mindspore.train.model import Model
- from mindspore.context import ParallelMode
- from mindspore.train.callback import ModelCheckpoint, CheckpointConfig, LossMonitor, TimeMonitor
- from mindspore.nn.loss import SoftmaxCrossEntropyWithLogits
- from mindspore.train.loss_scale_manager import FixedLossScaleManager
- from mindspore.train.serialization import load_checkpoint, load_param_into_net
- from mindspore.communication.management import init, get_rank, get_group_size
- from mindspore.common import set_seed
- import mindspore.nn as nn
- import mindspore.common.initializer as weight_init
- from src.lr_generator import get_lr, warmup_cosine_annealing_lr
- from src.CrossEntropySmooth import CrossEntropySmooth
-
- #from param_server import ParamHunter
- import flwr as fl
- from flwr.client.thgy_client import THGYApiClient
- from flwr.common import EvaluateIns, EvaluateRes, FitIns, FitRes, ParametersRes, Weights
- import timeit
- import datetime
- from . import DEFAULT_SERVER_ADDRESS
- import time
- import json
- import yaml
- import numpy as np
- import grpc
- import logging
- from mindspore.train.callback import Callback
-
- parser = argparse.ArgumentParser(description='Image classification')
- parser.add_argument('--net', type=str, default=None, help='Resnet Model, either resnet50 or resnet101')
- parser.add_argument('--dataset', type=str, default=None, help='Dataset, either cifar10 or imagenet2012')
- parser.add_argument('--run_distribute', type=ast.literal_eval, default=False, help='Run distribute')
- parser.add_argument('--device_num', type=int, default=1, help='Device num.')
-
- parser.add_argument('--dataset_path', type=str, default=None, help='Dataset path')
- parser.add_argument('--device_target', type=str, default='Ascend', help='Device target')
- parser.add_argument('--pre_trained', type=str, default=None, help='Pretrained checkpoint path')
- parser.add_argument('--parameter_server', type=ast.literal_eval, default=False, help='Run parameter server train')
- #parser.add_argument('--initial', type=str, default='False', choices=['False', 'True'], help='initial flag')
- #parser.add_argument('--globalStep', type=int, default=1, help='global step')
- parser.add_argument('--uuid', type=str, default='test', help='Whether to fetch average parameters from server.')
- #parser.add_argument('--num_epoch', type=int, default=80, help='global step')
- parser.add_argument(
- "--server_address",
- type=str,
- default=DEFAULT_SERVER_ADDRESS,
- help=f"gRPC server address (default: {DEFAULT_SERVER_ADDRESS})",
- )
- parser.add_argument(
- "--cid", type=str, required=True, help="Client CID (no default)"
- )
- parser.add_argument(
- "--log_host",
- type=str,
- help="Logserver address (no default)",
- )
-
- args_opt = parser.parse_args()
-
- print('==============='*10)
-
- set_seed(1)
-
- if args_opt.net == "resnet50":
- from src.resnet import resnet50 as resnet
- if args_opt.dataset == "cifar10":
- from src.config import config1 as config
- from src.dataset import create_dataset1 as create_dataset
- else:
- from src.config import config2 as config
- from src.dataset import create_dataset2 as create_dataset
- elif args_opt.net == "resnet101":
- from src.resnet import resnet101 as resnet
- from src.config import config3 as config
- from src.dataset import create_dataset3 as create_dataset
- else:
- from src.resnet import se_resnet50 as resnet
- from src.config import config4 as config
- from src.dataset import create_dataset4 as create_dataset
-
- config.epoch_size = args_opt.num_epoch
-
-
- class EvalCallBack(Callback):
- def __init__(self, model, eval_dataset, acc_record, eval_per_epoch=1):
- self.model = model
- self.eval_dataset = eval_dataset
- self.eval_per_epoch = eval_per_epoch
- self.acc_record = acc_record
-
- def end(self, run_context):
- cb_param = run_context.original_args()
- cur_epoch = cb_param.cur_epoch_num
- acc = self.model.eval(self.eval_dataset, dataset_sink_mode=False)
- self.acc_record.append(acc['top_1_accuracy'])
- print(f'epoch : {cur_epoch}, acc : {acc}')
-
- step_per_round = int(os.environ["CLIENT_STEP"]
- ) if 'CLIENT_STEP' in os.environ else 1
- print(f"step_per_round:{step_per_round}")
-
-
- class CifarClient(fl.client.Client):
- """Flower client implementing CIFAR-10 image classification using PyTorch."""
-
- def __init__(
- self,
- cid: str,
- model: resnet,
- trainset: "cifar10",
- testset: "cifar10",
- cb:[TimeMonitor(),LossMonitor()],
-
- ) -> None:
- self.cid = cid
- self.model = model
- self.trainset= trainset
- self.testset = testset
- self.cb = cb
-
- def get_parameters(self) -> ParametersRes:
- print(f"Client {self.cid}: get_parameters")
-
- weights: Weights = self.model.get_weights()
- parameters = fl.common.weights_to_parameters(weights)
- return ParametersRes(parameters=parameters)
-
-
- def fit(self, ins: FitIns) -> FitRes:
- print(f"Client {self.cid}: fit")
-
- weights: Weights = fl.common.parameters_to_weights(ins.parameters)
- config = ins.config
- fit_begin = timeit.default_timer()
-
- # Get training config
- epochs = int(config["epochs"])
-
- self.model.set_weights(weights)
- # define callbacks
-
- self.model.train(epochs, dataset, callbacks=cb, sink_size=dataset.get_dataset_size(),
- dataset_sink_mode=(not args_opt.parameter_server))
- #cifar.train(self.model, trainloader, epochs=epochs, device=DEVICE)
-
- # Return the refined weights and the number of examples used for training
- weights_prime: Weights = self.model.get_weights()
- params_prime = fl.common.weights_to_parameters(weights_prime)
- num_examples_train = len(self.trainset)
- fit_duration = timeit.default_timer() - fit_begin
- return FitRes(
- parameters=params_prime,
- num_examples=num_examples_train,
- num_examples_ceil=num_examples_train,
- fit_duration=fit_duration,
- )
-
-
- def evaluate(self, ins: EvaluateIns) -> EvaluateRes:
- print(f"Client {self.cid}: evaluate")
-
- weights = fl.common.parameters_to_weights(ins.parameters)
- # Use provided weights to update the local model
- self.model.set_weights(weights)
-
- time_cb = TimeMonitor(data_size=step_size)
- # 打印loss信息
- loss_cb = LossMonitor()
- cb = [time_cb, loss_cb]
- if config.save_checkpoint:
- config_ck = CheckpointConfig(save_checkpoint_steps=config.save_checkpoint_epochs * step_size,
- keep_checkpoint_max=config.keep_checkpoint_max)
- ckpt_cb = ModelCheckpoint(prefix="resnet", directory=ckpt_save_dir, config=config_ck)
- cb += [ckpt_cb]
-
- acc_record = []
- eval_cb = EvalCallBack(self.model, self.testset, acc_record)
- cb += [eval_cb]
-
- # Return the number of evaluation examples and the evaluation result (loss)
- return EvaluateRes(
- loss=float(loss), num_examples=len(self.testset), accuracy=float(acc_record[-1])
- )
-
- if __name__ == '__main__':
- target = args_opt.device_target
- ckpt_save_dir = config.save_checkpoint_path
- batch_size = int(config["batch_size"])
-
- # init context
- context.set_context(mode=context.GRAPH_MODE, device_target=target, save_graphs=False)
- if args_opt.parameter_server:
- context.set_ps_context(enable_ps=True)
- if args_opt.run_distribute:
- if target == "Ascend":
- device_id = int(os.getenv('DEVICE_ID'))
- context.set_context(device_id=device_id, enable_auto_mixed_precision=True)
- context.set_auto_parallel_context(device_num=args_opt.device_num, parallel_mode=ParallelMode.DATA_PARALLEL,
- gradients_mean=True)
- if args_opt.net == "resnet50" or args_opt.net == "se-resnet50":
- context.set_auto_parallel_context(all_reduce_fusion_config=[85, 160])
- else:
- context.set_auto_parallel_context(all_reduce_fusion_config=[180, 313])
- init()
- # GPU target
- else:
- init()
- context.set_auto_parallel_context(device_num=get_group_size(), parallel_mode=ParallelMode.DATA_PARALLEL,
- gradients_mean=True)
- if args_opt.net == "resnet50":
- context.set_auto_parallel_context(all_reduce_fusion_config=[85, 160])
- ckpt_save_dir = config.save_checkpoint_path + "ckpt_" + str(get_rank()) + "/"
-
- dataset = create_dataset(dataset_path=args_opt.dataset_path, do_train=True, repeat_num=1,
- batch_size=batch_size, target=target)
- eval_dataset = create_dataset(dataset_path='/root/jointcloud/data/cifar10/eval', do_train=False, repeat_num=1,
- batch_size=batch_size, target=target)
- step_size = dataset.get_dataset_size()
- # define net
- net = resnet(class_num=config.class_num)
-
- uuid = args_opt.cid
- if '_' in uuid:
- group_id, task_id = map(int, uuid.split('_'))
- else:
- group_id = 0
- task_id = int(uuid)
- #initial = args_opt.initial
- # init weight
- if args_opt.pre_trained:
- param_dict = load_checkpoint(args_opt.pre_trained)
- load_param_into_net(net, param_dict)
- else:
- for _, cell in net.cells_and_names():
- if isinstance(cell, nn.Conv2d):
- cell.weight.set_data(weight_init.initializer(weight_init.XavierUniform(),
- cell.weight.shape,
- cell.weight.dtype))
- if isinstance(cell, nn.Dense):
- cell.weight.set_data(weight_init.initializer(weight_init.TruncatedNormal(),
- cell.weight.shape,
- cell.weight.dtype))
- # define opt
- decayed_params = []
- no_decayed_params = []
- for param in net.trainable_params():
- if 'beta' not in param.name and 'gamma' not in param.name and 'bias' not in param.name:
- decayed_params.append(param)
- else:
- no_decayed_params.append(param)
-
-
- # group_params = [{'params': decayed_params, 'weight_decay': config.weight_decay},
- # {'params': no_decayed_params},
- # {'order_params': net.trainable_params()}]
-
- lr = get_lr(lr_init=config.lr_init, lr_end=config.lr_end, lr_max=config.lr_max,
- warmup_epochs=config.warmup_epochs, total_epochs=config.epoch_size, steps_per_epoch=step_size,
- lr_decay_mode=config.lr_decay_mode)
- lr = Tensor(lr)
- print(f'rearning rate is : {lr}')
- #opt = Momentum(group_params, lr, config.momentum, loss_scale=config.loss_scale)
-
- loss = SoftmaxCrossEntropyWithLogits(sparse=True, reduction="mean")
-
- opt = Momentum(filter(lambda x: x.requires_grad, net.get_parameters()), lr, config.momentum, config.weight_decay,
- config.loss_scale)
- loss_scale = FixedLossScaleManager(config.loss_scale, drop_overflow_update=False)
- time_cb = TimeMonitor(data_size=step_size)
- # 打印loss信息
- loss_cb = LossMonitor()
- cb = [time_cb, loss_cb]
- if config.save_checkpoint:
- config_ck = CheckpointConfig(save_checkpoint_steps=config.save_checkpoint_epochs * step_size,
- keep_checkpoint_max=config.keep_checkpoint_max)
- ckpt_cb = ModelCheckpoint(prefix="resnet", directory=ckpt_save_dir, config=config_ck)
- cb += [ckpt_cb]
-
- # define callbacks
- # time_cb = TimeMonitor(data_size=step_size)
- # #打印loss信息
- # loss_cb = LossMonitor()
- # cb = [time_cb, loss_cb]
- # if config.save_checkpoint:
- # config_ck = CheckpointConfig(save_checkpoint_steps=config.save_checkpoint_epochs * step_size,
- # keep_checkpoint_max=config.keep_checkpoint_max)
- # ckpt_cb = ModelCheckpoint(prefix="resnet", directory=ckpt_save_dir, config=config_ck)
- # cb += [ckpt_cb]
-
- # train model
- # 参数管理器
- #param_hunter = ParamHunter(net, debug=False)
- # api客户端
- api_client = THGYApiClient()
- # 训练之前,需要从JCCE.agent初始化model的参数
- #print(f"initial:{initial}")
-
- start_time = datetime.datetime.now()
- start_pull_time = datetime.datetime.now()
- end_pull_time = datetime.datetime.now()
- pull_time = (end_pull_time - start_pull_time).seconds
-
- model = Model(net, loss_fn=loss, optimizer=opt, loss_scale_manager=loss_scale, metrics={'top_1_accuracy', 'top_5_accuracy'},
- amp_level="O0", keep_batchnorm_fp32=True)
-
-
-
- # 界面展示用
- init_params_num = 25600
- api_client.add_training_parameters(0, task_id, init_params_num)
- # 当前轮次开始时间
- if args_opt.net == "se-resnet50":
- config.epoch_size = config.train_epoch_size
- trainStart_time = datetime.datetime.now()
- trainEnd_time = datetime.datetime.now()
- client = CifarClient(args_opt.cid, model, dataset, eval_dataset, cb)
- print(f"server address :{args_opt.server_address}")
- fl.client.start_client(args_opt.server_address, client)
-
-
|