Apache Flink Getting Started — Batch Processing

M Haseeb Asif
Big Data Processing
6 min readApr 23, 2022

This is the second article in the series of Getting Started with Apache Flink. We will explore the batch processing first as it has a lot of similarities with the database development and is easy to get started. Once we know the basic semantics of the batch processing, we will move to our third part of the series, which is stream processing.

It is always more robust to use some real-life case study and do hands-on while learning a skill. So, we will use a real-life dataset of movies ratings, do some aggregations and run queries against it. Then, we will see how we can read the data, perform different transformations, use some of the Flinks features, and store or output the results.

Following is your application’s usual architecture when dealing with the bounded datasets where we have all the input data available for jobs to be executed. Once a job is finished, all the outputs are available for users.

Image 1. Input > processing -> output

In the earlier version of the Flink, Batch and streaming were different APIs. Datasets APIs were used for batch while DataStream was used for Streaming applications. Starting Flink 1.12, the DataSet API has been soft deprecated as Apache Flink has unified the batch and streaming APIs, and DataStream API can be used to develop applications.

You can configure the execution behavior of your application to batch, stream, or automatic. The default mode is used for the classic streaming use-case, while the batch mode is used for batch-style execution on the dataStream API. Automatic mode lets the system decide based on the boundedness of the data sources.

We will discuss our examples using Dataset APIs for legacy reasons. However, some of the production applications are still using the DataSet API, and it will be helpful for developers starting now to maintain the legacy applications.

Dataset is a high-level abstraction for collecting distributed data on various compute nodes. It contains elements of the same type, and it is immutable.

All the Flinks programs need to have an execution environment. We use it to read the data, do configurations and start the application execution. As we are using IDE to run the program, it will create a full-fledge cluster to execute the application, but it can use the resources from an existing environment. You can follow along with the article using the project in the first article or code example from GitHub.

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

We can do different configurations using the env variable but it is not recommded to do the configuration in the runtime code but instead set it using the command-line when submitting the application or through the config files. Keeping the application code configuration-free allows for more flexibility as the same application can be executed in any execution mode.

Source

Once we have the execution environment and all the configurations are completed, we will read the data. DataSet API has the following options to read the data

  • readCsvFile: Read a file in CSV format
  • readTextFile: Read string data from a text file
  • readFileOfPrimitives: Parses files with a char sequence delimited primitive data types
  • fromElements: Creates a data set from the given sequence of objects of same type
  • fromCollection: Creates a data set from a Java.util.Collection.
  • readFile(inputFormat, file): Read file using a custom format type
  • createInput(inputFormat): Read data from a custom data source

Transformations

Once we have read the data in the DataSet, it is time to do the data processing or apply the business logic. Flink offers most of the well know transformations. Some of the most commonly used transformations are as follows

  • Map: applies the specific operation on every element and returns exactly one output
  • Filter: Filters the specific elements based on certain conditions and leaves some elements
  • FlatMap: Flatmap is similar to the map operator but can return zero or more elements
  • Min/Max: As the name suggests, min and max returns the min or max element from the collection
  • Distinct: returns the distinct element from the collection
  • Join is used to combine two datasets, similar to SQL join.
  • GroupBy: groupBy is used for aggregation of the dataset

Sink

It is essential to talk about where to store the output or sink the results before we get into the details of how we will do the processing. Apache Flink supports various sinks

  • print: it does print the each element onto the console. If we have more parallelish higher than 1, output will be prepended with the task identifier as well.
  • writeAsText: this will create multiple output files for each of the task depending on the configured parallelism for the Flink.
  • writeAsCsv: Writes the tuples as comma separated values. Row and field delimiters are configurable.
  • addSink:It is used to call a custom sink function or connectors provided by the Flink such as Apache Kafka.

Movies Data processing

Our actual data, movies data set, contain multiple files, but we will use two CSV data files, movies, and ratings. Movies files contain movieId, title and movie genres. Rating file have userId, movieId, rating, and timestamp.

So, first, we will read the movies CSV files using readCSVFile, then parse each line of the file into a POJO before applying a different transformation. You can find the Movie POJO on GitHub with attributes of name, id, and list of genres.

//read the data from the source
DataSet<Tuple3<Long, String, String>> datafile =
env.readCsvFile("src/main/resources/movies.csv")
.ignoreFirstLine()
.ignoreInvalidLines()
.types(Long.class, String.class, String.class);

//parse the data into a pojo
DataSet<Movie> movies =
datafile.map(longStringStringTuple3 -> new Movie(longStringStringTuple3.f0,
longStringStringTuple3.f1,
Arrays.asList(longStringStringTuple3.f2.split("|"))));

Now, we have the list of movies, and we can apply a different transformation or business logic to it. Let’s say we want to see all the movies in the drama genre. Then, we will use the filter to find out drama movies. Finally, we will sink the results to the console (only using the console for the demo purpose).

//filter the movies with specific genere
DataSet<Movie> dramaMovies =
movies.filter(movie -> movie.genres.contains("Drama"));
dramaMovies.print();

Once you run the application, you should see all the movies in the console.
Congratulations on writing your first Flink application. Now, we have a running application. We will use different transformations from more complex data processing scenarios.

Let’s take a complex example where we want to find out which movies genre has a better rating on average. We need to join both movies and rating datasets and then aggregate the ratings to see which highly rated genre.

So, we will read the rating datasets similarly as well. We will ignore the timestamp for now as it is not relevant to our examples. Following is the join code for joining and then grouping both datasets.

movies.join(ratings)
.where(0)
.equalTo(1)
.with(new JoinFunction<Tuple3<Long, String, String>,
Tuple3<Long, Long, Double>,
Tuple3<String, String, Double>>() {
@Override
public Tuple3<String, String, Double> join(Tuple3<Long, String, String> movies,
Tuple3<Long, Long, Double> ratings) throws Exception {
return new Tuple3<>(movies.f1, movies.f2.split("\\|")[0],ratings.f2);
}
})
//.print();
.groupBy(1)
.reduceGroup(new GroupReduceFunction<Tuple3<String, String, Double>, Tuple2<String, Double>>() {
@Override
public void reduce(Iterable<Tuple3<String, String, Double>> iterable,
Collector<Tuple2<String, Double>> collector) throws Exception {
Double totalRating = 0.0;
int totalCount=0;
String genre = "";
for (Tuple3<String, String, Double> rating: iterable) {
totalRating += rating.f2;
totalCount +=1;
genre = rating.f1;
}
collector.collect(new Tuple2<>(genre, totalRating/totalCount));

We start merging movies data with the rating data using the different column keys. There are various ways to define a key. First, we used the tuple index to join the datasets with the movie id. Then using the with operator, we described a new join type as an output. We can print and see our results at this stage as we have new tuples showing each movie, its genre (one of them), and the ratings.

Next, we are grouping the movies based on their genres and using the reduce function to aggregate the results for the whole group to calculate the average rating for a specific genre.

Now, you know how to write an Apache Flink program. I want you to play with this dataset and write different Flink jobs such as Top 10 movies by rating or in a specific genre.

This article introduced batch processing using the Apache Flink in our series of getting started with Apache Flink. The following piece is going to be about stream processing in Apache Flink.

--

--

M Haseeb Asif
Big Data Processing

Technical writer, teacher and passionate data engineer. Love to talk, write and code with Apache Spark, Flink or anything related to data