|
- import mindspore
-
- from mindspore import nn as nn, context
-
- import network.Resnet as Resnet
- from loss import JointEdgeSegLoss
- from network.wider_resnet import WiderResNetA2
- from config import cfg
- from network.mynn import initialize_weights, Norm2d
- import argparse
-
- from my_functionals import GatedSpatialConv as gsc
- from mindspore.ops.functional import stop_gradient
- import cv2
- import numpy as np
- import math
- from mindspore.common.tensor import Tensor
-
- def calculate_gain(nonlinearity, param=None):
- """calculate_gain"""
- linear_fns = ['linear', 'conv1d', 'conv2d', 'conv3d', 'conv_transpose1d',
- 'conv_transpose2d', 'conv_transpose3d']
- res = 0
- if nonlinearity in linear_fns or nonlinearity == 'sigmoid':
- res = 1
- elif nonlinearity == 'tanh':
- res = 5.0 / 3
- elif nonlinearity == 'relu':
- res = math.sqrt(2.0)
- elif nonlinearity == 'leaky_relu':
- if param is None:
- negative_slope = 0.01
- elif not isinstance(param, bool) and isinstance(param, int) or isinstance(param, float):
- # True/False are instances of int, hence check above
- negative_slope = param
- else:
- raise ValueError("negative_slope {} not a valid number".format(param))
- res = math.sqrt(2.0 / (1 + negative_slope ** 2))
- else:
- raise ValueError("Unsupported nonlinearity {}".format(nonlinearity))
- return res
-
-
- def _calculate_fan_in_and_fan_out(tensor):
- """
- _calculate_fan_in_and_fan_out
- """
- dimensions = len(tensor)
- if dimensions < 2:
- raise ValueError("Fan in and fan out can not be computed for tensor"
- " with fewer than 2 dimensions")
- if dimensions == 2: # Linear
- fan_in = tensor[1]
- fan_out = tensor[0]
- else:
- num_input_fmaps = tensor[1]
- num_output_fmaps = tensor[0]
- receptive_field_size = 1
- if dimensions > 2:
- receptive_field_size = tensor[2] * tensor[3]
- fan_in = num_input_fmaps * receptive_field_size
- fan_out = num_output_fmaps * receptive_field_size
- return fan_in, fan_out
-
-
- def _calculate_correct_fan(tensor, mode):
- """
- for pylint.
- """
- mode = mode.lower()
- valid_modes = ['fan_in', 'fan_out']
- if mode not in valid_modes:
- raise ValueError("Mode {} not supported, please use one of {}".format(mode, valid_modes))
- fan_in, fan_out = _calculate_fan_in_and_fan_out(tensor)
- return fan_in if mode == 'fan_in' else fan_out
-
-
- def kaiming_normal(inputs_shape, a=0, mode='fan_in', nonlinearity='leaky_relu'):
- """
- for pylint.
- """
- fan = _calculate_correct_fan(inputs_shape, mode)
- gain = calculate_gain(nonlinearity, a)
- std = gain / math.sqrt(fan)
- return np.random.normal(0, std, size=inputs_shape).astype(np.float32)
-
-
- def kaiming_uniform(inputs_shape, a=0., mode='fan_in', nonlinearity='leaky_relu'):
- """
- for pylint.
- """
- fan = _calculate_correct_fan(inputs_shape, mode)
- gain = calculate_gain(nonlinearity, a)
- std = gain / math.sqrt(fan)
- bound = math.sqrt(3.0) * std # Calculate uniform bounds from standard deviation
- return np.random.uniform(-bound, bound, size=inputs_shape).astype(np.float32)
-
- def _conv3x3(in_channel, out_channel, stride=1):
-
- weight_shape = (out_channel, in_channel, 3, 3)
- weight = Tensor(kaiming_uniform(weight_shape, mode="fan_out", nonlinearity='relu'))
-
- return nn.Conv2d(in_channel, out_channel, kernel_size=3, stride=stride,
- padding=0, pad_mode='same', weight_init=weight)
-
- def _newconv3x3(in_channel, out_channel, stride=1):
-
- weight_shape = (out_channel, in_channel, 3, 3)
- weight = Tensor(kaiming_normal(weight_shape, mode="fan_out", nonlinearity='relu'))
-
- return nn.Conv2d(in_channel, out_channel, kernel_size=3, stride=stride,
- padding=0, pad_mode='same', weight_init=weight)
-
- def _conv1x1(in_channel, out_channel, stride=1):
-
- weight_shape = (out_channel, in_channel, 1, 1)
- weight = Tensor(kaiming_uniform(weight_shape, mode="fan_out", nonlinearity='relu'))
-
- return nn.Conv2d(in_channel, out_channel, kernel_size=1, stride=stride,
- padding=0, pad_mode='same', weight_init=weight)
-
- def _bn(channel):
- return nn.BatchNorm2d(channel, eps=1e-4, momentum=0.95,
- gamma_init=1, beta_init=0, moving_mean_init=0, moving_var_init=1)
-
-
- class AtrousSpatialPyramidPoolingModule(nn.Cell):
- '''
- operations performed:
- 1x1 x depth
- 3x3 x depth dilation 6
- 3x3 x depth dilation 12
- 3x3 x depth dilation 18
- image pooling
- concatenate all together
- Final 1x1 conv
- '''
-
- def __init__(self, in_dim, reduction_dim=256, output_stride=16, rates=[6, 12, 18]):
- super(AtrousSpatialPyramidPoolingModule, self).__init__()
-
- # Check if we are using distributed BN and use the nn from encoding.nn
- # library rather than using standard pytorch.nn
-
- if output_stride == 8:
- rates = [2 * r for r in rates]
- elif output_stride == 16:
- pass
- else:
- raise 'output stride of {} not supported'.format(output_stride)
-
- self.features = []
- # 1x1
- self.features.append(
- nn.SequentialCell(_conv1x1(in_dim, reduction_dim),
- _bn(reduction_dim), nn.ReLU()))
- # other rates
- for r in rates:
- self.features.append(nn.SequentialCell(
- _conv3x3(in_dim, reduction_dim),
- _bn(reduction_dim),
- nn.ReLU()
- ))
- self.features = mindspore.nn.CellList(self.features)
-
-
- self.img_pooling = nn.AvgPool2d(kernel_size=90, stride=90)
-
- self.img_conv = nn.SequentialCell(
- _conv1x1(in_dim, reduction_dim),
- _bn(reduction_dim), nn.ReLU())
- self.edge_conv = nn.SequentialCell(
- _conv1x1(1, reduction_dim),
- _bn(reduction_dim), nn.ReLU())
- self.resize_bilinear = nn.ResizeBilinear()
- self.concat_op = mindspore.ops.Concat(axis=1)
- def construct(self, x, edge):
- x_size = x.shape
-
- img_features = self.img_pooling(x)
-
- img_features = self.img_conv(img_features)
-
-
-
- img_features = self.resize_bilinear(img_features, x_size[2:])
-
- out = img_features
-
- edge_features = self.resize_bilinear(edge, x_size[2:])
-
- edge_features = self.edge_conv(edge_features)
-
- #concat_op = mindspore.ops.Concat(axis=1)
-
- out = self.concat_op((out, edge_features))
-
-
- for f in self.features:
- y = f(x)
- out = self.concat_op((out, y))
- return out
-
-
- class GSCNN(nn.Cell):
- '''
- Wide_resnet version of DeepLabV3
- mod1
- pool2
- mod2 str2
- pool3
- mod3-7
-
- structure: [3, 3, 6, 3, 1, 1]
- channels = [(128, 128), (256, 256), (512, 512), (512, 1024), (512, 1024, 2048),
- (1024, 2048, 4096)]
- '''
-
- def __init__(self, num_classes, trunk=None, criterion=None):
- '''
- num_classes : 类别
- cirterion : 损失函数
- '''
- super(GSCNN, self).__init__()
- self.criterion = criterion
- self.num_classes = num_classes
- '''wide_resnet : 规则流的骨架'''
- wide_resnet = WiderResNetA2(structure=[3, 3, 6, 3, 1, 1], classes=1000, dilation=True)
-
- self.mod1 = wide_resnet.mod1
- self.mod2 = wide_resnet.mod2
- self.mod3 = wide_resnet.mod3
- self.mod4 = wide_resnet.mod4
- self.mod5 = wide_resnet.mod5
- self.mod6 = wide_resnet.mod6
- self.mod7 = wide_resnet.mod7
- self.pool2 = wide_resnet.pool2
- self.pool3 = wide_resnet.pool3
- self.interpolate = mindspore.nn.ResizeBilinear()
- #del wide_resnet
-
- # self.dsn1 = nn.Conv2d(64, 1, 1, weight_init="HeUniform")
- self.dsn3 = _conv1x1(256, 1)
- self.dsn4 = _conv1x1(512, 1)
- self.dsn7 = _conv1x1(4096, 1)
-
- self.res1 = Resnet.BasicBlock(64, 64, stride=1, downsample=None)
- self.d1 = _conv1x1(64, 32)
- self.res2 = Resnet.BasicBlock(32, 32, stride=1, downsample=None)
- self.d2 = _conv1x1(32, 16)
- self.res3 = Resnet.BasicBlock(16, 16, stride=1, downsample=None)
- self.d3 = _conv1x1(16, 8)
- self.fuse = _conv1x1(8, 1)
-
- self.cw = _conv1x1(2, 1)
-
- self.gate1 = gsc.GatedSpatialConv2d(32, 32)
- self.gate2 = gsc.GatedSpatialConv2d(16, 16)
- self.gate3 = gsc.GatedSpatialConv2d(8, 8)
-
- self.aspp = AtrousSpatialPyramidPoolingModule(4096, 256, output_stride=8)
-
- self.bot_fine = _conv1x1(128, 48)
- self.bot_aspp = _conv1x1(1280 + 256, 256)
-
- self.final_seg = nn.SequentialCell(
- _newconv3x3(256 + 48, 256),
- _bn(256),
- nn.ReLU(),
- _newconv3x3(256, 256),
- _bn(256),
- nn.ReLU(),
- _conv1x1(256, num_classes))
-
-
-
- self.sigmoid = nn.Sigmoid()
- self.resize_bilinear = nn.ResizeBilinear()
- #initialize_weights(self.final_seg)
- self.concat_op = mindspore.ops.Concat(axis=1)
-
- def construct(self, inp, mask, edgemap, newcanny, label_panduan):
-
- x_size = inp.shape
- #edgemap = edgemap.astype(dtype=mindspore.int32)
- m1 = self.mod1(inp)
-
- m2 = self.mod2(self.pool2(m1))
-
- m3 = self.mod3(self.pool3(m2))
-
- m4 = self.mod4(m3)
-
- m5 = self.mod5(m4)
-
- m6 = self.mod6(m5)
-
- m7 = self.mod7(m6)
-
- dsn3 = self.dsn3(m3)
- s3 = self.resize_bilinear(self.dsn3(m3), x_size[2:], align_corners=True)
-
- s4 = self.resize_bilinear(self.dsn4(m4), x_size[2:], align_corners=True)
-
- s7 = self.resize_bilinear(self.dsn7(m7), x_size[2:], align_corners=True)
-
-
- m1f = self.resize_bilinear(m1, x_size[2:], align_corners=True)
-
- canny = newcanny
-
-
- cs = self.res1(m1f)
-
- cs = self.resize_bilinear(cs, x_size[2:], align_corners=True)
-
- cs = self.d1(cs)
-
- cs = self.gate1(cs, s3)
-
- cs = self.res2(cs)
-
- cs = self.resize_bilinear(cs, x_size[2:], align_corners=True)
-
- cs = self.d2(cs)
-
- cs = self.gate2(cs, s4)
-
- cs = self.res3(cs)
-
- cs = self.resize_bilinear(cs, x_size[2:], align_corners=True)
-
- cs = self.d3(cs)
-
- cs = self.gate3(cs, s7)
-
- cs = self.fuse(cs)
-
- cs = self.resize_bilinear(cs, x_size[2:], align_corners=True)
-
- edge_out = self.sigmoid(cs)
-
- #concat_op = mindspore.ops.Concat(axis=1)
- cat = self.concat_op((edge_out, canny))
-
- acts = self.cw(cat)
-
- acts = self.sigmoid(acts)
-
- # aspp
- x = self.aspp(m7, acts)
-
-
- dec0_up = self.bot_aspp(x)
-
- dec0_fine = self.bot_fine(m2)
-
- dec0_up = self.interpolate(dec0_up, m2.shape[2:], align_corners=True)
-
-
- dec0 = self.concat_op((dec0_fine, dec0_up))
- dec1 = self.final_seg(dec0)
-
-
- seg_out = self.interpolate(dec1, x_size[2:])
-
-
-
- # loss = JointEdgeSegLoss(classes=19, ignore_index=255,
- # upper_bound=1.0,
- # edge_weight=1.0, seg_weight=1.0,
- # att_weight=1.0, dual_weight=1.0)
-
-
- # mask = mask.astype(dtype=mindspore.int32)
- # edgemap = edgemap.astype(dtype=mindspore.int32)
-
- # print("网络结果")
- # print(seg_out.max())
- # print(edge_out.max())
- # inputs = (seg_out, edge_out)
- # targets = (mask, edgemap, label_panduan)
- # main_loss = loss(inputs, targets)
- # # print("----------------------------------结束----------------------")
- # print(main_loss)
- # return main_loss
- return seg_out,edge_out
-
-
- if __name__ == '__main__':
- # img = torch.randn(1, 3, 256, 256)
- # model = get_icnet_resnet50_citys()
- # outputs = model(img)
-
- # model = GSCNN(19).to(device)
- #context.set_context(mode = context.PYNATIVE_MODE, device_target = "GPU")
- # context.set_context(mode=context.GRAPH_MODE, device_target="GPU")
-
- model = GSCNN(19).set_train(True)
- print(model)
- inputs = mindspore.Tensor(np.ones((2, 3, 24, 24)).astype("float32"))
- im_arr = inputs.asnumpy().transpose((0, 2, 3, 1)).astype(np.uint8)
- '''im_arr : torch.Size([2, 96, 96, 3])'''
- x_size = inputs.shape
- canny = np.zeros((x_size[0], 1, x_size[2], x_size[3]))
- '''canny : torch.Size([2, 1, 96, 96])'''
- # canny获得边缘图像
- # cv2.Canny()方法可以获得图像的边缘图像
- for i in range(x_size[0]):
- canny[i] = cv2.Canny(im_arr[i], 10, 100)
- canny = mindspore.Tensor.from_numpy(canny)
- canny = mindspore.Tensor(canny, dtype=mindspore.float32)
- new_canny = mindspore.Tensor(np.random.randn(2, 1, 24, 24).astype("float32"))
- output = model(inputs, canny)
- #print(output[0].shape)
- #print(output[1].shape)
- inputs = mindspore.Tensor(inputs, dtype=mindspore.double)
|