Are You Still Not Processing Data Streams with Flink?
I recently started learning stream processing with Apache Flink. I already worked with Apache Spark and Hadoop (tools for big data processing) so I expected I’ll quickly pick up with Flink.
To my surprise, Flink’s Getting Started Guide is really poorly written. IMO the most effective way of learning a new tool is by following simple exercises which hopefully give you a sense of the main concepts.
Flink’s official Getting Started Guide ends at the exact spot on which it should present the main concepts in detail.
I’ve spent a considerable amount of time solving the first exercise. This is also why I’ve decided to write this article so you don’t have to do the same.
By reading this article, you’ll learn:
- What is Apache Flink?
- A solution to Fraud Detection exercise in Getting Started guide
- How to use states in Flink?
Here are few links that might interest you:
- Free skill tests for Data Scientists & Machine Learning Engineers- Intro to Machine Learning with PyTorch- Correlation Analysis 101 in Python
Some of the links above are affiliate links and if you go through them to make a purchase I’ll earn a commission. Keep in mind that I link courses because of their quality and not because of the commission I receive from your purchases.
Meet Apache Flink
Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams. Flink has been designed to run in all common cluster environments, perform computations at in-memory speed and at any scale.
See, What is Apache Flink? — Architecture to learn more about it.
Why should you learn Flink?
Many big companies (like Uber and Airbnb) use it for stream processing. I also expect that stream processing will become mainstream in the future, because of real-time data from various sensors (which are getting cheaper).
A Fraud Detection App
A Fraud Detection App is the first official exercise when you get started with Flink. The app should output an alert for any account that makes a small transaction immediately followed by a large one. Where small is anything less than $1.00 and large is more than $500.
I couldn’t solve this exercise by myself. After spending an hour or two googling, I found Flink official document notes 07 blog from ProgrammerSought, which helped me to solve the exercise. The reason I wrote this article is also that the mentioned blog post is unorganized and hard to follow.
In each section below I share my thoughts and explain the solution.
Initialize the project
I suggest you initialize the Fraud Detection App with maven:
$ mvn archetype:generate \
I’m using Intellij to open the project. It sets everything automatically.
In case you would like to jump right to the solution, you can clone my flink-fraud-detection project.
In case you get Apache Flink: java.lang.NoClassDefFoundError when you run the project, you simply need to change the scope of Flink dependency from provided to compile in pom.xml file:
Developing a Fraud Detection App with Flink
The description of the first exercise is simple:
The fraud detector should output an alert for any account that immediately conducts a small transaction and then conducts a large transaction.
Suppose you get the following transaction flow for a specific account:
$0.01 -> $20 -> $800
This is not a fraud, as a large amount doesn’t precede a small amount.
We would like to find accounts that have a similar sequence of transactions:
$0.01 -> $800
My (wrong) solution
My initial expectation was that Flink will route data for a certain account to a specific task because I used key by account ID. But that wasn’t the case. Multiple accounts were routed to the same task.
Flink memorize information with states
The solution should remember the information between events. A large transaction is only considered fraud if the previous transaction is small in size.
We can achieve that by using KeyedProcessFunction, which memorizes information across events (it keeps the state).
How NOT to use state with Flink?
I implemented the most straightforward implementation, which is to set a Boolean flag when handling small transactions. When a large transaction is completed, I simply did a check whether a flag is set for the account.
However, simply implementing the flag as a member variable in the FraudDetector class doesn’t work.
Flink uses the same task of FraudDetector for transactions of multiple accounts, which means that if accounts A and B are routed through the same instance, a transaction account can set the flag to true, then trading account B can trigger a false alarm.
One solution would be to use a data structure like Map to track account key, but we would lose fault-tolerance, which we get from Flink out-of-the-box. If a failure occurs, all its information would be lost.
Therefore, if we restart the application to recover from the failure, the fraud detector may miss the alert.
How to use state with Flink?
In order to solve the problems above, Flink provides primitives for fault-tolerant states, which are almost as easy to use as regular member variables.
The most basic state type in Flink is ValueState, which is a data type that can add fault tolerance to any variable it wraps.
ValueState is a form of keying state, which means that it is only available in the context of keying context operators (eg. when we use the account’s key in the keyBy function).
Flink automatically limits the keying state of the operator to the key of the currently processed record.
Flink maintains an independent state for each account as the key is the account id of the current transaction (declared by keyBy()).
How to impelement state with Flink?
ValueState is created using ValueStateDescriptor, which contains metadata about how Flink should manage variables.
The state should be registered before the function starts processing data.
You can register the state with the open() method as shown in the example below:
You can interact with the ValueState content by using:
- update to set the value,
- value to get the current value
- clear to delete the content.
When you start the application (or after calling clear), the ValueState value will return null.
Be sure to perform ValueState’s updates with the update method as the system cannot track modifications made by direct assigment.
Flink automatically manages fault tolerance of Value State behind the scenes, so you can interact with it like any standard variable.
Take a look at the code below, which shows how to use flag status to track potential fraudulent transactions.
First, we fetch the flagState value, which can be null or true in our example. But more generally, it can have 3 values: null, true or false.
When the last transaction is not null, it means that the previous transaction for the current account was small.
Then we check if the current transaction is large and output an alert if it is. Remember, a small transaction followed by a large one signifies fraud. After this, we also clear the flag.
In the next code block, we check if the transaction amount is small and we set the flag in that case.
Flink has many useful concepts, which may seem hard to grasp at first. IMO this is because most of us are used to batch processing.
One of such concepts in Flink is timers, which enables us to trigger an action in the future. To keep this article short, I’m going to describe timers in the next article.