Stream enrichment with Flink SQL

--

In today’s world, real-time data processing is essential for businesses that want to remain competitive and responsive. The ability to obtain results in near real-time, as opposed to traditional batch-oriented methods, is incredibly appealing. Apache Flink, with its SQL API, has opened up new possibilities for building real-time applications using the widely understood SQL language. Combined with Flink’s well-proven stream processing capabilities, this allows developers and data engineers to perform complex operations on streaming data.

Stream data enrichment is a critical aspect of real-time data processing, that involves augmenting incoming data streams with additional context or information. This could mean enhancing raw data with metadata, joining streams with external databases, or transforming data into more meaningful insights.

In this article, we’ll delve into the essentials of stream data enrichment in Flink SQL. We’ll explore the concepts behind stream processing whilst highlighting the challenges related to implementing data enrichment effectively and efficiently. Primarily, we will focus on comparing the different types of joinsavailable in the Flink SQL engine in the context of stream enrichment.

Is streaming SQL different?

In recent years, streaming engines have evolved tremendously. The unification of batch and streaming APIs has facilitated the adoption of SQL for both execution modes. But does this mean that a well-written SQL query, typically executed in a relational database, can be seamlessly transformed into Streaming SQL?

To answer this question, we need to consider that the table used in traditional SQL must somehow be transformed into a stream, or more accurately, put into motion. Change Data Capture (CDC) libraries can help to achieve this by capturing and streaming changes from databases. While we won’t focus on CDC in this article, we’ll assume that a stream of events is already in motion and ready for streaming SQL to be applied.

Writing a correct left join in streaming SQL requires an understanding of the streaming nature of data and the differences between various join operators. Streaming SQL running on an unbounded stream, differs significantly from SQL running on a bounded stream. Let’s dive deeper into streaming joins to explore the available options and how they work in a streaming context.

Regular join

The most natural way to reflect batch SQL is to use Flink regular join. In some cases, the original SQL does not even need to be changed! Let’s look at a very simplified example:

SELECT *
FROM orders AS o
LEFT JOIN deliveries AS d
ON d.order_id = o.order_id

The code example above can be easily executed in a streaming scenario, and both batch and streaming engines should semantically return the same results. However, there are important considerations related to their execution.

First, the most obvious difference is that the result in a batch scenario will be a table, whereas in streaming it will be a stream. Every row modification in streaming is represented as a change event. Conversely, similar to the Change Data Capture (CDC) ingestion phase, a stream can be converted back into a static table later on.

In a batch scenario, it is certain that all data is present at the time of execution and job results are complete. This is what we call a “bounded stream” in the streaming world. In contrast, an “unbounded stream” does not have a defined end, hence streaming job results will not be complete at any point in time. The unbounded nature of true streams necessitates the production of updates from each join operator. In streaming, data is constantly flowing, requiring a continuously executed query to produce real-time results. This implies that every regular join operator maintains the state of events. When an update or insert event flows into the system, the join operator reads all correlated events from the opposite side of the join and produces the corresponding events.

This simple example highlights two potential issues when executing regular joins in a streaming scenario: the oversized internal state management and the explosion of update events.

State management

In Apache Flink SQL, the retract mechanism manages updates and deletions in streaming data by generating retract messages to remove old records before inserting updated ones. This ensures accurate state management in streaming tables, reflecting the latest data changes, and is essential for maintaining consistency and correctness in dynamic data environments. The Flink engine handles the production of correct updates automatically, requiring no additional work from the developer.

In the regular join operator, it’s crucial to maintain the entire state of ALL incoming events to produce correct results. However, this can lead to significant performance issues when dealing with very large streams. One way to mitigate this problem is by using a state time-to-live (TTL) strategy, which sets limits on data retention. While this strategy can help to improve performance, it may again impact the completeness of the data.

More on the topic of state management and why big states may be problematic can be read in another article from GetInData blog.

Updates explosion

It is quite common in stream enrichment scenarios, that a large set of main stream events is correlated with the same “hot” record on the right side. If the probe side history kept in the operator is large, processing this hot right event may lead to a bottleneck. This one hot event will trigger multiple updates that downstream operators must propagate, potentially leading to a cascade scenario where the entire system gets stuck while processing a single event.

This is even more troubling when combined with the retract mechanism. A hot right-side event will not have this negative impact on performance only once. In fact, every time it is updated, the regular join will need to produce -U and +U events for all correlated main stream events in order to maintain data correctness. This may lead to a significant increase in traffic and overwhelm the whole system.

This performance issue is further reinforced by the checkpointing mechanism, whether aligned or unaligned. When building a distributed snapshot, the checkpointing barrier may need to wait for that hot event to finish processing before it can progress to the next operator. If this takes too long, the checkpoint may timeout. Consequently, timing out checkpoints can impact job stability, leading to a job restart.

Choosing the right join strategy

To ensure exact results in a streaming job, updates need to be processed and propagated to the job output. However, it’s rarely the case that all of the updates need to be propagated to ensure reasonable data completeness and great business value. There are usually acceptable trade-offs, as streaming jobs don’t often need to update very old data. For example, a dictionary record update (build side) from a few seconds ago doesn’t necessarily need to update a join result that was published months ago. In fact, it often shouldn’t.

In the enrichment scenario described in this article, it is crucial to decide whether updates should be propagated to already published events. In enrichment scenarios, updates from the main stream (left side) are usually required to be propagated. This is not always the case for right side updates and a few cases can be distinguished for the right-hand stream (the build side). Let’s give names to those cases for a clearer view and further reference:

  • fully-updateable — new events as well as updates are processed, old released joined results will be updated
  • semi-updateable — new events as well as updates are processed and considered in the join for the following probe events; already released joined results won’t be retracted
  • append-only — only new events for a new key are processed, no updates are processed

The first option is the most desirable in most cases. This is due to the fact that it promises that all result data is up-to-date eventually. Different paces of joined streams or their unalignment do not affect the semantic results of the join. It is semantically similar to what can be achieved when running SQL statements on a static table in batch mode. It is, though, executed as a continuous query in a streaming fashion.

However, a fully-updateable strategy is not always feasible when the stream size is significant. Due to the problems described in the previous paragraphs, for some sources, it is worth considering a semi-updateable strategy. That strategy ensures that updates coming from the build side are processed and reflected in the following joined results, but do not trigger updates for already published results. This is essentially a trade-off between performance and old data completeness.

Consider a slow-changing dimension (SCD) source as one of the possible use cases. The business justification for that choice is clear when you consider joining a big stream with SCD. Updates from the build side rarely need to be applied for old, historical data. At the same time, setting global TTL is not possible, as SCD by design, may not be updated for a long time and still be relevant in the enrichment process. Those old events from the build side should be joined with fresh events coming from the left side to ensure result completeness.

In this scenario, we essentially want to keep a whole, small history for the build side and not keep any history for the probe side. This trade-off allows minimizing the state size of the Flink operator and eliminates the risk of causing an event cascade explosion every time the hot key right side event gets updated.

Another, more drastic option is to follow an append-only for a right side strategy. All updates on the right-hand side will be ignored in the join operator. This can be accomplished by introducing deduplication logic before the right side of the join operator and only propagating the first event for a given key downstream.

Non-regular joins to the rescue

As explained in the above section, it is rarely recommended to use regular joins for SQL data enrichment. Let’s review some other options that are available in the Flink framework. After a brief introduction to different types of joins, we will compare their capabilities.

Lookup join

A lookup join operator uses external service data to enrich left stream events. External data does not need to be transformed into a stream for other join operators. A lookup join can directly query an external database when processing each event. For better performance, it is recommended to configure join operator cache, especially for big streams.

SELECT *

FROM orders AS o

LEFT JOIN deliveries FOR SYSTEM_TIME AS OF o.proc_time AS d

ON d.order_id = o.order_id

This follows a semi-updateable strategy. In a lookup join it is impossible to observe or push update events from an external database. Processing is triggered by incoming probe side events. Having said that, the lookup join does not need to maintain any state besides the temporarily kept cached results from the external service.

Temporal join

A temporal join fits very well with the streaming nature of data. It respects the time attribute of each event when performing a join operation. It is possible to maintain multiple versions of build side data for different periods and include that time constraint in the join condition next to the join key condition. A temporal join will build and keep a history of the build side for the time the data is relevant. Watermark heuristics are used to determine the actual operator time and align the two streams that are flowing at different paces.

FROM orders AS oLEFT JOIN deliveries FOR SYSTEM_TIME AS OF o.order_time AS d ON d.order_id = o.order_id

Similar to a lookup join, it follows a semi-updateable strategy. Changes in the build side will affect events that are processed afterwards, but won’t trigger updates for already published data. A temporal join will maintain the build side stream state, keeping all relevant versions of data. Besides that, events from the probe side will be temporarily kept in Flink state, then processed and eventually cleaned after the watermark passes by.

Interval join

An interval join allows for the definition of a time frame for which data can be joined. Including a time constraint in a join condition allows the interval join to determine and discard old events. This helps to keep the state size smaller, compared to a regular join.

SELECT *

FROM orders AS o

LEFT JOIN deliveries FOR SYSTEM_TIME AS OF o.order_time AS d

ON d.order_id = o.order_id

As for Flink 1.16, an interval join cannot consume updateable streams. When applying an interval join for an upsert-kafka connector or Kafka with debezium format sources, an error will be thrown:

org.apache.flink.table.api.TableException: StreamPhysicalIntervalJoin

doesn't support consuming update and delete changes which is produced

by node (...)

That problem can be mitigated by introducing the preceding deduplicationoperator with “keeping only first-row” logic. The deduplication operator will transform the updateable stream to append-only mode. Please keep in mind that this will greatly change the semantics of your job.

Join comparison

Understanding the various types of joins available in Flink SQL is crucial for optimizing your real-time data processing tasks. Each join type, whether it’s a regular, lookup, temporal or interval join, has its unique characteristics and peculiarities. In the below table, we compare these different Flink SQL joins to highlight crucial differences in the context of data enrichment.

This table compares the different join types available in Flink version 1.16. Selecting the right join type can be challenging, as both business and technical implications should be investigated. Here are some general guidelines for choosing the most appropriate join type:

  • Regular Join: Use this type of join when you need a fully-updatable right stream. However, be aware of the extensive state problem. Configuring a global TTL can help mitigate this issue, although it will apply to all stateful operators*
  • Lookup Join and Temporal Join: These are suitable when a semi-updatable strategy for the right stream is acceptable.
  • Lookup Join: Recommended when the size of the right table is small enough to fit in the cache. Otherwise, calling an external service for every left stream record can become a performance bottleneck. Enabling asynchronous lookups can help mitigate this issue to some extent.
  • Temporal Join: Allows joining the main stream with a versioned table, enriching records with corresponding right values at a specific point in time. Even if supporting multiple versions is not essential, a temporal join might still be the best choice if a lookup join is not feasible due to costly external system calls. Additionally, temporal joins support stream alignment within a configured time frame, ensuring greater result completeness than lookup joins.
  • Combination Strategies: When requirements are less strict and an append-only approach is acceptable for the right stream, you can combine a deduplicate-first-row operator with a temporal join operator. This ensures that only the first version of a key is propagated to the join operator. The deduplicate operator can also be used with a regular join operator with an appropriately set TTL*.
  • Interval Join: This can be used when both sides of the join are append-only streams and no updates are meant to be propagated. Additionally, the requested correlation time frame should be reflected in the join condition.

Unwanted reordering

When working with updateable source streams, it is natural that some rows will be updated over time. Sometimes it may happen that the value in the column included in the join condition changes.

To harness the power of parallel processing, which is essential for handling large-scale data,

Flink jobs are usually executed in a highly distributed environment. To ensure data correctness, join operators must be preceded with appropriate stream redistribution. Whenever a stateful join is executed, Flink needs to redistribute both streams using the joining key upfront. Correlated data from both sides will be processed by the same parallel task of the distributed operator to properly maintain the operator internal state.

Whenever the join key value changes, it may happen that events related to the same source PK will be redirected to different parallel tasks of the join operator. This, in consequence, can lead to race conditions as Flink does not promise to keep an order of events for different parallel instances of the same operator. This race condition can result in unwanted events being reordered, which breaks the correctness of the streaming job result.

Flink tries to automatically deal with reordering by applying a special strategy just before the sink operator. Its logic is encapsulated in SinkUpsertMaterializer and current implementation requires that operators keep a full changelog in its state. This is theoretically possible when using regular joins without data retention, although rarely feasible. In other cases, stream results may need to be ordered manually.

We will cover more on events reordering in the next blog post: Flink SQL — changelog and races. Sign up for our newsletter to get notified.

SQL to streaming SQL migration

It is becoming more and more common that this industry migrates old-fashioned batch-oriented SQL jobs to more modern streaming SQL solutions. Indeed, in many cases, executing continuous queries will provide a real-time experience to the end user, which in turn will boost business value. One should be aware that streaming SQL brings new challenges to the table.

For simple use cases, it may be sufficient to use regular joins in implementation. However, regular joins, while simple, may struggle with dealing with the dynamic nature of streaming data due to their reliance on processing time. In real scenarios it may not be possible to maintain a complete state and update the entire event history. More often, the unbounded nature of the stream requires a different strategy, usually some trade-off is inevitable. Usually, migration will require redesigning the existing batch SQL and introducing a time dimension to the events processing. In combination with watermark heuristics, a temporal join will provide better performance without compromising the business value of the SQL job.

Final words

In this article, we’ve explored the different types of joins available in Flink SQL. Stream enrichment jobs can leverage various joins depending on the specific scenario, and sometimes it’s beneficial to use multiple join types within a single job.

It’s essential to determine the data enrichment completeness requirements before starting implementation. Understanding which updates need to be propagated is crucial, especially since keeping the entire history of a large stream in Flink state and using regular joins may not be desirable.

The comparison of the joins presented in this article provides a foundational framework for analyzing the challenges and potential issues associated with data enrichment. Defining your business requirements and assessing the appropriate join strategy are vital steps for ensuring both optimal business value and excellent performance.

Disclaimer

All observations were made using Flink 1.16.1.* Note that in newer versions of Flink, it’s possible to configure TTL at per operator level. Different TTL values can be configured for both sides of the join using SQL hints.

Blog Author: Marek Maj — Big Data Engineer

Originally published at https://getindata.com.

--

--

GetInData | Part of Xebia TechTeam

We are Data & AI experts working with international clients, creating and leading innovative projects related to the Data & AI environment.