diff --git a/recognition/45853757-VQVAE/README.MD b/recognition/45853757-VQVAE/README.MD new file mode 100644 index 0000000000..c597b0c3c1 --- /dev/null +++ b/recognition/45853757-VQVAE/README.MD @@ -0,0 +1,8 @@ +# Implementing a VQ-VAE for the OASIS dataset +VQ-VAEs are a variety of variational autoencoders (hence VAE), which take on the ideas of vector quantisation (VQ) to improve their effectiveness. + +My VQVAE model was constructed based on the implementation from Keras at: https://keras.io/examples/generative/vq_vae/ + +The OASIS dataset was downloaded through blackboard and preprocessing it included normalising the images. + +I did not get very conclusive results as my GPU cannot handle this many data samples and I could not finid an adequate solution. diff --git a/recognition/45853757-VQVAE/dataset.py b/recognition/45853757-VQVAE/dataset.py new file mode 100644 index 0000000000..47f220926e --- /dev/null +++ b/recognition/45853757-VQVAE/dataset.py @@ -0,0 +1,62 @@ +import os +import numpy as np +from tensorflow.keras.utils import load_img, img_to_array + + +def preprocess_data(training_data, validation_data, testing_data): + """ + Normalises each data set and finds the variance of training data + + Parameters: + training_data (list): list of arrays representing the images to train the model on + validation_data (list): list of arrays representing the images to validate the model on + testing_data (list): list of arrays representing the images to test the model on + + Returns: + training_data (list): normalised list of arrays + validation_data (list): normalised list of arrays + testing_data (list): normalised list of arrays + variance (float): variance of the training data + """ + training_data = np.array(training_data) + training_data = training_data.astype('float16') / 255. + + validation_data = np.array(validation_data) + validation_data = validation_data.astype('float16') / 255. + + testing_data = np.array(testing_data) + testing_data = testing_data.astype('float16') / 255. + + variance = np.var(training_data / 255.) + + return training_data, validation_data, testing_data, variance + + +def load_data(): + """ + Loads the data to be used for training, validating and testing the model. + + Params: None + + Returns: + Three normalised data sets of images for training, + validation and testing and the variance of the training dataset. + """ + # Initialise three empty lists for our data to be stored appropriately + training_data = [] + validation_data = [] + testing_data = [] + + # Create list pairs of the directories for the files with the images, and + # which list they should be sorted into + location_and_data_category = [["D:/keras_png_slices_data/keras_png_slices_train", \ + training_data], ["D:/keras_png_slices_data/keras_png_slices_vaildate", \ + validation_data], ["D:/keras_png_slices_data/keras_png_slices_test", \ + testing_data]] + + # Find and store each image in each file into the correct list + for dataset in location_and_data_category: + for file_name in os.listdir(dataset[0]): + dataset[1].append(img_to_array(load_img(os.path.join(dataset[0], file_name), color_mode="grayscale"))) + + return preprocess_data(training_data, validation_data, testing_data) diff --git a/recognition/45853757-VQVAE/modules.py b/recognition/45853757-VQVAE/modules.py new file mode 100644 index 0000000000..b8b2b16206 --- /dev/null +++ b/recognition/45853757-VQVAE/modules.py @@ -0,0 +1,118 @@ +import tensorflow as tf +from tensorflow import keras +from tensorflow.keras import layers, models +import numpy as np + +length = 256 +depth = 16 +kernel = 3 + +def create_encoder(latent_dim=16): + """ Create a simple encoder sequential layer """ + encoder = tf.keras.Sequential(name="encoder") + encoder.add(layers.Conv2D(depth, kernel, activation="relu", strides=2, padding="same", input_shape=(length, length, 1))) + encoder.add(layers.Conv2D(depth*2, kernel, activation="relu", strides=2, padding="same")) + encoder.add(layers.Conv2D(depth*4, kernel, activation="relu", strides=2, padding="same")) + encoder.add(layers.Conv2D(depth*8, kernel, activation="relu", strides=2, padding="same")) + encoder.add(layers.Conv2D(latent_dim, 1, padding="same")) + return encoder + +def create_decoder(): + """ Create a simple decoder sequential layer """ + decoder = tf.keras.Sequential(name="decoder") + decoder.add(layers.Conv2D(depth*8, kernel, activation="relu", strides=2, padding="same")) + decoder.add(layers.Conv2D(depth*4, kernel, activation="relu", strides=2, padding="same")) + decoder.add(layers.Conv2D(depth*2, kernel, activation="relu", strides=2, padding="same")) + decoder.add(layers.Conv2D(depth, kernel, activation="relu", strides=2, padding="same")) + decoder.add(layers.Conv2D(1, kernel, padding="same")) + return decoder + + +class VQLayer(layers.Layer): + def __init__(self, n_embeddings, embedding_dim, beta=0.25, **kwargs): + super().__init__(**kwargs) + self.embedding_dim = embedding_dim + self.n_embeddings = n_embeddings + self.beta = beta + + # Initialise embeddings + w_init = tf.random_uniform_initializer() + self.embeddings = tf.Variable( + initial_value=w_init(shape=(self.embedding_dim, self.n_embeddings), + dtype="float32"), trainable=True, name="vqvae_embeddings" + ) + + def call(self, x): + # Calc then flatten inputs, not incl embedding dimension + input_shape = tf.shape(x) + flattened = tf.reshape(x, [-1, self.embedding_dim]) + + # Perform quantisation then reshape quantised values to orig shape + encoding_indices = self.get_code_indices(flattened) + encodings = tf.one_hot(encoding_indices, self.n_embeddings) + quantised = tf.matmul(encodings, self.embeddings, transponse_b=True) + quantised = tf.reshape(quantised, input_shape) + + # Vector quntisation loss is added to the layer + commitment_loss = tf.reduce_mean((tf.stop_gradient(quantised) - x) ** 2) + codebook_loss = tf.reduce_mean((quantised - tf.stop_gradient(x)) ** 2) + self.add_loss(self.beta * commitment_loss - codebook_loss) + + # Straight-through estimator + return x + tf.stop_gradient(quantised - x) + + def get_code_indices(self, flattened_inputs): + # Calc L2-normalised dist between inputs and codes + similarity = tf.matmul(flattened_inputs, self.embeddings) + distances = ( + tf.reduce_sum(flattened_inputs ** 2, axis=1, keepdims=True) + + tf.reduce_sum(self.embeddings ** 2, axis=0) - 2 * similarity + ) + + # Derive for min distances + encoding_indices = tf.argmin(distances, axis=1) + return encoding_indices + + +class VQVAEModel(tf.keras.Sequential): + def __init__(self, variance, latent_dim, n_embeddings, **kwargs): + super(VQVAEModel, self).__init__(**kwargs) + self.variance = variance + self.latent_dim = latent_dim + self.n_embeddings = n_embeddings + + # Build our model + self.add(VQLayer(n_embeddings, latent_dim, name="vector quantiser")) + self.add(create_encoder(latent_dim)) + self.add(create_decoder()) + + # Measure our losses + self.total_loss_tracker = keras.metrics.Mean(name="total_loss") + self.reconstruction_loss_tracker = keras.metrics.Mean(name="recontruction_loss") + self.vq_loss_tracker = keras.metrics.Mean(name="vq_loss") + + @property + def metrics(self): + return [self.total_loss_tracker, self.reconstruction_loss_tracker, self.vq_loss_tracker] + + def train_step(self, x): + # Calculates the losses from the VQ-VAE + with tf.GradientTape() as tape: + reconstructions = self.call(x) + reconstruction_loss = (tf.reduce_mean((x - reconstructions) ** 2) / self.variance) + total_loss = reconstruction_loss + sum(self.losses) + + # Backpropagates our values + grads = tape.gradient(total_loss, self.trainable_variables) + self.optimizer.apply_gradients(zip(grads, self.trainable_variables)) + + # Implement loss tracking + self.total_loss_tracker.update_state(total_loss) + self.reconstruction_loss_tracker.update_state(reconstruction_loss) + self.vq_loss_tracker.update_state(sum(self.losses)) + + return { + "loss": self.total_loss_tracker.result(), + "reconstruction_loss": self.reconstruction_loss_tracker.result(), + "vqvae_loss": self.vq_loss_tracker.result() + } diff --git a/recognition/45853757-VQVAE/predict.py b/recognition/45853757-VQVAE/predict.py new file mode 100644 index 0000000000..c884643213 --- /dev/null +++ b/recognition/45853757-VQVAE/predict.py @@ -0,0 +1,33 @@ +from dataset import * +from modules import * +from train import * +import tensorflow as tf +import matplotlib.pyplot as plt + +def calculate_ssim(original_data, predicted_data): + """ Calculates and the average of the SSIM of all images in the two sets of data as a percentage. """ + ssim = tf.image.ssim(original_data, predicted_data, max_val=1) + print("SSIM of data sets:", ssim) + +def compare_predicted(original_data, predicted_data, index): + """ Plots the original and predicted image at the given index """ + fig = plt.figure() + ax = fig.add_subplot(1, 2, 1) + imgplot = plt.imshow(original_data[index]) + ax.set_title("Original Image") + + ax = fig.add_subplot(1, 2, 2) + imgplot = plt.imshow(predicted_data[index]) + ax.set_title("Reconstructed Image") + fig.show() + +def plot_loss(model): + return None + +training_data, validation_data, testing_data, data_variance = load_data() +model = train_vqvae() + +predictions = model.predict(testing_data) + +calculate_ssim(testing_data, predictions) +compare_predicted(testing_data, predictions, 8) \ No newline at end of file diff --git a/recognition/45853757-VQVAE/train.py b/recognition/45853757-VQVAE/train.py new file mode 100644 index 0000000000..2764910240 --- /dev/null +++ b/recognition/45853757-VQVAE/train.py @@ -0,0 +1,17 @@ +import tensorflow as tf +from modules import * +from dataset import * + +batch_size = 64 +epochs = 100 + +def train_vqvae(): + # Load our data + training_data, validation_data, testing_data, data_variance = load_data() + + # Construct and train our model + vqvae_model = VQVAEModel(variance=data_variance, latent_dim=16, n_embeddings=128) + vqvae_model.compile(optimizer=keras.optimizers.Adam()) + print(vqvae_model.summary()) + + return vqvae_model.fit(training_data, training_data, validation_data=(validation_data, validation_data), epochs=epochs, batch_size=batch_size)