Seamless data exchange with Kafka Connect and Strimzi on Kubernetes at Decathlon

Thomas Dangleterre
Decathlon Digital
Published in
9 min readJan 30, 2024

Decathlon Supply Chain and data streaming

Decathlon is the largest French sporting goods retailer, with considerable supply chain activities spanning 56 countries across the globe.

Since 2020, the Digital Supply chain at Decathlon has undergone an evolution in terms of transforming our data pipelines in order to ensure seamless data sharing across all of Decathlon’s business units.

This was done by transforming supply projects, to make use of streaming technologies enabling them to share events in real time: Stock Management, Logistics, Replenishment, Ecommerce…

Our team’s main activity is to develop data streaming solutions for the supply chain. One such solution is a data streaming platform connecting the supply side to the rest of Decathlon, called “VCStream”.

VCStream leverages the power of Apache Kafka for a resilient, scalable, and persistent data streaming platform. This platform interfaces with our core ERPs, processing upwards of 50 million events per day and distributing them across 60 systems all over the world.

The woes of Systems Interconnections

Making data available within Kafka topics was not enough though:

  • Our data lake needed a way to access VCStream Data
  • Event tracing throughout VCStream requires a connection to a log management system
  • Change data Capture pattern was under study

On the other hand, developing customized services for every new requirement was not very efficient, especially since we already knew that many of the connections could be managed by Kafka Connect components.

It is possible to deploy a Kafka Connect cluster manually, or get a Kafka connect cluster provided by a cloud platform.

The first approach may be complex and difficult to maintain. The second one is easier to support but is limited to what the provider provides. And in our case, not all the needed connectors were available and we wanted to be able to use in-house plugins that will help us add features such as encryption of user’s personal data and eventually include SMTs (Single Message Transforms) plugin developed at Decathlon to apply specific transformation to the data .

The questions we came up with were: How do we manage all the connectors? How to deploy them? How to make their deployment and maintenance easy?

We then drafted our prerequisites to find the best answer and here’s what we came up for:

  • A secure and production ready solution: Security is by far our first priority, we must be able to enable end-to-end TLS and manage certificates with our existing secret management platform. Therefore, we have to rely on a tested and resilient tool.
  • A cloud native experience: All of our infrastructure is on Kubernetes, we heavily rely on cloud native technologies in our team that grant us scalability, flexibility and high availability. Therefore, we needed a solution that integrates well with Kubernetes.
  • An open source solution: We love open source softwares which foster collaboration, innovation, and cost-effectiveness, leading to greater transparency, security, and a wide range of tried and tested applications.

Strimzi meets all the requirements listed above.

Strimzi in production at Decathlon

Use cases

Data lake

The first request we received was to store all the events related to logistic movements for each and every item across all of our stores and warehouses. Individual Items are tracked thanks to a unique RFID chip. For instance, we can track when an item arrives at a warehouse, when it is stored, when it is sent to a store…

All these events have to be stored in our data lake for many uses: traceability, BI, Analytics…

We decided to rely on the Kafka Connect S3 Sink to get events from Kafka topics and drop them in AWS S3 buckets.

This plugin comes with nice features used in production such as :

  • Conversion from Apache Avro to Apache Parquet
  • Time based partitioner: messages are split in subdirectories (e.g. /year=2015/month=12/day=07/hour=15/ )
  • Time interval flushing: files are committed periodically to the data lake to avoid buckets overloading
  • Number of records threshold configuration to trigger a sending in order to free heap size on the workers and to keep under control S3 costs

After this approach proved successful, we applied the model to a growing number of event types.

We can now send an average of 30 Million messages a day from +200 topics (~ 600 partitions) to the data lake!

Event sourcing

Furthermore, we use connectors to publish events in MongoDB instantiated with the MongoDB Kafka Connector plugin. Documents stored in the database will then be queried by a service in read-only.

We are using a central event log (Kafka topic) to capture all state changes and allowing for event replay to reconstruct the system state.

In this approach the system is eventually consistent as the events may take some time to propagate and update the database in favor of fault tolerance, and scalability.

We use transforms to use the Kafka message key as the MongoDB document id and we use the ReplaceOneDefaultStrategy to insert messages into Mongo: if a document with an existing key is inserted, it will replace the old document with the matching key.

spec:
config:
transforms.hk.type: org.apache.kafka.connect.transforms.HoistField$Key
transforms.hk.field: _id
Writemodel.strategy: com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneDefaultStrategy

Thanks to this connector an average of 5 millions of messages a day are sent to our MongoDB.

Telemetry

As a platform, we were required to offer observability to users connected to VCStream. Thus we implemented tracing for all messages flowing through the platform, more specifically at the terminal components:

  • When they are collected by the system
  • When they are exposed by the system

The analytic messages are sent to our observability tool, meaning roughly 100 millions trace events every day.

Moreover all the messages rejected by VCstream are also transmitted to be checked. We call them anomalies. In case of failure, for example when a deserialization error occurs, we send those messages to an anomaly topic.

Traces and anomalies are important for us as we continuously improve by analyzing them, we can:

  • Automate validation during test phase
  • Improve our Error management automating corner use cases
  • Boost performance when it’s required as we measure latency
  • Generate report to teams to better understand the processes
  • Alert when something unexpected happens

In order to send traces from the kafka topics we use a connector that perform http call to send messages to the telemetry platform :

Deployment

Strimzi follows the Operator design pattern: it provides custom resources definition that extends Kubernetes API and brings a lot of automation out of the box to manage them.

The 2 custom resources using declarative configurations that we rely on are:

  • KafkaConnect: it represents a Kafka Connect cluster and allows the user to specify crucial parameters such as the number of workers, connector plugins, topic configuration, Kafka Broker Authentication …
  • KafkaConnector: It represents a high abstraction that coordinates data streaming by managing tasks. The tasks are processes in charge of sending data to/from Kafka and are deployed on the workers.

Aside from Connect, what about other Strimzi Kafka resources ?

As from now, Decathlon relies on third parties platforms to deploy Kafka brokers, MirrorMaker connectors and schema registries. Furthermore, we do not use Topic and User operators provided by Strimzi yet.

In order to deploy and update Kafka Connect resources we use a GitOps approach. All of our configuration files are stored in a git repository. Every change on a kubernetes resource file will be applied by flux periodically. Strimzi Operator will then reconcile updated resources by adjusting state by calling Kubernetes API. This GitOps workflow allows us to deploy resources in a simple way and to keep the history of all the updates applied to our infrastructure.

Kubernetes objects instantiated by CRD can be queried like any other, all the CRUD operations (create, get, delete…) can be executed to them :

kubectl get KafkaConnect
NAME DESIRED REPLICAS READY
vcstream 3 True

> Here we can see a Kafka Connect cluster that is horizontally scaled to 3 workers if we execute a “kubectl get pods” we can get the 3 workers :

vcstream-connect-5b7bdcf8c9-hh9zg   1/1     Running   1 (6d20h ago)   6d20h
vcstream-connect-5b7bdcf8c9-jjkvs 1/1 Running 0 10d
vcstream-connect-5b7bdcf8c9-w7l4v 1/1 Running 0 11d

Furthermore we can also query the KafkaConnectors thanks to the Kubernetes API:

kubectl get KafkaConnectors 
NAME CLUSTER CONNECTOR CLASS MAX TASKS READY
articlereferencementstatus-s3 vcstream io.confluent.connect.s3.S3SinkConnector 3 True
cessionprice-s3 vcstream io.confluent.connect.s3.S3SinkConnector 3 True
container-update-s3-parquet vcstream io.confluent.connect.s3.S3SinkConnector 3 True
contractprice-s3 vcstream io.confluent.connect.s3.S3SinkConnector 3 True

As we have two Kafka Connect clusters split in two different namespaces, we can deploy the Strimzi Kafka operator in its own namespace and have it watch the Kafka Connect namespaces.

When a new KafkaConnector is created, Strimzi will call the Kafka Connect API to instantiate the connector and tasks on the workers.

Secret management

Strimzi also offers a built-in config provider that we use to load sensitive data to configure Kafka Connectors.

Once configured in the Kafka Connect descriptor file:

KafkaConnect :

spec:
# ...
config:
# ...
config.providers: secrets,configmaps
config.providers.secrets.class: io.strimzi.kafka.KubernetesSecretConfigProvider
config.providers.configmaps.class: io.strimzi.kafka.KubernetesConfigMapConfigProvider

We can retrieve Kubernetes secrets that store sensitive information injected by our Vault directly in our connectors configurations :

spec:
config:
key.converter.basic.auth.user.info: ${secrets:connect-vcstream-dev/connectors-credentials:VCSTREAM_BASIC_AUTH_USER_INFO}
value.converter.basic.auth.user.info: ${secrets:connect-vcstream-dev/connectors-credentials:VCSTREAM_BASIC_AUTH_USER_INFO}

Observability

Thanks to an embedded JMX Exporter in the KafkaConnect CRD, we can easily export metrics from the Kafka Connect Cluster and scrape them with our existing Prometheus.

Once in Prometheus, metrics can be graphed and give us real time information such as availability, volumetry of data, CPU and memory usage…

We also created alerts when some metrics exceed a certain threshold. For instance, if there is a connector with failed tasks : we will be alerted thanks to a Slack notification in order to check what is happening.

A continuously evolving solution

When we started deploying the solution, one of our main issues was that we could frequently lose the connection between a Kafka sink connector deployed with Strimzi, and a data bucket (usually caused by transient plugin issues). The fix was pretty easy, but also annoying: we had to manually restart the connector. Since it occurred randomly and sometimes more than once a day, we had to find a way to automate those restarts.

We analyzed several possible solutions, one of which was to create a cronjob to periodically check connector status and execute the restart if needed.

Looking at the Strimzi GitHub repository, we found out there was already an issue raised about this subject.

So…

Let’s contribute !

After asking on the Strimzi’s Slack channel on how we could help to get this feature developed, we proposed a Pull Request to implement the Automatically restarting FAILED connectors and tasks proposal.

You can find a great article about our contribution on the Strimzi blog written by Jakub Scholz, Strimzi maintainer.

We are proud to have contributed to the Strimzi project, and to have solved this problem for many potential Strimzi users. This feature is now available since Strimzi version 0.33.0. No more manual restarts.

Conclusion

We are thrilled with Strimzi in production at Decathlon, this has drastically improved our data flows deployment lead time. Now, we have less operations and our maintenance is done with ease.

Strimzi has been a wonderful tool for us and we are thinking of testing it for new applications such as making change data capture on our ERP database or building CQRS based solutions or improving our backup on critical topics using cloud storage…

A last, a word of thanks to Strimzi maintainers who had welcomed us and helped us to contribute to the project.

It has been a tremendous experience to be part of this open source project.

Thank you for reading !

Interested in joining the Decathlon Digital Team? Check out https://digital.decathlon.net/

--

--

Thomas Dangleterre
Decathlon Digital

Software engineer at Decathlon and open-source software contributor, with a particular interest in areas such as development, cloud and streaming.