|
- import mindspore as ms
- import mindspore.nn as nn
- from mindspore import context, ms_function
-
- from mindspore.common.initializer import initializer, HeNormal, HeUniform, One, Zero
- import numpy as np
-
-
- class ResidualBlock(nn.Cell):
- def __init__(self, in_planes, planes, norm_fn='group', stride=1):
- super(ResidualBlock, self).__init__()
-
- self.conv1 = nn.Conv2d(in_channels=in_planes, out_channels=planes,
- kernel_size=3, pad_mode='pad', padding=1, stride=stride, has_bias=True)
- self.conv2 = nn.Conv2d(in_channels=planes, out_channels=planes,
- kernel_size=3, pad_mode='pad', padding=1, has_bias=True)
- self.relu = nn.ReLU()
-
- num_groups = planes // 8
-
- if norm_fn == 'group':
- self.norm1 = nn.GroupNorm(num_groups=num_groups, num_channels=planes)
- self.norm2 = nn.GroupNorm(num_groups=num_groups, num_channels=planes)
- if stride != 1:
- self.norm3 = nn.GroupNorm(num_groups=num_groups, num_channels=planes)
-
- elif norm_fn == 'batch':
- self.norm1 = nn.BatchNorm2d(planes)
- self.norm2 = nn.BatchNorm2d(planes)
- if stride != 1:
- self.norm3 = nn.BatchNorm2d(planes)
-
- #elif norm_fn == 'instance':
- # self.norm1 = nn.InstanceNorm2d(num_features=planes, affine=False)
- # self.norm2 = nn.InstanceNorm2d(num_features=planes, affine=False)
- # if not stride == 1:
- # self.norm3 = nn.InstanceNorm2d(num_features=planes, affine=False)
- elif norm_fn == 'instance':
- self.norm1 = nn.GroupNorm(num_groups=planes, num_channels=planes, affine=False)
- self.norm2 = nn.GroupNorm(num_groups=planes, num_channels=planes, affine=False)
- if stride != 1:
- self.norm3 = nn.GroupNorm(num_groups=planes, num_channels=planes, affine=False)
-
- elif norm_fn == 'none':
- self.norm1 = nn.SequentialCell()
- self.norm2 = nn.SequentialCell()
- if stride != 1:
- self.norm3 = nn.SequentialCell()
-
- if stride == 1:
- self.downsample = None
-
- else:
- self.downsample = nn.SequentialCell(
- nn.Conv2d(in_channels=in_planes, out_channels=planes,
- kernel_size=1, stride=stride, has_bias=True),
- self.norm3)
-
- def construct(self, x):
- y = x
- y = self.relu(self.norm1(self.conv1(y)))
- y = self.relu(self.norm2(self.conv2(y)))
-
- if self.downsample is not None:
- x = self.downsample(x)
-
- return self.relu(x + y)
-
-
- class BottleneckBlock(nn.Cell):
- def __init__(self, in_planes, planes, norm_fn='group', stride=1):
- super(BottleneckBlock, self).__init__()
-
- self.conv1 = nn.Conv2d(in_planes, planes // 4, kernel_size=1, padding=0)
- self.conv2 = nn.Conv2d(planes // 4, planes // 4, kernel_size=3, padding=1, stride=stride)
- self.conv3 = nn.Conv2d(planes // 4, planes, kernel_size=1, padding=0)
- self.relu = nn.ReLU()
-
- num_groups = planes // 8
-
- if norm_fn == 'group':
- self.norm1 = nn.GroupNorm(num_groups=num_groups, num_channels=planes // 4)
- self.norm2 = nn.GroupNorm(num_groups=num_groups, num_channels=planes // 4)
- self.norm3 = nn.GroupNorm(num_groups=num_groups, num_channels=planes)
- if not stride == 1:
- self.norm4 = nn.GroupNorm(num_groups=num_groups, num_channels=planes)
-
- elif norm_fn == 'batch':
- self.norm1 = nn.BatchNorm2d(planes // 4)
- self.norm2 = nn.BatchNorm2d(planes // 4)
- self.norm3 = nn.BatchNorm2d(planes)
- if not stride == 1:
- self.norm4 = nn.BatchNorm2d(planes)
-
- #elif norm_fn == 'instance':
- # self.norm1 = nn.InstanceNorm2d(planes // 4,affine=False)
- # self.norm2 = nn.InstanceNorm2d(planes // 4,affine=False)
- # self.norm3 = nn.InstanceNorm2d(planes,affine=False)
- # if not stride == 1:
- # self.norm4 = nn.InstanceNorm2d(planes,affine=False)
- elif norm_fn == 'instance':
- self.norm1 = nn.GroupNorm(num_groups=planes // 4,num_channels=planes // 4, affine=False)
- self.norm2 = nn.GroupNorm(num_groups=planes // 4,num_channels=planes // 4, affine=False)
- self.norm3 = nn.GroupNorm(num_groups=planes,num_channels=planes, affine=False)
- if not stride == 1:
- self.norm4 = nn.GroupNorm(num_groups=planes,num_channels=planes, affine=False)
-
-
- elif norm_fn == 'none':
- self.norm1 = nn.SequentialCell()
- self.norm2 = nn.SequentialCell()
- self.norm3 = nn.SequentialCell()
- if not stride == 1:
- self.norm4 = nn.SequentialCell()
-
- if stride == 1:
- self.downsample = None
-
- else:
- self.downsample = nn.SequentialCell(
- nn.Conv2d(in_planes, planes, kernel_size=1, stride=stride), self.norm4)
-
- def construct(self, x):
- y = x
- y = self.relu(self.norm1(self.conv1(y)))
- y = self.relu(self.norm2(self.conv2(y)))
- y = self.relu(self.norm3(self.conv3(y)))
-
- if self.downsample is not None:
- x = self.downsample(x)
-
- return self.relu(x + y)
-
-
- class BasicEncoder(nn.Cell):
- def __init__(self, output_dim=128, norm_fn='batch', dropout=0.0):
- super(BasicEncoder, self).__init__()
- self.norm_fn = norm_fn
-
- if self.norm_fn == 'group':
- self.norm1 = nn.GroupNorm(num_groups=8, num_channels=64)
-
- elif self.norm_fn == 'batch':
- self.norm1 = nn.BatchNorm2d(num_features=64, affine=True)
-
- elif self.norm_fn == 'instance':
- # self.norm1 = nn.InstanceNorm2d(num_features=64, affine=False, momentum=0.9)
- self.norm1 = nn.GroupNorm(num_groups=64,num_channels=64,affine=False)
- elif self.norm_fn == 'none':
- self.norm1 = nn.SequentialCell()
-
- self.conv1 = nn.Conv2d(in_channels=3, out_channels=64,
- kernel_size=7, stride=2, pad_mode='pad', padding=3, has_bias=True)
- self.relu1 = nn.ReLU()
-
- self.in_planes = 64
- self.layer1 = self._make_layer(64, stride=1)
- self.layer2 = self._make_layer(96, stride=2)
- self.layer3 = self._make_layer(128, stride=2)
-
- # output convolution
- self.conv2 = nn.Conv2d(in_channels=128, out_channels=output_dim,
- kernel_size=1, has_bias=True)
-
- self.dropout = None
- if dropout > 0:
- self.dropout = nn.Dropout(keep_prob=1-dropout) ## todo 应使用Dropout2d
-
- #record 将cat和split做为算子
- self.cat=ms.ops.Concat(axis=0)
- self.split=ms.ops.Split(axis=0,output_num=2)
-
- # parameter initialization
- for m in [el[1] for el in self.cells_and_names()]:
- if isinstance(m, nn.Conv2d):
- m.weight=initializer(HeNormal(mode='fan_out', nonlinearity='relu'), m.weight.shape, ms.float32)
- elif isinstance(m, (nn.BatchNorm2d, nn.InstanceNorm2d, nn.GroupNorm)):
- #record gamma和beta分别对应weight和bias
- if m.gamma is not None:# x * gamma + beta
- m.gamma=initializer(One(),m.gamma.shape, ms.float32)
- if m.beta is not None:
- m.beta=initializer(Zero(), m.beta.shape, ms.float32)
-
- def _make_layer(self, dim, stride=1):
- layer1 = ResidualBlock(self.in_planes, dim, self.norm_fn, stride=stride)
- layer2 = ResidualBlock(dim, dim, self.norm_fn, stride=1)
- layers = (layer1, layer2)
-
- self.in_planes = dim
- return nn.SequentialCell(*layers)
-
- def construct(self, x):
- # if input is list, combine batch dimension
- is_list = isinstance(x, tuple) or isinstance(x, list)
- if is_list:
- batch_dim = x[0].shape[0]
- x = self.cat(x)
-
- x = self.conv1(x)
- x = self.norm1(x)
- # if self.norm_fn == 'batch':
- # print("[DEBUG] cnet after norm1: ")
- # np.save("pred_ms/cnet_norm1.npy", x.asnumpy())
- x = self.relu1(x)
-
- x = self.layer1(x)
- x = self.layer2(x)
- x = self.layer3(x)
-
-
-
- x = self.conv2(x)
-
- if self.training and self.dropout is not None:
- x = self.dropout(x)
-
- if is_list:
- x = self.split(x)
-
- return x
-
-
- # if __name__=='__main__':
- # args=None
- # # context.set_context(mode=context.GRAPH_MODE, device_target='GPU')
- # context.set_context(mode=context.PYNATIVE_MODE, device_target='CPU')
- # model=BasicEncoder()
- # arr=np.random.random((3,3,368,768)).astype(np.float32)
- # arr=ms.Tensor(arr)
- # input=[arr,arr.copy()]
- # output=model(input)
-
- # print('input:')
- # for el in input:
- # print(el.shape)
- # print('output:')
- # for el in output:
- # print(el.shape)
|