|
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
-
- import os
- import unittest
-
- os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
-
- import numpy as np
- import tensorflow as tf
- import tensorlayer as tl
- from tqdm import tqdm
- from sklearn.utils import shuffle
- from tensorlayer.models.seq2seq import Seq2seq
- from tests.utils import CustomTestCase
- from tensorlayer.cost import cross_entropy_seq
-
-
- class Model_SEQ2SEQ_Test(CustomTestCase):
-
- @classmethod
- def setUpClass(cls):
-
- cls.batch_size = 16
-
- cls.vocab_size = 20
- cls.embedding_size = 32
- cls.dec_seq_length = 5
- cls.trainX = np.random.randint(20, size=(50, 6))
- cls.trainY = np.random.randint(20, size=(50, cls.dec_seq_length + 1))
- cls.trainY[:, 0] = 0 # start_token == 0
-
- # Parameters
- cls.src_len = len(cls.trainX)
- cls.tgt_len = len(cls.trainY)
-
- assert cls.src_len == cls.tgt_len
-
- cls.num_epochs = 100
- cls.n_step = cls.src_len // cls.batch_size
-
- @classmethod
- def tearDownClass(cls):
- pass
-
- def test_basic_simpleSeq2Seq(self):
- model_ = Seq2seq(
- decoder_seq_length=5,
- cell_enc=tf.keras.layers.GRUCell,
- cell_dec=tf.keras.layers.GRUCell,
- n_layer=3,
- n_units=128,
- embedding_layer=tl.layers.Embedding(vocabulary_size=self.vocab_size, embedding_size=self.embedding_size),
- )
-
- optimizer = tf.optimizers.Adam(learning_rate=0.001)
-
- for epoch in range(self.num_epochs):
- model_.train()
- trainX, trainY = shuffle(self.trainX, self.trainY)
- total_loss, n_iter = 0, 0
- for X, Y in tqdm(tl.iterate.minibatches(inputs=trainX, targets=trainY, batch_size=self.batch_size,
- shuffle=False), total=self.n_step,
- desc='Epoch[{}/{}]'.format(epoch + 1, self.num_epochs), leave=False):
-
- dec_seq = Y[:, :-1]
- target_seq = Y[:, 1:]
-
- with tf.GradientTape() as tape:
- ## compute outputs
- output = model_(inputs=[X, dec_seq])
-
- output = tf.reshape(output, [-1, self.vocab_size])
-
- loss = cross_entropy_seq(logits=output, target_seqs=target_seq)
-
- grad = tape.gradient(loss, model_.all_weights)
- optimizer.apply_gradients(zip(grad, model_.all_weights))
-
- total_loss += loss
- n_iter += 1
-
- model_.eval()
- test_sample = trainX[0:2, :].tolist()
-
- top_n = 1
- for i in range(top_n):
- prediction = model_([test_sample], seq_length=self.dec_seq_length, start_token=0, top_n=1)
- print("Prediction: >>>>> ", prediction, "\n Target: >>>>> ", trainY[0:2, 1:], "\n\n")
-
- # printing average loss after every epoch
- print('Epoch [{}/{}]: loss {:.4f}'.format(epoch + 1, self.num_epochs, total_loss / n_iter))
-
-
- if __name__ == '__main__':
- unittest.main()
|