Managing Kafka and Data Streams at Criteo

H. Egemen Ciritoğlu
Criteo R&D Blog
Published in
8 min readFeb 23, 2023

--

Photo by Chris Liverani on Unsplash

Criteo is a global ad-tech company that powers commerce media platform by activating the world’s largest set of commerce data[1,2]. While data continuously flow in Criteo, Apache Kafka is at the heart of the core business. Kafka powers our business log collection pipeline and streaming infrastructure. Currently, we are ingesting up to 40 million messages per second into ten of Kafka clusters located on three different continents. As a consequence, our messaging and streaming infrastructure carries petabytes of data every day. Maintaining operations like node provision/decommission to manage highly-scaled systems is nothing but ordinary. Yet the nature of data-intensive systems introduces another dimension to the problem around data(i.e. state). These are few challenges to address while running the streaming .

In this blog post, we try to address the following challenges that we faced in Criteo:

1) Monitoring in Kafka Environment
2) Managing partitions in Kafka clusters
3) Managing the flow of datasets

1) Monitoring in Kafka Environment

Providing a highly observable system requires effort at different levels: client-side monitoring, servers-side monitoring, and overall pipeline monitoring.

On the server side, we are monitoring and alerting based on broker metrics(e.g. log size, produce latency queue time and size as well as the availability of IO and network threads) in addition to the cluster metrics(under replicated partitions, under min in-sync replicas, leadership, rack-awareness) all backed up by Prometheus alerting that can trigger a page on OpsGenie, and eventually metrics flow into VictoriaMetrics(as long-term storage).

On the client side, we take advantage of having our own SDK (for JVM and .NET) in Criteo. This brings great operational flexibility. For instance, we have visibility and precise control over on our Kafka users, supporting the mentioned SDK with the central dynamic configuration system allows us to force particular features(e.g. authentication, batching, retry mechanism or slight delay by linger.ms) to provide guarantees like throughput and reliability. For any change in Kafka clients' configs, we can apply the configuration on the fly or on a restart basis. This minimizes having bad clients and utilizes clusters efficiently(for instance, we enable batching by default). Moreover, the SDK allows the authentication of our clients. As long as clients are authenticated, we can simply limit their consumption and even switch them off if required. The SDK enhances monitoring with the central monitoring system[3] and by publishing Kafka client metrics.

In most use cases, we are using Kafka as a part of our pipelines. Thus, we do have metrics that help us oversee the pipelines. Moreover, paging alerts on the pipeline metrics help us to guarantee what we promise! We are taking advantage of watermarks for tracking the lateness of overall pipeline monitoring. Watermarks are special messages that include the event time of records allowing us to decide how late pipelines are. We insert watermarks regardless of a business message inside Kafka topics and thus are expected on the other side of the pipeline. Our usage is similar to the idea of watermarks introduced in the Dataflow Model[4,10].

2) Managing partitions in Kafka clusters

Managing Kafka partitions could be a significant challenge when your cluster has to scale out regularly or have to replace a node, or the change of popularity for a topic can cause unbalanced usage in the cluster. Consequently, these partitions need to be redistributed(i.e. Rebalance). During rebalance, satisfying particular constraints(i.e. rack-awareness) are vital. So the problem will escalate to a multi-objective optimization problem. Another concern that will require attention would be running the maintenance. You need to do such maintenance operations when the cluster is running and ingesting messages normally. This requires strict throttling since we would rather not impact/distract our clients’ ordinary business. Thus, running rebalance smoothly is critical to achieving the “best way” to balance replicas/leaderships in the cluster. Such operation would need to be operated in batches due to minimize state submission overhead. Consequently, it acquaints a limit for the number of on the fly partitions as well as the concurrent number of leadership changes.

We are using the open-source CruiseControl[5] framework to handle such issues. CruiseControl is used as the backbone of our operational capabilities. CruiseControl can sample metrics regularly either from its metric reporter or can pull from Prometheus. Then generates a cluster model. This cluster model is utilized to place each partition by satisfying the “hard constraints” and trying its best with “soft constraints”. CruiseControl is quite flexible and has pluggable modules: metrics, throttling, rebalance strategies, and self-healing for disk-level and broker-level.

The workflow of CruiseControl with Prometheus

We were already heavily using Prometheus in our domain. So, we were already pulling and had our own metrics definition. We extend existing definitions of DefaultPrometheusQuerySupplier[6] to fit our metrics. It allows the mapping of existing names with CruiseControl’s expectations.

TYPE_TO_QUERY.put(BROKER_PRODUCE_LOCAL_TIME_MS_MEAN,
"kafka_network_RequestMetrics_Mean{name=\"LocalTimeMs\",request=\"Produce\"}");

Moreover, the endpoint should be set for Prometheus. It is important to note that Prometheus is pull-based so it may require an adjustment on RESOLUTION_STEP_MS based on the pull setting as pulling the same metrics twice would not be useful at all. Cruise control comes with many strategies for how you would like to prioritise partitions while running rebalance[7]. In our case, the already implemented PrioritizeOneAboveMinIsrWithOfflineReplicasStrategy is exactly what we need.

Rebalancing is a sensitive operation in the cluster. Not only for calculating the “best way” to balance the distribution of replicas and leaders but throttling the network (at the broker level) is also a critical aspect. In addition, we also need to throttle the number of partitions on the fly and even the number of leadership changes[8]. The design of CruiseControl provides such a flexible way to give smoothness controls to our operations. In addition, we crafted our rebalance monitoring dashboard to that track ongoing replica movements, under-replicated partitions, utilization of both disk and network IO, and message loss that comes from the SDK. Moreover, the dashboard can report how long the rebalance would take based on linear regression, the methodology is not quite accurate, but it gives a quiet indication.

Rebalance dashboard

CruiseControl is a game-changer tool, but there is always room for improvement

In Criteo, we are powered by open-source. Also, we ❤️ contributing too! While we were running CruiseControl in production, we realized an edge case of having a dead broker. It can stop and block ongoing rebalance due to CruiseControl throwing a timeout exception while trying to throttle a dead node cause that doesn’t answer back. We made a patch and sent a PR: https://github.com/linkedin/cruise-control/pull/1957.

Another problem we have seen so far is excessive throttling occurs in the case of just order change. For instance, changing the replica set from

Topic: my_test_topic  Partition: 0  Leader: 1  Replicas: 1,2,3  Isr: 1,2,3

to

Topic: my_test_topic  Partition: 0  Leader: 2  Replicas: 2,1,3  Isr: 2,1,3

CruiseControl thinks it is INTER_BROKER_REPLICA_MOVEMENT, so it applies throttles, yet all replicas are in-sync. CruiseControl’s throttling can become a bottleneck due to changes required on ZooKeeper. We think this could be done without throttling as all replicas are already in-sync and this can accelerate ongoing rebalance. And there is several more improvement we would like to focus like improving CruiseControl UI, and implement a behavior to pick up the previous started rebalance task in case of restarted (currently we need to start manually).

3) Managing the flow of datasets

Scaling heavily brings many topics to manage. Moreover, datasets can be used to generate other datasets by enriching, sampling or gathering worldwide data to a single location. In addition, datasets can be sampled or copied fully to a preprod environment for testing purposes(e.g. testing features or do PoC streaming). We created a DSL by encapsulating topics and their replication between topics. While having many datasets in different regions, tracking this relationship is critical. Even debugging would be annoying at scale since you would need to recursively track the relationship.

Criteo prioritises developer experience, and understanding data lineage between environment(i.e. prod/preprod/dev) is essential for developers. As published before[9], there is an ongoing effort to observe datasets in HDFS with Datadoc in Criteo. Datadoc is useful to track overall how the particular dataset generated. In other words, it clarifies which dataset feeds which dataset on HDFS. In addition to Datadoc, we can also monitor which user/service account produces into which topic of Kafka as we are using authenticated Kafka clients. Yet, tracking streams is another side of the coin for Stream Processing. In order to address the data lineage problem and enhance self-service streaming in the context of streams, we put forward a web application: StreamingPortal. The figure below depicts a flow of Kafka topics which involves data copied among DCs/environments with replications. The depicted flow can help you with your investigation to track down the branch of the root.

We are using Protobuf; therefore, data are encoded so it is in serialised format. This non-human readability can introduce overhead while debugging. Thus, we added a feature to StreamingPortal that can help developers to understand what data that has been written to Kafka topics for a given time. We are using Kafka REST Proxy for this purpose. This feature provides better visibility for developers to iterate faster.

Conclusion

Managing the messaging system that requires high availability and scalability as a platform is an interesting challenge in various aspects. Eliminating toil continuously is a primary goal while scaling up and automatically maintaining the cluster. Furthermore, making the platform fully transparent to external systems is essential for developer experience and productivity. Such concerns highlight particular issues to address in the context of monitoring and managing the infra and configurations. In this article, we detailed lessons we learnt while trying to solve such problems in Criteo.

Acknowledgement

We sincerely appreciate all reviewers’ feedback to help us to improve the quality of the blog post: Grégoire Seux, Hervé Rivière, Khalil Kooli, Ilyas Toumlilt, Qinghui Xu (sorted by alphabetical order).

References

[1] What is the First-Party Media Network? https://www.criteo.com/blog/what-is-the-first-party-media-network/
[2] What is Commerce Data? https://www.criteo.com/blog/what-is-commerce-data/
[3] Monitoring microservices — Central Monitoring: A tool for a global view of things https://medium.com/criteo-engineering/monitoring-microservices-central-monitoring-a-tool-for-a-global-view-of-things-80e46a810fd5
[4] Akidau, Tyler, et al. “The dataflow model: a practical approach to balancing correctness, latency, and cost in massive-scale, unbounded, out-of-order data processing.” (2015) https://dl.acm.org/doi/10.14778/2824032.2824076
[5] CruiseControl: https://github.com/linkedin/cruise-control
[6] CruiseControl — Prometheus supplier source code: https://github.com/linkedin/cruise-control/blob/b4e44ec004e6f5e22bd1c4e203d92341ed9e1659/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/monitor/sampling/prometheus/DefaultPrometheusQuerySupplier.java
[7] CruiseControl — PluggableComponents: https://github.com/linkedin/cruise-control/wiki/Pluggable-Components#replica-movement-strategy
[8] CruiseControl — Configurations: https://github.com/linkedin/cruise-control/wiki/Configurations
[9] DataDoc — The Criteo Data Observability Platform: https://medium.com/criteo-engineering/datadoc-the-criteo-data-observability-platform-2cd826a9a1af
[10] Data Streaming with Flink — Enabling real-time Business Cases: https://medium.com/criteo-engineering/criteo-streaming-flink-31816c08da50

--

--

H. Egemen Ciritoğlu
Criteo R&D Blog

Academically-backed seasoned SRE, enjoying with #DistributedSystems' problems while trying to keep part of internet working. More: https://egemenciritoglu.com