|
- from collections import OrderedDict
- import warnings
- import AISyncore
-
-
- import paddle
- import paddle.nn.functional as F
- from paddle.vision.transforms import ToTensor
- import numpy as np
- import matplotlib.pyplot as plt
- print(paddle.__version__)
-
-
- warnings.filterwarnings("ignore", category=UserWarning)
- paddle.device.set_device('gpu:0')
-
- # Model (simple CNN adapted from PaddlePaddle)
- class Net(paddle.nn.Layer):
- def __init__(self, num_classes=1):
- super(Net, self).__init__()
-
- self.conv1 = paddle.nn.Conv2D(in_channels=3, out_channels=32, kernel_size=(3, 3))
- self.pool1 = paddle.nn.MaxPool2D(kernel_size=2, stride=2)
-
- self.conv2 = paddle.nn.Conv2D(in_channels=32, out_channels=64, kernel_size=(3,3))
- self.pool2 = paddle.nn.MaxPool2D(kernel_size=2, stride=2)
-
- self.conv3 = paddle.nn.Conv2D(in_channels=64, out_channels=64, kernel_size=(3,3))
-
- self.flatten = paddle.nn.Flatten()
-
- self.linear1 = paddle.nn.Linear(in_features=1024, out_features=64)
- self.linear2 = paddle.nn.Linear(in_features=64, out_features=num_classes)
-
- def forward(self, x):
- x = self.conv1(x)
- x = F.relu(x)
- x = self.pool1(x)
-
- x = self.conv2(x)
- x = F.relu(x)
- x = self.pool2(x)
-
- x = self.conv3(x)
- x = F.relu(x)
-
- x = self.flatten(x)
- x = self.linear1(x)
- x = F.relu(x)
- x = self.linear2(x)
- return x
-
-
- def train(model,trainloader,epoch_num=1):
- print('start training ... ')
- # turn into training mode
- model.train()
- opt = paddle.optimizer.Adam(learning_rate=0.001, parameters=model.parameters())
-
- for epoch in range(epoch_num):
- for batch_id, data in enumerate(trainloader()):
- x_data = data[0]
- y_data = paddle.to_tensor(data[1])
- y_data = paddle.unsqueeze(y_data, 1)
-
- logits = model(x_data)
- loss = F.cross_entropy(logits, y_data)
-
- if batch_id % 1000 == 0:
- print("epoch: {}, batch_id: {}, loss is: {}".format(epoch, batch_id, loss.numpy()))
- loss.backward()
- opt.step()
- opt.clear_grad()
- model.train()
-
-
- def test(model, testloader):
- # evaluate model after one epoch
- model.eval()
- accuracies = []
- losses = []
- for batch_id, data in enumerate(testloader()):
- x_data = data[0]
- y_data = paddle.to_tensor(data[1])
- y_data = paddle.unsqueeze(y_data, 1)
-
- logits = model(x_data)
- loss = F.cross_entropy(logits, y_data)
- acc = paddle.metric.accuracy(logits, y_data)
- accuracies.append(acc.numpy())
- losses.append(loss.numpy())
-
- avg_acc, avg_loss = np.mean(accuracies), np.mean(losses)
- return avg_loss, avg_acc
-
-
- def load_data():
- """Load CIFAR-10 (training and test set)."""
- transform = ToTensor()
- cifar10_train = paddle.vision.datasets.Cifar10(mode='train',
- transform=transform)
- cifar10_test = paddle.vision.datasets.Cifar10(mode='test',
- transform=transform)
-
- trainloader = paddle.io.DataLoader(cifar10_train,
- shuffle=True,
- batch_size=32)
-
- testloader = paddle.io.DataLoader(cifar10_test, batch_size=32)
-
- num_examples = {"trainset": len(cifar10_train), "testset": len(cifar10_test)}
- return trainloader, testloader, num_examples
-
-
-
- # #############################################################################
- # . Federation of the pipeline with Flower
- # #############################################################################
- def main():
- """Create model, load data, define Flower client, start Flower client."""
- # Load paddle model
- net = Net()
- # Load data (CIFAR-10)
- trainloader, testloader, num_examples = load_data()
- # Flower client
- class CifarClient(AISyncore.client.NumPyClient):
- def get_parameters(self):
- return [val.numpy() for _, val in net.named_parameters()]
-
- def set_parameters(self, parameters):
- params_dict = zip(net.named_parameters().keys(), parameters)
- state_dict = OrderedDict({k: paddle.to_tensor(v) for k, v in params_dict})
- net.load_state_dict(state_dict, strict=True)
-
- def fit(self, parameters, config):
- self.set_parameters(parameters)
- train(net, trainloader, epochs=1)
- return self.get_parameters(), num_examples["trainset"], {}
-
- def evaluate(self, parameters, config):
- self.set_parameters(parameters)
- loss, accuracy = test(net, testloader)
- return float(loss), num_examples["testset"], {"accuracy": float(accuracy)}
-
- # Start client
- server_address="0.0.0.0"
- AISyncore.client.run_numpy_client(server_address+":8080", client=CifarClient())
-
-
- if __name__ == "__main__":
- main()
|