Generation of images using generative adversarial neural networks (GAN) using ECG images as an example.
To create images with GAN, I will use Tensorflow.
A generative adversarial network (GAN) is a machine learning (ML) model in which two neural networks compete with each other to be more accurate in their predictions.
How GANs work?
The first step in establishing a GAN is to identify the desired end result and collect an initial training data set based on those parameters. This data is then randomized and fed into the generator until it reaches basic accuracy in producing results.
After this, the generated images are fed into the discriminator along with actual data points from the original concept. The discriminator filters the information and returns a probability between 0 and 1 to represent the authenticity of each image (1 maps to real and 0 maps to false). These values are then manually checked for success and repeated until the desired result is reached.
Why generate an ECG image?
I have created a coronarography.ai project. In it, an ECG image is fed to the input, and at the output we get the presence of a pathology of the main arteries of the heart. It became interesting for me to check the possibility of generating ECG images in principle and to compare the obtained images with real ones.
In the future, I was able to simultaneously generate synthetic tabular data and ECG images with the same flow dependence. (I will tell you in the next article).
Used Libraries
import pandas as pd
import glob
import imageio
import matplotlib.pyplot as plt
import numpy as np
import os
import PIL
from PIL import Image
from tensorflow.keras import layers
import time
import tensorflow as tf
from IPython import display
import seaborn as sns
import matplotlib.pyplot as plt
import matplotlib.mlab as mlab
import matplotlib
from matplotlib.pyplot import figure
from sklearn.preprocessing import MinMaxScaler
import joblib
import tensorflow as tf
from tensorflow.keras.layers import *
from tensorflow.keras.models import Model
print(tf.__version__)
%matplotlib inline
matplotlib.rcParams['figure.figsize'] = (18,10)
import moviepy.editor as mpe
Loading and preparing a dataset
Here we are loading ECG images from a folder into an array. We convert to a single-channel (black and white) image, we normalize the image.
data_image = []
for k in os.listdir('../AI_coronarography/DATA_WORK/DATA_WORK/ЭКГ'):
if k.endswith('.jpg'):
img = Image.open('../AI_coronarography/DATA_WORK/DATA_WORK/ЭКГ/'+k)
img = img.convert('L')
img = img.resize((800, 800))
data_image += [(np.array(img) - 127.5) / 127.5]
An example of a loaded ECG image.
Determine the batch size and mix the images.
train_images = np.array(data_image).reshape(np.array(data_image).shape[0], 800, 800, 1).astype('float32')
BUFFER_SIZE = 100
BATCH_SIZE = 10
train_dataset = tf.data.Dataset.from_tensor_slices(train_images).shuffle(BUFFER_SIZE).batch(BATCH_SIZE)
We define the structure of the generator.
def make_generator_model():
input_1 = Input(shape=(100, ), name = "Input_image")
x = Dense(25*25*256, use_bias=False)(input_1)
x = BatchNormalization()(x)
x = LeakyReLU()(x)
x = Reshape((25, 25, 256))(x)
x = Conv2DTranspose(256, (1, 1), strides=(1, 1), padding='same', use_bias=False)(x)
x = BatchNormalization()(x)
x = LeakyReLU()(x)
x = Conv2DTranspose(128, (3, 3), strides=(2, 2), padding='same', use_bias=False)(x)
x = BatchNormalization()(x)
x = LeakyReLU()(x)
x = Conv2DTranspose(64, (5, 5), strides=(2, 2), padding='same', use_bias=False)(x)
x = BatchNormalization()(x)
x = LeakyReLU()(x)
x = Conv2DTranspose(32, (7, 7), strides=(2, 2), padding='same', use_bias=False)(x)
x = BatchNormalization()(x)
x = LeakyReLU()(x)
x = Conv2DTranspose(16, (9, 9), strides=(2, 2), padding='same', use_bias=False)(x)
x = BatchNormalization()(x)
x = LeakyReLU()(x)
x = Conv2DTranspose(1, (11, 11), strides=(2, 2), padding='same', use_bias=False, activation='tanh')(x)
model = Model(inputs=input_1, outputs=x)
return model
Let’s check the untrained generator.
generator = make_generator_model()
noise = tf.random.normal([1, 100])
generated_image = generator(noise, training=False)
plt.imshow(generated_image[0, :, :, 0], cmap='gray');
Let’s create a discriminator.
def make_discriminator_model():
input_1 = Input(shape=(800, 800, 1), name = "Input_image")
x = Conv2D(16, (11, 11), strides=(2, 2), padding='same')(input_1)
x = LeakyReLU()(x)
x = Dropout(0.3)(x)
x = Conv2D(32, (9, 9), strides=(2, 2), padding='same')(x)
x = LeakyReLU()(x)
x = Dropout(0.3)(x)
x = Conv2D(64, (7, 7), strides=(2, 2), padding='same')(x)
x = LeakyReLU()(x)
x = Dropout(0.3)(x)
x = Conv2D(128, (5, 5), strides=(2, 2), padding='same')(x)
x = LeakyReLU()(x)
x = Dropout(0.3)(x)
x = Conv2D(256, (3, 3), strides=(2, 2), padding='same')(x)
x = LeakyReLU()(x)
x = Dropout(0.3)(x)
x = Conv2D(256, (1, 1), strides=(1, 1), padding='same')(x)
x = LeakyReLU()(x)
x = Dropout(0.3)(x)
x = Flatten()(x)
x = Dense(1)(x)
model = Model(inputs=input_1, outputs=x)
return model
We use the not yet trained discriminator to classify the generated images as real or fake. The model will be trained to output positive values for real images and negative values for fake images.
discriminator = make_discriminator_model()
decision = discriminator(generated_image)
print (decision)
#result
tf.Tensor([[1.6453338e-05]], shape=(1, 1), dtype=float32)
Let us define the loss functions and optimizers for both models.
Loss of discriminator.
This method quantifies how well the discriminator is able to distinguish real images from fakes. It compares the discriminator predictions for real images to array 1, and the discriminator predictions for fake (generated) images to array 0.
Generator losses.
The loss of a generator quantifies how well it was able to fool the discriminator. Intuitively, if the generator works well, the discriminator classifies the fake images as real (or 1).
The discriminator and generator optimizers are different because you will be training the two networks separately.
cross_entropy = tf.keras.losses.BinaryCrossentropy(from_logits=True)
def discriminator_loss(real_output, fake_output):
real_loss = cross_entropy(tf.ones_like(real_output), real_output)
fake_loss = cross_entropy(tf.zeros_like(fake_output), fake_output)
total_loss = real_loss + fake_loss
return total_loss
def generator_loss(fake_output):
return cross_entropy(tf.ones_like(fake_output), fake_output)
generator_optimizer = tf.keras.optimizers.Adam(1e-4)
discriminator_optimizer = tf.keras.optimizers.Adam(1e-4)
Save checkpoints
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")
checkpoint = tf.train.Checkpoint(generator_optimizer=generator_optimizer,
discriminator_optimizer=discriminator_optimizer,
generator=generator,
discriminator=discriminator)
Defining a learning cycle
The learning cycle begins with the generator receiving a random seed as input. This number is used to create the image. The discriminator is then used to classify real images (extracted from the training set) and fake images (generated by the generator). The loss is calculated for each of these models, and the gradients are used to update the generator and discriminator.
EPOCHS = 6000
noise_dim = 100
num_examples_to_generate = 1
seed = tf.random.normal([num_examples_to_generate, noise_dim])
@tf.function
def train_step(images):
noise = tf.random.normal([BATCH_SIZE, noise_dim])
with tf.GradientTape() as gen_tape, tf.GradientTape() as disc_tape:
generated_images = generator(noise, training=True)
real_output = discriminator(images, training=True)
fake_output = discriminator(generated_images, training=True)
gen_loss = generator_loss(fake_output)
disc_loss = discriminator_loss(real_output, fake_output)
gradients_of_generator = gen_tape.gradient(gen_loss, generator.trainable_variables)
gradients_of_discriminator = disc_tape.gradient(disc_loss, discriminator.trainable_variables)
generator_optimizer.apply_gradients(zip(gradients_of_generator, generator.trainable_variables))
discriminator_optimizer.apply_gradients(zip(gradients_of_discriminator, discriminator.trainable_variables))
def train(dataset, epochs):
for epoch in range(epochs):
start = time.time()
for image_batch in dataset:
train_step(image_batch)
display.clear_output(wait=True)
generate_and_save_images(generator,
epoch + 1,
seed)
if (epoch + 1) % 500 == 0:
checkpoint.save(file_prefix = checkpoint_prefix)
print ('Time for epoch {} is {} sec'.format(epoch + 1, time.time()-start))
display.clear_output(wait=True)
generate_and_save_images(generator,
epochs,
seed)
Create and save images
def generate_and_save_images(model, epoch, test_input):
predictions = model(test_input, training=False)
fig = plt.figure(figsize=(12, 12))
plt.imshow(predictions[0, :, :, 0] * 127.5 + 127.5, cmap='gray')
plt.axis('off')
plt.savefig('image_per_epoch/image_at_epoch_{:04d}.png'.format(epoch), bbox_inches='tight', pad_inches=0)
plt.show()
Train Model
Call the train() method defined above to train the generator and discriminator at the same time. It is important that the generator and discriminator do not overwhelm each other (for example, that they learn at the same rate).
At the beginning of training, the generated images look like random noise. As you learn, the generated ECG images will look more and more real.
train(train_dataset, EPOCHS)
Video training neural network generating ECG images.
Comparison of real and generated ECG images.