Building a Scalable ML Model with Real-Time Inference Endpoint for Customer Segmentation

Abhishek Chandragiri
7 min readJun 1, 2024

--

In this blog, we’ll walk through the development of a scalable machine learning model designed to handle real-time inference via a REST API. This project focuses on solving the business problem of customer segmentation using unstructured data to create tailored marketing strategies. Our solution includes complex workflows such as data ingestion, data transformation, model training, deployment with Kubernetes, and CI/CD integration using GitHub Actions. By the end, you’ll have a comprehensive understanding of how to implement and scale an ML model for real-time predictions.

Table of Contents

  1. Introduction
  2. Business Problem
  3. Project Workflow Overview
  4. Data Ingestion
  5. Data Transformation
  6. Model Training
  7. Real-Time Inference with FastAPI
  8. Containerization with Docker
  9. Deployment with Kubernetes
  10. CI/CD Pipeline with GitHub Actions
  11. Testing the Model
  12. Conclusion

1. Introduction

The goal of this project is to develop a scalable machine learning model that can handle real-time inference requests. We use a K-Means clustering algorithm for customer segmentation to help businesses create targeted marketing strategies. By integrating FastAPI, Docker, Kubernetes, and GitHub Actions, we ensure that our model is robust, scalable, and easy to deploy.

2. Business Problem

Customer segmentation is crucial for businesses to understand and cater to the needs of different customer groups. By segmenting customers based on their behaviors and characteristics, businesses can tailor their marketing strategies to improve customer engagement and retention. Our project addresses this need by developing a machine learning model that segments customers and provides real-time predictions through a REST API.

3. Project Workflow Overview

The project workflow consists of several interconnected components:

  • Data Ingestion: Loading and preparing the dataset.
  • Data Transformation: Cleaning and transforming the data for model training.
  • Model Training: Training the K-Means clustering algorithm.
  • Real-Time Inference: Implementing a FastAPI application to serve the model.
  • Containerization: Using Docker to containerize the application.
  • Deployment: Deploying the application on a Kubernetes cluster.
  • CI/CD Pipeline: Automating the build and deployment process using GitHub Actions.
  • Testing: Verifying the model using Postman and a Gradio interface hosted on Hugging Face Spaces.

4. Data Ingestion

The data ingestion process involves loading the customer data from a CSV file and saving it in a format suitable for further processing.

import pandas as pd
import os

class DataIngestion:
def __init__(self):
self.data_path = os.path.join('artifacts', 'df.csv')

def start_data_ingestion(self):
df = pd.read_csv('/path/to/Customer Data.csv')
os.makedirs(os.path.dirname(self.data_path), exist_ok=True)
df.to_csv(self.data_path, index=False, header=True)
return self.data_path

5. Data Transformation

Data transformation is essential to clean and prepare the data for model training. This step includes handling missing values, scaling features, and transforming the data.

import pandas as pd
from sklearn.preprocessing import StandardScaler

class DataTransformation:
def __init__(self):
self.processor_path = os.path.join("artifacts", "processor.pkl")

def get_transformed_data(self, df):
df["MINIMUM_PAYMENTS"].fillna(df["MINIMUM_PAYMENTS"].mean(), inplace=True)
df["CREDIT_LIMIT"].fillna(df["CREDIT_LIMIT"].mean(), inplace=True)
df.drop(columns=["CUST_ID"], axis=1, inplace=True)
scaler = StandardScaler()
scaled_df = scaler.fit_transform(df)
return scaled_df, scaler

6. Model Training

The model training step involves using the K-Means clustering algorithm to segment customers based on their behaviors and characteristics. We also determine the optimal number of clusters using the Elbow Method.

import numpy as np
from sklearn.cluster import KMeans
import matplotlib.pyplot as plt

class ModelTrainer:
def __init__(self):
self.model_path = os.path.join("artifacts", "model.pkl")
def find_optimal_clusters(self, data, max_k=10):
wcss = []
for i in range(1, max_k + 1):
kmeans = KMeans(n_clusters=i, random_state=42)
kmeans.fit(data)
wcss.append(kmeans.inertia_)
plt.plot(range(1, max_k + 1), wcss, marker='o')
plt.title('Elbow Method For Optimal k')
plt.savefig(os.path.join("artifacts", "elbow_plot.png"))
optimal_k = np.argmax(np.diff(np.diff(wcss))) + 2
return optimal_k
def train_model(self, data):
optimal_clusters = self.find_optimal_clusters(data)
kmeans = KMeans(n_clusters=optimal_clusters,random_state=42)
kmeans.fit(data)
save_object(kmeans, self.model_path)
return kmeans.labels_

7. Real-Time Inference with FastAPI

We use FastAPI to create a REST API for real-time inference. This allows other systems to send customer data and receive segmentation predictions.

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import pandas as pd
import os


class InputData(BaseModel):
CUST_ID: str
BALANCE: float
BALANCE_FREQUENCY: float
PURCHASES: float
ONEOFF_PURCHASES: float
INSTALLMENTS_PURCHASES: float
CASH_ADVANCE: float
PURCHASES_FREQUENCY: float
ONEOFF_PURCHASES_FREQUENCY: float
PURCHASES_INSTALLMENTS_FREQUENCY: float
CASH_ADVANCE_FREQUENCY: float
CASH_ADVANCE_TRX: int
PURCHASES_TRX: int
CREDIT_LIMIT: float
PAYMENTS: float
MINIMUM_PAYMENTS: float
PRC_FULL_PAYMENT: float
TENURE: int
app = FastAPI()
model = load_object(os.path.join('artifacts', 'model.pkl'))
transformer = load_object(os.path.join('artifacts', 'processor.pkl'))
@app.post("/predict")
def predict(input_data: InputData):
try:
input_df = pd.DataFrame([input_data.dict()])
input_df.drop(columns=["CUST_ID"], axis=1, inplace=True)
data_trans = transformer.transform(input_df)
cluster_label = model.predict(data_trans)
return {"cluster_label": int(cluster_label[0])}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))

8. Containerization with Docker

We use Docker to containerize our FastAPI application, ensuring consistency across different environments.

FROM python:3.9-slim-buster
WORKDIR /app
COPY . /app
RUN pip install --upgrade pip
RUN pip install -r requirements.txt
EXPOSE 8000
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]

9. Deployment with Kubernetes

We deploy our containerized application to a Kubernetes cluster for scalability and high availability.

# deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: fastapi-app
spec:
replicas: 3
selector:
matchLabels:
app: fastapi-app
template:
metadata:
labels:
app: fastapi-app
spec:
containers:
- name: fastapi-container
image: customerseg.azurecr.io/myfastapiapp:latest
ports:
- containerPort: 8000
imagePullSecrets:
- name: acr-secret
# service.yaml
apiVersion: v1
kind: Service
metadata:
name: fastapi-service
spec:
selector:
app: fastapi-app
ports:
- protocol: TCP
port: 80
targetPort: 8000
type: LoadBalancer

10. CI/CD Pipeline with GitHub Actions

We automate the build and deployment process using GitHub Actions, ensuring that changes are automatically tested and deployed.

name: CI/CD Pipeline

on:
push:
branches:
- main

jobs:
build:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v2
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v2
with:
install: true
driver: docker-container
use: true
- name: Log in to Azure
uses: azure/login@v1
with:
creds: ${{ secrets.AZURE_CREDENTIALS }}
- name: Build and push Docker image
run: |
docker buildx build --platform linux/amd64,linux/arm64 -t ${{ secrets.AZURE_ACR_LOGIN_SERVER }}/myfastapiapp:latest --push .

deploy:
runs-on: ubuntu-latest
needs: build
steps:
- name: Azure Login
uses: azure/login@v1
with:
creds: ${{ secrets.AZURE_CREDENTIALS }}
- name: Deploy to AKS
run: |
kubectl set image deployment/fastapi-app fastapi-container=${{ secrets.AZURE_ACR_LOGIN_SERVER }}/myfastapiapp:latest
kubectl rollout status deployment/fastapi-app

11. Testing the Model

Testing is a critical step to ensure the reliability and accuracy of our model. We performed two types of testing: using Postman for API endpoint testing and developing a Gradio application hosted on Hugging Face Spaces for user-friendly testing.

Postman Testing

We used Postman to test the REST API endpoint provided by our FastAPI application deployed on Kubernetes. By sending various requests with different customer data, we ensured that the API returned the expected cluster labels accurately and consistently.

Gradio Application

To make testing more accessible, we developed a Gradio application that interacts with our FastAPI endpoint. The Gradio interface allows users to input customer data through a web form and receive the predicted customer segment.

Here’s a brief overview of the Gradio testing setup:

import gradio as gr
import requests

API_URL = "http://4.186.39.227/predict"

def predict_customer_segment(data):
response = requests.post(API_URL, json=data)
if response.status_code == 200:
cluster_label = response.json()["cluster_label"]
cluster_descriptions = {
0: "Low Value Customer",
1: "Medium Value Customer",
2: "High Value Customer",
3: "Premium Customer"
}
return cluster_descriptions.get(cluster_label, "Unknown Cluster")
else:
return f"Error: {response.json()['detail']}"

def predict_from_form(CUST_ID, BALANCE, BALANCE_FREQUENCY, PURCHASES, ONEOFF_PURCHASES, INSTALLMENTS_PURCHASES, CASH_ADVANCE, PURCHASES_FREQUENCY, ONEOFF_PURCHASES_FREQUENCY, PURCHASES_INSTALLMENTS_FREQUENCY, CASH_ADVANCE_FREQUENCY, CASH_ADVANCE_TRX, PURCHASES_TRX, CREDIT_LIMIT, PAYMENTS, MINIMUM_PAYMENTS, PRC_FULL_PAYMENT, TENURE):
data = {
"CUST_ID": CUST_ID,
"BALANCE": BALANCE,
"BALANCE_FREQUENCY": BALANCE_FREQUENCY,
"PURCHASES": PURCHASES,
"ONEOFF_PURCHASES": ONEOFF_PURCHASES,
"INSTALLMENTS_PURCHASES": INSTALLMENTS_PURCHASES,
"CASH_ADVANCE": CASH_ADVANCE,
"PURCHASES_FREQUENCY": PURCHASES_FREQUENCY,
"ONEOFF_PURCHASES_FREQUENCY": ONEOFF_PURCHASES_FREQUENCY,
"PURCHASES_INSTALLMENTS_FREQUENCY": PURCHASES_INSTALLMENTS_FREQUENCY,
"CASH_ADVANCE_FREQUENCY": CASH_ADVANCE_FREQUENCY,
"CASH_ADVANCE_TRX": CASH_ADVANCE_TRX,
"PURCHASES_TRX": PURCHASES_TRX,
"CREDIT_LIMIT": CREDIT_LIMIT,
"PAYMENTS": PAYMENTS,
"MINIMUM_PAYMENTS": MINIMUM_PAYMENTS,
"PRC_FULL_PAYMENT": PRC_FULL_PAYMENT,
"TENURE": TENURE
}
return predict_customer_segment(data)

with gr.Blocks() as demo:
gr.Markdown("# Customer Segmentation Model")
gr.Markdown("### Predict the customer segment based on the provided details")

with gr.Row():
CUST_ID = gr.Textbox(label="Customer ID", lines=1, max_lines=1)
BALANCE = gr.Number(label="Balance")
BALANCE_FREQUENCY = gr.Number(label="Balance Frequency")
PURCHASES = gr.Number(label="Purchases")
ONEOFF_PURCHASES = gr.Number(label="One-off Purchases")
INSTALLMENTS_PURCHASES = gr.Number(label="Installments Purchases")
CASH_ADVANCE = gr.Number(label="Cash Advance")
PURCHASES_FREQUENCY = gr.Number(label="Purchases Frequency")
ONEOFF_PURCHASES_FREQUENCY = gr.Number(label="One-off Purchases Frequency")
PURCHASES_INSTALLMENTS_FREQUENCY = gr.Number(label="Purchases Installments Frequency")
CASH_ADVANCE_FREQUENCY = gr.Number(label="Cash Advance Frequency")
CASH_ADVANCE_TRX = gr.Number(label="Cash Advance Transactions")
PURCHASES_TRX = gr.Number(label="Purchases Transactions")
CREDIT_LIMIT = gr.Number(label="Credit Limit")
PAYMENTS = gr.Number(label="Payments")
MINIMUM_PAYMENTS = gr.Number(label="Minimum Payments")
PRC_FULL_PAYMENT = gr.Number(label="Percent Full Payment")
TENURE = gr.Number(label="Tenure")

predict_button = gr.Button("Predict", variant="primary")
prediction_output = gr.Textbox(label="Predicted Customer Segment", lines=1, max_lines=1)

predict_button.click(predict_from_form, inputs=[
CUST_ID, BALANCE, BALANCE_FREQUENCY, PURCHASES, ONEOFF_PURCHASES, INSTALLMENTS_PURCHASES,
CASH_ADVANCE, PURCHASES_FREQUENCY, ONEOFF_PURCHASES_FREQUENCY, PURCHASES_INSTALLMENTS_FREQUENCY,
CASH_ADVANCE_FREQUENCY, CASH_ADVANCE_TRX, PURCHASES_TRX, CREDIT_LIMIT, PAYMENTS,
MINIMUM_PAYMENTS, PRC_FULL_PAYMENT, TENURE
], outputs=prediction_output)

demo.launch()

We hosted the Gradio application on Hugging Face Spaces, providing an intuitive web interface for users to test our model.

Link: https://huggingface.co/spaces/Abhishek0323/Customer-Segmentation

12. Conclusion

In this blog, we detailed the end-to-end process of building a scalable machine learning model for customer segmentation with real-time inference capabilities. From data ingestion and transformation to model training and deployment, each step was meticulously crafted to ensure scalability and robustness. By leveraging modern tools like FastAPI, Docker, Kubernetes, and GitHub Actions, we created a high-level solution that can be easily integrated into any system for real-time predictions, helping businesses tailor their marketing strategies effectively.

We also emphasized the importance of testing, using Postman for API endpoint testing and developing a Gradio application hosted on Hugging Face Spaces for user-friendly testing. By incorporating these testing methods, we ensured the reliability and accuracy of our model.

Github Repository: https://github.com/Abhi0323/Real-Time-Customer-Segmentation-with-Scalable-Kubernetes-Deployment-and-CI-CD-Integration

--

--

Abhishek Chandragiri

Meet Abhishek Chandragiri: Expert Data Scientist & AI Enthusiast | Master’s from University of Houston