Unified Flink Source at Pinterest: Streaming Data Processing

Pinterest Engineering
Pinterest Engineering Blog
7 min readJul 15, 2021

Lu Niu & Chen Qin | Software Engineers, Stream Processing Platform Team

To best serve Pinners, creators, and advertisers, Pinterest leverages Flink as its stream processing engine. Flink is a data processing engine for stateful computation over data streams. It provides rich streaming APIs, exact-once support, and state checkpointing, which are essential to build stable and scalable streaming applications. Nowadays Flink is widely used in companies like Alibaba, Netflix, and Uber in mission critical use cases.

Xenon is the Flink-based stream processing platform at Pinterest. The mission is to build a reliable, scalable, easy-to-use, and efficient stream platform to enable data-driven products and timely decision making. This system includes:

  • Reliable and up-to-date Flink compute engine
  • Improved Pinterest dev velocity on stream application development
  • Platform reliability and efficiency
  • Security & Compliance
  • Documentation, User Education, and Community

Xenon has enabled several critical use cases, including:

  • Ads real-time ingestion, spending, and reporting — Calculate spending against budget limits in real time to quickly adjust budget pacing and update advertisers with more timely reporting results.
  • Fast signals — Make content signals available quickly after content creation and use these signals in ML pipelines for a customized user experience.
  • Realtime Trust & Safety — Reduce levels of unsafe content as close to content creation time as possible.
  • Content activation — Distribute fresh creator content and surface engagement metrics to creators so they can refine their content with minimal feedback delay.
  • Shopping — Deliver a trustworthy shopping product experience by updating product metadata in near real time.
  • Experimentation — Accurately deliver metrics to engineers for faster experiment setup, verification, and evaluation.

Streaming Data Architecture

Data collected through Pinterest App published to Kafka; Merced keeps a copy of Kafka data with longer retention. Flink application can read data from both paths.
Figure 1: the setup of streaming data infrastructure at Pinterest
  1. Singer, a highly performant logging agent, uploads log messages from serving hosts to Kafka. Singer supports multiple log formats and guarantees at-least-once delivery for log messages.
  2. Merced, a variance of Secor, moves data from Kafka to S3. Merced guarantees exactly-once message persistence from Kafka to S3.
  3. Most of our Flink applications consume from Kafka and output to Kafka, Druid, or RocksStore based on different use cases.

Although all our use cases consume data from Kafka, accessing Kafka alone cannot satisfy all user requirements:

  1. Some use cases need to access historical data; however, data in Kafka has a short retention from three days to less than eight hours.
  2. We require all use cases to go through load testing before going into production. Simulating load by rewinding Kafka doesn’t scale.
  3. Replaying historical data via Kafka is one option, but that comes with added operational costs and a 20x increase in infrastructure costs.

Thanks to Merced, we have historical data in hand. Hence, we are able to provide a single API that concatenates historical data with real time data — the concept of an unlimited log. Users are able to seek any offset or timestamp without worrying about which storage system holds the data.

This design brings several benefits:

  1. The encoding and schema of a topic are the same in Merced (aka bounded stream) and Kafka (aka unbounded stream). No extra effort is needed to convert them into a consistent view.
  2. Merced stores the data on S3 and keeps the original event ordering and partitions with a small fraction of infra cost. Replaying data through Merced acts just like reading original data from Kafka.

Features in Unified Source

One API for Both Realtime and Historical Data

Here is a brief summary of the implementation:

UnifiedSource consists of FlinkKafkaConsumer and MercedFileSource and it ensures partitions from both sides are aligned.
Figure 2: Unified Source Implementation
  1. UnifiedSource extends RichParallelSourceFunction. Each SubTask runs one instance of UnifiedSource, and each instance starts both FlinkKafkaConsumer and MercedFileSource. FlinkKafkaConsumer is provided by Flink out-of-the box; MercedFileSource is able to transform Merced output to stream format.
  2. During partition scheduling, the FlinkKafkaConsumer and MercedFileSource in single SubTask are guaranteed to get data belonging to the same partition, which supports seamless transition from reading the file to Kafka.
  3. In order to get all files belonging to the same partition, MercedSource listened to a Kafka topic that was published by Merced Notification system. This way, we are able to handle late arriving events.
  4. Merced outputs are hourly partitioned, and the file naming encodes the Kafka creation time and partition. Therefore, MercedFileSource is able to read files and recreate the same event ordering and partitioning.
  5. UnifiedSource follows Flink best practices to generate watermarks from Kafka. The API forces users to provide a timestamp extractor, which is used by KafkaSource to generate watermarks from source. In this way, we standardize watermark generation across Flink applications. Similarly, MercedFileSource combines watermark updates of multiple partitions into one single watermark stream.
  6. Merced observes late arriving events in Kafka and writes back to previous data partitions that have already been consumed by UnifiedSource.
  7. Sometimes event time skew across multiple Merced partitions leads to an ever-growing watermark gap, thus leading to downstream checkpointing pressure. We implemented watermark synchronization across subtasks using GlobalAggregateManager to keep watermarks within a global range.

Traffic Control & Sources Synchronization

Unlike processing unbounded streams, processing and reprocessing historical data require extra features like traffic control and source synchronization to achieve stable operation.

Here is a simplified use case to explain this problem:

A product team wants to gain insights in near real time into how audiences on platforms view Pins posted within the last seven days. So the product team built a Flink job with XenonUnifiedSource to pull both published picture Kafka logs as well as views of Kafka logs. In order for the Flink job to post meaningful information, we use a unified source to pull the last seven days of published Pins in sequence.

UnifiedSource pull from published pictures since 7 days ago and join with stream of impressions

There are two issues using filesource or Kafka source here:

Views topic is much larger than published Pins so it consumes a lot of bandwidth and slows down backfilling of published Pins. Because the pipelines were written in event time fashion, this led to a slowed down progression of low watermark in the inner join operator. Over time, more and more events were buffered in the inner join waiting to get cleared up. This causes backpressure and checkpointing failures.

In response to traffic control issues, we developed rate limiting per topic level. Views of Pins were throttled at a lower ratio and made room for published picture topics to progress quickly.

Views topics should “wait” until the entire seven days of published Pins is complete. Otherwise, a low match ratio is almost certain. In Flink terminology, watermark from published Pins should progress to “current” (the time job is launched) before views of picture topic watermark progress beyond “current.”

In response to synchronization issues, we developed allreduce-based watermark synchronization implementation and periodically update the ratelimiter threshold of each topic. The view of the picture source ratelimiter is only granted a quota to pull the Kafka topic when watermarks from all subtasks reach “current.”

Building Reliable Flink Applications Using UnifiedSource

The benefits of UnifiedSource are not limited to easy access of realtime and historical data. It provides other benefits as well:

  1. UnifiedSource provides out-of-the-box features such as deserializer, watermark generation, traffic control, and corrupted message metrics. This allows Flink developers to focus on business logic implementation.
  2. Users are now able to generate deterministic results by loading fixed datasets and verify correctness against code changes or framework upgrades.
  3. At Pinterest, critical Flink jobs are required to go through load testing before moving to production. Simulating traffic load through UnifiedSource is more scalable and cost effective than rewinding Kafka offsets.
Throughput of Reading Merced is 6X comparing with reading Kafka
(simulating large load through UnifiedSource)
Watermark increases smoothly during transferring from file to Kafka
watermark change when transferring from file to Kafka

We’re hiring

We are always evolving our streaming processing platform, and there are interesting challenges and opportunities ahead. If you’re interested in tackling big data challenges with us, join us!

Acknowledgments

Thanks to Hannah Chen and Ang Zhang for reviewing. Thanks to Divye Kapoor and Karthik Anantha Padmanabhan for integration in the Near Real-Time Galaxy framework. Thanks for our client teams, Ads Measurement, Core Product Indexing, Content Quality for early adoption and feedback. Thanks to the whole Stream Processing Platform Team for supporting.

References

GitHub: Pinterest/Singer

LogDevice: a distributed data store for logs

Keystone Real-time Stream Processing Platform | by Netflix Technology Blog

Distributed Time Travel for Feature Generation | by Netflix Technology Blog

Designing a Production-Ready Kappa Architecture for Timely Data Stream Processing

Build and run streaming applications with Apache Flink and Amazon Kinesis Data Analytics for Java Applications

FLIP-134: Batch execution for the DataStream API

--

--