Data Drift Detection In Kubeflow Pipelines
It is important to identify the drift at the earliest and take the necessary steps to mediate it.
We use Kubeflow pipelines to orchestrate our ML Pipelines. This includes training, inference, and backfilling of feature tables. Data drift is the variation in inference data from the data used during model training. It is important that we identify this at the earliest and take the necessary steps to mediate it.
One of the open-source tools used to identify data drift is Evidently. It runs statistical tests in the background to identify data drifts and provides a simple interface to run these tests. Evidently also provides interactive reports, which help us in debugging.
Our objective was to build a Kubeflow pipeline component which we could reuse across all our batch inference pipelines. This component needs 2 inputs, mainly:
- Reference Dataset
- Inference Dataset
This component’s output is a report that we can visualize in the Kubeflow pipelines UI.
You can find the component and the pipeline in this repo.
The component checks if data drift has occurred by running the K-S test, which compares the two distributions(reference and inference dataset). The comparison is performed feature by feature. If a significant portion of features has drifted (in our case, it is 50% of the features), we can conclude that there is a significant data drift and we may need to retrain the model.
def _detect_dataset_drift(
reference,
production,
column_mapping,
confidence=0.95,
threshold=0.5,
get_ratio=False,
) -> bool:
"""
Returns True if Data Drift is detected, else returns False.
If get_ratio is True, returns ration of drifted features.
The Data Drift detection depends on the confidence level and the threshold.
For each individual feature Data Drift is detected with the selected confidence (default value is 0.95).
Data Drift for the dataset is detected if share of the drifted features is above the selected threshold (default value is 0.5).
"""data_drift_profile = Profile(sections=[DataDriftProfileSection()])
data_drift_profile.calculate(
reference, production, column_mapping=column_mapping
)
report = data_drift_profile.json()
json_report = json.loads(report)
# return json_report
drifts = []
num_features = (
column_mapping.numerical_features
if column_mapping.numerical_features
else []
)
cat_features = (
column_mapping.categorical_features
if column_mapping.categorical_features
else []
)
for feature in num_features + cat_features:
drifts.append(
json_report["data_drift"]["data"]["metrics"][feature]["drift_score"]
)n_features = len(drifts)
n_drifted_features = sum([1 if x < (1.0 - confidence) else 0 for x in drifts])if get_ratio:
return n_drifted_features / n_features
else:
if n_drifted_features / n_features >= threshold:
return True
else:
return False
When we run this component in a pipeline we get the below dashboard under Kubeflow pipeline visualisation:
We can use the results of this component to conditionally execute the rest of the pipeline if required.
The following blog post will showcase how we use Evidently for “real-time monitoring”, mainly for our models deployed as a Fast API service.
Meanwhile, if you find this article helpful and interesting, then maybe you would make a great fit for our team! Join us and be a Katalis!