Automated data pipeline using Ceph notifications and KNative Serving

A complete guide to implement the solution and create a demo!

Guillaume Moutier
Jan 9 · 8 min read

I recently shared a video where I demonstrated how you can create a fully-automated data pipeline that leverages the new notification features from Ceph, along with Kafka and KNative Eventing and Serving. Maybe it’s better to check it out quickly if you haven’t done it yet, as it will help you understand the rest of this post!

If you don’t have time, here is a summary of the scenario we will implement:

Now that we’re all on the same page, let’s see how you can reproduce this demo from the comfort of your (home|cubicle|airport lounge|batcave)…

Note: All the code used here is available at https://github.com/guimou/dataprep-code


Preparing the data

Those steps are not completely necessary, as the code I’m sharing already includes a trained model and some demo images that you can use. But if you want to know how the whole thing works or you want to reproduce everything from scratch, just give it a try!

The dataset

This demo is based on the following dataset, available from Kaggle: https://www.kaggle.com/paultimothymooney/chest-xray-pneumonia. It’s a set of chest X-Ray images from patients presenting signs of pneumonia (~1300 images), and from healthy ones (~3900 images). They are of course already classified, and split in different batches for training, validating and testing. So the first step is of course to download it and extract it somewhere (~1.2GB). This gives the following structure (demo_base comes from later operations):

.
└── data
└── chest_xray
├── demo_base
│ ├── NORMAL
│ └── PNEUMONIA
├── test
│ ├── NORMAL
│ └── PNEUMONIA
├── train
│ ├── NORMAL
│ └── PNEUMONIA
└── val
├── NORMAL
└── PNEUMONIA

Training the model

The notebook I used to train the model, “x-ray-predict-pneumonia-keras-training.ipynb”, is available under examples/xray/utils in the repo. You can use it from the root of the project if you have the above directory structure.

It’s just a matter of running the notebook, being patient, ~ 20mn for me with the help of a GTX1070Ti (yeah, I’m not only doing demos and posts, though I should definitely get a RTX… 😁). If everything goes well you should obtain the “pneumonia_model.h5” file that is used for predictions.

Creating demo images

Of course the images that we have from the original dataset are completely anonymized. But for the anonymization demo, we want to upload images with personal informations… Well, let’s just create them! That’s done with the help of the “make_demo_images.ipynb” notebook, at the same location than the previous one. No rocket science here: we just take images from the original dataset (here in the val/PNEUMONIA folder) and we generate some random personal information that we paste on top of the images.

That’s it for the data part, we now have a model, and some images that we can use in the demo!


Some appetizers before the main course (sounds better than prerequisites)

Ceph

You will of course need a Ceph cluster for this demo, but for a complete walk-through of installation that’s another story… Quick hint though: as to perform the rest of the demo you will also need OpenShift or another type of Kubernetes cluster (not tested though!), you can use Rook-Ceph to deploy your Ceph instance (that’s what I’m using here). As of now, the Kafka notifications I am using are only available on the master branch of Ceph (but HTTP and AMQ notifications are already in the latest release). So if you’re using rook-ceph, make sure your cluster yaml file reads at some point:

cephVersion:
image: ceph/daemon-base:latest-master-devel
allowUnsupported: true

(I will update the post when the feature is available in a standard release).

Kafka

I won’t give much details here, as deploying Kafka and creating topics is also another story in itself. So I will assume that you have a Kafka cluster ready, all the necessary endpoint informations, and that you have created a topic, called “storage” in our example. Another quick hint though: you can use the Strimzi operator to deploy Kafka quickly and easily on OpenShift.

KNative / OpenShift Serverless

Finally, KNative must be installed on your cluster. Again, it’s not the goal of this article to detail this operation, there is already lots of documentation covering this.


Creating the pipeline

If you remember from the video (or not if you’ve not watched it), this is the pipeline that we will be building:

You can start by creating the 3 buckets we will use for this demo: xray-data-in, xray-data-in-processed and xray-data-in-anonymized.

Setting up Bucket notifications to Kafka

Now to bucket notifications! To help you with these steps, you will find a Postman collection of all the requests, “Ceph notifications pipeline.postman_collection.json” in the utils folder. Whether you use it or not, don’t forget to set your credentials (user Access Key and Secret Key) to be able to interact with the Gateway. In Postman you have to edit the Collection parameters (authorization tab), or you can set them request-by-request.

There are two different steps involved:

  • Create a Kafka topic: that’s a simple POST request to the gateway, passing the required parameters:
POST http://your.ceph.rgw.url/?Action=CreateTopic&Name=storage&push-endpoint=kafka://my-cluster-kafka-bootstrap.kafka:9092&kafka-ack-level=broker

It will return the topic information in the form of arn:aws:sns:my-store::storage

  • Create the notification: that’s a PUT request to the gateway on the bucket where you want to have notifications enabled. In our example it was xray-data-in:
PUT http://your.ceph.rgw.url/xray-data-in?notification

with the parameters passed as XML in the body of the request (including the topic you just created):

<NotificationConfiguration xmlns=”http://s3.amazonaws.com/doc/2006-03-01/">
<TopicConfiguration>
<Id>storage</Id>
<Topic>arn:aws:sns:my-store::storage</Topic>
</TopicConfiguration>
</NotificationConfiguration>

As simple as that!

Deploying the KNative Serving container

It’s the container that will do the image analysis, and eventually its anonymization. The source code is available in the examples/xray/xray-event-container folder of the repo. Here is the Python code itself:

It’s pretty much self-explanatory and commented but if you have any question please leave a comment. You can build it yourself, or use the image at quay.io/guimou/xray:master

After creating a project/namespace “xray”, we will create a secret with the credentials needed to access the different buckets that the container will interact with (don’t forget to create them! You should have xray-data-in, xray-data-processed and xray-data-anonymized for this demo). Here is the “secret.yaml” file (you will have to enter actual key/secret values):

apiVersion: v1
kind: Secret
metadata:
name: s3-secret
namespace: xray
stringData:
AWS_ACCESS_KEY_ID: replace_me
AWS_SECRET_ACCESS_KEY: replace_me

That we deploy with:

oc create -f secret.yaml

Then we define our service with “service-xray.yaml” (you will have to enter the right service_point URL):

apiVersion: serving.knative.dev/v1alpha1 
kind: Service
metadata:
name: xray
namespace: xray
spec:
template:
spec:
containers:
— image: quay.io/guimou/xray:master
env:
— name: AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: s3-secret
key: AWS_ACCESS_KEY_ID
— name: AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: s3-secret
key: AWS_SECRET_ACCESS_KEY
— name: service_point
value: http://your.gateway.url

That we deploy with:

oc create -f service-xray.yaml

Deploying the KNative Eventing source

That’s the component that will consume our Kafka topic and pass the message to the Service we just created.

It is defined by “kafkasource.yaml”:

apiVersion: sources.eventing.knative.dev/v1alpha1
kind: KafkaSource
metadata:
name: kafka-source
namespace: xray
spec:
consumerGroup: xray
bootstrapServers: my-cluster-kafka-bootstrap.kafka:9092
topics: storage
sink:
apiVersion: serving.knative.dev/v1alpha1
kind: Service
name: xray

And again deployed with:

oc create -f kafkasource.yaml

Everything is now ready and should work!


Reviewing the whole pipeline

To better understand what’s going on, or if you need to do some debugging, here is step-by-step what’s happening in this pipeline.

  • When an image is pushed to the xray-data-in bucket, Ceph will send a notification to Kafka.
Original image
  • This notification will be a message like this:
{“Records”: [{“eventVersion”: “2.2”, “eventSource”: “ceph:s3”, “awsRegion”: “”, “eventTime”: “2019–12–12T02:42:43.692977Z”, “eventName”: “s3:ObjectCreated:Put”, “userIdentity”: {“principalId”: “ceph-user-bp3UfKOG”}, “requestParameters”: {“sourceIPAddress”: “”}, “responseElements”: {“x-amz-request-id”: “645c86a7–8062–43b1–972b-f6c6ccd4bc6f.134312.15301”, “x-amz-id-2”: “20ca8-my-store-my-store”}, “s3”: {“s3SchemaVersion”: “1.0”, “configurationId”: “storage”, “bucket”: {“name”: “xray-data-in”, “ownerIdentity”: {“principalId”: “ceph-user-bp3UfKOG”}, “arn”: “arn:aws:s3:::xray-data-in”, “id”: “645c86a7–8062–43b1–972b-f6c6ccd4bc6f.29026.2”}, “object”: {“key”: “data/chest_xray/demo/PNEUMONIA/demo_Anna Mcgehee_3677_1941–11–26_2015–02–10.jpeg”, “size”: 253130, “etag”: “a7d65d30ba7a639a017aa7d2c6fdc28a”, “versionId”: “”, “sequencer”: “23A9F15D4FBC632A”, “metadata”: [{“key”: “x-amz-content-sha256”, “val”: “95fe6dd24fc010ee526e40457ff34ef057d257c71b22ac9fd312094d4d7b2a37”}, {“key”: “x-amz-date”, “val”: “20191212T024243Z”}]}}, “eventId”: “1576118563.711179.a7d65d30ba7a639a017aa7d2c6fdc28a”}]}
  • Kafka will handle this message through the “storage” topic because that’s where we published it.
  • The KNative Eventing component we defined is a consumer of the “storage” topic. It will fetch the message, and pass to its “sink”, our KNative Serving Service.
  • The Service will launch an image-processing container (if not already running), or will eventually scale it depending on the workload, and pass it the content of the message.
  • The image-processing container will parse the message, retrieve the image, do its magic (assessing pneumonia risk and printing it on the image), then save it.
risk-assessed image, PII still there
  • Eventually (depending on the risk) our KNative service will anonymize the image and save it to another bucket, as per our scenario.
Anonymized image
  • When the process is finished the image-processing container is simply terminated.

Final notes

A problem worth mentioning

The tricky part during the creation of this demo was figuring out how to listen to KNative Eventing messages. It’s well documented theoretically speaking, but I did not really find any simple implementation in Python (at least simple enough for me to understand, as I’m not a real developer 😁). Until I found this post and this project. It’s a really nice and simple package that will get you quickly going in case you want to play with KNative and Python.

But if you’ve read all the code, you will ask “So why did you not use it?”. Well, and that’s why I wanted to mention it here, this package uses the ThreadedHTTPServer to listen to events. But as Keras seems to have some problems when predictions are launched from within a thread (and again I must confess I do not totally understand the issue), the prediction function would just fail. That’s why I reimplemented the whole CloudeventsServer class from the package with ForkedHTTPServer, instead of just calling it.

Perspectives

This simple demo shows that it’s now easier than ever to build automated data pipelines without requiring some orchestrator or other tooling. We can just use native functions from the storage tier and the container/application tier, and bind them together in almost any way we can think of. Stay tuned for other implementations! And if you have use cases that you think would take advantage of this kind of interactions, please feel free to reach out.

Analytics Vidhya

Analytics Vidhya is a community of Analytics and Data Science professionals. We are building the next-gen data science ecosystem https://www.analyticsvidhya.com

Guillaume Moutier

Written by

Hi! I am a Senior Principal Technical Evangelist working @ Red Hat. Containers, Storage, Data Science, AI/ML, that’s what it’s all about!

Analytics Vidhya

Analytics Vidhya is a community of Analytics and Data Science professionals. We are building the next-gen data science ecosystem https://www.analyticsvidhya.com

Welcome to a place where words matter. On Medium, smart voices and original ideas take center stage - with no ads in sight. Watch
Follow all the topics you care about, and we’ll deliver the best stories for you to your homepage and inbox. Explore
Get unlimited access to the best stories on Medium — and support writers while you’re at it. Just $5/month. Upgrade