CDC and SCD Approach to Enhancing Expedition Operations

Arkan Nibrastama
Data Engineering Indonesia
7 min readOct 7, 2023

Business Case

Imagine an expedition business that offers shipping or package delivery. To stay competitive with the competitors, the company should optimize its business operations, such as increasing warehouse capacity in certain areas, relocating employees to busy offices or warehouses, etc.

The core of optimizing business operations lies in adopting Change Data Capture (CDC), which allows companies to track the time span of each process in the business process by using the concept of Slowly Changing Dimension (SCD) type 2 in the data warehouse so that companies can find out which offices or warehouses need to be optimized.

Build all docker container

To build the Change Data Capture (CDC) pipeline, we need to set up several components, such as :

  • Database (that has active WAL setting)
  • Debezium (CDC connection tool)
  • Kafka (to stream change data)
# docker-compose.yaml
# .............................................................................
# make a container of the required tools
version: "3.7"
services:
mysql:
image: quay.io/debezium/example-mysql:2.3
container_name: mysql
ports:
- 3306:3306
environment:
- MYSQL_ROOT_PASSWORD=123
- MYSQL_USER=mysqluser
- MYSQL_PASSWORD=user123

zookeeper:
image: quay.io/debezium/zookeeper:2.3
container_name: zookeeper
ports:
- 2181:2181

kafka:
image: quay.io/debezium/kafka:2.3
container_name: kafka
ports:
- 29092:29092
environment:
ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENERS: INTERNAL://:9092,EXTERNAL://:29092
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:9092,EXTERNAL://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
depends_on:
- zookeeper

debezium:
image: quay.io/debezium/connect:2.3
container_name: debezium
ports:
- 8083:8083

environment:
BOOTSTRAP_SERVERS: kafka:9092
GROUP_ID: 1
CONFIG_STORAGE_TOPIC: my_connect_configs
OFFSET_STORAGE_TOPIC: my_connect_offsets
STATUS_STORAGE_TOPIC: my_connect_statuses

depends_on:
- kafka
- mysql

We will build a MySQL database inside a docker container. You can use another database system or a native MySQL database, but make sure to enable the WAL (Write Ahead Log) setting.

Once you have read and understood the docker-compose.yaml file, you can build all the components by typing the command :

docker-compose up -d

You can check the docker desktop to make sure all the components are running successfully

Build the API

Instead of making an entire expedition system, we are using an API for its replacement. The API job is to input expedition data from Frontliner into the database.

We are using Python with the FastAPI library to make the API. First of all, build the connection to MySQL, specifically to the expedition database by creating the engine using sqlalchemy library.

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base

URL_DB = 'mysql+pymysql://root:123@localhost:3306/expedition'

engine = create_engine(url=URL_DB)

SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

Base = declarative_base()

After that, make an order data model that contains order_id, customer_id, product_category_id, order_date, origin_office, destination_office, and order_status columns.

from sqlalchemy import Column, Integer, String, Date
from database import Base

class Order(Base):

__tablename__ = 'order'

order_id = Column(Integer, primary_key=True, index=True, autoincrement=True)
customer_id = Column(Integer)
product_category_id = Column(Integer)
order_date = Column(Date)
origin_office = Column(String(30))
destination_office = Column(String(30))
order_status = Column(String(25))

And the last one is to create the endpoint of the API. We create two endpoints, one is for creating the new expedition transaction or order, and the second one is to update the status of the expedition order.

from fastapi import FastAPI, HTTPException, Depends, status
from pydantic import BaseModel
from typing import Annotated
import models
from database import engine, SessionLocal
from sqlalchemy.orm import Session

api = FastAPI()
models.Base.metadata.create_all(bind=engine)

# data validation (input schema)
class OrderBase(BaseModel):
# don't input the order_id becuse it's auto increment
customer_id: int
product_category_id: int
order_date: str
origin_office: str
destination_office: str
order_status: str

class UpdateStatusBase(BaseModel):
order_status: str

# database dependencies
def get_db():
db = SessionLocal()
try :
yield db
finally :
db.close()

db_dependency = Annotated[Session, Depends(get_db)]

# endpoint
# add a new order transaction
@api.post("/order/", status_code=status.HTTP_201_CREATED)
async def create_order(order:OrderBase, db:db_dependency):
input_order = models.Order(**order.dict())
db.add(input_order)
db.commit()

# update partial (just the status)
@api.patch("/status/{order_id}", status_code=status.HTTP_200_OK)
async def update_status(order_id:int, status:UpdateStatusBase, db:db_dependency):
old_status = db.query(models.Order).filter(models.Order.order_id == order_id).first()
for var, value in vars(status).items():
setattr(old_status, var, value) if value else None
db.add(old_status)
db.commit()

Make sure to create the expedition database on MySQL before running the API

CREATE DATABASE expedition;

Once it’s done, you can run the API by typing the command

uvicorn orderAPI:api --reload

Create Debezium Connector

To make a Debezium connector, you have to configure the connector with JSON file

{
"name": "expedition-order-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "root",
"database.password": "123",
"database.server.id": "184054",
"topic.prefix": "source",
"database.include.list": "expedition",
"schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
"schema.history.internal.kafka.topic": "schemahistory.expedition",
"transforms" : "unwrap",
"transforms.unwrap.type" : "io.debezium.transforms.ExtractNewRecordState",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter": "org.apache.kafka.connect.json.JsonConverter"
}
}

name: Debezium connector name
tasks.max: Maximum tasks the connector performs at one time
database.hostname: The hostname of the database. Since we are using a docker container, we can use the docker container name for the database hostname
database.port: The port used to run the database
database.user: Database username to log into the database
database.password: Database password to log into the database
database.server.id: Unique id for Kafka topics to identify the database
topics.prefix : Unique prefix for Kafka topics
database.include.list: List of databases to be detected for changes
schema.history.internal.kafka.{…}: The connector will store the database schema history in Kafka using this broker and topic name.

The rest of the configuration is optional, its function is only to simplify the output of the Debezium Connector and convert it to JSON data type. Next, to establish the connection, we have to post the configuration to the Debezium Connector API, and we can use Postman to do this job.

Once the connector is created, you can run topics.py to check the available topics

from kafka import KafkaConsumer
bootstrap_servers = ['localhost:29092']
consumer = KafkaConsumer( bootstrap_servers=bootstrap_servers)
print(consumer.topics())

There will be a topic named source.expedition.order. If the topic does not appear, you can run the API make a new request, and then try to rerun the code and see if the topic is available. If it is available, run the Kafka consumer code to check if the data change was detected successfully.

import json
from kafka import KafkaConsumer

topicName = 'source.expedition.order'
bootstrap_servers = ['localhost:29092']
consumer = KafkaConsumer (
topicName ,
auto_offset_reset='earliest',
bootstrap_servers = bootstrap_servers,
group_id='order-transactions'
)
for m in consumer:
print(json.loads(m.value))

Send Data to BigQuery

After data changes are detected in the Kafka consumer, we can send the data to the data warehouse for further analysis. In the data warehouse, we will implement the concept of Slowly Changing Dimension (SCD) type 2 to analyze the time span of each business process (order_status).

from google.cloud import bigquery
import os

os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = './credentials.json'

class BigQuery():

def __init__(self, table_name) -> None:
self.client = bigquery.Client()

# create table if not exist
query = f'''
CREATE TABLE IF NOT EXISTS {table_name} (
order_id INTEGER,
customer_id INTEGER,
product_category_id INTEGER,
origin_office STRING(30),
destination_office STRING(30),
order_status STRING(25),
start_date TIMESTAMP,
end_date TIMESTAMP,
)
'''
self.client.query(query=query).result()


def isAvailable(self, order_id:int, table_name:str) -> bool:

query = f'''
SELECT order_id
FROM `{table_name}`
WHERE order_id = {order_id}
LIMIT 1;
'''
query_job = self.client.query(query=query)
result = query_job.result()
total_row = result.total_rows

if total_row == 0:
return False
else:
return True

def updateLastRecord(self, order_id:int, date_now:str, table_name:str) -> None:

query = f'''
UPDATE `{table_name}`
SET end_date='{date_now}'
WHERE order_id={order_id}
AND end_date IS NULL;
'''

query_job = self.client.query(query=query)
query_job.result()
print("data has been updated!")

def insertData(self, data:list, table_name:str) -> None:

query = f'''
INSERT INTO `{table_name}` VALUES (
{data['order_id']},
{data['customer_id']},
{data['product_category_id']},
'{data['origin_office']}',
'{data['destination_office']}',
'{data['order_status']}',
'{data['date_now']}',
NULL
);
'''
query_job = self.client.query(query=query)
query_job.result()
print("data has been loaded!")
import json
from kafka import KafkaConsumer
from sendToBQ import BigQuery
from datetime import datetime

# initialize BQ connection
bq_table_name = 'expedition.order'
bq = BigQuery(table_name=bq_table_name)

# initialize consumer
topicName = 'source.expedition.order'
bootstrap_servers = ['localhost:29092']
consumer = KafkaConsumer (
topicName ,
auto_offset_reset='earliest',
bootstrap_servers = bootstrap_servers,
group_id='order-transactions'
)

# read the topic message (data change)
print('reading the messages ...')
for m in consumer:

# print(json.loads(m.value))
if len(m) > 0:

print('data enter...')
data_changes = json.loads(m.value)
utc_now = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")

# check if the data is available
isAvailable = bq.isAvailable(data_changes['order_id'], bq_table_name)
if isAvailable:
# update the latest data
bq.updateLastRecord(data_changes['order_id'], utc_now, bq_table_name)

# insert the new data
data = {
'order_id': data_changes['order_id'],
'customer_id': data_changes['customer_id'],
'product_category_id': data_changes['product_category_id'],
'origin_office' : data_changes['origin_office'],
'destination_office' : data_changes['destination_office'],
'order_status': data_changes['order_status'],
'date_now': utc_now
}
bq.insertData(data, bq_table_name)

print('send to BQ!')

The code detects whether there is a change in the data, if there is then the code will detect whether the data is new data or old data (meaning the order_status of the data is updated). If the data is new data, then we can directly load the data into BigQuery, but if the data is old data, then we must update the end_date of the latest data first, and then load it into BigQuery.

NOTE
This is not the best practice, you can use Pub/Sub sink connectors and then process the data with a streaming processing tool like Google Dataflow and then load it into BigQuery.

Conclusion

In summary, our study case explores the application of data engineering concepts, such as Change Data Capture and Slowly Changing Dimension Type 2, to optimize expedition operations. By adopting these techniques, expedition businesses can streamline their processes, make informed decisions, and ultimately enhance efficiency.

Additionally, the integration of APIs simplifies system operations, enabling real-time data exchange and automation. Together, these strategies offer a transformative approach to expedition management, ensuring companies are well-prepared to navigate the challenges of the modern business landscape.

If you like my articles, you can follow me to get an update :

Thank You!

--

--