How Flink Sources Work and How to Implement One

Selim Abidin
6 min readAug 28, 2022

--

Apache Flink is a real-time (or close) stream and batch processing framework, and I will explain Flink Source logic and give tips for Flink and writing a custom source, as well as some necessary concepts in Flink. Also, you can see find the source code link at the bottom of the article.

Checkpointing

Checkpointing is a failover strategy in Flink. If enabled, Flink will save the whole state every X time and keep it somewhere like RocksDB and HDFS. Besides saving the state, Sources like KafkaSource or BulkWriters use checkpointing for committing. For example, a KafkaSource reads a Kafka stream and sends it downstream. It does not commit the offset until Flink completes checkpointing to ensure the data is read and successfully processed.

Source Functions

Flink has legacy polymorphic SourceFunction and RichSourceFunction interfaces that help you create simple non-parallel and parallel sources. You implement a run method and collect input data. Source functions are unbounded(endless sources), and most days, it will save the day.

A rich source function that supports parallelism

As the example states, ten lines of code are enough to create a source with Source Functions, but the article's main topic is Flink Source, and we will target Flink Sources rather than legacy solutions.

Flink Source Implementation

A Flink Source has three main components. SplitEnumerator, SourceReader, and Split. Besides them, you also need a serializer for serializing states and splits for messaging and state-saving purposes.

Split is the smallest part of a source. It holds a piece of information or data to give directives to SourceReaders. A file path, a range, or everything that can be serializable can be Split. I recommend you keep splits clean and small.

SourceReader is the worker of a Source. Your application needs to read files, listen to an HTTP service, or generate some data, which SourceReader does! SourceReaders do the job with Splits directives.

SplitEnumerator is the brain of a Source. It distributes the tasks(Splits) to SourceReaders, keeps stats, and tracks SourceReaders. It runs as one instance.

Source has all the creator methods for running a source.

Split Assignment

The “createEnumerator” method is the first one called in your source after serializer methods, and then Flink apps create readers.

A straightforward source would not need a brain, so readers would know their tasks and start producing data immediately, but the real world does not work like this. Readers are just actors running in parallel; for this reason, someone should organize their tasks to avoid duplicate or out-of-order processing.

Let’s create a source that produces sequential Integer data with three parallelisms. Since it is sequential, the first reader starts producing data from 0 to 1000, but the second and third readers don’t know where to start. If they all start from 0, all three readers will produce duplicate data. For this example, we can keep a state for the current value and increase it on every split assignment. Each reader request to enumerator to get a task or split in Flink jargon. (Figure-1)

SourceReaders request split from SplitEnumerator (Figure-1)
SourceReaders request split from SplitEnumerator (Figure-1)

The enumerator receives a source split request and starts distributing splits by task ids. Ower of the first split request gets a split with 0–1000 range, and the enumerator increases state to 1000. The second request gets 1000–2000, and the third one is 2000–3000. The enumerator's state completes the first round with 3000. (Figure-2)

Enumerator assigns splits to readers and increases state. (Figure-2)
Enumerator assigns splits to readers and increases state. (Figure-2)

Splits are sent between readers and the enumerators. Source splits are sent between the enumerator and readers. Splits are also used for state saving during checkpointing and snapshots. For those reasons, splits should be serializable.

SourceReaders

A SourceReader takes a SourceSplit and runs accordingly. The main processes take place in the “pollNext” method. It takes ReaderOutput as a parameter and returns the “InputStatus” enum, and InputStatus specifies the following strategy.
If the method returns NOTHING_AVAILABLE, Flink checks the reader’s availability. The “pollNext” method will be called again even though data is unavailable, so you are expected to build the logic. The happy path for NOTHING_AVAILABLE state would be requesting a split and letting the reader know is available for the next round.

By receiving a split, the “poolNext” method may start producing data. The Splits have “from” and “until” properties. We can use the “from” property as the current value and increase it in a loop to emit produced. A long-running method will block checkpointing, so emitting should be done on every “poolNext” method call and return MORE_AVAILABLE since we still have data.

A Working Source With Issues

So far, I have tried to explain how the source is working. You can find a working code example in my Github repo written as I describe above. Although the source works, it does not carry the entire features you would expect from a modern, stable source.

Since we are generating sequential Int, it is not very easy to have an error but let’s throw an artificial error in poolNext when the current produced value is 5. The current value state is held in the Source Reader.

In the Figure-3 image, the reader requests a split from the enumerator and receives a split that suggests producing integers from 0 until 1000.
The reader starts generating, and we throw an error since the current value is 5. Meanwhile, every generated number is already printed until the error. What would be the following number?

An error occurred in the reader (Figure-3)
An error occurred in the reader (Figure-3)

When an error occurs in the reader, the Flink job terminates the current source reader, creates a new source reader, and returns the split to the enumerator by calling the “addSplitsBack” method with the split list and task id. If adding a split back strategy is not handled, the new reader will request a new split and get it. (Figure-4)

The new reader is initialized (Figure-4)
The new reader is initialized (Figure-4)

Not having a returning split strategy will cause to loss of numbers between 5–1000. Since we had an enumerator state earlier, we can add returned splits in the enumerator state as dead splits, and then we can use dead splits first when a split is requested. The source reader holds the current value, and the value is lost when the source reader is terminated on error. Splits are returned to the enumerator on failure. Therefore, we can keep the current value in splits in case of failure and will be reshared again, but there is a problem. Flink uses the split from the latest checkpointing, which means it will continue printing the same values from 0 to 5 over and over until the checkpoint completes with successful snapshots. After the checkpoint, it will save the current split in the reader, but it is not always five, or we did not overcome the failure. Checkpointing saves the current state of the split, which means the current number can be 1 or 2. (Figure-5)

Checkpointing happens just after emitting “one” (Figure-5)
Checkpointing happens just after emitting “one” (Figure-5)

A retry implementation would make sense if you don’t need real-time logic and believe the failure will go away for some reason. A retry strategy in the reader would make sense, but sometimes by silencing the failure and increasing error metrics, logging current states and data would make more sense.

Full Source Codes

You can find the complete source code in SelimAbidin/CustomFlinkSource: Flink Source Example (github.com).

If you have a question, you can leave a comment here or reach me on Twitter (@SelimAbidin)

--

--