|
- # Copyright (c) Nanjing University, Vision Lab.
- # Last update: 2019.09.29
-
- #import tensorflow as tf
- import numpy as np
- #from tensorflow.contrib.coder.python.ops import coder_ops
- from tensorlayer.layers import Module
- import tensorlayer as tl
- import torchac
- import torch
-
- def myabs(x):
- pos = tl.ops.Maximum()(x, 0)
- neg = tl.ops.Minimum()(x, 0)
- out = pos + neg*(-1)
- return out
-
- class EntropyBottleneck(Module): #已改
- """The layer implements a flexible probability density model to estimate
- entropy of its input tensor, which is described in this paper:
- >"Variational image compression with a scale hyperprior"
- > J. Balle, D. Minnen, S. Singh, S. J. Hwang, N. Johnston
- > https://arxiv.org/abs/1802.01436
- """
-
- def __init__(self, channels, likelihood_bound=1e-9, range_coder_precision=16,
- init_scale=8, filters=(3,3,3)): #已改
- super(EntropyBottleneck, self).__init__()
- self._likelihood_bound = float(likelihood_bound)
- self._range_coder_precision = int(range_coder_precision)
- self._init_scale = float(init_scale)
- self._filters = tuple(int(f) for f in filters)
- #self.input_spec = tf.keras.layers.InputSpec(min_ndim=2)
- self.build(channels)
-
- def build(self, channels): #已改
- """Build the entropy model.
- Creates the variables for the network modeling the densities.
- And then uses that to create the probability mass functions (pmf) and the
- discrete cumulative density functions (cdf) used by the range coder.
-
- Arguments:
- channels.
- """
-
- filters = (1,) + self._filters +(1,)
- scale = self._init_scale ** (1 / (len(self._filters) + 1))
-
- # Create variables.
- self._matrices = []
- self._biases = []
- self._factors = []
- for i in range(len(self._filters) + 1):
- init = np.log(np.expm1(1.0 / scale / filters[i + 1]))
- exec('self.matrix_{} = self._get_weights("matrix_{}",shape=(channels, filters[{} + 1], filters[{}]),init=tl.initializers.constant(init),trainable=True)'.format(i, i, i, i))
- '''matrix = self._get_weights("matrix_{}".format(i), #tl规定:[filter_depth, filter_height, filter_width, in_channels, out_channels]
- shape=(channels, filters[i + 1], filters[i]),
- init=tl.initializers.constant(init),
- trainable=True)
-
- self._matrices.append(matrix)
- '''
- exec("self._matrices.append(self.matrix_{})".format(i))
- exec('self.bias_{} = self._get_weights("bais_{}",shape=(channels, filters[{}+1], 1),init=tl.initializers.random_uniform(minval=-0.5, maxval=0.5),trainable=True)'.format(i, i, i))
- '''bias = self._get_weights("bais_{}".format(i),
- shape=(channels, filters[i+1], 1),
- init=tl.initializers.random_uniform(minval=-0.5, maxval=0.5),
- trainable=True)
- self._biases.append(bias)
- '''
- exec("self._biases.append(self.bias_{})".format(i))
- exec('self.factor_{} = self._get_weights("factor_{}",shape=(channels, filters[{} + 1], 1),init=tl.initializers.zeros(),trainable=True)'.format(i, i, i))
- '''factor = self._get_weights("factor_{}".format(i),
- shape=(channels, filters[i + 1], 1),
- init=tl.initializers.zeros(),
- trainable=True)
- self._factors.append(factor)
- '''
- exec("self._factors.append(self.factor_{})".format(i))
-
- def _logits_cumulative(self, inputs): #已改
- """Evaluate logits of the cumulative densities.
-
- Arguments:
- inputs: The values at which to evaluate the cumulative densities,
- expected to have shape `(channels, 1, batch)`.
-
- Returns:
- A tensor of the same shape as inputs, containing the logits of the
- cumulatice densities evaluated at the the given inputs.
- """
-
- logits = inputs
-
- for i in range(len(self._filters) + 1):
- matrix = self._matrices[i]
- matrix = tl.ops.softplus(matrix)
- logits = tl.ops.matmul(matrix, logits)
-
- bias = self._biases[i]
- logits += bias
-
- factor = self._factors[i]
- factor = tl.ops.tanh(factor)
- logits = logits + factor * tl.ops.tanh(logits)
-
- return logits
-
- def _quantize(self, inputs, mode): #已改
- """Add noise or quantize."""
- half = tl.ops.constant(0.5)
-
- if mode == "noise":
- noise = tl.initializers.random_uniform(minval=-half, maxval=half)(shape=inputs.shape, dtype=tl.float32)
- return tl.ops.add_n([inputs, noise]) #张量逐元素相加
-
- if mode == "symbols":
- outputs = tl.ops.round(inputs)
- return outputs
-
- def _likelihood(self, inputs): #已改
- """ Estimate the likelihoods.
-
- Arguments:
- inputs: tensor with shape(batch size, length, width, height, channels)
-
- Return:
- likelihoods: tensor with shape(batch size, length, width, height, channels)
- """
-
- ndim = 5
- channel_axes = ndim - 1
- half = tl.ops.constant(0.5)
-
- # Convert shape to (channels, 1, -1)
- order = list(range(ndim))# order=[0,1,2,3,4]
- order.pop(channel_axes)# order=[0,1,2,3]
- order.insert(0, channel_axes)# order=[4,0,1,2,3]
- inputs = tl.ops.transpose(inputs, order)
- shape = inputs.shape
- #shape = tf.shape(inputs)# shape=[channels, batch, length, width, height]
- inputs = tl.ops.reshape(inputs,(shape[0], 1, -1))# shape=(channel, 1, -1)
-
- # Evaluate densities.
- lower = self._logits_cumulative(inputs - half)
- upper = self._logits_cumulative(inputs + half)
-
- # Flip signs if we can move more towards the left tail of the sigmoid.
- sign = -tl.ops.Sign()(tl.ops.add_n([lower, upper]))
- # sign = tf.stop_gradient(sign)
- likelihood = myabs(tl.ops.sigmoid(sign * upper) - tl.ops.sigmoid(sign * lower))
-
- # Convert back to input tensor shape.
- order = list(range(1, ndim))# order=[1,2,3,4]
- order.insert(channel_axes, 0)# order=[1,2,3,4,0]
- likelihood = tl.ops.reshape(likelihood, shape)# shape=[channels, batch, length, width, height]
- likelihood = tl.ops.transpose(likelihood, order) # shape=[batch size, length, width, height, channels]
-
- return likelihood
-
- def forward(self, inputs, training=True): #训练的时候调用 已改
- """Pass a tensor through the bottleneck.
-
- Arguments:
- inputs: The tensor to be passed through the bottleneck.
-
- Returns:
- values: `Tensor` with the shape as `inputs` containing the perturbed
- or quantized input values.
- likelihood: `Tensor` with the same shape as `inputs` containing the
- likelihood of `values` under the modeled probability distributions.
- """
-
- #inputs = tf.convert_to_tensor(inputs, dtype=self.dtype)
- outputs = self._quantize(inputs, "noise" if training else "symbols")
-
- # Evaluate densities.
- likelihood = self._likelihood(outputs)
- likelihood_bound = tl.ops.constant(self._likelihood_bound)
- likelihood = tl.ops.Maximum()(likelihood, likelihood_bound)
-
- # # TODO:delete graph execution.
- # if not tf.executing_eagerly():
- # outputs_shape, likelihood_shape = \
- # tf.TensorShape(inputs.shape), tf.TensorShape(inputs.shape)
- # outputs.set_shape(outputs_shape)
- # likelihood.set_shape(likelihood_shape)
-
- return outputs, likelihood
-
- def _get_cdf(self, min_v, max_v,channels): #已改
- """Get quantized cumulative density function (CDF) for compress/decompress.
- Arguments:
- inputs: integer tesnor min_v, max_v.
- Return:
- cdf with shape [1, channels, symbols].
- """
- a = tl.ops.reshape(tl.ops.range(min_v, max_v+1), [1, 1, max_v-min_v+1])# [1, 1, N]
- a = tl.ops.tile(a, [channels, 1, 1])# [C, 1, N]
- a = tl.ops.cast(a, tl.float32)
-
- # estimate pmf/likelihood.
- half = tl.ops.constant(.5, dtype=tl.float32)
- lower = self._logits_cumulative(a - half)
- upper = self._logits_cumulative(a + half)
-
- sign = -tl.ops.Sign()(tl.ops.add_n([lower, upper]))
-
- likelihood = myabs(tl.ops.sigmoid(sign * upper) - tl.ops.sigmoid(sign * lower))
- likelihood_bound = tl.ops.constant(self._likelihood_bound, dtype=tl.float32)
- likelihood = tl.ops.Maximum()(likelihood, likelihood_bound)
- pmf = likelihood #pmf.shape: (8, 1, 4)
- #pmf: tf.Tensor([[[9.9999997e-10 9.9999997e-10 1.0000000e+00 9.9999997e-10]]...[[9.9999997e-10 9.9999997e-10 1.0000000e+00 9.9999997e-10]]], shape=(8, 1, 4), dtype=float32)
- # pmf to cdf.
- #######cdf = coder_ops.pmf_to_quantized_cdf(pmf, precision=self._range_coder_precision)
- pmf = tl.ops.reshape(pmf,[channels,-1]) #(8,N)
- cdf = self._pmf_to_cdf(pmf) #cdf:累计概率,从0到1 cdf.shape: (8,N+1) 增加了一个0
- cdf = tl.ops.reshape(cdf,[1, channels, -1])
-
- return cdf
-
- def _pmf_to_cdf(self, pmf): #已改
- cdf = tl.ops.cumsum(pmf, axis=-1)
- #cdf = pmf.cumsum(dim=-1) #每一列都是前面列的累加和
- spatial_dimensions = pmf.shape[:-1] + (1,)
- zeros = tl.ops.zeros(spatial_dimensions, dtype=tl.float32)
- #zeros = torch.zeros(spatial_dimensions, dtype=pmf.dtype, device=pmf.device)
- cdf_with_0 = tl.ops.concat([zeros, cdf], axis=-1)
- cdf_with_0 = tl.ops.Minimum()(cdf_with_0, 1.0)
-
- return cdf_with_0
-
- def compress(self, inputs): #推理的时候调用 已改
- """Compress inputs and store their binary representations into strings.
-
- Arguments:
- inputs: `Tensor` with values to be compressed. Must have shape
- [**batch size**, length, width, height, channels]
- Returns:
- compressed: String `Tensor` vector containing the compressed
- representation of each batch element of `inputs`.
- """
- #inputs = tf.convert_to_tensor(inputs)
- '''if not self.built:
- if self.dtype is None:
- self._dtype = inputs.dtype.base_dtype.name
- self.build(inputs.shape) #factorized时执行这句,hyper不执行这句,不知道为啥
- '''
- ndim = 5
- channel_axes = ndim - 1
- channels = inputs.shape[-1]
-
- # quantize.
- values = self._quantize(inputs, "symbols")
-
- # get cdf
- min_v = tl.ops.cast(tl.ops.floor(tl.ops.reduce_min(values)), dtype=tl.int32)
- max_v = tl.ops.cast(tl.ops.ceil(tl.ops.reduce_max(values)), dtype=tl.int32)
-
- cdf = self._get_cdf(min_v, max_v,channels)
- # range encode.
- values = tl.ops.reshape(values, [-1, channels])
- values = tl.ops.cast(values, dtype=tl.int32)
- values = values - min_v
- out_cdf = tl.ops.tile(cdf, [values.shape[0], 1, 1]) #扩展了维度out_cdf.shape: torch.Size([13849, 8, 37]),而且在13849的维度上是一样的
- out_cdf = tl.convert_to_numpy(out_cdf)
- out_cdf = torch.from_numpy(out_cdf)
- values = torch.from_numpy(tl.convert_to_numpy(values))
- strings = torchac.encode_float_cdf(out_cdf, values.to(torch.int16), check_input_bounds=True) #根据累积分布函数进行算术编解码
-
- return strings, min_v, max_v
-
- def decompress(self, strings, min_v, max_v, shape): #已改
- """Decompress values from their compressed string representations.
-
- Arguments:
- strings: A string `Tensor` vector containing the compressed data.
- shape: A `Tensor` vector of int32 type. Contains the shape of the tensor to be
- decompressed. [batch size, length, width, height, channels]
- min_v & max_v: minimum & maximum values.
-
- Returns:
- The decompressed `Tensor`. tf.float32.
- """
- shape = tl.convert_to_tensor(shape, dtype='int32')# [batch size, length, width, height, channels]
- min_v = tl.convert_to_tensor(min_v, dtype='int32')
- max_v = tl.convert_to_tensor(max_v, dtype='int32')
- channels = shape[-1]
- # get cdf.
- cdf = self._get_cdf(min_v, max_v, channels) #[1, channels, -1]
- out_cdf = tl.ops.tile(cdf, [shape[0]*shape[1]*shape[2]*shape[3], 1, 1])
- out_cdf = tl.convert_to_numpy(out_cdf)
- out_cdf = torch.from_numpy(out_cdf)
- values = torchac.decode_float_cdf(out_cdf, strings).numpy()
- values = values + min_v.numpy()
- values = tl.convert_to_tensor(values, dtype=tl.float32)
- values = tl.ops.reshape(values, shape)
-
- return values
-
- if __name__=='__main__': #测试通过
- np.random.seed(108)
- training = False
- y = np.random.rand(2, 8, 8, 8, 8).astype("float32") #0-1均匀分布
- y = np.round(y * 20 - 10)
- y_gpu = tl.convert_to_tensor(y,tl.int32)
- print("y_gpu[0,0,0,0]:",y_gpu[0,0,0,0])
- entropy_bottleneck = EntropyBottleneck(channels=8)
- y_strings, y_min_v, y_max_v = entropy_bottleneck.compress(y_gpu) #encode
- print("y_min_v:",y_min_v)
- print("y_max_v:",y_max_v)
-
- #decode
- y_decoded = entropy_bottleneck.decompress(y_strings, y_min_v, y_max_v, y_gpu.shape)
- compare = tl.ops.equal(y_gpu,tl.ops.cast(y_decoded, dtype=tl.int32))
- compare = tl.ops.cast(compare,tl.float32)
- print("compare=False:",tl.ops.where(compare<0.1),len(tl.ops.where(compare<0.1)))
- print("y_decoded[0,0,0,0]:",y_decoded[0,0,0,0])
|