|
- import torch
- import torch.nn as nn
- import torch.nn.functional as F
- from torch.autograd import Variable
-
-
- class ConvLSTMCell(nn.Module):
-
- def __init__(self, input_size, input_dim, hidden_dim, kernel_size, bias):
- """
- Initialize ConvLSTM cell.
- Parameters
- ----------
- input_size: (int, int)
- Height and width of input tensor as (height, width).
- input_dim: int
- Number of channels of input tensor.
- hidden_dim: int
- Number of channels of hidden state.
- kernel_size: (int, int)
- Size of the convolutional kernel.
- bias: bool
- Whether or not to add the bias.
- """
- super(ConvLSTMCell, self).__init__()
-
- self.height, self.width = input_size
- self.input_dim = input_dim
- self.hidden_dim = hidden_dim
-
- self.kernel_size = kernel_size
- self.padding = kernel_size[0] // 2, kernel_size[1] // 2
- self.bias = bias
-
- self.conv = nn.Conv2d(in_channels=self.input_dim + self.hidden_dim,
- out_channels=4 * self.hidden_dim,
- kernel_size=self.kernel_size,
- padding=self.padding,
- bias=self.bias)
-
- def forward(self, input, prev_state):
- h_prev, c_prev = prev_state
- combined = torch.cat((input, h_prev), dim=1) # concatenate along channel axis
-
- combined_conv = self.conv(combined)
- cc_i, cc_f, cc_o, cc_g = torch.split(combined_conv, self.hidden_dim, dim=1)
-
- i = F.sigmoid(cc_i)
- f = F.sigmoid(cc_f)
- o = F.sigmoid(cc_o)##this work as soft attension,can be used to do soft attention fusino
- g = F.tanh(cc_g)
-
- c_cur = f * c_prev + i * g
- h_cur = o * F.tanh(c_cur)
-
- return h_cur, c_cur
-
- def init_hidden(self, batch_size, cuda=True):
- state = (Variable(torch.zeros(batch_size, self.hidden_dim, self.height, self.width)),
- Variable(torch.zeros(batch_size, self.hidden_dim, self.height, self.width)))
- if cuda:
- state = (state[0].cuda(), state[1].cuda())
- return state
-
-
- class ConvLSTM(nn.Module):
-
- def __init__(self, input_size, input_dim, hidden_dim, kernel_size, num_layers,
- batch_first=False, bias=True, return_all_layers=False):
- super(ConvLSTM, self).__init__()
-
- self._check_kernel_size_consistency(kernel_size)
-
- # Make sure that both `kernel_size` and `hidden_dim` are lists having len == num_layers
- kernel_size = self._extend_for_multilayer(kernel_size, num_layers)
- hidden_dim = self._extend_for_multilayer(hidden_dim, num_layers)
- if not len(kernel_size) == len(hidden_dim) == num_layers:
- raise ValueError('Inconsistent list length.')
-
- self.height, self.width = input_size
-
- self.input_dim = input_dim
- self.hidden_dim = hidden_dim
- self.kernel_size = kernel_size
- self.num_layers = num_layers
- self.batch_first = batch_first
- self.bias = bias
- self.return_all_layers = return_all_layers
-
- cell_list = []
- for i in range(0, self.num_layers):
- cur_input_dim = self.input_dim if i == 0 else self.hidden_dim[i-1]
-
- cell_list.append(ConvLSTMCell(input_size=(self.height, self.width),
- input_dim=cur_input_dim,
- hidden_dim=self.hidden_dim[i],
- kernel_size=self.kernel_size[i],
- bias=self.bias))
-
- self.cell_list = nn.ModuleList(cell_list)
-
- def forward(self, input, hidden_state=None):
- """
- Parameters
- ----------
- input_tensor: todo
- 5-D Tensor either of shape (t, b, c, h, w) or (b, t, c, h, w)
- hidden_state: todo
- None. todo implement stateful
- Returns
- -------
- last_state_list, layer_output
- """
- if not self.batch_first:
- # (t, b, c, h, w) -> (b, t, c, h, w)
- input = input.permute(1, 0, 2, 3, 4)
-
-
- if hidden_state is None:
- hidden_state = self.get_init_states(batch_size=input.size(0))
-
- layer_output_list = []
- last_state_list = []
-
- seq_len = input.size(1)
- cur_layer_input = input
-
- for layer_idx in range(self.num_layers):
- h, c = hidden_state[layer_idx]
- output_inner = []
- for t in range(seq_len):
- h, c = self.cell_list[layer_idx](input=cur_layer_input[:, t, :, :, :],
- prev_state=[h, c])
- output_inner.append(h)
-
- layer_output = torch.stack(output_inner, dim=1)
- cur_layer_input = layer_output
-
- layer_output_list.append(layer_output)
- last_state_list.append((h, c))
-
-
- layer_output = layer_output_list[-1]
- if not self.batch_first:
- layer_output = layer_output.permute(1, 0, 2, 3, 4)
-
- return layer_output, last_state_list
-
- def get_init_states(self, batch_size, cuda=True):
- init_states = []
- for i in range(self.num_layers):
- init_states.append(self.cell_list[i].init_hidden(batch_size, cuda))
- return init_states
-
- @staticmethod
- def _check_kernel_size_consistency(kernel_size):
- if not (isinstance(kernel_size, tuple) or
- (isinstance(kernel_size, list) and all([isinstance(elem, tuple) for elem in kernel_size]))):
- raise ValueError('`kernel_size` must be tuple or list of tuples')
-
- @staticmethod
- def _extend_for_multilayer(param, num_layers):
- if not isinstance(param, list):
- param = [param] * num_layers
- return param
-
- class GRUCell(nn.Module):
-
- def __init__(self, input_size, input_dim, hidden_dim, kernel_size, bias):
- """
- Initialize GRU cell.
- Parameters
- ----------
- input_size: (int, int)
- Height and width of input tensor as (height, width).
- input_dim: int
- Number of channels of input tensor.
- hidden_dim: int
- Number of channels of hidden state.
- kernel_size: (int, int)
- Size of the convolutional kernel.
- bias: bool
- Whether or not to add the bias.
- """
- super(GRUCell, self).__init__()
-
- self.height, self.width = input_size
- self.input_dim = input_dim
- self.hidden_dim = hidden_dim
-
- self.kernel_size = kernel_size
- self.padding = kernel_size[0] // 2, kernel_size[1] // 2
- self.bias = bias
-
- self.conv1 = nn.Conv2d(in_channels=self.input_dim + self.hidden_dim,
- out_channels=2 * self.hidden_dim,
- kernel_size=self.kernel_size,
- padding=self.padding,
- bias=self.bias)
-
- self.conv2 = nn.Conv2d(in_channels=self.input_dim + self.hidden_dim,
- out_channels=self.hidden_dim,
- kernel_size=self.kernel_size,
- padding=self.padding,
- bias=self.bias)
-
- def forward(self, input, prev_state):
- h_prev = prev_state
- combined = torch.cat((input, h_prev), dim=1) # concatenate along channel axis
-
- combined_conv = self.conv1(combined)
- r,z = torch.split(combined_conv, self.hidden_dim, dim=1)
- #print("h's size is",h_prev.shape)
- #print("z's size is",z.shape)
-
- r = F.sigmoid(r)
- z = F.sigmoid(z)
-
- h_prev_reset=r*h_prev
- combined_candidate=torch.cat((input,h_prev_reset),dim=1)
- candidate=F.tanh(self.conv2(combined_candidate))
-
- h_cur=z*h_prev+(1-z)*candidate
-
- return h_cur
-
- def init_hidden(self, batch_size, cuda=True):
- state = Variable(torch.zeros(batch_size, self.hidden_dim, self.height, self.width))
- if cuda:
- state = state.cuda()
- return state
-
- class convGRU(nn.Module):
-
- def __init__(self, input_size, input_dim, hidden_dim, kernel_size, num_layers,
- batch_first=False, bias=True, return_all_layers=False):
- super(convGRU, self).__init__()
-
- self._check_kernel_size_consistency(kernel_size)
-
- # Make sure that both `kernel_size` and `hidden_dim` are lists having len == num_layers
- kernel_size = self._extend_for_multilayer(kernel_size, num_layers)
- hidden_dim = self._extend_for_multilayer(hidden_dim, num_layers)
- if not len(kernel_size) == len(hidden_dim) == num_layers:
- raise ValueError('Inconsistent list length.')
-
- self.height, self.width = input_size
-
- self.input_dim = input_dim
- self.hidden_dim = hidden_dim
- self.kernel_size = kernel_size
- self.num_layers = num_layers
- self.batch_first = batch_first
- self.bias = bias
- self.return_all_layers = return_all_layers
-
- cell_list = []
- for i in range(0, self.num_layers):
- cur_input_dim = self.input_dim if i == 0 else self.hidden_dim[i-1]
-
- cell_list.append(GRUCell(input_size=(self.height, self.width),
- input_dim=cur_input_dim,
- hidden_dim=self.hidden_dim[i],
- kernel_size=self.kernel_size[i],
- bias=self.bias))
-
- self.cell_list = nn.ModuleList(cell_list)
-
- def forward(self, input, hidden_state=None):
- """
- Parameters
- ----------
- input_tensor: todo
- 5-D Tensor either of shape (t, b, c, h, w) or (b, t, c, h, w)
- hidden_state: todo
- None. todo implement stateful
- Returns
- -------
- last_state_list, layer_output
- """
- if not self.batch_first:
- # (t, b, c, h, w) -> (b, t, c, h, w)
- input = input.permute(1, 0, 2, 3, 4)
-
-
- if hidden_state is None:
- hidden_state = self.get_init_states(batch_size=input.size(0))
-
- layer_output_list = []
- last_state_list = []
-
- seq_len = input.size(1)
- cur_layer_input = input
-
- for layer_idx in range(self.num_layers):
- h= hidden_state[layer_idx]
- output_inner = []
- for t in range(seq_len):
- h= self.cell_list[layer_idx](input=cur_layer_input[:, t, :, :, :],
- prev_state=h)
- output_inner.append(h)
-
- layer_output = torch.stack(output_inner, dim=1)
- cur_layer_input = layer_output
-
- layer_output_list.append(layer_output)
- last_state_list.append(h)
-
-
- layer_output = layer_output_list[-1]
- if not self.batch_first:
- layer_output = layer_output.permute(1, 0, 2, 3, 4)
-
- return layer_output, last_state_list
-
- def get_init_states(self, batch_size, cuda=True):
- init_states = []
- for i in range(self.num_layers):
- init_states.append(self.cell_list[i].init_hidden(batch_size, cuda))
- return init_states
-
- @staticmethod
- def _check_kernel_size_consistency(kernel_size):
- if not (isinstance(kernel_size, tuple) or
- (isinstance(kernel_size, list) and all([isinstance(elem, tuple) for elem in kernel_size]))):
- raise ValueError('`kernel_size` must be tuple or list of tuples')
-
- @staticmethod
- def _extend_for_multilayer(param, num_layers):
- if not isinstance(param, list):
- param = [param] * num_layers
- return param
-
- class ModGRUCell(nn.Module):
-
- def __init__(self, input_size, input_dim, hidden_dim, kernel_size, bias):
- """
- Initialize modified GRU cell.
- Parameters
- ----------
- input_size: (int, int)
- Height and width of input tensor as (height, width).
- input_dim: int
- Number of channels of input tensor.
- hidden_dim: int
- Number of channels of hidden state.
- kernel_size: (int, int)
- Size of the convolutional kernel.
- bias: bool
- Whether or not to add the bias.
- """
- super(ModGRUCell, self).__init__()
-
- self.height, self.width = input_size
- self.input_dim = input_dim
- self.hidden_dim = hidden_dim
-
- self.kernel_size = kernel_size
- self.padding = kernel_size[0] // 2, kernel_size[1] // 2
- self.bias = bias
-
- self.conv1 = nn.Conv2d(in_channels=self.input_dim + self.hidden_dim,
- out_channels=1,
- kernel_size=self.kernel_size,
- padding=self.padding,
- bias=self.bias)
-
- self.conv2 = nn.Conv2d(in_channels=self.input_dim,
- out_channels=self.hidden_dim,
- kernel_size=self.kernel_size,
- padding=self.padding,
- bias=self.bias)
-
- def forward(self, input, prev_state):
- h_prev = prev_state
- combined = torch.cat((input, h_prev), dim=1) # concatenate along channel axis
-
- combined_conv = F.sigmoid(self.conv1(combined))
- m=combined_conv
- #print("h's size is",h_prev.shape)
- #print("z's size is",z.shape)
- repeat_mask=m.repeat(1,self.hidden_dim,1,1)
- h_prev_reset=h_prev*repeat_mask
- candidate=F.tanh(self.conv2(input))
-
- h_cur=h_prev_reset+(1-repeat_mask)*candidate
-
- return h_cur
-
- def init_hidden(self, batch_size, cuda=True):
- state = Variable(torch.zeros(batch_size, self.hidden_dim, self.height, self.width))
- if cuda:
- state = state.cuda()
- return state
-
- class ModconvGRU(nn.Module):
-
- def __init__(self, input_size, input_dim, hidden_dim, kernel_size, num_layers,
- batch_first=False, bias=True, return_all_layers=False):
- super(ModconvGRU, self).__init__()
-
- self._check_kernel_size_consistency(kernel_size)
-
- # Make sure that both `kernel_size` and `hidden_dim` are lists having len == num_layers
- kernel_size = self._extend_for_multilayer(kernel_size, num_layers)
- hidden_dim = self._extend_for_multilayer(hidden_dim, num_layers)
- if not len(kernel_size) == len(hidden_dim) == num_layers:
- raise ValueError('Inconsistent list length.')
-
- self.height, self.width = input_size
-
- self.input_dim = input_dim
- self.hidden_dim = hidden_dim
- self.kernel_size = kernel_size
- self.num_layers = num_layers
- self.batch_first = batch_first
- self.bias = bias
- self.return_all_layers = return_all_layers
-
- cell_list = []
- for i in range(0, self.num_layers):
- cur_input_dim = self.input_dim if i == 0 else self.hidden_dim[i-1]
-
- cell_list.append(ModGRUCell(input_size=(self.height, self.width),
- input_dim=cur_input_dim,
- hidden_dim=self.hidden_dim[i],
- kernel_size=self.kernel_size[i],
- bias=self.bias))
-
- self.cell_list = nn.ModuleList(cell_list)
-
- def forward(self, input, hidden_state=None):
- """
- Parameters
- ----------
- input_tensor: todo
- 5-D Tensor either of shape (t, b, c, h, w) or (b, t, c, h, w)
- hidden_state: todo
- None. todo implement stateful
- Returns
- -------
- last_state_list, layer_output
- """
- if not self.batch_first:
- # (t, b, c, h, w) -> (b, t, c, h, w)
- input = input.permute(1, 0, 2, 3, 4)
-
-
- if hidden_state is None:
- hidden_state = self.get_init_states(batch_size=input.size(0))
-
- layer_output_list = []
- last_state_list = []
-
- seq_len = input.size(1)
- cur_layer_input = input
-
- for layer_idx in range(self.num_layers):
- h= hidden_state[layer_idx]
- output_inner = []
- for t in range(seq_len):
- h= self.cell_list[layer_idx](input=cur_layer_input[:, t, :, :, :],
- prev_state=h)
- output_inner.append(h)
-
- layer_output = torch.stack(output_inner, dim=1)
- cur_layer_input = layer_output
-
- layer_output_list.append(layer_output)
- last_state_list.append(h)
-
-
- layer_output = layer_output_list[-1]
- if not self.batch_first:
- layer_output = layer_output.permute(1, 0, 2, 3, 4)
-
- return layer_output, last_state_list
-
- def get_init_states(self, batch_size, cuda=True):
- init_states = []
- for i in range(self.num_layers):
- init_states.append(self.cell_list[i].init_hidden(batch_size, cuda))
- return init_states
-
- @staticmethod
- def _check_kernel_size_consistency(kernel_size):
- if not (isinstance(kernel_size, tuple) or
- (isinstance(kernel_size, list) and all([isinstance(elem, tuple) for elem in kernel_size]))):
- raise ValueError('`kernel_size` must be tuple or list of tuples')
-
- @staticmethod
- def _extend_for_multilayer(param, num_layers):
- if not isinstance(param, list):
- param = [param] * num_layers
- return param
|