|
- batch_size = 18 # 批量大小
- image_size = 224 # 训练图像空间大小
- num_epochs = 10 # 训练周期数
- lr = 0.001 # 学习率
- momentum = 0.9 # momentum
- workers = 4 # 并行线程个数
- import torch
- import torch.nn as nn
- import torch.optim as optim
- import torchvision
- from torchvision import datasets, transforms
-
- # 数据集目录路径
- data_path_train = "/dataset/data/Canidae/train/"
- data_path_val = "/dataset/data/Canidae/val/"
-
- # 创建训练数据集
-
- def create_dataset_canidae(dataset_path, usage):
- """数据加载"""
- # 数据增强操作
- mean = [0.485 * 255, 0.456 * 255, 0.406 * 255]
- std = [0.229 * 255, 0.224 * 255, 0.225 * 255]
- scale = 32
-
- if usage == "train":
- # Define map operations for training dataset
- trans = transforms.Compose([
- #vision.RandomCropDecodeResize(size=image_size, scale=(0.08, 1.0), ratio=(0.75, 1.333)),
- transforms.RandomResizedCrop(image_size),
- transforms.RandomHorizontalFlip(p=0.5),
- transforms.ToTensor(),
- transforms.Normalize(mean=mean, std=std),
- #vision.HWC2CHW()
- ])
- else:
- # Define map operations for inference dataset
- trans = transforms.Compose([
- #vision.Decode(),
- transforms.Resize(image_size + scale),
- transforms.CenterCrop(image_size),
- transforms.ToTensor(),
- transforms.Normalize(mean=mean, std=std),
- #vision.HWC2CHW()
- ])
-
- # 数据映射操作
- data_set = datasets.ImageFolder(dataset_path, transform=trans)
- dataset_set = torch.utils.data.DataLoader(data_set, batch_size=batch_size, shuffle=True,num_workers=8)#,num_workers=16,pin_memory=False
-
- return dataset_set
-
- dataset_train = create_dataset_canidae(data_path_train, "train")
-
- print(dataset_train.__dict__.items())
- dataset_val = create_dataset_canidae(data_path_val, "val")
- print(dataset_val.__dict__.items())
- for id, data in enumerate(dataset_train):
- train_features, train_labels = data
- print(f"Feature batch shape: {train_features.size()}")
- print(f"Labels batch shape: {train_labels.size()}")
- import torch.nn as nn
- from typing import Type, Union, List, Optional
- class ResidualBlock(nn.Module):
- expansion = 4 # 最后一个卷积核的数量是第一个卷积核数量的4倍
-
- def __init__(self, in_channel: int, out_channel: int,
- stride: int = 1, down_sample: Optional[nn.Module] = None) -> None:
- super(ResidualBlock, self).__init__()
-
- self.conv1 = nn.Conv2d(in_channel, out_channel,kernel_size=1)
- self.norm1 = nn.BatchNorm2d(out_channel)
- self.conv2 = nn.Conv2d(out_channel, out_channel,kernel_size=3, stride=stride)
- self.norm2 = nn.BatchNorm2d(out_channel)
- self.conv3 = nn.Conv2d(out_channel, out_channel * self.expansion,kernel_size=1)
- self.norm3 = nn.BatchNorm2d(out_channel * self.expansion)
- self.relu = nn.ReLU()
- self.down_sample = down_sample
-
- def forward(self, x):
-
- identity = x # shortscuts分支
-
- out = self.conv1(x) # 主分支第一层:1*1卷积层
- out = self.norm1(out)
- out = self.relu(out)
- out = self.conv2(out) # 主分支第二层:3*3卷积层
- out = self.norm2(out)
- out = self.relu(out)
- out = self.conv3(out) # 主分支第三层:1*1卷积层
- out = self.norm3(out)
-
- if self.down_sample is not None:
- identity = self.down_sample(x)
-
- out += identity # 输出为主分支与shortcuts之和
- out = self.relu(out)
-
- return out
-
- def make_layer(last_out_channel, block: Type[Union[ResidualBlock]],
- channel: int, block_nums: int, stride: int = 1):
- down_sample = None # shortcuts分支
-
-
- if stride != 1 or last_out_channel != channel * block.expansion:
- down_sample = nn.Sequential(
- nn.Conv2d(last_out_channel, channel * block.expansion,kernel_size=1, stride=stride),
- nn.BatchNorm2d(channel * block.expansion)
- )
- layers = []
- layers.append(block(last_out_channel, channel, stride=stride, down_sample=down_sample))
-
- in_channel = channel * block.expansion
- # 堆叠残差网络
- for _ in range(1, block_nums):
-
- layers.append(block(in_channel, channel))
-
- return nn.Sequential(*layers)
-
- class ResNet(nn.Module):
- def __init__(self, block: Type[Union[ResidualBlock]],
- layer_nums: List[int], num_classes: int, input_channel: int) -> None:
- super(ResNet, self).__init__()
-
- self.relu = nn.ReLU()
- # 第一个卷积层,输入channel为3(彩色图像),输出channel为64
- self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2)
- self.norm = nn.BatchNorm2d(64)
- # 最大池化层,缩小图片的尺寸
- self.max_pool = nn.MaxPool2d(kernel_size=3, stride=2)
- # 各个残差网络结构块定义,
- self.layer1 = make_layer(64, block, 64, layer_nums[0])
- self.layer2 = make_layer(64 * block.expansion, block, 128, layer_nums[1], stride=2)
- self.layer3 = make_layer(128 * block.expansion, block, 256, layer_nums[2], stride=2)
- self.layer4 = make_layer(256 * block.expansion, block, 512, layer_nums[3], stride=2)
- # 平均池化层
- self.avg_pool = nn.AvgPool2d(1, stride=1)
- # flattern层
- self.flatten = nn.Flatten()
- # 全连接层
- self.fc = nn.Linear(input_channel,num_classes)
-
-
- def forward(self, x):
-
- x = self.conv1(x)
- x = self.norm(x)
- x = self.relu(x)
- x = self.max_pool(x)
-
- x = self.layer1(x)
- x = self.layer2(x)
- x = self.layer3(x)
- x = self.layer4(x)
-
- x = self.avg_pool(x)
- x = self.flatten(x)
- x = self.fc(x)
-
- return x
-
-
- def _resnet(model_url: str, block: Type[Union[ResidualBlock]],
- layers: List[int], num_classes: int, pretrained: bool, pretrianed_ckpt: str,
- input_channel: int):
- model = ResNet(block, layers, num_classes, input_channel)
-
- if pretrained:
- # 加载预训练模型
- download(url=model_url, path=pretrianed_ckpt)
- param_dict = load_checkpoint(pretrianed_ckpt)
- load_param_into_net(model, param_dict)
-
- return model
-
-
- def resnet50(num_classes: int = 1000, pretrained: bool = False):
- "ResNet50模型"
- resnet50_url = "https://obs.dualstack.cn-north-4.myhuaweicloud.com/mindspore-website/notebook/models/application/resnet50_224_new.ckpt"
- resnet50_ckpt = "./LoadPretrainedModel/resnet50_224_new.ckpt"
- return _resnet(resnet50_url, ResidualBlock, [3, 4, 6, 3], num_classes,
- pretrained, resnet50_ckpt, 2048)
-
- network = resnet50()
- #print(network)
- # 全连接层输入层的大小
- #in_channels = network.fc.in
-
- # # 输出通道数大小为狼狗分类数2
- head = nn.Linear(2048, 2)
- # # 重置全连接层
- network.fc = head
-
- # # 平均池化层kernel size为7
- avg_pool = nn.AvgPool2d(kernel_size=7)
- # # 重置平均池化层
- network.avg_pool = avg_pool
|