Data Engineering and ML Platform — Part 2 — Codebase

Susmit
zeza_tech
Published in
11 min readFeb 19, 2024

Platform Architecture, Codebase, and Interesting Features

Image Credits: Pradnyesh Mali

Introduction

Welcome back to our comprehensive blog series related to the data engineering and ML platform XBoost which we developed for Xcellen PTE Ltd. This second part centers around the Codebase, building upon the Platform Architecture discussed earlier (click here to read it). After this, we invite you to explore the final part of our series, focusing on some of the Interesting Features that we developed.

In this article, we will discuss some of the libraries being used, the folder structures, system design patterns followed, and the framework-like code setup by us for frontend, backend, and ML codebases. We will also touch upon the dev ops codebase via which the infrastructure is deployed.

Table Of Contents

Feel free to jump around.

Frontend

The frontend is developed in Nextjs with typescript support.

For state management keeping the developer familiarity and the complexity of Redux in mind, we decided to opt for using SWR for data fetching and caching along with Formik for maintaining the form states. For validation, we went ahead with Yup due to the way it handles validation for complex nested objects. Being a data platform, there was a need to efficiently show a significant amount of data to the user, for this we went ahead with AG Grid. For plots, we decided to opt for Plotly as we needed something that could generate plots in both Python and Javascript and supports being loaded as both HTML files or as JSON data. For showing the loader while an API call is in progress we kept it simple with NProgress.
For styling, instead of adopting any framework, we went ahead with hardcode CSS and SCSS. Testing was taken care of by using jest. For one of the features where we had to allow users to create a formula similar to Excel formulae, we went ahead with slate for react.

The folder structure for nested components is fairly straightforward.

[ComponentName]
- index.tsx: Component-specific logic and rendering.
- services.tsx: Component services/hooks.
- utils.ts: Component-specific utility functions.
- [ComponentName].module.scss: CSS module file for styling.

[SubComponentName]
- index.tsx: Sub-component-specific logic and rendering.
- services.tsx: Sub-component services/hooks.
- utils.ts: Sub-component-specific utility functions.
- [SubComponentName].module.scss: CSS module file for styling.

Styling

For styling, the variables/ directory contains SCSS files that define reusable variables for consistent styling throughout the application.

  • _index.scss — This file forwards individual variable files.
  • _color.scss — Defines color variables.
  • border-radius.scss — Defines border-radius variables.

Here is some sample code,

File: styles/variables/_color.scss

$primary: #5BCED2;

$primary-text: #1B3D3F;

$black: #000;


File: styles/variables/_border-radius.scss

$radius-small: .6rem;
$radius-large: 2.4rem;


File: styles/variables/_index.scss

@forward './color';
@forward './border-radius';

And here is how we use it,

File: styles/global.scss

@use './variables' as *;
@use "./ag-grid";
@use "./react-select";
@use "./mcc-styles";

html,
body {
button {
font-size: $body-text;
border: none;
border-radius: $radius-small;
}
}

To know about some of the best practices we followed in the frontend, check out this blog. https://medium.com/zeza-tech/best-practices-for-your-react-project-258d41a9bbd2

Jump back to the table of contents

Backend

The backend is developed using Python with strict adherence to using typing hints.

We developed the API using the Django rest framework. To communicate with the Postgres database, Django ORM was self-sufficient. Since we had to perform operations on large amounts of data, we decided to use the modin library which is a wrapper over pandas and uses all cores for processing instead of just one (pandas behaviour). To perform fuzzy matching for one of the features, we went ahead with rapidfuzz. For testing, pytest.

We went ahead with the feature-based folder structure as to make it easy to horizontally scale the codebase with the addition of each new feature. Also any new developer assigned to the codebase spends less time in understanding of any particular feature as all the required files are grouped together following the Common Closure Principle.

Each feature has the below mentioned files,

  • views — Contains the API endpoints. All the data I/O (interacting with database and S3) is performed here.
  • view_models — Contains the view models for the API endpoints. View models are nothing but user requests and anything else if required encapsulated in an object and passed forward. Benefit: We know what the keys are, as well as their data types using typing hints. Also if there is any need for logic, it can be added here.
  • services — Contains the business logic. This is where the actual logic of the API is. No data I/O is performed here. This is to ensure that the business logic is not tightly coupled with the data I/O. This also helps in unit testing the business logic. Also, the service code is reused in the recipe run. To perform multiple enrich techniques on datasets one after the other in memory without unnecessary data I/O this has been done.
  • utils — Contains the utility functions, this is where the helper functions are kept. This is to ensure that unit tests can be written and small functions can be tested in isolation and reused if required.

Here is some sample code,

# View
def post(self, request: Request, company_id: int, project_id: int) -> Response:
"""
Calculate Numerical Data

request.data => {
"data_id": str :: The id of the data,
"formula": str :: Formula,
"new_col_name": str :: New column name,
"description" : str :: Description of new column
}
"""
view_model = CalculateNumDataViewModel(**request.data)

data_obj, df = ViewIO.get_data_obj_and_df(data_id=view_model.data_id)

updated_data_obj, df = self.service.calculate_num_data(
view_model=view_model, data_obj=data_obj, df=df
)
updated_data_serializer = ViewIO.update_data_df_as_enriched(
data_obj=updated_data_obj, df=df
)

return Response(
{"status": "success", "data": updated_data_serializer.data},
status=status.HTTP_200_OK,
)
# View Model
class CalculateNumDataViewModel:
data_id: int
formula: str
new_col_name: str
description: str
new_display_name: str

def __init__(
self,
data_id: int,
formula: str,
new_col_name: str,
description: str,
*args,
**kwargs
):
self.data_id = data_id
self.formula = formula
self.new_col_name = core_utils.clean_column_name(col_name=new_col_name)
self.description = description
self.new_display_name = new_col_name

def accept(self, visitor: ViewModelVisitor):
return visitor.visit_calculation_num_view_model(self)
# Formation step
@dataclass
class CalculationNumFormationStep(SingleNewColFormationStep):
formula: str

def accept(self, visitor: FormationStepVisitor, *args, **kwargs):
return visitor.visit_calculation_num_formation_step(self, *args, **kwargs)

As shown in the view model and formation step code, we used the Visitor system design pattern for a part of our codebase to avoid a ladder of if — else if for deciding the function to invoke and to keep code related to different techniques that perform the same type of operations together. Here is a good article explaining the pattern, https://refactoring.guru/design-patterns/visitor
We have heavily used dataclasses which is an inbuilt library, for instances where you don’t need powerful validation of the data model you can use it over pydantic.

Jump back to the table of contents

ML

ML worker codebase is developed in Python. We didn’t opt for any standard data science projects folder structure for example the famous cookie cutter but instead decided to structure our project based on the requirements.

Apart from some of the standard libraries such as pandas, scikit-learn, plotly, joblib for storing fitted transformers and models, category-encoders that includes a variety of encoders, kneed for finding the optimal k in clustering, we used kmodes for using KPrototypes for clustering. KPrototypes fits a KMeans model for numerical data and a KModes model for categorical data. This is much more proper than auto-encoding categorical columns without any domain knowledge. shap library was used to compute shapely values which are at the heart of explainable AI.

The folder structure has been pretty much stabilized.

src/
config - Folder for configuration files
external - Folder for queues and task signals
ml/
- executors/ One folder for each executor task
- imputer - Folder for imputer tasks
- encoder - Folder for encoder tasks
- file handlers - For IO, more on on this mentioned below
- repo/
- transformers/ Contains the sklearn based custom transformers
- encoders/
- processors/ (more on this later in the blog)
- time encoder processor
- time encoder
- scalers
- models
ml inf - dataclasses that serve as the exposed interface for ML results

One of the many places where we used the concept of Abstraction and the Bridge design pattern was when we implemented queues. We defined an abstract Queue class and its two implementations SQSQueue for use in the cloud and MockQueue for local development. MockQueue helped us immensely in local development. We just had to prepare JSON files in the same way as the message that is delivered by AWS SQS and that meant the ML codebase could be developed locally as a standalone without backend, frontend running on local, not even Postman! In case you are wondering, the communication with the backend also uses the TaskSignal abstract class.

# base classes file
from abc import ABC, abstractmethod
from models.job_models import JobMessage

class Queue(ABC):
"""Represents a queue which will contain messsages"""

@abstractmethod
def poll_for_mssg(self) -> JobMessage:
"""Polls a queue for messages"""

@abstractmethod
def stop_poll(self):
"""Stops the polling for messages"""


# mock queue file
from models.job_models import JobMessage
from .base_classes import Queue

class MockQueue(Queue):
"""Represents a mock queue which will contain messsages"""

def poll_for_mssg(self) -> JobMessage:
"""Polls a queue for messages"""
body = json.load(fp=open("mock_pipelines/class_titanic_train.json", "r"))
queue_message = JobMessage(
mssg_id="mock_mssg_id", receipt_handle="mock_mssg_handler", body=body
)
return queue_message

def stop_poll(self):
"""Stops the polling for messages"""

To handle different configuration attributes seamlessly we used a simple env variable to dynamically load the proper class which gets inherited by the Config class accessed all across the system.

if os.environ.get("LOCAL", "false") == "true":
print("Running in local mode")
from .local_config import Config as Conf
else:
print("Running in production mode")
from .prod_config import Config as Conf

class Config(Conf):
AWS_ACCESS_KEY_ID = os.environ.get("AWS_ACCESS_KEY_ID", "")
AWS_SECRET_ACCESS_KEY = os.environ.get("AWS_SECRET_ACCESS_KEY", "")
AWS_REGION = os.environ.get("AWS_REGION", "ap-south-1")

To handle the I/O of data, pipelining it from one task to another along with saving in S3, we used 3 file handlers. One for storing in S3, one for storing on a local disk, and one for storing in memory. All sharing the same interface instantiated under one file handler accessed by the usage code. Here is the code of the MemoryStore we developed, used by in-memory file handler.

from typing import Any

class MemoryStore:
"""
A static class that handles the data in memory.

Attributes:
objs (dict): A dictionary that holds the objects.

Methods:
get_obj(key): Returns the object with the given key.
set_obj(key, obj): Sets the object with the given key.
has_obj(key): Returns True if the object with the given key exists.
remove_obj(key): Deletes the object with the given key.
"""

objs: dict[str, Any] = {}

@staticmethod
def store_obj(obj: Any, key: str) -> None:
MemoryStore.objs[key] = obj

@staticmethod
def get_obj(key: str) -> Any:
return MemoryStore.objs[key]

@staticmethod
def has_obj(key: str) -> bool:
return key in MemoryStore.objs

@staticmethod
def remove_obj(key: str) -> None:
del MemoryStore.objs[key]

For the transformers, we took inspiration from Composite design pattern and tweaked it to suit our purpose. We kept two concepts, one the primary transformer and one it’s processor.

  • The primary transformer is accessed by the usage code and instantiates one processor for each column. For example, if SimpleImputer is to be used, then the transformer instantiates one SimpleImputer object for each column.
  • In this case, the processor code is not written, however, for custom transformers, it is written.
  • This allows us to use the interface maintained by us and not directly expose transformers from scikit-learn and category-encoders to the rest of the codebase, ensuring that later it is easy to utilize some other library as well.

Here is the code of a time encoder as an example.

class TimeEncoderProcessor(BaseEstimator, TransformerMixin, InversibleInplaceProcessor):
"""
Encodes times by converting to military time (integer)
and scaling with max time (2359)
Encodes missing values by -1
"""

def fit(self, x: pd.DataFrame, y: Optional[pd.DataFrame] = None):
return self

def transform(self, x: pd.DataFrame) -> pd.DataFrame:
return x.applymap(
lambda x: (x.hour * 100 + x.minute) / 2359 if not pd.isnull(x) else -1
)

def inverse_transform(self, x: pd.DataFrame) -> pd.DataFrame:
return x.applymap(
lambda x: time(hour=int(x * 2359 / 100), minute=int(x * 2359 % 100))
if x != -1
else None
)

class TimeEncoder(
PostTransformScaledNumericColDTypeInplaceTransformer, InversibleInplaceTransformer
):
"""
Encodes times using TE

Attributes:
processors: Mapping of column name to transformer
cols_dtypes: The columns data types
cols_to_fit_on: The columns to fit on
"""

processors: dict[str, TE]

def get_processor(self) -> TE:
"""
Returns the processor
"""

return TE()

@property
def dtypes_to_fit_on(self) -> list[ColDType]:
"""
Returns the data types to fit on
"""

return [ColDType.TIME]

We used the Adapter design pattern to set up the code to transform one task’s output to match the next task’s input. We defined an adapter class for each pair of input-output and then set up the mapping that returns the proper adapter.

# Output of imputer task
@dataclass
class ClsImputerTaskOP(ImputerTaskOP, IClsImputerTaskOP):
train_data_path: str
test_data_path: str

# Input of encoder task
@dataclass
class ClsEncoderTaskIP(EncoderTaskIP, IClsEncoderTaskIP):
train_data_path: str = field(init=False)
test_data_path: str = field(init=False)

# Common adapter for classification tasks
class ClsTaskAdapter(TaskAdapter):
"""
Task Adapter for tasks in classification pipeline
"""

def __call__(
self,
prev_task_output_params: ClsTaskInputParams,
task_to_execute_input_params: ClsTaskOutputParams,
) -> TaskInputParams:
task_to_execute_input_params.train_data_path = (
prev_task_output_params.train_data_path
)
task_to_execute_input_params.test_data_path = (
prev_task_output_params.test_data_path
)
task_to_execute_input_params.cols_dtypes = prev_task_output_params.cols_dtypes
return task_to_execute_input_params

# The adapter
class ClsImputerOPToClsEncoderIP(ClsTaskAdapter):
pass

adapters_mapping: dict[
Tuple[TaskOutputParams, TaskInputParams],
TaskAdapter,
]
adapters_mapping = {
(
ClsImputerTaskOP,
ClsEncoderTaskIP,
): ClsImputerOPToClsEncoderIP()
}

# Stitching it together
def adapt_task_output_to_task_input(
prev_task_output_params: TaskOutputParams,
task_to_execute_input_params: TaskInputParams,
) -> TaskInputParams:
"""
Adapts the output of a task to the input of another task
Args:
prev_task_output_params: Output of the previous task
task_to_execute_input_params: Input of the task to be executed

Returns:
TaskInputParams: Input of the task to be executed
"""

adapter: Optional[TaskAdapter] = adapters_mapping.get(
(type(prev_task_output_params), type(task_to_execute_input_params))
)
if not adapter:
return task_to_execute_input_params

adapted_input_params = adapter(
prev_task_output_params=prev_task_output_params,
task_to_execute_input_params=task_to_execute_input_params,
)
return adapted_input_params

For the models, we had two different abstract classes. One is the actual model and one is its parameters. For this, we utilised the Abstract Factory design pattern as the model object and its parameters object are related to each other. Here is an example of the same

class ModelFactory(ABC):
@abstractmethod
def createModel(self, params: ModelParams, cols_dtypes: ColsDTypes) -> Model:
"""Create a model"""

@abstractmethod
def createModelParams(self, **kwargs) -> ModelParams:
"""Create a model params"""

def get_model_factory(ml_model: MLModel) -> ModelFactory:
"""Get a model factory"""
match ml_model:
case MLModel.KPrototype:
return KPrototypesModelFactory()

case MLModel.XGBNNEnsembleClassifier:
return XGBNNEnsembleClassifierModelFactory()

case MLModel.ElasticNetReg:
return ElasticNetRegModelFactory()

class KPrototypesModelFactory(ModelFactory):
def createModel(
self, params: KPrototypesModelParams, cols_dtypes: ColsDTypes
) -> KPrototypes:
"""Create a model"""
return KPrototypes(
params=params,
cols_dtypes=cols_dtypes,
)

def createModelParams(
self, num_clusters: Optional[int], **kwargs
) -> KPrototypesModelParams:
"""Create a model params"""
return KPrototypesModelParams(n_clusters=num_clusters)

# Usage code
model_factory: ModelFactory = get_model_factory(ml_model=self.ml_model)
model_params = model_factory.createModelParams(**self.ip_params.__dict__)
model = model_factory.createModel(
params=model_params, cols_dtypes=self.cols_dtypes
)

Jump back to the table of contents

DevOps

We manage our infrastructure as code using Terraform. To manage different environments and for better code readability we use Terragrunt over it. The folder structure as mentioned below consists of environment-independent modules and one folder for each environment which uses the modules.

live/
- dev
- vpc
- main.tf
- backend.tf
- outputs.tf
- vars.tf
- terragrunt.hcl
- demo
- prod
modules/
- vpc
- main.tf
- outputs.tf
- vars.tf
  • main.tf — The primary file where resources are defined for Terraform.
  • outputs.tf — Defines the outputs after the Terraform apply command is completed.
  • vars.tf — Contains variable definitions for Terraform to customize deployments.
  • backend.tf — Specifies the remote state configuration for Terraform. This file is auto-generated by Terragrunt.
  • terragrunt-hcl — Provides additional configurations for Terragrunt to manage Terraform code.

Here is some part of the code which is used to manage the VPC.

File: modules/vpc/main.tf

resource "aws_vpc" "vpc" {
enable_dns_hostnames = true
enable_dns_support = true
# VPC CIDR Block
cidr_block = var.cidr_block
# Shared Tenancy
instance_tenancy = var.vpc_tenancy
# VPC Tags
tags = {
Name = var.vpc_name
env = var.env
}
}

resource "aws_internet_gateway" "igw" {
# VPC ID
vpc_id = aws_vpc.vpc.id
# Tags
tags = {
Name = var.igw_name
env = var.env
}
}


File: modules/vpc/outputs.tf

output "vpc_id" {
description = "The ID of the VPC"
value = aws_vpc.vpc.id
}


File: modules/vpc/vars.tf

variable "vpc_name" {
description = "Name of the VPC"
type = string
}

variable "igw_name" {
type = string
description = "Internet Gateway Name Tag"
}

variable "env" {
description = "Environment, e.g. 'dev', 'demo', 'prod' etc"
type = string
}

The above module is used by each environment as shown in the example given below.

File: live/dev/vpc/main.tf

provider "aws" {
region = "ap-south-1"
}

module "xboost_vpc" {
source = "../../../modules/vpc"

env = "dev"
vpc_name = "XBoost VPC"
igw_name = "XBoost IGW"
}

File: live/dev/vpc/outputs.tf

output "vpc_id" {
description = "The ID of the VPC"
value = module.xboost_vpc.vpc_id
}

Jump back to the table of contents

Conclusion

As we conclude our deep dive into the Codebase, we hope the insights shared here will enrich your understanding of effective coding practices adapted for data engineering and ML. The series culminates in its final part, where we explore some of the interesting features that we incorporated into our platform (click here to read it).

You can reach out to me on Linkedin susmit-vengurlekar or shoot me an email at susmit.py@gmail.com. You can see what I am up to on Github susmit.py

Interested in data engineering and ML? You’re in the right place! Keep an eye out for open roles at Zeza Tech.

--

--

Susmit
zeza_tech

Data Scientist and Full Stack Software Developer. Certified Neo4j Professional.