Apache Flink: Introduction to the DataStream API

Benjamin Barrett
5 min readJul 8, 2024

--

I recently attended the Kafka Summit in London, and right from the get-go, it was obvious that Flink was a big deal. Confluent is pushing for Flink SQL to replace KSQL, and through the grapevine, there was a lot of talk of Flink competing with KStreams. Since I consider myself a KStreams developer, I had to figure out what this was all about with a use case.

Flink APIs

Apache Flink offers several APIs that cater to different use cases and levels of abstraction, enabling users to build and deploy stream processing and batch processing applications. Here are a couple of APIs provided by Flink:

DataStream

DataStream applications are programs that implement transformations on data streams. These streams can be created from various sources, such as message queues, sockets streams, files and of course Kafka.

Its intended purpose is to process data continuously and potentially unbounded. However, keep in mind that you’ll have to manage state carefully or you might end up with memory issues.

Tables

The Table API bridges the gap between relational databases and data processing pipelines, allowing users to perform complex operations on datasets with minimal code. It supports both static data (for example, an Excel file) and dynamic data (like a Kafka topic).

SQL

The SQL API is similar to the Tables API, but its statements are expressed in the SQL language. This is currently the only API that Confluent supplies.

Use case: Person data

In this post, I’ll cover a use case where we have two topics of data representing people: one from an analytics system, and the other from a CMS (content management system). The goal of the job will be to standardize the output of the 2 inputs into a single Kafka topic.

Since I approached this use case through the lens of a KStreams developer, I have chosen to tackle this scenario using DataStream API.

Project code

The entire project can be viewed in this GitHub repository. However, I will only be covering code from inside the PersonImporterJob class.

StreamExecutionEnvironment

This component serves as the main entry point for defining and executing stream processing applications. It’s where you will set up your program, define your data sources and define execution parameters of the resulting Flink job.

var env  = StreamExecutionEnvironment.getExecutionEnvironment(); 

DataStream

DataStream is the abstraction that wraps a source and represents an unbounded stream of data. Sources can cover anything: files, databases, … All that is required is to include the relevant dependency, and a connection can be made.

For this use case, we will use a KafkaSource.

var cmsSource = KafkaSource.<CmsPerson>builder() 
.setProperties(PropertiesLoader.loadKafkaProperties())
.setTopics(Topics.TOPIC_CMS_PERSON)
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new JsonDeserializationSchema<>(CmsPerson.class))
.build();

var cmsStream = env.fromSource(
cmsSource,
WatermarkStrategy.noWatermarks(),
"cms_person_source");

Transformations

For anyone familiar with KStreams, we can see similar requirements here. The properties include the configuration for the consumer, and we need to declare source topics, the starting offset, and the value deserializer.

Once we have determined the source, we can use the StreamExecutionEnvironment to create a DataStream.

Sink

Now that we have defined our sources, all that remains is to join the data, and apply any transformation that is required.

var personStream = cmsPerson.connect(analyticsPerson)                
.map(new CoMapFunction<CmsPerson, AnalyticsPerson, Person>() {

@Override
public Person map1(CmsPerson value) throws Exception {
return value.toPerson();
}

@Override
public Person map2(AnalyticsPerson value) throws Exception {
return value.toPerson();
}

})
.filter(PersonImporterJob::isAnAdult);

In this case, the two input DataStreams are connected and mapped to the same output type. As shown in the code snippet above, the DataStream interface supports a filter operation. Other functions that are available in KStreams such as map, flatMap, reduce, … are also available in DataStreams.

For a more comprehensive list of features, see this overview.

Sink

In a similar fashion to sources, sinks can be connected to all kinds of technologies, but for this use case we will use a KafkaSink.

var serializer = KafkaRecordSerializationSchema.<Person>builder() 
.setTopic(Topics.TOPIC_PERSON)
.setValueSerializationSchema(new JsonSerializationSchema<>())
.build();

var sink = KafkaSink.<Person>builder()
.setKafkaProducerConfig(PropertiesLoader.loadKafkaProperties())
.setRecordSerializer(serializer)
.build();

personStream.sinkTo(sink);

Again, we need to define all the properties required to connect to Kafka, and once the sink is configured, data from the personStream can be sent to it.

All that’s left now is executing the job.

env.execute("person_importer"); 

Running the application

The PersonImporterJob main method can be run from the local machine just fine. Make sure that the environment variables defined in the kafka.properties file are filled in, and the application will execute the job and connect to the configured Kafka cluster.

If you want to generate input data for the PersonImporterJob, the DataGenJob can be executed, and sample data will be pumped into Kafka. Once up and running, any person data will be posted on the person topic.

I suggest experimenting with various streaming operations and seeing what Apache Flink has to offer.

What’s next?

We now have our Flink application, but how do we deploy it? To fully benefit from the elasticity and high availability that Apache Flink supports, we will need to set up a Flink cluster with job/task managers to execute our jobs. But that’s a topic for another day.

Should we use the DataStreams API over KStreams?

Why should we use Flink’s DataStream API if KStreams can do this as well? I’ve only recently explored the DataStream API, so I don’t want to jump the gun and make grand statements that we should exclusively use either of the technologies.

I do have a small list of pros and cons for both technologies, but these findings are not set in stone and I’m very curious to how I will view these statements in the future.

KStreams

The main pro I consider for KStreams is the deployment stack. If we have our application and we want to add another process, all we need to do is increase the number of replicas and we don’t have to worry about anything other than making sure CPU & memory resources are available.

The biggest con for KStreams is that a topology can get really messy, really fast. The amount of internal topics required for state and co-partitioning can skyrocket for even simple streaming applications, resulting in increased cost and complexity. Even for seasoned developers, complex topologies aren’t easy to grasp, let alone maintain and update.

Apache Flink/DataStream API

As stated before, I’m very new to Apache Flink. At first glance, it seems to me that the DataStream API supports more streaming operations than KStreams.

The way that the Apache Flink manages state is through snapshots which are stored on the volume. When using Kafka, that means we won’t be cluttering our cluster with internal topics, which is a nice pro.

A con for Apache Flink is that we need a Flink Cluster to deploy our jobs. However, there are different deployment modes available (session and application), and depending on which solution you go for, some of the difficulties could be minimized.

Seriously though, what should we use?

Personally, I will keep exploring what the DataStream API has to offer, as it does seem like a very powerful tool for streaming operations.

In the meantime, I will use whatever technology an employer requires me to use, before advocating for either of the two technologies.

Want to join our team of event-streaming afficionados? Check out our jobs!

--

--