Realtime Data Drift Detection

Ok, maybe not “real-time” but mini online batch-based data drift detection.

Varun Mallya
DKatalis
4 min readJul 27, 2022

--

In the last article, we specified how to build a data drift component which could then be a part of the Kubeflow pipeline. But, what if your model is deployed as an API and you wish to perform a data drift detection alongside your inference? This is possible by using Evidently too!

Basically, for this use case, we would still need a reference dataset (the data the model was trained on). But, for the inference dataset, we will store samples of data we see during inference.

For example, let’s say we want a sample of size 100 to compare against the reference dataset. When the app starts, we will store 100 samples which may correspond to 100 API calls to a predict endpoint. We would compare it to the reference dataset and check for data drift only once we have 100 samples. From there, we would take the most recent 100 samples by dropping the least recent sample from our set for every subsequent API call.

Data drift is computed using this latest sample for every API call. The data drift metrics are exposed at the desired endpoint which can then be scraped by Prometheus and displayed via Grafana.

An example should make this easier to understand. The code for the same can be found in this repo.

We have a simple Fast API app which performs prediction. An endpoint for such an app can look like something shown below 👇🏽

@app.post("/api/v1/predict")
async def predict_v1(data: PersonData):
features = pd.DataFrame(data.dict(), index=[0])
response = model.predict(features)
return response

PersonData holds the features which are then fed to the model to predict and the model response is returned. For identifying if this data has drifted from the data we saw in training, we have to set up drift monitoring.

To do this, first, we write a simple config file, mainly to initialise the drift-monitoring service:

column_mapping: 
categorical_features:
- job_title
- industry
- occupation
numerical_features:
- age
data_format:
header: true
separator: ','
service:
calculation_period_sec: 10
monitors:
- data_drift
moving_reference: false
reference_path: reference_df.csv
use_reference: true
window_size: 30

This config file has column mappings which tell us which features are numerical and which are categorical. We set up the path of the reference_df this is the path of the training dataset. We also setup a minimum window size and calculation_period_sec.

Minimum window size gives us the size of the inference dataset which will be used to compute drift detection against reference data, whilecalculation_period_sec is the recurring time interval for data drift computation (i.e. compute drift detection every 10 seconds).

Once we have set up this config file, we need to initialise the data drift monitoring in the Fast API app by adding the below snippet of code:

from realtime_data_drift.realtime_data_drift.data_drift import (getDriftMonitoringService,MonitoringService)
from starlette_exporter import PrometheusMiddleware, handle_metrics
app.add_middleware(
PrometheusMiddleware,
app_name="sample_fast_api",
prefix="sample_fast_api",
)
app.add_route("/metrics", handle_metrics)
SERVICE: Optional[MonitoringService] = None
@app.on_event("startup")
async def startup_event():
global SERVICE
config_file_name = "data-drift-config.yaml"
if not os.path.exists(config_file_name):
exit("Cannot find config file for the metrics service. Try to check README.md for setup instructions.")
with open(config_file_name, "rb") as config_file:
config = yaml.safe_load(config_file)
SERVICE = getDriftMonitoringService(config)

We are going to set up Prometheus middleware so we can expose our metrics at /metrics endpoint. This can be achieved by using starlette_exporter. Once we have that, we initialise the monitoring service in the application startup. A good way to reuse the data drift monitoring across all API deployments would be to package it and store it in private PyPI. This package can then be imported across all models deployed via Fast API.

The last step is to use this service in the predict endpoint. So the endpoint we defined earlier would look something like this:

@app.post("/api/v1/predict")
async def predict_v1(data: PersonData, background_tasks: BackgroundTasks):
features = pd.DataFrame(data.dict(), index=[0])
if SERVICE is None:
logging.error("data drift service not found!")
else:
# drift will be computed when there is 30 rows of data
background_tasks.add_task(SERVICE.iterate, features.drop("person_id", axis=1))
response = model.predict(features)
return response

We are using Fast API background task to compute data drift detection so that this does not impact the response time of the model prediction.

Once we run this app we will be able to see data drift metrics once we have enough data (window size of 30) i.e 30 requests to /api/v1/predict endpoint. The metrics will be available at /metrics endpoint.

This metrics endpoint would then have to be scraped by Prometheus scrape job. Once the metrics have been scraped then we can proceed to visualise this in Grafana.

The Grafana dashboard is available in the repo and can be imported.

Do let me know what you think of Evidently’s drift detection package!

If you like to experiment with new ways to detect data drift, you might want to take a peek at our jobs board!

--

--