Source code for simplegan.autoencoder.vae

import cv2
import os
from tensorflow.keras.layers import Dropout, BatchNormalization
from tensorflow.keras.layers import Lambda, Dense, Reshape, Input
from tensorflow.keras import Model
import imageio
import numpy as np
from ..datasets.load_cifar10 import load_cifar10_AE
from ..datasets.load_mnist import load_mnist_AE
from ..datasets.load_custom_data import load_custom_data_AE
from ..losses.mse_loss import mse_loss
import tensorflow as tf
import datetime
from tqdm.auto import tqdm

os.environ["TF_CPP_MIN_LOG_LEVEL"] = "2"

### Silence Imageio warnings
def silence_imageio_warning(*args, **kwargs):
    pass


imageio.core.util._precision_warn = silence_imageio_warning

"""
References: 
-> https://arxiv.org/abs/1312.6114
-> https://github.com/keras-team/keras/blob/master/examples/variational_autoencoder.py
"""

__all__ = ["VAE"]


[docs]class VAE: r"""`Variational Autoencoder <https://arxiv.org/abs/1312.6114>`_ model Args: interm_dim (int, optional): represents the dimension of the bottleneck layer. Defaults to ``256`` latent_dim (int, optional): represents the dimension of the distribution to sample from. Defaults to ``32`` enc_units (int, list, optional): represents the number of units/neurons in the encoder part of the network. Defaults to ``[256, 128]`` dec_units (int, list, optional): represents the number of units/neurons in the decoder part of the network. Defaults to ``[128, 256]``` activation (str, optional): type of non-linearity to be applied. Defaults to ``relu`` kernel_initializer (str, optional): initialization of kernel weights. Defaults to ``glorot_uniform`` kernel_regularizer (str, optional): type of regularization to be applied to the weights. Defaults to ``None`` """ def __init__( self, interm_dim=256, latent_dim=32, enc_units=[256, 128], dec_units=[128, 256], activation="relu", kernel_initializer="glorot_uniform", kernel_regularizer=None, ): self.model = None self.image_size = None self.config = locals()
[docs] def load_data( self, data_dir=None, use_mnist=False, use_cifar10=False, batch_size=32, img_shape=(64, 64), ): r"""Load data to train the model Args: data_dir (str, optional): string representing the directory to load data from. Defaults to ``None`` use_mnist (bool, optional): use the MNIST dataset to train the model. Defaults to ``False`` use_cifar10 (bool, optional): use the CIFAR10 dataset to train the model. Defaults to ``False`` batch_size (int, optional): mini batch size for training the model. Defaults to ``32`` img_shape (int, tuple, optional): shape of the image when loading data from custom directory. Defaults to ``(64, 64)`` Return: two tensorflow dataset objects representing the train and test datset """ if use_mnist: train_data, test_data = load_mnist_AE() elif use_cifar10: train_data, test_data = load_cifar10_AE() else: train_data, test_data = load_custom_data_AE(data_dir, img_shape) self.image_size = train_data.shape[1:] train_data = ( train_data.reshape( (-1, self.image_size[0] * self.image_size[1] * self.image_size[2]) ) / 255 ) train_ds = ( tf.data.Dataset.from_tensor_slices(train_data).shuffle(10000).batch(batch_size) ) test_data = ( test_data.reshape( (-1, self.image_size[0] * self.image_size[1] * self.image_size[2]) ) / 255 ) test_ds = ( tf.data.Dataset.from_tensor_slices(test_data).shuffle(10000).batch(batch_size) ) return train_ds, test_ds
[docs] def get_sample(self, data=None, n_samples=1, save_dir=None): r"""View sample of the data Args: data (tf.data object): dataset to load samples from n_samples (int, optional): number of samples to load. Defaults to ``1`` save_dir (str, optional): directory to save the sample images. Defaults to ``None`` Return: ``None`` if save_dir is ``not None``, otherwise returns numpy array of samples with shape (n_samples, img_shape) """ assert data is not None, "Data not provided" sample_images = [] data = data.unbatch() for img in data.take(n_samples): img = img.numpy() img = img.reshape((self.image_size[0], self.image_size[1], self.image_size[2])) sample_images.append(img) sample_images = np.array(sample_images) if save_dir is None: return sample_images assert os.path.exists(save_dir), "Directory does not exist" for i, sample in enumerate(sample_images): imageio.imwrite(os.path.join(save_dir, "sample_" + str(i) + ".jpg"), sample)
def sampling(self, distribution): z_mean = distribution[0] z_var = distribution[1] batch = tf.keras.backend.shape(z_mean)[0] dim = tf.keras.backend.int_shape(z_mean)[1] epsilon = tf.keras.backend.random_normal((batch, dim)) return z_mean + tf.keras.backend.exp(0.5 * z_var) * epsilon """ encoder and decoder layers for custom dataset can be reimplemented by inherting this class(vae) """ def vae(self, config): enc_units = config["enc_units"] encoder_layers = len(enc_units) dec_units = config["dec_units"] decoder_layers = len(dec_units) interm_dim = config["interm_dim"] latent_dim = config["latent_dim"] activation = config["activation"] kernel_initializer = config["kernel_initializer"] kernel_regularizer = config["kernel_regularizer"] org_inputs = Input(shape=self.image_size[0] * self.image_size[1] * self.image_size[2]) x = Dense( enc_units[0] * 2, activation=activation, kernel_initializer=kernel_initializer, kernel_regularizer=kernel_regularizer, )(org_inputs) for i in range(encoder_layers): x = Dense( enc_units[i], activation=activation, kernel_initializer=kernel_initializer, kernel_regularizer=kernel_regularizer, )(x) x = Dense( interm_dim, activation=activation, kernel_initializer=kernel_initializer, kernel_regularizer=kernel_regularizer, )(x) z_mean = Dense(latent_dim)(x) z_var = Dense(latent_dim)(x) # Sampling from intermediate dimensiont to get a probability density z = Lambda(self.sampling, output_shape=(latent_dim,))([z_mean, z_var]) # Encoder model enc_model = Model(org_inputs, [z_mean, z_var]) latent_inputs = Input(shape=(latent_dim,)) outputs = Dense( dec_units[0] // 2, activation=activation, kernel_initializer=kernel_initializer, kernel_regularizer=kernel_regularizer, )(latent_inputs) for i in range(decoder_layers): outputs = Dense( dec_units[i], activation=activation, kernel_initializer=kernel_initializer, kernel_regularizer=kernel_regularizer, )(outputs) final_outputs = Dense( self.image_size[0] * self.image_size[1] * self.image_size[2], activation="sigmoid" )(outputs) # Decoder model dec_model = Model(latent_inputs, final_outputs) out = dec_model(z) model = Model(org_inputs, out) kl_loss = -0.5 * tf.math.reduce_mean( z_var - tf.math.square(z_mean) - tf.math.exp(z_var) + 1 ) model.add_loss(kl_loss) return model """ call build_model to intialize the layers before you train the model """ def __load_model(self): self.model = self.vae(self.config)
[docs] def fit( self, train_ds=None, epochs=100, optimizer="Adam", verbose=1, learning_rate=0.001, tensorboard=False, save_model=None, ): r"""Function to train the model Args: train_ds (tf.data object): training data epochs (int, optional): number of epochs to train the model. Defaults to ``100`` optimizer (str, optional): optimizer used to train the model. Defaults to ``Adam`` verbose (int, optional): 1 - prints training outputs, 0 - no outputs. Defaults to ``1`` learning_rate (float, optional): learning rate of the optimizer. Defaults to ``0.001`` tensorboard (bool, optional): if true, writes loss values to ``logs/gradient_tape`` directory which aids visualization. Defaults to ``False`` save_model (str, optional): Directory to save the trained model. Defaults to ``None`` """ assert train_ds is not None, "Initialize training data through train_ds parameter" self.__load_model() kwargs = {} kwargs["learning_rate"] = learning_rate optimizer = getattr(tf.keras.optimizers, optimizer)(**kwargs) if tensorboard: current_time = datetime.datetime.now().strftime("%Y%m%d-%H%M%S") train_log_dir = "logs/gradient_tape/" + current_time + "/train" train_summary_writer = tf.summary.create_file_writer(train_log_dir) steps = 0 train_loss = tf.keras.metrics.Mean() try: total = tf.data.experimental.cardinality(train_ds).numpy() except: total = 0 for epoch in range(epochs): train_loss.reset_states() pbar = tqdm(total=total, desc="Epoch - " + str(epoch + 1)) for data in train_ds: with tf.GradientTape() as tape: data_recon = self.model(data) loss = mse_loss(data, data_recon) gradients = tape.gradient(loss, self.model.trainable_variables) optimizer.apply_gradients(zip(gradients, self.model.trainable_variables)) train_loss(loss) steps += 1 pbar.update(1) if tensorboard: with train_summary_writer.as_default(): tf.summary.scalar("loss", loss.numpy(), step=steps) pbar.close() del pbar if verbose == 1: print("Epoch:", epoch + 1, "reconstruction loss:", train_loss.result().numpy()) if save_model is not None: assert isinstance(save_model, str), "Not a valid directory" if save_model[-1] != "/": self.model.save_weights(save_model + "/variational_autoencoder_checkpoint") else: self.model.save_weights(save_model + "variational_autoencoder_checkpoint")
[docs] def generate_samples(self, test_ds=None, save_dir=None): r"""Generate samples using the trained model Args: test_ds (tf.data object): test data object used to generate samples save_dir (str, optional): directory to save the generated images. Defaults to ``None`` Return: returns ``None`` if save_dir is ``not None``, otherwise returns a numpy array with generated samples """ assert test_ds is not None, "Enter input test dataset" generated_samples = np.array([]) for i, data in enumerate(test_ds): gen_sample = self.model(data, training=False) gen_sample = gen_sample.numpy() if i == 0: generated_samples = gen_sample else: generated_samples = np.concatenate((generated_samples, gen_sample), 0) generated_samples = generated_samples.reshape( (-1, self.image_size[0], self.image_size[1], self.image_size[2]) ) if save_dir is None: return generated_samples assert os.path.exists(save_dir), "Directory does not exist" for i, sample in enumerate(generated_samples): imageio.imwrite(os.path.join(save_dir, "sample_" + str(i) + ".jpg"), sample)