Eyes on the road

David Perez Vicens
Saturdays.AI
Published in
10 min readAug 12, 2023

Driver Drowsiness model prediction. Saturdays AI

Eyes on the road is a project developed as part of Saturdays AI in Alicante in the first half of 2023. During six weeks, students learned different approaches to use artificial intelligence like supervised and unsupervised learning, random forest, regression or natural language processing tools.

After the training sessions, students had to develop a social project with all the concepts learned and we though that what is more social that saving lives? In Spain, the cause of four out of ten accidents is drivers getting sleepy. That’s why we decided to develop a tool to prevent drivers getting sleepy while driving using a low cost device like a Raspberry Pi.

After different unlucky attempts, we took a direct approach, let’s use a model that calculates how closed your eyes are so we can play a sound or an alarm when this happens.

The first step was to acquire and process all the images from the dataset. We used the DMD (Driving Monitoring Dataset), more specifically the fatigue related subset.

The Driver Monitoring Dataset is the largest visual dataset for real driving actions, with footage from synchronized multiple cameras (body, face, hands) and multiple streams (RGB, Depth, IR) recorded in two scenarios (real car, driving simulator). Different annotated labels related to distraction, fatigue and gaze-head pose can be used to train Deep Learning models for Driver Monitor Systems.

The DMD content consists of hundreds of videos for each subset, an example of the video frame sample can be shown bellow.

Each video comes with a JSON file that labels each frame with a specific category with the following strcuture.

...
"actions": {
"0": {
"name": "",
"type": "eyes_state/open",
"frame_intervals": [
{ "frame_start": 23, "frame_end": 33 },
{ "frame_start": 45, "frame_end": 51 },
...js

Due that all the original DMD dataset were videos we have to challenge to process all videos and extract the specific frames that were labeled as awake and drowsy, that way all model could be more efficient in processing each image from the webcam. To accomplish this, we created a script that processes all the labels in json format and extracts the frames into each labeled folder.

import os
import cv2
import json
import re

# Path to the dataset/raw folder
json_folder = 'dmd/raw'

# Create output directory if it doesn't exist
output_dir = 'dmd/labels'
os.makedirs(output_dir, exist_ok=True)


# Function to sanitize folder names
def sanitize_folder_name(folder_name):
# Replace invalid characters with underscores
folder_name = re.sub(r'[<>:"/\\|?*]', '_', folder_name)
# Remove leading and trailing whitespaces
folder_name = folder_name.strip()
return folder_name


# Process each JSON file in the folder
for filename in os.listdir(json_folder):
if filename.endswith('.json'):
json_path = os.path.join(json_folder, filename)

# Load JSON data
with open(json_path) as json_file:
data = json.load(json_file)

# Set video_path based on the JSON filename
video_filename = filename.replace('ann_drowsiness.json', 'face.mp4')
video_path = os.path.join(json_folder, video_filename)

# Process each action in the JSON data
for action_key, action_value in data['openlabel']['actions'].items():
action_type = action_value['type']
action_type_folder = sanitize_folder_name(action_type)
frame_intervals = action_value['frame_intervals']

# Create subfolder for the action type if it doesn't exist
action_folder = os.path.join(output_dir, action_type_folder)
os.makedirs(action_folder, exist_ok=True)

# Process frame intervals
for interval in action_value['frame_intervals']:
frame_start = interval['frame_start']
frame_end = interval['frame_end']

# Open video file for reading
video = cv2.VideoCapture(video_path)

# Set video frame position to the start frame
video.set(cv2.CAP_PROP_POS_FRAMES, frame_start)

# Read and save frames until the end frame is reached
for frame_num in range(frame_start, frame_end + 1):
ret, frame = video.read()
if ret:
# Save frame as a JPG image
output_filename = f'{frame_num}.jpg'
output_path = os.path.join(action_folder, output_filename)
cv2.imwrite(output_path, frame)

print(f'Saved frame: {output_path}')
else:
break

# Release video resources
video.release()

With this scripts we processed each video and extracted every frame that was labeled in each interval. You end with a structure of folders named by label (blinks_blinking, eyes_state_close, eyes_state_closing, eyes_state_open, eyes_state_opening, eyes_state_undefined) but we need to simplify even more all the labels and ended using only a binary approach with “awake” and “drowsy” labels.

Now that we have accomplished transforming video processing (heavy CPU load) into image processing we can focus in the next step.

Face landmarks detection

We could start training our model with these images but the primary tests we did were pretty inaccurate due that all the images were based in a few people diversity subset and in the same light conditions. To solve this we processed all the images with an already generated open source library called DLIB to detect faces landmarks such as mouth outline, eyes outline and face outline.

Dlib is a modern C++ toolkit containing machine learning algorithms and tools for creating complex software in C++ to solve real world problems. It is used in both industry and academia in a wide range of domains including robotics, embedded devices, mobile phones, and large high performance computing environments.

These are the face landmarks points that we can get for each frame.

For blink detection, we are only interested in two sets of facil strcutures (the eyes).

For each eye, we have 6 *(x, y)*coordinates. These start at the left-corner of the eye, as if you were looking at the person, and then go clockwise around the rest of the region:

According to Soukupová and Čech’s 2016 paper titled “Real-Time Eye Blink Detection using Facial Landmarks,” there’s a relationship between the width and height of certain coordinates. This relationship can be expressed through an equation called the “eye aspect ratio” (EAR).

The equation uses 2D facial landmark locations to measure the distance between vertical and horizontal eye landmarks. The numerator measures vertical eye landmark distance and the denominator measures horizontal eye landmark distance. The equation is useful because it can detect blinking by noticing when the eye aspect ratio falls to zero. With this equation, we don’t need to use image processing, we can just rely on the ratio of eye landmark distances. The figure below from Soukupová and Čech can help illustrate this better.

In the top left, there is an open eye that stays mostly the same size over time.

When the person blinks (top right), the eye gets much smaller.

The bottom graph shows how the eye changes size over time in a video clip. It starts out mostly the same, then quickly becomes very small, and then gets big again, which shows that the person blinked once.

Our goal is to transform all images into a CSV dataset that binds every image into a 0/1 depending if we detect the person is awake of drowsy by processing the face landmarks using the “eye aspect ratio” EAR.

import os
import cv2
import imutils
from imutils import face_utils
import dlib
import glob2
import matplotlib.pyplot as plt
import pandas as pd
from scipy.spatial import distance as dist

DATASET = './dmd/binary_labels'
CSV_FILE = 'landmarks.csv'
detector = dlib.get_frontal_face_detector()
predictor = dlib.shape_predictor('dlib/shape_predictor_68_face_landmarks.dat')

EYE_AR_THRESH = 0.3
MOUTH_AR_THRESH = 0.2

def get_landmarks_ratios(frame):

frame = imutils.resize(frame, width=640)
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

# detect faces
faces = detector(gray)
print(f'Faces: {len(faces)}')

if len(faces) == 0:
return None

# get the largest face
largest_face = None;
for face in faces:
largest_face_area = 0;
if face.area() > largest_face_area:
largest_face = face

shape = predictor(gray, largest_face)

# Extracting the indices of the facial features
(lStart, lEnd) = face_utils.FACIAL_LANDMARKS_IDXS["left_eye"]
(rStart, rEnd) = face_utils.FACIAL_LANDMARKS_IDXS["right_eye"]
(mStart, mEnd) = face_utils.FACIAL_LANDMARKS_IDXS["inner_mouth"]

# Get coordinates for left eye, right eye, and mouth
left_eye = [(shape.part(i).x, shape.part(i).y) for i in range(lStart, lEnd)]
right_eye = [(shape.part(i).x, shape.part(i).y) for i in range(rStart, rEnd)]
mouth = [(shape.part(i).x, shape.part(i).y) for i in range(mStart, mEnd)]

# Compute aspect ratios for the eyes and mouth
def eye_aspect_ratio(eye):
A = dist.euclidean(eye[1], eye[5])
B = dist.euclidean(eye[2], eye[4])
C = dist.euclidean(eye[0], eye[3])
ear = (A + B) / (2.0 * C)
return ear

def mouth_aspect_ratio(mouth):
A = dist.euclidean(mouth[2], mouth[6])
C = dist.euclidean(mouth[0], mouth[4])
mar = A / C
return mar

left_ear = eye_aspect_ratio(left_eye)
right_ear = eye_aspect_ratio(right_eye)
mouth_ar = mouth_aspect_ratio(mouth)

return left_ear, right_ear, mouth_ar

Dataset creation

Finally, we can create our CSV file using all the processed information. The CSV file will include the Image name, Left Eye AR, Right Eye AR, Mouth AR, and a binary value of 0/1 depending on the awareness state.

if os.path.exists(CSV_FILE):
csv = pd.read_csv
else:
# Create a DataFrame to hold the data
df = pd.DataFrame(columns=["Image", "Left_Eye_AR", "Right_Eye_AR", "Mouth_AR", "Awake"])
file_paths = glob2.glob(DATASET + '/**/*.jpg')
data = []

for i, file_path in enumerate(file_paths):

awake = 1 if 'awake' in file_path else 0
print(f'Awake: {awake}')
img = cv2.imread(file_path)
ratios = get_landmarks_ratios(img)

if img is not None and ratios is not None:
# for ratio in ratios:

print(f"{ratios}\n")
if img is not None and ratios is not None:
row = {
"Image": file_path,
"Left_Eye_AR": ratios[0],
"Right_Eye_AR": ratios[1],
"Mouth_AR": ratios[2],
"Awake": awake
}
data.append(row)


df = pd.concat([df, pd.DataFrame(data)], ignore_index=True) # concatenate the new data with the old dataframe

df.to_csv('landmarks_ratios.csv', index=False)

Sample of the CSV generated.

The next step is to conduct a study of all available model options and review all dataset information.

Using the describe method, we can precisely adjust the minimum and maximum thresholds for the eyes and mouth to achieve accurate detection.

After processing the distribution image we observed that our dataset is well balanced.

Model selection and training

We tested all three model approaches using hyperparameters to get the best fit model, each with its own advantages and disadvantages:

Support Vector Machines (SVM)

This algorithm can model complex, non-linear relationships. It may work better than Logistic Regression if such relationships are present in the data. However, it may take longer to train, especially with large datasets.

start = time.time()
svm_params = {
'gamma': ['scale', 'auto', 1, 0.1, 0.01, 0.001],
'C': [0.1, 1],
'kernel': ['rbf', 'sigmoid']
}
svm_grid_search = GridSearchCV(SVC(probability=True), svm_params, cv=5)

# Fit the model
svm_grid_search.fit(X_train, y_train)

end = time.time()
print(f'Time taken to train model: {end - start} seconds')
print('Best parameters for SVM: ', svm_grid_search.best_params_)

joblib.dump(svm_grid_search, MODEL_PATH + '/svm.pkl' )

evaluate_model(svm_grid_search)Py

Random Forest Classifier

This is an ensemble method that builds multiple decision trees and combines their predictions. It often performs well out-of-the-box and can handle non-linear relationships.

start = time.time()
rf_params = {
'n_estimators': [100, 200],
'max_depth': [10, 20],
'min_samples_split': [2, 5],
'min_samples_leaf': [1, 2],
}
rf_grid_search = GridSearchCV(RandomForestClassifier(random_state=0), rf_params, cv=5)

rf_grid_search.fit(X_train, y_train)

end = time.time()

joblib.dump(rf_grid_search, MODEL_PATH + '/rfgrid.pkl' )

print('Best parameters for RandomForestClassifier: ', rf_grid_search.best_params_)
print(f'Time taken to train model: {end - start} seconds')

evaluate_model(rf_grid_search)

Gradient Boosting Classifier

This is another powerful ensemble method, which builds trees sequentially, each trying to correct the mistakes of the previous one.

start = time.time()

gb_params = {
'n_estimators': [100, 200],
'learning_rate': [0.01, 0.1],
'max_depth': [3, 5],
'min_samples_split': [2, 5],
'min_samples_leaf': [1, 2],
}

gb_grid_search = GridSearchCV(GradientBoostingClassifier(random_state=42), gb_params, cv=5)

gb_grid_search.fit(X_train, y_train)

end = time.time()

joblib.dump(gb_grid_search, MODEL_PATH + '/gbgrid.pkl' )

print('Best parameters for GradientBoostingClassifier: ', gb_grid_search.best_params_)
print(f'Time taken to train model: {end - start} seconds')

evaluate_model(gb_grid_search)

Result evaluation

The confusion matrix shows the false positives and negatives that our model detects, this value being quite low compared to the accurate predictions.

Raspberry Pi Integration

The next step is to develop a program that can process real-time images from a Raspberry Pi Webcam, extract the coordinates of facial landmarks, and analyze them using the trained random forest model.

Here is the workflow for the program.

  1. Boot and initialize Pi & Webcam.
  2. Detect face and face landmark points.
  3. Extract and clean landmakrs points.
  4. Transform and normalize landmark points into csv format.
  5. Predict 0/1 (awake status) using transformed data.
  6. Store prediction in a timeline buffer.
  7. Trigger PI sound if buffer algorithm predicts an increment in drowsiness.

Here is the code for the program.

import dlib
import glob2
import os
import cv2
import imutils
from matplotlib import pyplot as plt
import joblib

DATASET = 'datasets/dmd/test'
detector = dlib.get_frontal_face_detector()
predictor = dlib.shape_predictor('datasets/dlib/shape_predictor_68_face_landmarks.dat')

def get_landmarks_ratios(frame):

frame = imutils.resize(frame, width=640)
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

# detect faces
faces = detector(gray)

if len(faces) == 0:
return None

# get the largest face
largest_face = None;
for face in faces:
largest_face_area = 0;
if face.area() > largest_face_area:
largest_face = face

shape = predictor(gray, largest_face)

# Extracting the indices of the facial features
(lStart, lEnd) = face_utils.FACIAL_LANDMARKS_IDXS["left_eye"]
(rStart, rEnd) = face_utils.FACIAL_LANDMARKS_IDXS["right_eye"]
(mStart, mEnd) = face_utils.FACIAL_LANDMARKS_IDXS["inner_mouth"]

# Get coordinates for left eye, right eye, and mouth
left_eye = [(shape.part(i).x, shape.part(i).y) for i in range(lStart, lEnd)]
right_eye = [(shape.part(i).x, shape.part(i).y) for i in range(rStart, rEnd)]
mouth = [(shape.part(i).x, shape.part(i).y) for i in range(mStart, mEnd)]

# Compute aspect ratios for the eyes and mouth
def eye_aspect_ratio(eye):
A = dist.euclidean(eye[1], eye[5])
B = dist.euclidean(eye[2], eye[4])
C = dist.euclidean(eye[0], eye[3])
ear = (A + B) / (2.0 * C)
return ear

def mouth_aspect_ratio(mouth):
A = dist.euclidean(mouth[2], mouth[6])
C = dist.euclidean(mouth[0], mouth[4])
mar = A / C
return mar

left_ear = eye_aspect_ratio(left_eye)
right_ear = eye_aspect_ratio(right_eye)
mouth_ar = mouth_aspect_ratio(mouth)

return left_ear, right_ear, mouth_ar

file_paths = glob2.glob(DATASET + '/**/*.jpg')
rows = []
model = joblib.load(MODEL_PATH + '/pimodel.pkl')
for i, file_path in enumerate(file_paths):

img = cv2.imread(file_path)
ratios = get_landmarks_ratios(img)

print('\n')
print(f'image: {file_path}')
if img is not None and ratios is not None:
features = np.array([ratios])
features = features.reshape(1, -1)
predictions = model.predict(features)
print(predictions)
print('Awake' if predictions[0] else 'Drowsy')
else:
print('No faces detected in the image.')

Demo

Here you can find a video we recorded with the set up and the application we developed running, playing a sound when we were getting sleepy.

Conclusion

With additional time, you could refine the prediction model and incorporate more variables. It may be beneficial to utilize a categorization prediction model rather than a binary prediction, as it allows for more flexibility in processing predictions.

Additional info

You can find more information about the project at:

--

--

David Perez Vicens
Saturdays.AI

Software Arquitect specialist interested in app development, DDD, machine and deep learning, SOLID principles and Agile management.