Apache Flink Getting Started — Stream Processing

M Haseeb Asif
Big Data Processing
6 min readMay 4, 2022

It is the third part in the series of apache Flink getting started, where we will familiarize ourselves with Stream processing. Earlier, we had an overview of the Apache Flink and wrote some jobs using batch processing.

We have two types of data sources, bounded and unbounded. Bounded data sources have boundaries, and we know when the data will be completely available. Batch processing is used to process the bounded data sources.

On the contrary, unbounded datasets don’t have boundaries, and one cannot say when the data will be completely available. It’s like a stream of water that is continuously running. Batch processing cannot process such data sources. Hence, stream processing comes into the picture and introduces a couple of different semantics used to tackle or produce analytics from the continuously running stream of data.

Fig. 1. Bounded vs unbounded stream

An example is IoT devices where sensors are continuously sending the data. We need to monitor and analyze the behavior of the devices to see if all the functionalities are working fine. Other examples can be fraud detection for all bank transactions and stock price analysis. We cannot wait until we have all the data available in all these cases, but we need to perform the real-time analysis and produce results in real-time.

The application structure will be similar to what we have seen in the batch processing, but sources will be unbounded. They will continuously stream the data, such as Kafka, Kinesis, or Twitter. Each program generally follows the following flow.

  1. Get the execution environment
  2. Source — Read the data from the data source, file, messaging system, database
  3. Operators — Apply transformations on the data such as map, filter, flat map, KeyBy, etc.
  4. Sink — Store (or print) the results of the computations

We will start our application by getting the reference to the stream execution enviroment StreamExecutionEvnironment. It is different than batch execution environment ExecutionEnvironment .We will use the same project created in the first article, or you can follow the examples from the Github.

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

We can do different configurations using the env variable but it is not recommded to do the configuration in the runtime code.Instead you should set it using the command-line when submitting the application or through the config files. Keeping the application code configuration-free allows for more flexibility as the same application can be executed anywhere, in any of the deployment mode.

Streaming applications use the DataStream API, which combines both stream and batch capabilities. It is similar to a Java collection in usage, but it’s distributed under the hood. It is immutable, and each transformation (operation) creates a new data stream. A data stream can be made by reading the data from different sources.

Source

Once we have the stream execution environment and all the configurations are done, we read the data using different sources offered by Apache Flink. There are broadly four categories — File-based, Socket-based, Collection-based, and custom sources. Some of them are as follows.

  • readTextFile: Read texts files line-by-line and return them as strings
  • socketTextStream: Reads from a socket and each input is treated as a separate element
  • fromCollection/fromElements: creates a data stream from the given elements or a java collection
  • addSource: attach a new source function to read from another system such as Apache Kafka. Flink offers connectors to read from different systems.

Here are the examples of reading the data from different sources

DataStream<Integer> integerDataStream = env.fromElements(1,2,3,4,5,6);DataStream<String> socketTextStream = env.socketTextStream("localhost", 9999);//Read the twitter keys from config file, environment variable or ...  Properties twitterCredentials = getTwitterCredentials();  DataStream<String> tweetStream = env.addSource(new TwitterSource(twitterCredentials));

Transformations

Once we have read the data in the DataStream, it is time to do the data processing or apply the business logic. Flink offers most of the well know transformations. Some of the most commonly used transformations are as follows.

  • map: Perform the desired operation on each element in the stream
  • filter: returns only elements that meet the criteria
  • flatMap: returns zero or more elements after performing the desired operation
  • Union: combine elements from two streams
  • Reduce: a reduced function combines the current element of the stream with the last reduced values and returns the new value.

Sink

It is essential to discuss where to store the output or the results. Apache Flink supports various sinks which consume DataStream and forward them to files, sockets, and external systems or print them. Some of the supported sinks are as follows.

  • print: it does print each element onto the console. If we have more parallelism higher than 1, the output will be prepended with the task identifier.
  • writeAsText: this will create multiple output files for each task depending on the configured parallelism for the Flink.
  • writeAsCsv: Writes the tuples as comma-separated values. Row and field delimiters are configurable.
  • addSink: It is used to call a custom sink function of connectors provided by the Flink, such as Apache Kafka.

Twitter Streaming Application

Using a real-life case study while learning a new skill is constructive. So, we will take Twitter data as the streaming data and perform different streaming operations. We already have a post about it that you can follow to start and develop a working Twitter application.

Windowing

Flink allows splitting the infinite data stream into finite blocks, called windows, to compute the results at regular intervals. But why do we need windows? One can execute most functions against individual elements, but aggregations functions need to be executed on a bounded set of data. For example, one cannot calculate the minimum temperature of an IoT sensor throughout a stream because data never stops coming. So we take the stream, cut it into finite blocks (windows) of one minute, and compute the aggregate functions (minimum temperature) against those one-minute windows.

We have a detailed article about windowing and its types.

Time Notions

Flink is processing unbounded data in real-time hence it is essential to understand the different time notions it uses for data processing — Event time, ingestion time, processing time. Furthermore, since it is a distributed system and messages go through other mediums having different times and different network latency, it can cause the message to come out of orders. Having said that, Whenever we operate such as windowing, Flinks needs to consider the time of events to determine which events should be accounted for to select the group of elements.

  • Processing time — time of a particular machine that will process the specific element. It could be different than the actual event time when the event is generated, and the order of events could be separate because of the network failures and delays.
  • Event time — It is the time when an event is generated at the source. It’s the actual time of an event.
  • Ingestion time — It is a time when the Flink receives an event for processing. It could be more reliable than the processing time since all the operators will see the same timestamp for the individual event.

The default time is processing time, but you can change the time using the env variable.

env.setStreamTimeCharacteristic(TimeCharacteristic.____);

State Management

The streaming application can be stateful or stateless. In most cases, you can process application stream elements independently from each other, but some cases require managing state, referred to as stateful stream processing. For example, if we monitor the average running temperature of an IoT sensor, we need to store some values in the state. Also, the state is required to support Flink’s fault-tolerance behavior.

There are two types of states — Operator state and Keyed state. The operator state is related to a single operator, while Keyed state is shared across a keyed stream. Keyed states support different data structures to store the state values — ValueSate, ListSate, MapState, ReducingState.

The state can be used with any of the transformations, but we have to use the Rich version of the functions such as RichFlatMapFunction because it provides additional methods used to set up the state.

Finally, It is good to read about the concepts, but more critical is to implement them hands-on to understand them better. So please check a couple of example projects on my GitHub and try to implement some use-cases based on these projects or your own.

--

--

M Haseeb Asif
Big Data Processing

Technical writer, teacher and passionate data engineer. Love to talk, write and code with Apache Spark, Flink or anything related to data