Data Streaming with Flink

Enabling real-time Business Cases

Summary

At Criteo, we need real-time Data to be able to detect and solve critical incidents faster. To achieve this, we have introduced an Apache Flink pipeline in our production environment. The first dataset to become real-time at Criteo is the revenue pipeline. In addition to the Incident Management use case, real-time data has proven key in increasing Operational Safety and developing new Business approaches. In this blog post, we will describe how we built a real-time calculation pipeline.

I) Initial Business Case

During a critical incident, time is of the essence. Every minute you lose to solve the issue is money you’re not making if the incident has a financial impact or you may also run the risk of losing clients if the incident has a reputational impact. And there are a lot of opportunities to lose time during an incident. First, time is spent between the incident start time and the incident detection time. Then, once an investigation starts, you may be missing some data needed to pinpoint the root cause. Finally, when you have identified the root cause and applied a fix, you need the new data point from the monitored time series to either validate the resolution or reopen the investigation.

Prior to the introduction of Flink, the fastest time series in our Analytical Infrastructure with basic business metrics at a small granularity had a three to four hours latency against reality. This means that if an analyst had an abnormal revenue variation in your scope of responsibility at 10:00, you could only start investigating at 14:00 and, provided you fixed the issue immediately, you could only validate the resolution at 18:00. Doesn’t sound very effective, does it?

In addition to that, special business events such as Black Friday are crucial in the ad industry. Not reaching the target set by the client on a given day can have both immediate financial impact and lasting commercial consequences. Without a low latency dataset, you can only prepare for this event and hope for the best while with real-time data you can actually drive the campaign.

Thus, as part of a Criteo Hackathon, an initiative to provide a low-latency dataset was launched. Over the years, the scope and the technologies have changed a lot from the initial project. After testing several Data Streaming solutions, we found that Apache Flink with the great implementation of the DataFlow model was the best suited for our use case.

II) How we used Flink to solve the problem

Since our revenue data is time-series, we needed to use event-time processing to be able to group revenue over the time window. Several approaches are available to perform event-time processing in streaming, but the most powerful is the Data Flow model. Apache Flink has a great implementation of this approach on the market and it was the main reason to choose this framework in order to perform real-time revenue computation. Also, Watermark Tracking in Flink can easily be integrated with watermarks that we were using for exporting data from Kafka to HDFS.

Flink has a good non-functional characteristic, which facilitates running in production. Easy integration with different monitoring back-ends (e.g. Graphite, Prometheus), and a UI that provides good introspection capabilities. Also, the fast development cycle of Flink jobs along with simple execution model makes learning curve smooth and development very productive.

Original use-case case for Apache Kafka was moving data from different data centers around the globe to our Hadoop clusters. Data is written from the real-time web services to “Front” Apache Kafka cluster within the same data center. Later, with map/reduce jobs based on Camus, this data is exported to HDFS.

Since most of the data in Criteo is time-series and Kafka does guarantee global order over the topic more than a single partition(but does guarantee the order within the same partition), we need to track global time within the same Kafka topic in order to acknowledge the timestamp which is exported on HDFS. Because of the ordering problem we cannot use the timestamp from the message to get the global time of the topic. Thus, we need to use special tricks to track time. We inject special messages, which we call watermark, into the Kafka partition of a given topic. These messages store the last written timestamp to the topic. Since messages are ordered within the partition, Kafka Consumer can track the timestamp for each partition and minimum time within all partitions would be the global time of the whole topic.

On the image watermarks have purple color and data have an orange color, with timestamp as the label. If consumer reads all data than the global time would be equal 2.

Most of the batch pipelines in Criteo are platform oriented, so we should be able to have platform level processing in Steaming pipelines too. What we call platform is some part of the world, e.g. Europe, America, Asia. A single platform is represented by several Data Centers located in the region. Since, in the single “Front” Kafka cluster, we only store Data Center-wide data, we should merge several topics from different data centers in order to have platform-level data processing. Thus, we have built a special “Streaming” Kafka cluster which contains merged topics from the different “Front” Kafka cluster.

The problem is that during replication from “Front” to “Streaming” Kafka clusters we randomly shuffle data, including watermarks. Thus Flink consumers of the “Streaming” cluster couldn’t build a global time over the topic, because we do not have the guarantee that order within the partition is kept and watermark messages will keep initial semantic. In order to overcome this issue, we filter out all watermarks during the replication in kafka-connect into another dedicated service, which we call Watermark Reinjector. This Watermark Reinjector service builds the consensus of all source topics being replicated and injects new watermarks into every partition of the destination topic every minute. Thus our “Streaming” cluster contains new watermarks, which could be used to recover global time over the merged topic.

For sure, such replication and watermark reinjection add some latency, but since most of our streaming pipelines compete with a batch pipeline one-minute latency is not critical for us.

III) Challenges and their solutions

One of the still open questions of infrastructure is the problem that during topic replication Kafka-connect can introduce a significant amount of duplicates. Since Criteo is using Kafka 0.10.x, we cannot leverage exactly once delivery and in case of significant amount of failures during replication, we can introduce duplicates in the streaming Kafka topics. Any attempts to deduplicate such topics can only heuristically reduce the amount of the duplicates.

So in order to measure duplicates introduced in Kafka and validate the general logic of the watermarking we run a Hadoop job which constantly measures a difference. This job shows us the discrepancies between deduplicated batch computation and Flink based streaming computation. This ensures we validate data constantly and any doubts on accuracy can be confirmed by Data Analysts and Scientists.

Since we are running Flink on Yarn and brought the first-time use-case of long-running Yarn application in Criteo, we needed to resolve several problems because of it. Flink required keytab authentication in case of long-running applications. Also, we needed to put Flink application into the dedicated resource pull, because we couldn’t compete for resource consumption with batch jobs since we have totally different resource utilization patterns. Since the probability of losing Yarn Container/Task Manager is high we did allocate some additional Task Managers, so Flink can restart jobs much faster without waiting for new Yarn containers to be reallocated.

Finally, a catch-up strategy is very important for real-time use cases. If for some reason you’re late in event consumption (e.g. infrastructure issue on Yarn/Kafka), do you want to process already outdated data and risk increasing delays farther? Considered, that the state of Flink can grow during catch up and workaround with dynamic scaling has been accessible only from version 1.5, in our revenue calculation use-case, we’ve chosen to drop late data and keep processing real-time data in order to minimize latency.

IV) Emerging Business Cases

In addition to the initial business case, we’ve seen emerging uses for low latency data being implemented all across the organization, both from technical and operational functions. As a reminder, the initial business case was to enable analysts to detect and investigate anomalies in the revenue and cost of the organization.

In the Sales & Operations teams, low latency data is firstly being used to increase Operational Safety. Some changes in a setup for an advertiser can sometimes be hard to assess as they depend on the advertiser reach. With our previous High-Latency Data, if someone responsible for a campaign made a high-risk change for a client, he would have to wait around four hours to know the precise impact on the advertising campaign. Let’s say the client ask for this change in the middle of the afternoon. It’s possible a faulty change could be left running overnight which can have financial and reputational consequences for the organization. With Low Latency Data, we can afford to systematize Monitoring one hour after a change at every level of the organization.

Aside from preventing loss of earnings, low latency data supports business initiatives that were not possible before. One such example is Flash Sales. For example, you’re an online retailer and you know a high profile new product is going to be announced today at the constructor conference with immediate availability on your website. You want to capitalize on the coverage generated by this event by spending a given amount of advertising budget in the two or three hours that follow this event. Real-time Data enables such a use case as the person in charge of the ad campaign is going to be able to precisely adjust the run rate to reach the delivery objective. With a higher latency, we could end up underspending or overspending the advertising budget. Thus, by strengthening flash sales, Low Latency Data creates more business opportunities for Criteo.

Ensuring correct budget delivery is a business optimization opportunity that is currently being tried out in technical teams. All of our clients can give us a budget for a fixed period of time. If we are below this budget at the end of the period, then we are missing out on potential revenue. If we go above this budget, we usually won’t bill our client for the extra but we will still pay the cost. At the scale of one budget, this optimization may seem small. But if you scale that to thousands of budgets, it can reach a significant amount for the company. Thus, ensuring precise budget delivery for all our products will prove a significant improvement in the operational lifecycle.

Following the release and promotion of Low-Latency Data, we’ve been able to fulfill existing business uses and foster new emerging ones. The underlying impact of this is a new mindset for every data user at Criteo: If I had this other dataset in real-time, what could I do? Which product could I create or which process could I optimize?

If you are interested in scaling large-scale distributed systems, consider applying for one of the available roles in Criteo Labs!

Guillaume Paillot

Data Science & Analytics

Team Lead

Business Escalation

&

Oleksandr Nitavskyi

Software Engineer in Criteo Labs