Visual Perception for Self-Driving Cars! Part4: Road Segmentation

Learn concepts by coding! Explore how deep learning and computer vision are used for different visual tasks in autonomous driving.

Shahrullohon Lutfillohonov
11 min readSep 15, 2022

This article is part of series. Check out the full series: Part 1, Part 2, Part 3, Part 4, Part5, Part6!

We continue our blog series in Visual Perception for Self-Driving Cars.

As one of the fundamental components, road segmentation observes the environment, finds the drivable area, and creates an occupancy map for path planning. In self-driving systems, road segmentation not only classify the area, but also contributes to other perception modules.

Left: Road image; Center: Mask data; Right: Masked Road image — Image by Author

Segmentation

Segmentation is the task of clustering elements of an image corresponding to the same object class. It is also known as pixel-level classification. It entails partitioning images (or video frames) into multiple segments of objects. Depending on the classification nature, image segmentation is divided into two categories: semantic segmentation (pixels with semantic labels) and instance segmentation (partitioning of individual objects). For example, in an image with a bunch of cars: while semantic segmentation label all the objects as car objects, instance segmentation can classify the separate instances where an object appears. In our today’s blog, we will use semantic segmentation.

Road Segmentation on PyTorch

We create custom segmentation model based on PyTorch and train it on road dataset. Here is our agenda for the task:

  • Create environment and install dependencies
  • Download the dataset
  • Data processing
  • Create custom model
  • Model training
  • Run inference on videos with trained weights

The code for the project should be self-explanatory, but if you have any doubt or face any problem, please feel free to put it in the comment or contact me.

Create new environment and install dependencies

It is helpful to create virtual environment to manage dependencies and isolate our project.

# Create new conda environment
conda create -n (your env name) python=3.9 jupyter

then, do not forget to activate it

# activate the conda environment
conda activate (your env name)

here is the required dependencies. Please make new directory for the project and save as requirements.txt inside the project directory. It is always required (but most junior developers forget about it) to make list of all of a project’s dependencies. It saves you and your team from facing burnout after a while.

# pip install -r requirements.txtmatplotlib>=3.2.2
numpy>=1.20.3
opencv-python>=4.5.5.64
pillow>=9.0.1
pytorch>=1.11.0=py3.9_cuda11.3_cudnn8_0
tqdm>=4.63.0

on the terminal, run

pip install -r requirements.txt

Download the dataset

We will use road dataset from this link. Please download it and move it to the project directory. It contains 1300 photos from sunny and rainy weather with masks. It has three masks: general, color and watershed masks. We chose watershed mask to our project.

Jupyter Notebook

It is possible to make the model project with jupyter notebook. Write the following code to the terminal, Jupyter Notebook should be open accordingly.

# To the new environment activated terminal
jupyter notebook

Import the dependencies

We use <ThreadPoolExecutor> from concurrent library to speed-up our program by executing tasks concurrently. It lets max_workers threads to execute calls asynchronously. To know more about it, please refer to this link.

import os
import time
from datetime import datetime
from tqdm.notebook import trange
import concurrent.futures
from concurrent.futures import ThreadPoolExecutor
import cv2
from PIL import Image, ImageFilter
import matplotlib.pyplot as plt
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F

Data Processing

First, we create ImageLoader class to load the dataset for processing.

class ImageLoader:

def __init__(self, folders_path, data_type, height=480, width=640, channel_first=False, file_mask=None, postprocessing=None):

self.channels = 3
self.height = height
self.width = width
self.data_type = data_type
self.postprocessing = postprocessing
self.file_mask = file_mask

self.file_names = []
for folder in folders_path:
self.file_names += self._find_files(folder)

self.file_names.sort()

self.count = len(self.file_names)

self.channel_first = channel_first

if self.channel_first:
self.images = np.zeros((self.count, self.channels, self.height, self.width), dtype=np.uint8)
else:
self.images = np.zeros((self.count, self.height, self.width, self.channels), dtype=np.uint8)

ptr = 0
for file_name in self.file_names:
print('Loading image :', file_name)
self.images[ptr] = self._load_image(file_name)
ptr += 1

def _find_files(self, path):
files = []
for (dirpath, dirnames, filenames) in os.walk(path):
files.append(filenames)

result = []
for file_name in files[0]:
if file_name.endswith('.jpg') or file_name.endswith('.png'):
if self.file_mask == None:
result.append(path+file_name)
elif file_name.find(self.file_mask) != -1:
result.append(path+file_name)

return result

def _load_image(self, file_name):
image = Image.open(file_name).convert('RGB')

if self.data_type == 'mask':
image = image.crop((0, 1, 640, 479))
else:
image = image.resize((640, 478))

if self.postprocessing is not None:
image = self.postprocessing(image)
image_np = np.array(image)
else:
image = image.resize((self.width, self.height))
image_np = np.array(image)
if self.channel_first and len(image_np.shape) > 2:
image_np = np.moveaxis(image_np, 2, 0)

return image_np

Now we can process the dataset to make it work on PyTorch model

class DatasetProcess:

def __init__(self, folders_training, folders_testing, classes_ids, height=480, width=640, augmentation_count=10):

self.classes_ids = classes_ids
self.classes_count = len(classes_ids)
self.height = height
self.width = width
self.channels = 3

self.training_images = []
self.training_masks = []
self.training_count = 0

for folder in folders_training:
images = ImageLoader([folder + '/images/'], 'image', height, width, channel_first=True)
masks = ImageLoader([folder + '/mask/'], 'mask', height, width, channel_first=True, file_mask='_watershed_mask', postprocessing=self._mask_postprocessing)

self.training_images.append(images.images)
self.training_masks.append(masks.images)

print('Processing augmentations\n')

images_aug, masks_aug = self._augmentation(images.images, masks.images, augmentation_count)

self.training_images.append(images_aug)
self.training_masks.append(masks_aug)

self.training_count += images.count * (1 + augmentation_count)

self.testing_images = []
self.testing_masks = []
self.testing_count = 0

for folder in folders_testing:
images = ImageLoader([folder + "/images/"], height, width, channel_first=True)
masks = ImageLoader([folder + "/mask/"], height, width, channel_first=True, file_mask="_watershed_mask",
postprocessing=None)
self.testing_images.append(images.images)
self.testing_masks.append(masks.images)
self.testing_count += images.count

self.input_shape = (self.channels, self.height, self.width)
self.output_shape = (self.classes_count, self.height, self.width)
print("\n\n\n\n")
print("dataset summary : \n")
print("training_count = ", self.get_training_count())
print("testing_count = ", self.get_testing_count())
print("channels = ", self.channels)
print("height = ", self.height)
print("width = ", self.width)
print("classes_count = ", self.classes_count)
print("\n")

def get_training_count(self):
return self.training_count

def get_testing_count(self):
return self.testing_count
def get_training_batch(self, batch_size=32):
return self._get_batch(self.training_images, self.training_masks, batch_size, augmentation=True)

def get_testing_batch(self, batch_size=32):
return self._get_batch(self.training_images, self.training_masks, batch_size, augmentation=False)

def process(self, images, masks, augmentation=True):
group_idx = np.random.randint(len(images))
image_idx = np.random.randint(len(images[group_idx]))
image_np = np.array(images[group_idx][image_idx]) / 256.0
mask_np = np.array(masks[group_idx][image_idx]).mean(axis=0).astype(int)
#if self._rnd(0, 1) > 0.1:
if augmentation:
image_np = self._augmentation_noise(image_np)
image_np, mask_np = self._augmentation_flip(image_np, mask_np)
mask_one_hot = np.eye(self.classes_count)[mask_np]
mask_one_hot = np.moveaxis(mask_one_hot, 2, 0)
result_x = torch.from_numpy(image_np).float()
result_y = torch.from_numpy(mask_one_hot).float()
return result_x, result_y

def _get_batch(self, images, masks, batch_size, augmentation=True):
result_x = torch.zeros((batch_size, self.channels, self.height, self.width)).float()
result_y = torch.zeros((batch_size, self.classes_count, self.height, self.width)).float()
with ThreadPoolExecutor(max_workers=batch_size) as executor:
results = [None] * batch_size
for x in range(batch_size):
results[x] = executor.submit(self.process, images, masks, augmentation=augmentation)
counter = 0
for f in concurrent.futures.as_completed(results):
result_x[counter], result_y[counter] = f.result()[0], f.result()[1]
counter += 1
return result_x, result_ydef _augmentation(self, images, masks, augmentation_count):
angle_max, crop_prop = 25, 0.2
count = images.shape[0]
total_count = count * augmentation_count
images_result = np.zeros((total_count, images.shape[1], images.shape[2], images.shape[3]), dtype=np.uint8)
mask_result = np.zeros((total_count, masks.shape[1], masks.shape[2], masks.shape[3]), dtype=np.uint8)
ptr = 0
for j in range(count):
image_in = Image.fromarray(np.moveaxis(images[j], 0, 2), 'RGB')
mask_in = Image.fromarray(np.moveaxis(masks[j], 0, 2), 'RGB')
for i in range(augmentation_count):angle = self._rnd(-angle_max, angle_max)image_aug = image_in.rotate(angle)
mask_aug = mask_in.rotate(angle)
c_left = int(self._rnd(0, crop_prop) * self.width)
c_top = int(self._rnd(0, crop_prop) * self.height)
c_right = int(self._rnd(1.0 - crop_prop, 1.0) * self.width)
c_bottom = int(self._rnd(1.0 - crop_prop, 1.0) * self.height)
image_aug = image_aug.crop((c_left, c_top, c_right, c_bottom))
mask_aug = mask_aug.crop((c_left, c_top, c_right, c_bottom))
if np.random.rand() < 0.5:
fil = np.random.randint(6)
if fil == 0:
image_aug = image_aug.filter(ImageFilter.BLUR)
elif fil == 1:
image_aug = image_aug.filter(ImageFilter.EDGE_ENHANCE)
elif fil == 2:
image_aug = image_aug.filter(ImageFilter.EDGE_ENHANCE_MORE)
elif fil == 3:
image_aug = image_aug.filter(ImageFilter.SHARPEN)
elif fil == 4:
image_aug = image_aug.filter(ImageFilter.SMOOTH)
elif fil == 5:
image_aug = image_aug.filter(ImageFilter.SMOOTH_MORE)
image_aug = image_aug.resize((self.width, self.height))
mask_aug = mask_aug.resize((self.width, self.height))
image_aug = np.array(image_aug)
mask_aug = np.array(mask_aug)
image_aug = np.moveaxis(image_aug, 2, 0)
mask_aug = np.moveaxis(mask_aug, 2, 0)
images_result[ptr] = image_aug
mask_result[ptr] = mask_aug
ptr += 1return images_result, mask_result
def _augmentation_noise(self, image_np):
brightness = self._rnd(-0.25, 0.25)
contrast = self._rnd(0.5, 1.5)
noise = 0.05 * (2.0 * np.random.rand(self.channels, self.height, self.width) - 1.0)
result = image_np + brightness
result = 0.5 + contrast * (result - 0.5)
result = result + noise
return np.clip(result, 0.0, 1.0)def _augmentation_flip(self, image_np, mask_np):
# random flips
if self._rnd(0,1) < 0.5:
image_np = np.flip(image_np , 2)
mask_np = np.flip(mask_np, 1)
return image_np.copy(),mask_np.copy()def _rnd(self, min_value, max_value):
return (max_value - min_value) * np.random.rand() + min_value
def _mask_postprocessing(self, image):
image = image.resize((self.width, self.height), Image.NEAREST)
image = image.convert("L")
for i in range(len(self.classes_ids)):
image.putpixel((4 * i + self.width // 2, 4 * i + self.height // 2), self.classes_ids[i])
return image.quantize(self.classes_count)

Create Custom Segmentation Model on PyTorch

Processing the data is finished, so it is time to create our custom model for segmentation. It is also possible to use pre-trained models. However, we wanted a challenge here and this is a good experience for our readers too.

class CustomModel(nn.Module):

def __init__(self, input_shape=(3, 256, 352), output_shape=(2, 256, 352)):
super(CustomModel, self).__init__()

self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

self.layers_encoder_0 = [
self.conv_bn(input_shape[0], 32, 2),
self.conv_bn(32, 64, 1),
self.conv_bn(64, 128, 2),
self.conv_bn(128, 128, 1),
self.conv_bn(128, 128, 1),
self.conv_bn(128, 128, 1),
self.conv_bn(128, 256, 2),

]

self.layers_encoder_1 = [
self.conv_bn(256, 256, 1),
self.conv_bn(256, 512, 2),
self.conv_bn(512, 512, 1),
self.conv_bn(512, 512, 1),
self.conv_bn(512, 512, 1),
self.conv_bn(512, 512, 1),
self.conv_bn(512, 512, 2)
]

self.layers_decoder = [
self.conv_bn(512+256, 256, 1),
self.conv_bn(256, 256, 1),
self.conv_bn(256, 128, 1),
self.conv_bn(128, 128, 1),
self.conv_bn(128, 128, 1),

nn.Conv2d(128, output_shape[0], kernel_size=1, stride=1, padding=0),
nn.Upsample(scale_factor=8, mode='bilinear', align_corners=False)
]

for i in range(len(self.layers_encoder_0)):
if hasattr(self.layers_encoder_0[i], 'weight'):
nn.init.xavier_uniform_(self.layers_encoder_0[i].weight)
nn.init.zeros_(self.layers_encoder_0[i].bias)

for i in range(len(self.layers_encoder_1)):
if hasattr(self.layers_encoder_1[i], 'weight'):
nn.init.xavier_uniform_(self.layers_encoder_1[i].weight)
nn.init.zeros_(self.layers_encoder_1[i].bias)

for i in range(len(self.layers_decoder)):
if hasattr(self.layers_decoder[i], 'weight'):
nn.init.xavier_uniform_(self.layers_decoder[i].weight)
nn.init.zeros_(self.layers_decoder[i].bias)

self.model_encoder_0 = nn.Sequential(*self.layers_encoder_0)
self.model_encoder_0.to(self.device)

self.model_encoder_1 = nn.Sequential(*self.layers_encoder_1)
self.model_encoder_1.to(self.device)

self.model_decoder = nn.Sequential(*self.layers_decoder)
self.model_decoder.to(self.device)

print(self.model_encoder_0)
print(self.model_encoder_1)
print(self.model_decoder)


def forward(self, x):
encoder_0 = self.model_encoder_0(x)
encoder_1 = self.model_encoder_1(encoder_0)

encoder_1_up = F.interpolate(encoder_1, scale_factor=4, mode='nearest')

d_in = torch.cat([encoder_0, encoder_1_up], dim=1)

y = self.model_decoder(d_in)

return y

def conv_bn(self, inputs, outputs, stride):
return nn.Sequential(
nn.Conv2d(inputs, outputs, kernel_size=3, stride=stride, padding=1),
nn.BatchNorm2d(outputs),
nn.LeakyReLU(inplace=True))

Model Training

Model is also ready. Now we can define the parameters and start the training. The model is trained over 100 epochs. You can choose number of epochs and batch size according your convenience.

Remember, our data is located in 5 folders, so first we merge them into one list.

# Load training images from folders
folders_training = []
folders_training.append('City_dataset_full/City_sunny1/')
folders_training.append('City_dataset_full/City_sunny2/')
folders_training.append('City_dataset_full/City_rainy1/')
folders_training.append('City_dataset_full/City_rainy2/')
folders_training.append('City_dataset_full/City_2/')
# Assign classes
classes_ids = [8, 12]
classes_count = len(classes_ids)
# Load the model
model = CustomModel()
# Time estimating variables
epochminus, arrayloss, arrayepoch, lossforavg = 0, [], [], 0
# Number of epoches
N_EPOCHS = 100
# Batch size
BATCH_SIZE = 128
# Load images (height and width must be divisible by 32)
dataset = DatasetProcess(folders_training, folders_training, classes_ids, height=384, width=512, augmentation_count=25)
MAXLOSS = 9999.0# Print start time()
print(time.time())
for epoch in trange(N_EPOCHS):
# Time estimating variables
epochminus += 1
timestart = time.time()

# Calculate batch_count
batch_count = (dataset.get_training_count() + BATCH_SIZE) // BATCH_SIZE
print(batch_count, 'BATCH_COUNT')

# Set optimizer for the model parameters
optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)

# Print current epoch
print('EPOCH - ', epoch)

# Batch loop
for batch in trange(batch_count):
# Get batch from model loader
x, y = dataset.get_training_batch(BATCH_SIZE)

# Put images on DEVICE
x = x.to(model.device)
y = y.to(model.device)

# Push images to model
y_pred = model.forward(x)

# Calculate loss for optimizer
loss = ((y - y_pred) ** 2).mean()

# Get loss number for graph
lossforavg += float(loss.data.cpu().numpy())

# Save best model
if epoch > 10 and MAXLOSS > float(loss.data.cpu().numpy()):
torch.save(model.state_dict(), 'best_model.pt')
MAXLOSS = float(loss.data.cpu().numpy())
print('Best model saved')

# Reset, find and update gradients
optimizer.zero_grad()
loss.backward()
optimizer.step()

# Graphing variables
arrayepoch.append(epoch)
arrayloss.append(lossforavg / batch_count)
print(lossforavg / batch_count, 'Epoch avg loss')
lossforavg = 0

# Time estimating variables
timetoend = (N_EPOCHS - epochminus) * (time.time() - timestart)
dt_object = datetime.fromtimestamp(timetoend + time.time())
print(dt_object, 'time to end')
# Save final model
PATH = './Model_final.pt'
torch.save(model.state_dict(), PATH)

Inference

Training is finished. We saved weights for best and last model. It is time to use trained weights for inference. As our aim is self-driving cars, we inference model on video directly. But the reader can also use them on images too.

class Inference:

def __init__(self, classes_count):

# Load model
self.model = CustomModel()
self.device = self.model.device
# Path to trained weights
self.PATH = 'best_model.pt'
# Load weights
self.model.load_state_dict(torch.load(self.PATH))
# Set GPU device if available
self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# Turn on evaluation mode
self.model.eval()
self.colors = self._make_colors(classes_count)

print('Segmentation Inference ready')

def process(self, image_np, channel_first=False, alpha=0.35):

# Put image to device
image_t= torch.from_numpy(image_np).float().to(self.device)
image_t = image_t / 256.0

if channel_first == False:
image_in_t = image_t.transpose(0, 2).transpose(1, 2)
else:
image_in_t = image_t

# Process image
prediction_t = self.model(image_in_t.unsqueeze(0)).squeeze(0)
prediction_t = torch.argmax(prediction_t, dim=0)
prediction_t = prediction_t.transpose(0, 1)

mask_t = self.colors[prediction_t, :].transpose(0, 1)

# Mix mask with image with alpha 0.35
result_t = (1.0 - alpha) * image_t + alpha * mask_t

# Get results back to CPU
prediction = prediction_t.detach().to('cpu').numpy()
mask = mask_t.detach().to('cpu').numpy()
result = result_t.detach().to('cpu').numpy()

return prediction, mask, result

# Make colors for mask
def _make_colors(self, count):

result = []

result.append([0, 0, 0])
result.append([0, 0, 1])
result.append([0, 0, 0])
result.append([0, 0, 0])
result.append([0, 0, 0])

result = torch.from_numpy(np.array(result)).to(self.device)

return result

Inference video based on Inference class

# Load desired video
cap = cv2.VideoCapture('../../RoadSegmentation/testing/challenge.mp4')
show_video = False
save_video = True
# Set same height and width as trained model have
height = 480
widht = 640
# Load segmentation inference
si = Inference(2)
if save_video:
fourcc = cv2.VideoWriter_fourcc(*'XVID')
writer = cv2.VideoWriter('output1.avi', fourcc, 25.0, (widht, height))

fps_smooth = 0.0
frame_skip = 20
next_frame = 0
cnt = 0
def print_video(image, text):
x = cv2.putText(image, text, (20, 20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 1, lineType=cv2.LINE_AA)

while(True):
ret, frame = cap.read()

if ret == False:
break

frame = cv2.resize(frame, (widht, height), interpolation=cv2.INTER_AREA)
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

if cnt > next_frame:
time_start = time.time()

# Sent frame to inference model and get result
prediction_np, mask, result = si.process(frame)

# Count FPS
time_stop = time.time()
fps = 1.0 / (time_stop - time_start)
result = (result * 255).astype(np.uint8)

# Print FPS
text = 'fps = ' + str(round(fps, 1))

im_bgr = cv2.cvtColor(result, cv2.COLOR_BGR2RGB)
print_video(im_bgr, text)

if show_video:
cv2.imshow('frame', im_bgr)

if save_video:
writer.write(im_bgr)

frame_skip = 25 / fps
frame_skip = int(np.clip(frame_skip, 1, 500))

next_frame = cnt + frame_skip

cnt += 1
if cv2.waitKey(1) & 0xFF == ord('q'):
break

cap.release()
cv2.destroyAllWindows()

Result

After training for 100 epochs, the result is promising. We can improve model efficiency by increasing the number of epochs, training with more data or using pre-trained models. Let’s leave it as a task for our readers.

Custom Inference of Road Segmentation Model on Road Video Data

Conclusion

Our custom segmentation model has showed fairly well and promising performance in segmenting the road for our self-driving car. We have showed full details of custom code implementation for data processing and model creation. May be our model is not ready to use in real-life autonomous driving, but it is a baseline for the aimed project.

I hope you enjoyed reading. If you have any question or suggestion, please feel free to leave a comment. You can also find me on LinkedIn or email me directly. I’d love to hear from you!

We will discuss further more on visual perception for self driving cars in the following posts.

--

--