import cv2
import imageio
import os
from tensorflow.keras.layers import Conv2D, Dropout, BatchNormalization
from tensorflow.keras.layers import LeakyReLU, Conv2DTranspose, Dense
from tensorflow.keras.layers import Reshape, Flatten
from tensorflow.keras import Model
import numpy as np
from ..datasets.load_cifar10 import load_cifar10_AE
from ..datasets.load_mnist import load_mnist_AE
from ..datasets.load_custom_data import load_custom_data_AE
from ..losses.mse_loss import mse_loss
import datetime
import tensorflow as tf
from tqdm.auto import tqdm
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "2"
### Silence Imageio warnings
def silence_imageio_warning(*args, **kwargs):
pass
imageio.core.util._precision_warn = silence_imageio_warning
__all__ = ["VanillaAutoencoder"]
[docs]class VanillaAutoencoder:
r"""Vanilla Autoencoder model
Args:
interm_dim (int, optional): represents the dimension of the bottleneck layer. Defaults to ``64``
enc_units (int, list, optional): represents the number of units/neurons in the encoder part of the network. Defaults to ``[256, 128]``
dec_units (int, list, optional): represents the number of units/neurons in the decoder part of the network. Defaults to ``[128, 256]```
activation (str, optional): type of non-linearity to be applied. Defaults to ``relu``
kernel_initializer (str, optional): initialization of kernel weights. Defaults to ``glorot_uniform``
kernel_regularizer (str, optional): type of regularization to be applied to the weights. Defaults to ``None``
"""
def __init__(
self,
interm_dim=64,
enc_units=[256, 128],
dec_units=[128, 256],
activation="relu",
kernel_initializer="glorot_uniform",
kernel_regularizer=None,
):
self.model = tf.keras.Sequential()
self.image_size = None
self.config = locals()
[docs] def load_data(
self,
data_dir=None,
use_mnist=False,
use_cifar10=False,
batch_size=32,
img_shape=(64, 64),
):
r"""Load data to train the model
Args:
data_dir (str, optional): string representing the directory to load data from. Defaults to ``None``
use_mnist (bool, optional): use the MNIST dataset to train the model. Defaults to ``False``
use_cifar10 (bool, optional): use the CIFAR10 dataset to train the model. Defaults to ``False``
batch_size (int, optional): mini batch size for training the model. Defaults to ``32``
img_shape (int, tuple, optional): shape of the image when loading data from custom directory. Defaults to ``(64, 64)``
Return:
two tensorflow dataset objects representing the train and test datset
"""
if use_mnist:
train_data, test_data = load_mnist_AE()
elif use_cifar10:
train_data, test_data = load_cifar10_AE()
else:
train_data, test_data = load_custom_data_AE(data_dir, img_shape)
self.image_size = train_data.shape[1:]
train_data = (
train_data.reshape(
(-1, self.image_size[0] * self.image_size[1] * self.image_size[2])
)
/ 255
)
train_ds = (
tf.data.Dataset.from_tensor_slices(train_data).shuffle(10000).batch(batch_size)
)
test_data = (
test_data.reshape(
(-1, self.image_size[0] * self.image_size[1] * self.image_size[2])
)
/ 255
)
test_ds = (
tf.data.Dataset.from_tensor_slices(test_data).shuffle(10000).batch(batch_size)
)
return train_ds, test_ds
[docs] def get_sample(self, data=None, n_samples=1, save_dir=None):
r"""View sample of the data
Args:
data (tf.data object): dataset to load samples from
n_samples (int, optional): number of samples to load. Defaults to ``1``
save_dir (str, optional): directory to save the sample images. Defaults to ``None``
Return:
``None`` if save_dir is ``not None``, otherwise returns numpy array of samples with shape (n_samples, img_shape)
"""
assert data is not None, "Data not provided"
sample_images = []
data = data.unbatch()
for img in data.take(n_samples):
img = img.numpy()
img = img.reshape((self.image_size[0], self.image_size[1], self.image_size[2]))
sample_images.append(img)
sample_images = np.array(sample_images)
if save_dir is None:
return sample_images
assert os.path.exists(save_dir), "Directory does not exist"
for i, sample in enumerate(sample_images):
imageio.imwrite(os.path.join(save_dir, "sample_" + str(i) + ".jpg"), sample)
def encoder(self, config):
enc_units = config["enc_units"]
encoder_layers = len(enc_units)
interm_dim = config["interm_dim"]
activation = config["activation"]
kernel_initializer = config["kernel_initializer"]
kernel_regularizer = config["kernel_regularizer"]
model = tf.keras.Sequential()
model.add(
Dense(
enc_units[0] * 2,
activation=activation,
kernel_initializer=kernel_initializer,
kernel_regularizer=kernel_regularizer,
input_dim=self.image_size[0] * self.image_size[1] * self.image_size[2],
)
)
for i in range(encoder_layers):
model.add(
Dense(
enc_units[i],
activation=activation,
kernel_initializer=kernel_initializer,
kernel_regularizer=kernel_regularizer,
)
)
model.add(Dense(interm_dim, activation="sigmoid"))
return model
def decoder(self, config):
dec_units = config["dec_units"]
decoder_layers = len(dec_units)
interm_dim = config["interm_dim"]
activation = config["activation"]
kernel_initializer = config["kernel_initializer"]
kernel_regularizer = config["kernel_regularizer"]
model = tf.keras.Sequential()
model.add(
Dense(
dec_units[0] // 2,
activation=activation,
kernel_initializer=kernel_initializer,
kernel_regularizer=kernel_regularizer,
input_dim=interm_dim,
)
)
for i in range(decoder_layers):
model.add(
Dense(
dec_units[i],
activation=activation,
kernel_initializer=kernel_initializer,
kernel_regularizer=kernel_regularizer,
)
)
model.add(
Dense(
self.image_size[0] * self.image_size[1] * self.image_size[2],
activation="sigmoid",
)
)
return model
def __load_model(self):
self.model.add(self.encoder(self.config))
self.model.add(self.decoder(self.config))
[docs] def fit(
self,
train_ds=None,
epochs=100,
optimizer="Adam",
verbose=1,
learning_rate=0.001,
tensorboard=False,
save_model=None,
):
r"""Function to train the model
Args:
train_ds (tf.data object): training data
epochs (int, optional): number of epochs to train the model. Defaults to ``100``
optimizer (str, optional): optimizer used to train the model. Defaults to ``Adam``
verbose (int, optional): 1 - prints training outputs, 0 - no outputs. Defaults to ``1``
learning_rate (float, optional): learning rate of the optimizer. Defaults to ``0.001``
tensorboard (bool, optional): if true, writes loss values to ``logs/gradient_tape`` directory
which aids visualization. Defaults to ``False``
save_model (str, optional): Directory to save the trained model. Defaults to ``None``
"""
assert train_ds is not None, "Initialize training data through train_ds parameter"
self.__load_model()
kwargs = {}
kwargs["learning_rate"] = learning_rate
optimizer = getattr(tf.keras.optimizers, optimizer)(**kwargs)
if tensorboard:
current_time = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
train_log_dir = "logs/gradient_tape/" + current_time + "/train"
train_summary_writer = tf.summary.create_file_writer(train_log_dir)
steps = 0
train_loss = tf.keras.metrics.Mean()
try:
total = tf.data.experimental.cardinality(train_ds).numpy()
except:
total = 0
for epoch in range(epochs):
train_loss.reset_states()
pbar = tqdm(total=total, desc="Epoch - " + str(epoch + 1))
for data in train_ds:
with tf.GradientTape() as tape:
recon_data = self.model(data)
loss = mse_loss(data, recon_data)
gradients = tape.gradient(loss, self.model.trainable_variables)
optimizer.apply_gradients(zip(gradients, self.model.trainable_variables))
train_loss(loss)
steps += 1
pbar.update(1)
if tensorboard:
with train_summary_writer.as_default():
tf.summary.scalar("loss", loss.numpy(), step=steps)
pbar.close()
del pbar
if verbose == 1:
print("Epoch:", epoch + 1, "reconstruction loss:", train_loss.result().numpy())
if save_model is not None:
assert isinstance(save_model, str), "Not a valid directory"
if save_model[-1] != "/":
self.model.save_weights(save_model + "/vanilla_autoencoder_checkpoint")
else:
self.model.save_weights(save_model + "vanilla_autoencoder_checkpoint")
[docs] def generate_samples(self, test_ds=None, save_dir=None):
r"""Generate samples using the trained model
Args:
test_ds (tf.data object): test data object used to generate samples
save_dir (str, optional): directory to save the generated images. Defaults to ``None``
Return:
returns ``None`` if save_dir is ``not None``, otherwise returns a numpy array with generated samples
"""
assert test_ds is not None, "Enter input test dataset"
generated_samples = np.array([])
for i, data in enumerate(test_ds):
gen_sample = self.model(data, training=False)
gen_sample = gen_sample.numpy()
if i == 0:
generated_samples = gen_sample
else:
generated_samples = np.concatenate((generated_samples, gen_sample), 0)
generated_samples = generated_samples.reshape(
(-1, self.image_size[0], self.image_size[1], self.image_size[2])
)
if save_dir is None:
return generated_samples
assert os.path.exists(save_dir), "Directory does not exist"
for i, sample in enumerate(generated_samples):
imageio.imwrite(os.path.join(save_dir, "sample_" + str(i) + ".jpg"), sample)