(Abstract) Factories and SOLID for Machine Learning Engineering
Unleashing the Power of Design Patterns (which are often neglected in Machine Learning Engineering). Here is a simple yet comprehensive guide for using a very simple yet powerful design pattern for your ML applications.
Imagine a product or project you are working on that would require object detection(or any model). And if you are into ML space, you’d know how quickly new models are coming, so it’d be an obvious instinct to try all the models in your pipeline. But imagine, each model having a different type of pre-processing code, a different input format, and a different output format. So if you want to use these models as it is, you’d have to change the code for your pipeline which is going to be a headache and not scaleable. What if you quickly want to shift from model A to model B, or occasionally change the model to get user feedback? It will make code clearer, concise, and scaleable and you’ll be able to experiment quickly.
Remember what SOLID says:
Your code should be open to extension but close to modification.
If you implement abstract factories in a case like the above, you can extend your code by adding as many models as you can, but you won't be changing (modifying) your original pipeline.
Example
Keep the above diagram in the head, we will come to it at the end of the article.
Let’s say you are working on an object detection problem. The basic pipeline is
import cv2
cap = cv2.imread("video.mp4")
while True:
ret, frame = cap.read()
if ret:
detections = model.predict(frame) # where model can be any object detector
x_min, y_min, x_max, y_max, score = detections
# ... Do something with these values
But imagine you are using a model that returns the detections that are in the format of x_cen, y_cen, w, h. You’d have to change your whole pipeline, in that case, to get it run. So how can you avoid it?
Step 1: Create a base class for every model to use.
Create an Abstract base class, and each of your different models can inherit that class. So now your inputs and outputs for each function will be constant.
from abc import ABC, abstractmethod
import numpy as np
class Model(ABC):
def __init__(self, config):
self.config = config
@abstractmethod
def load_model(self):
pass
@abstractmethod
def pre_process(self, image, **kwargs):
pass
@abstractmethod
def post_process(self, results, **kwargs):
pass
@abstractmethod
def detect(self, frame) -> np.ndarray:
pass
@abstractmethod
def draw_detections(self, frame, results):
pass
Now if you are working on Yolov8, you can simply inherit a class from our class and implement all the functions.
Step 2: Inherit every model from the base class and implement it
import numpy as np
import ultralytics
from ultralytics import YOLO
import cv2
ultralytics.checks()
class Yolov8(Model):
def __init__(self, config) -> None:
super().__init__(config)
self.model = None
def load_model(self):
self.model = YOLO(self.config["weights_path"])
return self
def pre_process(self, image, **kwargs):
"""
BGR to RGB conversion
"""
return cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
def post_process(self, results, **kwargs):
"""
Results should be converted to Numpy from Torch Tensor
"""
return results[0].cpu().numpy().boxes.xyxy
def detect(self, image) -> np.ndarray:
"""
:param image: image to detect
:return: list of detections
"""
assert isinstance(image, np.ndarray)
image = self.pre_process(image)
results = self.model(image)
results = self.post_process(results)
return results
def draw_detections(self, frame, results):
# draw detections on the frame
for detection in results:
x1, y1, x2, y2 = detection.astype(int)
cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
return frame
Or if you want to implement Yolov7, you can create a class for it. Yolov7 implementation is pretty rough and requires a lot of things. If you are using this in your pipeline, it’d require a lot of changes in your pipeline. But now, all we need to do is “extend” our code.
import torch
import cv2
from detectors.model import Model
from numpy import random
import numpy as np
import sys
from yolov7_utils.models.experimental import attempt_load
from yolov7_utils.utils.general import check_img_size, non_max_suppression, set_logging, scale_coords
from yolov7_utils.utils.torch_utils import select_device, time_synchronized
class Yolov7(Model):
def __init__(self, config):
super().__init__(config)
sys.path.append("detectors/yolov7_utils")
self.colors = None
self.names = None
self.classes = None
self.half = None
self.device = None
self.stride = None
self.img_size = None
self.model = None
def load_model(self):
self.model, self.img_size, self.stride, self.device, self.half, self.classes, self.names, self.colors = self.__initialize_yolov7()
return self
def __initialize_yolov7(self):
""" Initializing YOLOv7 For Logo Predictions """
with torch.no_grad():
weights, img_size = self.config['weights_path'], self.config['img_size']
set_logging()
device = select_device(self.config['device'])
half = device.type != 'cpu'
print("Weights path is:", weights)
model = attempt_load(weights, map_location=device) # load FP32 model
stride = int(model.stride.max()) # model stride
img_size = check_img_size(img_size, s=stride) # check img_size
if half:
model.half()
names = model.module.names if hasattr(model, 'module') else model.names
colors = [[random.randint(0, 255) for _ in range(3)] for _ in names]
if device.type != 'cpu':
model(torch.zeros(1, 3, img_size, img_size).to(device).type_as(next(model.parameters())))
classes = self.config['classes']
return model, img_size, stride, device, half, classes, names, colors
def pre_process(self, image, **kwargs):
img = self.letterbox(image, self.img_size, stride=self.stride)[0]
img3 = img.copy()
img = img[:, :, ::-1].transpose(2, 0, 1) # BGR to RGB, to 3x416x416
img = np.ascontiguousarray(img)
img = torch.from_numpy(img).to(self.device)
img = img.half() if self.half else img.float() # uint8 to fp16/32
img /= 255.0 # 0 - 255 to 0.0 - 1.0
if img.ndimension() == 3:
img = img.unsqueeze(0)
return img, img3
def post_process(self, results, **kwargs):
# read img3 and img from kwargs
img = kwargs['img']
img0 = kwargs['img0']
processed_results = []
print("Actual frame size", self.config["frame_size"])
for result in results:
if result.numel() == 0:
print("Empty Tensor")
else:
print("Result shape", result.shape)
print("Result", result)
result[:, :4] = scale_coords(img.shape[2:], result[:, :4], img0.shape).round()
processed_results.append(result[0].cpu().detach().numpy()[0:4])
return processed_results
def detect(self, frame) -> np.ndarray:
""" Catching Logos From Yolo """
img, img3 = self.pre_process(frame)
# Inference
time_synchronized()
self.model.conf = self.config['conf_thresh']
pred = self.model(img, augment=False)[0]
pred = non_max_suppression(pred, self.config['conf_thresh'], self.config['iou_thresh'], classes=self.classes,
agnostic=False)
detections = self.post_process(pred, img=img, img0=frame)
return detections
def draw_detections(self, frame, results):
""" Drawing Bounding Boxes Around Logos """
for detection in results:
x1, y1, x2, y2 = detection.astype(int)
cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2) # noqa: F821
return frame
def letterbox(self, img, new_shape=(640, 640), color=(114, 114, 114), auto=True, scaleFill=False, scaleup=True,
stride=32):
""" Resize Frames For Better Yolov7 Inference"""
# Resize and pad image while meeting stride-multiple constraints
shape = img.shape[:2] # current shape [height, width]
if isinstance(new_shape, int):
new_shape = (new_shape, new_shape)
# Scale ratio (new / old)
r = min(new_shape[0] / shape[0], new_shape[1] / shape[1])
if not scaleup: # only scale down, do not scale up (for better test mAP)
r = min(r, 1.0)
# Compute padding
ratio = r, r # width, height ratios
new_unpad = int(round(shape[1] * r)), int(round(shape[0] * r))
dw, dh = new_shape[1] - new_unpad[0], new_shape[0] - new_unpad[1] # wh padding
if auto: # minimum rectangle
dw, dh = np.mod(dw, stride), np.mod(dh, stride) # wh padding
elif scaleFill: # stretch
dw, dh = 0.0, 0.0
new_unpad = (new_shape[1], new_shape[0])
ratio = new_shape[1] / shape[1], new_shape[0] / shape[0] # width, height ratios
dw /= 2 # divide padding into 2 sides
dh /= 2
if shape[::-1] != new_unpad: # resize
img = cv2.resize(img, new_unpad, interpolation=cv2.INTER_LINEAR) # noqa: F821
top, bottom = int(round(dh - 0.1)), int(round(dh + 0.1))
left, right = int(round(dw - 0.1)), int(round(dw + 0.1))
img = cv2.copyMakeBorder(img, top, bottom, left, right, cv2.BORDER_CONSTANT, value=color) # noqa: F821
return img, ratio, (dw, dh)
Step 3: Create a Factory class for dynamic import and use it every where
Now we can create a Model Factory which can instantiate the models for us. We will perform dynamic importing of the module based on our config file using importlib
. Now whenever we will load our ModelFactory object, it will in its essence the model which we have specified in the config file. We use getattr
to get the object of the class the name of which is specified in the config as class_name
.
import importlib
import yaml
class ModelFactory:
@staticmethod
def create_model(config_file):
with open(config_file, 'r') as f:
config = yaml.safe_load(f)
model_name = config['running_model']
model_config = config['models'][model_name]
# Load the appropriate module based on the module_name
module = importlib.import_module(f"detectors.{model_name}")
# Instantiate the model class and pass the config to the constructor
model_class = getattr(module, model_config['class_name'])
detector = model_class(model_config)
# Call the load_model method to load the actual model
detector.load_model()
return detector
Step 4: Set the config for tuneable parameters and hyper-parameters
And the config file:
models:
yolov7:
class_name: Yolov7
input_size: 1024
classes:
- classA
- classB
weights_path: /path/to/weights.pt
conf_thresh: 0.3 # confidence threshold for inference.
iou_thresh: 0.5 # NMS IoU threshold for inference.
device: cpu # device to run our model i.e. 0 or 0,1,2,3 or cpu
frame_size:
- 720
- 1280
- 3
yolov8:
class_name: Yolov8
input_size: 640
classes:
- classA
- classB
weights_path: /path/to/weights.pt
running_model: yolov8
And now, if you test it, you’ll get a dynamically created yolov8 object with the same methods as we have implemented. Or simply by replacing the model name in the config, we can get the yolov7 model with the same methods and input-output formats as well.
In [1]: model = ModelFactory()
In [2]: model.create_model('config.yaml')
Out[2]: <detectors.yolov8.Yolov8 at 0x7fc4c4ac6850>
Now let's talk about the diagram that I shared at the start of this article.
If you keep the code that we just did in your mind, the architecture now will make sense. Our Programming logic should not break regardless of the choice of our model. The Abstract Model Factory is responsible for dynamically creating the Model that we want to run for our pipeline. Using the SOLID principles, we ensured that our code was open to extension and that all our input and output formats for each of the Models are the same. Now our pipeline will not break regardless of the choice of our model.
Conclusion
You have learned how to leverage the concept of SOLID principles and use Abstract Model Factories to create a scaleable object detection pipeline for your use case. This is a very simplified version, of course, you can extend it to your use case and needs.