Generating images with DDPMs: A PyTorch Implementation

Brian Pulfer
15 min readJul 10, 2022

--

Introduction

Denoising Diffusion Probabilistic Models (DDPM) are deep generative models that are recently getting a lot of attention due to their impressive performances. Brand new models like OpenAI’s DALL-E 2 and Google’s Imagen generators are based on DDPMs. They condition the generator on text such that it becomes then possible to generate photo-realistic images given an arbitrary string of text.

For example, inputting “A photo of a Shiba Inu dog with a backpack riding a bike. It is wearing sunglasses and a beach hat” to the new Imagen model and “a corgi’s head depicted as an explosion of a nebula” to the DALL-E 2 model produces the following images:

Image generated by Imagen (left) and DALL-E 2 (right)

These models are simply mind-blowing, but understanding how they work requires understanding the Original work of Ho et. al. “Denoise Diffusion Probabilistic Models”.

In this brief post, I will focus on creating from scratch (in PyTorch) a simple version of DDPM. In particular, I will be re-implementing the original paper by Ho. Et al. We will work with the classical and non-resource-hungry MNIST and Fashion-MNIST datasets, and try to generate images out of thin air. Let’s start with a little bit of theory.

Denoise Diffusion Probabilistic Models

Denoise Diffusion Probabilistic Models (DDPMs) first appeared in this paper.

The idea is quite simple: given a dataset of images, we add a little bit of noise step-by-step. With each step, the image becomes less and less clear, until all that is left is noise. This is called the “forward process”. Then, we learn a machine learning model that can undo each of such steps, and we call it the “backward process”. If we can successfully learn a backward process, we have a model that can generate images from pure random noise.

The main idea of DDPM: Map images x0 to more and more noisy images with probability distribution q. Then, learn the inverse function p parametrized by parameters theta. The image is taken from “Denoising DIffusion Probabilistic Models” by Ho et. al.

A step in the forward process consists in making the input image noisier (x at step t) by sampling from a multivariate gaussian distribution which mean is a scaled-down version of the previous image (x at step t-1) and which covariance matrix is diagonal and fixed. In other words, we perturb each pixel in the image independently by adding some normally distributed value.

Forward process: we sample from a normal distribution which mean is a scaled version of the current image and which covariance matrix simply has all equal variance terms beta t.

For each step, there is a different coefficient beta that tells how much we are distorting the image in that step. The higher beta is, the more noise is added to the image. We are free to pick coefficients beta, but we should try to not have steps where too much noise is added all at once, and the overall forward process should be “smooth”. In the original work by Ho et. al., betas are put in a linear space from 0.0001 to 0.02.

A nice property of a gaussian distribution is that we can sample from it by adding to the mean vector a normally distributed noise vector scaled by the standard deviation. This results in:

Forward process but sampling is done by just adding the mean and scaling a normally distributed noise (epsilon) by the standard deviation.

We now know how to get the next sample in the forward process by just scaling what we already have and adding some scaled noise. If we now consider that the formula is recursive, we can write:

The formula of the forward process is recursive, so we can start expanding it.

If we keep doing this and do some simplifications, we can go all the way back and obtain a formula for getting the noisy sample at step t starting from the original non-noisy image x0:

The equation for the forward process that allows to directly get a desired noisy level starting from the original non-noisy image.

Great. Now no matter how many steps our forward process will have, we will always have a way to directly get the noisy image at step t directly from the original image.

For the backward process, we know our model should also work as a gaussian distribution, so we would just need the model to predict the distribution mean and standard deviation given the noisy image and time step. In practice, in this first paper on DDPMs the covariance matrix is kept fixed, so we only really want to predict the mean of the gaussian (given the noisy image and the time step we are at currently):

Backward process: we try to go back to a less noisy image (x at timestep t-1) using a gaussian distribution which mean is predicted by a model

Now, it turns out that the optimal mean value to be predicted is just a function of terms that we are already familiar with:

The optimal mean value to be predicted to reverse the noising process. Given the more noisy image at step t, we can make it less noisy by subtracting a scale of the added noise and applying a scaling afterwards.

So, we can further simplify our model and just predict the noise epsilon with a function of the noisy image and the time-step.

Our model just predicts the noise that was added, and we use this to recover a less noisy image using the information for the particular time step.

And our loss function is just going to be a scaled version of the Mean-Square Error (MSE) between the real noise that was added and the one predicted by our model

Final loss function. We minimize the MSE between the noises actually added to the images and the one predicted by the model. We do so for all images in our dataset and all time steps.

Once the model is trained (algorithm 1), we can use the denoising model to sample new images (algorithm 2).

Training and sampling algorithms. Once the model is trained, we can use it to generate brand new samples starting from gaussian noise.

Let’s get coding

Now that we have a rough understanding of how diffusion models work, it’s time to implement something of our own. You can run the following code yourself in this Google Colab Notebook or with this GitHub repository.

As usual, imports are trivially our first step.

# Import of libraries
import random
import imageio
import numpy as np
from argparse import ArgumentParser

from tqdm.auto import tqdm
import matplotlib.pyplot as plt

import einops
import torch
import torch.nn as nn
from torch.optim import Adam
from torch.utils.data import DataLoader

from torchvision.transforms import Compose, ToTensor, Lambda
from torchvision.datasets.mnist import MNIST, FashionMNIST

# Setting reproducibility
SEED = 0
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)

# Definitions
STORE_PATH_MNIST = f"ddpm_model_mnist.pt"
STORE_PATH_FASHION = f"ddpm_model_fashion.pt"

Next, we define a few parameters for our experiment. In particular, we decide if we want to run the training loop, whether we want to use the Fashion-MNIST dataset and some training hyper-parameters

no_train = False
fashion = True
batch_size = 128
n_epochs = 20
lr = 0.001
store_path = "ddpm_fashion.pt" if fashion else "ddpm_mnist.pt"

Next, we would really like to display images. Both the training images and those generated by the model are of interest to us. We write a utility function that given some images, will display a square (or as close as it gets) grid of sub-figures:

def show_images(images, title=""):
"""Shows the provided images as sub-pictures in a square"""

# Converting images to CPU numpy arrays
if type(images) is torch.Tensor:
images = images.detach().cpu().numpy()

# Defining number of rows and columns
fig = plt.figure(figsize=(8, 8))
rows = int(len(images) ** (1 / 2))
cols = round(len(images) / rows)

# Populating figure with sub-plots
idx = 0
for r in range(rows):
for c in range(cols):
fig.add_subplot(rows, cols, idx + 1)

if idx < len(images):
plt.imshow(images[idx][0], cmap="gray")
idx += 1
fig.suptitle(title, fontsize=30)

# Showing the figure
plt.show()

To test this utility function, we load our dataset and show the first batch. Important: Images must be normalized in the range [-1, 1], as our network will have to predict noise values that are normally distributed:

# Shows the first batch of images
def show_first_batch(loader):
for batch in loader:
show_images(batch[0], "Images in the first batch")
break
# Loading the data (converting each image into a tensor and normalizing between [-1, 1])
transform = Compose([
ToTensor(),
Lambda(lambda x: (x - 0.5) * 2)]
)
ds_fn = FashionMNIST if fashion else MNIST
dataset = ds_fn("./datasets", download=True, train=True, transform=transform)
loader = DataLoader(dataset, batch_size, shuffle=True)
Images in our first batch. If you kept the same randomizing seed, you should get the exact same batch.

Great! Now that we have this nice utility function, we will use it for images generated by our model later on as well. Before we start actually dealing with the DDPM model, we will just get a GPU device from colab (typically a Tesla T4 for non colab-pro users):

Getting a device and, if it is a GPU, printing its name

The DDPM Model

Now that we got the trivial stuff out of the way, it’s time to work on the DDPM. We will create a MyDDPM PyTorch module that will be responsible for storing betas and alphas values and applying the forward process. For the backward process instead, the MyDDPM module will simply rely on a network used for constructing the DDPM:

# DDPM class
class MyDDPM(nn.Module):
def __init__(self, network, n_steps=200, min_beta=10 ** -4, max_beta=0.02, device=None, image_chw=(1, 28, 28)):
super(MyDDPM, self).__init__()
self.n_steps = n_steps
self.device = device
self.image_chw = image_chw
self.network = network.to(device)
self.betas = torch.linspace(min_beta, max_beta, n_steps).to(
device) # Number of steps is typically in the order of thousands
self.alphas = 1 - self.betas
self.alpha_bars = torch.tensor([torch.prod(self.alphas[:i + 1]) for i in range(len(self.alphas))]).to(device)

def forward(self, x0, t, eta=None):
# Make input image more noisy (we can directly skip to the desired step)
n, c, h, w = x0.shape
a_bar = self.alpha_bars[t]

if eta is None:
eta = torch.randn(n, c, h, w).to(self.device)

noisy = a_bar.sqrt().reshape(n, 1, 1, 1) * x0 + (1 - a_bar).sqrt().reshape(n, 1, 1, 1) * eta
return noisy

def backward(self, x, t):
# Run each image through the network for each timestep t in the vector t.
# The network returns its estimation of the noise that was added.
return self.network(x, t)

Note that the forward process is independent of the network used to denoise, so technically we could already visualize its effect. At the same time, we can also create a utility function that applies Algorithm 2 (sampling procedure) to generate new images. We do so with two DDPM’s specific utility functions:

def show_forward(ddpm, loader, device):
# Showing the forward process
for batch in loader:
imgs = batch[0]

show_images(imgs, "Original images")

for percent in [0.25, 0.5, 0.75, 1]:
show_images(
ddpm(imgs.to(device),
[int(percent * ddpm.n_steps) - 1 for _ in range(len(imgs))]),
f"DDPM Noisy images {int(percent * 100)}%"
)
break

To generate images, we start with random noise and let t go from T back to 0. At each step, we estimate the noise as eta_theta and apply the denoising function. Finally, extra noise is added as in Langevin dynamics.

def generate_new_images(ddpm, n_samples=16, device=None, frames_per_gif=100, gif_name="sampling.gif", c=1, h=28, w=28):
"""Given a DDPM model, a number of samples to be generated and a device, returns some newly generated samples"""
frame_idxs = np.linspace(0, ddpm.n_steps, frames_per_gif).astype(np.uint)
frames = []

with torch.no_grad():
if device is None:
device = ddpm.device

# Starting from random noise
x = torch.randn(n_samples, c, h, w).to(device)

for idx, t in enumerate(list(range(ddpm.n_steps))[::-1]):
# Estimating noise to be removed
time_tensor = (torch.ones(n_samples, 1) * t).to(device).long()
eta_theta = ddpm.backward(x, time_tensor)

alpha_t = ddpm.alphas[t]
alpha_t_bar = ddpm.alpha_bars[t]

# Partially denoising the image
x = (1 / alpha_t.sqrt()) * (x - (1 - alpha_t) / (1 - alpha_t_bar).sqrt() * eta_theta)

if t > 0:
z = torch.randn(n_samples, c, h, w).to(device)

# Option 1: sigma_t squared = beta_t
beta_t = ddpm.betas[t]
sigma_t = beta_t.sqrt()

# Option 2: sigma_t squared = beta_tilda_t
# prev_alpha_t_bar = ddpm.alpha_bars[t-1] if t > 0 else ddpm.alphas[0]
# beta_tilda_t = ((1 - prev_alpha_t_bar)/(1 - alpha_t_bar)) * beta_t
# sigma_t = beta_tilda_t.sqrt()

# Adding some more noise like in Langevin Dynamics fashion
x = x + sigma_t * z

# Adding frames to the GIF
if idx in frame_idxs or t == 0:
# Putting digits in range [0, 255]
normalized = x.clone()
for i in range(len(normalized)):
normalized[i] -= torch.min(normalized[i])
normalized[i] *= 255 / torch.max(normalized[i])

# Reshaping batch (n, c, h, w) to be a (as much as it gets) square frame
frame = einops.rearrange(normalized, "(b1 b2) c h w -> (b1 h) (b2 w) c", b1=int(n_samples ** 0.5))
frame = frame.cpu().numpy().astype(np.uint8)

# Rendering frame
frames.append(frame)

# Storing the gif
with imageio.get_writer(gif_name, mode="I") as writer:
for idx, frame in enumerate(frames):
writer.append_data(frame)
if idx == len(frames) - 1:
for _ in range(frames_per_gif // 3):
writer.append_data(frames[-1])
return x

Everything that concerns DDPM is on the table now. We simply need to define the model that will actually do the job of predicting the noise in the image given the image and the current time step. To do that, we will create a custom U-Net model. It goes without saying that you are free to use any other model of your choice.

The U-Net

We start the creation of our U-Net by creating a block that will keep spatial dimensionality unchanged. This block will be used at every level of our U-Net.

class MyBlock(nn.Module):
def __init__(self, shape, in_c, out_c, kernel_size=3, stride=1, padding=1, activation=None, normalize=True):
super(MyBlock, self).__init__()
self.ln = nn.LayerNorm(shape)
self.conv1 = nn.Conv2d(in_c, out_c, kernel_size, stride, padding)
self.conv2 = nn.Conv2d(out_c, out_c, kernel_size, stride, padding)
self.activation = nn.SiLU() if activation is None else activation
self.normalize = normalize

def forward(self, x):
out = self.ln(x) if self.normalize else x
out = self.conv1(out)
out = self.activation(out)
out = self.conv2(out)
out = self.activation(out)
return out

The tricky thing in DDPMs is that our image-to-image model has to be conditioned on the current time step. To do so in practice, we use sinusoidal embedding and one-layer MLPs. The resulting tensors will be added channel-wise to the input of the network through every level of the U-Net.

def sinusoidal_embedding(n, d):
# Returns the standard positional embedding
embedding = torch.zeros(n, d)
wk = torch.tensor([1 / 10_000 ** (2 * j / d) for j in range(d)])
wk = wk.reshape((1, d))
t = torch.arange(n).reshape((n, 1))
embedding[:,::2] = torch.sin(t * wk[:,::2])
embedding[:,1::2] = torch.cos(t * wk[:,::2])

return embedding

We create a small utility function that creates a one-layer MLP which will be used to map positional embeddings.

def _make_te(self, dim_in, dim_out):
return nn.Sequential(
nn.Linear(dim_in, dim_out),
nn.SiLU(),
nn.Linear(dim_out, dim_out)
)

Now that we know how to deal with the time information, we can create a custom U-Net network. We will have 3 down-sample parts, a bottleneck in the middle of the network, and 3 up-sample steps with the usual U-Net residual connections (concatenations).

class MyUNet(nn.Module):
def __init__(self, n_steps=1000, time_emb_dim=100):
super(MyUNet, self).__init__()

# Sinusoidal embedding
self.time_embed = nn.Embedding(n_steps, time_emb_dim)
self.time_embed.weight.data = sinusoidal_embedding(n_steps, time_emb_dim)
self.time_embed.requires_grad_(False)

# First half
self.te1 = self._make_te(time_emb_dim, 1)
self.b1 = nn.Sequential(
MyBlock((1, 28, 28), 1, 10),
MyBlock((10, 28, 28), 10, 10),
MyBlock((10, 28, 28), 10, 10)
)
self.down1 = nn.Conv2d(10, 10, 4, 2, 1)

self.te2 = self._make_te(time_emb_dim, 10)
self.b2 = nn.Sequential(
MyBlock((10, 14, 14), 10, 20),
MyBlock((20, 14, 14), 20, 20),
MyBlock((20, 14, 14), 20, 20)
)
self.down2 = nn.Conv2d(20, 20, 4, 2, 1)

self.te3 = self._make_te(time_emb_dim, 20)
self.b3 = nn.Sequential(
MyBlock((20, 7, 7), 20, 40),
MyBlock((40, 7, 7), 40, 40),
MyBlock((40, 7, 7), 40, 40)
)
self.down3 = nn.Sequential(
nn.Conv2d(40, 40, 2, 1),
nn.SiLU(),
nn.Conv2d(40, 40, 4, 2, 1)
)

# Bottleneck
self.te_mid = self._make_te(time_emb_dim, 40)
self.b_mid = nn.Sequential(
MyBlock((40, 3, 3), 40, 20),
MyBlock((20, 3, 3), 20, 20),
MyBlock((20, 3, 3), 20, 40)
)

# Second half
self.up1 = nn.Sequential(
nn.ConvTranspose2d(40, 40, 4, 2, 1),
nn.SiLU(),
nn.ConvTranspose2d(40, 40, 2, 1)
)

self.te4 = self._make_te(time_emb_dim, 80)
self.b4 = nn.Sequential(
MyBlock((80, 7, 7), 80, 40),
MyBlock((40, 7, 7), 40, 20),
MyBlock((20, 7, 7), 20, 20)
)

self.up2 = nn.ConvTranspose2d(20, 20, 4, 2, 1)
self.te5 = self._make_te(time_emb_dim, 40)
self.b5 = nn.Sequential(
MyBlock((40, 14, 14), 40, 20),
MyBlock((20, 14, 14), 20, 10),
MyBlock((10, 14, 14), 10, 10)
)

self.up3 = nn.ConvTranspose2d(10, 10, 4, 2, 1)
self.te_out = self._make_te(time_emb_dim, 20)
self.b_out = nn.Sequential(
MyBlock((20, 28, 28), 20, 10),
MyBlock((10, 28, 28), 10, 10),
MyBlock((10, 28, 28), 10, 10, normalize=False)
)

self.conv_out = nn.Conv2d(10, 1, 3, 1, 1)

def forward(self, x, t):
# x is (N, 2, 28, 28) (image with positional embedding stacked on channel dimension)
t = self.time_embed(t)
n = len(x)
out1 = self.b1(x + self.te1(t).reshape(n, -1, 1, 1)) # (N, 10, 28, 28)
out2 = self.b2(self.down1(out1) + self.te2(t).reshape(n, -1, 1, 1)) # (N, 20, 14, 14)
out3 = self.b3(self.down2(out2) + self.te3(t).reshape(n, -1, 1, 1)) # (N, 40, 7, 7)

out_mid = self.b_mid(self.down3(out3) + self.te_mid(t).reshape(n, -1, 1, 1)) # (N, 40, 3, 3)

out4 = torch.cat((out3, self.up1(out_mid)), dim=1) # (N, 80, 7, 7)
out4 = self.b4(out4 + self.te4(t).reshape(n, -1, 1, 1)) # (N, 20, 7, 7)

out5 = torch.cat((out2, self.up2(out4)), dim=1) # (N, 40, 14, 14)
out5 = self.b5(out5 + self.te5(t).reshape(n, -1, 1, 1)) # (N, 10, 14, 14)

out = torch.cat((out1, self.up3(out5)), dim=1) # (N, 20, 28, 28)
out = self.b_out(out + self.te_out(t).reshape(n, -1, 1, 1)) # (N, 1, 28, 28)

out = self.conv_out(out)

return out

def _make_te(self, dim_in, dim_out):
return nn.Sequential(
nn.Linear(dim_in, dim_out),
nn.SiLU(),
nn.Linear(dim_out, dim_out)
)

Now that we defined our denoising network, we can proceed to instantiate a DDPM model and play with some visualizations.

Some visualizations

We instantiate the DDPM model using our custom U-Net as follows.

# Defining model
n_steps, min_beta, max_beta = 1000, 10 ** -4, 0.02 # Originally used by the authors
ddpm = MyDDPM(MyUNet(n_steps), n_steps=n_steps, min_beta=min_beta, max_beta=max_beta, device=device)

Let’s check what the forward process looks like:

# Optionally, show the diffusion (forward) process
show_forward(ddpm, loader, device)
Result of running the forward process. Images get noisier and noisier with each step until just noise is left.

We haven't trained the model yet, but we can already use the function that allows us to generate new images and see what happens:

Generating new images with a non-trained model. Noise is produced.

Not surprisingly, nothing happens when we do so. However, we will re-use this same method later on when the model will be finished training.

Training loop

We now implement Algorithm 1 to learn a model that will know how to denoise images. This corresponds to our training loop.

def training_loop(ddpm, loader, n_epochs, optim, device, display=False, store_path="ddpm_model.pt"):
mse = nn.MSELoss()
best_loss = float("inf")
n_steps = ddpm.n_steps

for epoch in tqdm(range(n_epochs), desc=f"Training progress", colour="#00ff00"):
epoch_loss = 0.0
for step, batch in enumerate(tqdm(loader, leave=False, desc=f"Epoch {epoch + 1}/{n_epochs}", colour="#005500")):
# Loading data
x0 = batch[0].to(device)
n = len(x0)

# Picking some noise for each of the images in the batch, a timestep and the respective alpha_bars
eta = torch.randn_like(x0).to(device)
t = torch.randint(0, n_steps, (n,)).to(device)

# Computing the noisy image based on x0 and the time-step (forward process)
noisy_imgs = ddpm(x0, t, eta)

# Getting model estimation of noise based on the images and the time-step
eta_theta = ddpm.backward(noisy_imgs, t.reshape(n, -1))

# Optimizing the MSE between the noise plugged and the predicted noise
loss = mse(eta_theta, eta)
optim.zero_grad()
loss.backward()
optim.step()

epoch_loss += loss.item() * len(x0) / len(loader.dataset)

# Display images generated at this epoch
if display:
show_images(generate_new_images(ddpm, device=device), f"Images generated at epoch {epoch + 1}")

log_string = f"Loss at epoch {epoch + 1}: {epoch_loss:.3f}"

# Storing the model
if best_loss > epoch_loss:
best_loss = epoch_loss
torch.save(ddpm.state_dict(), store_path)
log_string += " --> Best model ever (stored)"

print(log_string)

As you can see, in our training loop we simply sample some images and some random time steps for each of them. We then make them noisy with the forward process and run the backward process on those noisy images. The MSE between the actual noise added and the one predicted by the model is optimized.

Training takes roughly 24 seconds per epoch. With 20 epochs, it takes roughly 8 minutes to train a DDPM model.

By default, I set the training epochs to 20 as it takes 24 seconds per epoch (total of roughly 8 minutes to train). Note that it is possible to obtain even better performances with more epochs, a better U-Net, and other tricks. In this post, I omit those for simplicity.

Testing the model

Now that the job is done, we can simply enjoy the results. We load the best model obtained during training according to the MSE loss function, set it to evaluation mode and use it to generate new samples

# Loading the trained model
best_model = MyDDPM(MyUNet(), n_steps=n_steps, device=device)
best_model.load_state_dict(torch.load(store_path, map_location=device))
best_model.eval()
print("Model loaded")
print("Generating new images")
generated = generate_new_images(
best_model,
n_samples=100,
device=device,
gif_name="fashion.gif" if fashion else "mnist.gif"
)
show_images(generated, "Final result")
Final result on the MNIST dataset

The cherry on the cake is the fact that our generation function automatically creates a nice gif of the diffusion process. We visualize that gif in Colab with the following command:

Showing the generated gif image
Obtained GIFs for Fashion-MNIST and MNIST datasets.

And we are done! We finally have our DDPM model working!

Further Improvements

Further improvements have been made, as to allow the generation of higher resolution images, accelerate sampling or obtain better sample quality and likelihood. Imagen and DALL-E 2 models are based on improved versions of the original DDPMs.

More references

For more references regarding DDPMs, I strongly recommend reading the outstanding post by Lilian Weng and Niels Rogge and Kashif Rasul’s amazing Hugging Face Blog. Other authors are also mentioned at the end of the Colab Notebook.

Conclusion

Diffusion models are generative models that learn to denoise images iteratively. Starting from some noise is then possible to ask the model to de-noise the sample until some realistic image is obtained.

We created a DDPM from scratch in PyTorch and had it learn to de-noise MNIST / Fashion-MNIST images. The model, after training, was finally capable of generating new images out of random noise. Quite magical, right?

The Colab Notebook with the shown implementation is freely accessible at this link, while the GitHub repository contains .py files. If you found this story useful, consider giving it a clap 👏. If you feel like something is unclear, don’t hesitate to contact me directly! I’d be glad to discuss it with you.

--

--

Brian Pulfer

BSc in CS, MSc in AI. Currently Ph.D. Student in Machine Learning & Computer Vision. Passionate about AI and how to use it against climate change.