Building Event Driven Applications with Apache Flink, Apache Kafka and Amazon EMR — Part 1

Using Apache Flink and Apache Kafka to build responsive, data streaming pipelines — Architecture

Zain Haider
Retailo Tech
11 min readJun 24, 2022

--

Retailo Technologies is one of the fastest growing startups in the MENAP region that serves thousands of retailers in Pakistan, Saudi Arabia and the UAE. The Retailo App allows retailers to browse through thousands of products, make instant price comparisons and get them delivered the next day. Since the company’s inception in 2020, we’ve seen tremendous month-on-month growth.

Here at Retailo, we are scaling a real time event processing architecture to obtain business value from our high velocity and high volume data sources such as our delivery agent geo-coordinates. This blog post covers the architecture and the components used to implement an end to end streaming pipeline employing Apache Flink and Apache Kafka. To read about its implementation, check part 2 of this series of blogs here.

Retailo — simplifying retail through tech

Getting Started with Apache Kafka on MSK

Apache Kafka is a distributed data store optimized for ingesting and processing streaming data in real-time. Streaming data is data that is continuously generated by thousands of data sources. A streaming platform needs to handle this constant influx of data, and process the data sequentially and incrementally.

Kafka architecture is made up of topics, producers, consumers, consumer groups, clusters, brokers, partitions, replicas, leaders, and followers. The following diagram offers a simplified look at the architecture of a Kafka cluster.

Source : Kafka Architecture
  • A Kafka cluster can have 1 or more brokers and a replica is maintained on one broker.
  • Data is written on a topic by a producer, each topic can have 1 or more partitions and each partition has 1 or more replicas.
  • A topic can have 0 or more consumers, each identified by a consumer group ID.

Kafka provides two main functions to its users:

  • Publish and subscribe to streams of records.
  • Effectively store streams of records in the order in which records were generated.

Kafka is primarily used to build real-time streaming data pipelines and applications that adapt to the data streams.

Kafka Broker:

A Kafka broker is a server running in a Kafka cluster. Typically, multiple brokers work in concert to form the Kafka cluster and achieve load balancing and reliable redundancy and failover (this is explained in detail in Topic replication factor).

Bootstrap Servers:

‘Bootstrap servers’ is a comma-separated list of host and port pairs that are the addresses of the Kafka brokers in a Kafka cluster.

Kafka Topic:

A Kafka topic defines a channel through which data is streamed. Producers publish messages to topics, and consumers read messages from the topic they subscribe to. Topics organize and order messages, with particular types of messages published to particular topics. Topics are identified by unique names within a Kafka cluster, and there is no limit on the number of topics that can be created, it is important to note here that the number of topics can increase infinitesimally but the resources for the Kafka clusters have a limit and if the number of topic and consumers increase you might observe resource bottlenecks.

Kafka Partitions:

Within the Kafka cluster, topics are divided into partitions, and the partitions are replicated across brokers. From each partition, multiple consumers can read from a topic in parallel.

Topic Replication Factor:

Topic replication is essential to designing resilient and highly available Kafka deployments. When a broker goes down, topic replicas on other brokers will remain available to ensure that data remains available and that the Kafka deployment avoids failures and downtime.

A Kafka cluster with Replication Factor 2

Consumer Groups:

A Kafka consumer group includes related consumers with a common task. Kafka sends messages from partitions of a topic to consumers in the consumer group. At the time it is read, each partition is read by only a single consumer within the group. A consumer group has a unique group-id, and can run multiple processes or instances at once.

A simple example of a producer sending a message to a topic, and a consumer that is subscribed to that topic reading the message
This diagram demonstrates how producers can send messages to singular topics

Amazon Managed Streaming for Apache Kafka (Amazon MSK) is a fully managed service that provides control-plane operations, such as those for creating, updating, and deleting clusters. It lets you use Apache Kafka data-plane operations, such as those for producing and consuming data. It runs open-source versions of Apache Kafka.

Components:

  • Broker Types —

When you create an Amazon MSK cluster, you specify the type of brokers that you want it to have. Amazon MSK supports the following broker types:

a) kafka.t3.x, kafka.m5.x

  • Broker nodes — When creating an Amazon MSK cluster, you specify how many broker nodes you want Amazon MSK to create in each Availability Zone. Each Availability Zone has its own virtual private cloud (VPC) subnet.
  • Producers, consumers, and topic creators — Amazon MSK enables you to use Apache Kafka data-plane operations to create topics and to produce and consume data.
  • Cluster operations — You can use the AWS Management Console, the AWS Command Line Interface (AWS CLI), or the APIs in the SDK to perform control-plane operations.

Read more about Amazon MSK here

Getting Started with Apache Flink on EMR

Apache Flink is a pure stream computing engine with unified stream and batch processing capabilities.

The main features of Flink:

  • High throughput and low latency (milliseconds) which makes it ideal for stream processing.
  • Provides job scheduling, resource assignments, parallel processing, and reliability.
  • It is distributed and can scale horizontally.

Components:

As a software stack, Flink is a layered system. The different layers of the stack build on top of each other and raise the abstraction level of the program representations they accept:

  • The runtime layer receives a program in the form of a JobGraph. A JobGraph is a generic parallel data flow with arbitrary tasks that consume and produce data streams.
  • Both the DataStream API and the Table API generate JobGraphs through separate compilation processes. The DataSet API uses an optimizer to determine the optimal plan for the program, while the DataStream API uses a stream builder.
  • The JobGraph is executed according to a variety of deployment options available in Flink (e.g., local, remote, YARN, etc).
  • JobManager: The JobManager has a number of responsibilities related to coordinating the distributed execution of Flink Applications: it decides when to schedule the next task (or set of tasks), reacts to finished tasks or execution failures, coordinates checkpoints, and coordinates recovery on failures, among others.
  • ResourceManager: The ResourceManager is responsible for resource de-/allocation and provisioning in a Flink cluster — it manages task slots, which are the unit of resource scheduling in a Flink cluster (see TaskManagers). Flink implements multiple ResourceManagers for different environments and resource providers such as YARN, Kubernetes and standalone deployments.
  • Task Managers: The TaskManagers (also called workers) execute the tasks of a dataflow, and buffer and exchange the data streams.There must always be at least one TaskManager. The smallest unit of resource scheduling in a TaskManager is a task slot. The number of task slots in a TaskManager indicates the number of concurrent processing tasks.
  • DataStream API: A DataStream is similar to a regular Java Collection in terms of usage but is quite different in some key ways. They are immutable, meaning that once they are created you cannot add or remove elements. You can also not simply inspect the elements inside but only work on them using the DataStream API operations, which are also called transformations. You can create an initial DataStream by adding a source in a Flink program.
  • Table API: Apache Flink features two relational APIs — the Table API and SQL — for unified stream and batch processing. The Table API is a language-integrated query API for Java, Scala, and Python that allows the composition of queries from relational operators such as selection, filter, and join in a very intuitive way. The Table API and SQL interfaces integrate seamlessly with each other and Flink’s DataStream API. You can easily switch between all APIs and libraries which build upon them.
  • Transformations: Operators transform one or more DataStreams into a new DataStream. Programs can combine multiple transformations into sophisticated dataflow topologies.
  • Map: DataStream → DataStream

Takes one element and produces one element.

  • FlatMap: DataStream → DataStream

Takes one element and produces zero, one, or more elements.

Check out more about Flink operators here

  • Anatomy of a Flink Program:

Flink programs look like regular programs that transform DataStreams. Each program consists of the same basic parts:

  1. Obtain an execution environment,
  2. Load/create the initial data,
  3. Specify transformations on this data,
  4. Specify where to put the results of your computations,
  5. Trigger the program execution

Read More about Flink here

Leveraging EMR

Here’s the description provided by Amazon for the EMR service:

Amazon EMR is the industry-leading cloud big data platform for processing vast amounts of data using open source tools such as Apache Spark, Apache Hive, Apache HBase, Apache Flink, Apache Hudi, and Presto. For short-running jobs, you can spin up and spin down clusters and pay per second for the instances used. For long-running workloads, you can create highly available clusters that automatically scale to meet demand. If you have existing on-premises deployments of open source tools such as Apache Spark and Apache Hive, you can also run EMR clusters on AWS Outposts.

Official Amazon documentation to create EMR cluster with Apache Flink can be found here

Understanding clusters and nodes:

The central component of Amazon EMR is the cluster. A cluster is a collection of Amazon Elastic Compute Cloud (Amazon EC2) instances. Each instance in the cluster is called a node. Each node has a role within the cluster, referred to as the node type. Amazon EMR also installs different software components on each node type, giving each node a role in a distributed application like Apache Hadoop.

The node types in Amazon EMR are as follows:

  • Master node: A node that manages the cluster by running software components to coordinate the distribution of data and tasks among other nodes for processing. The master node tracks the status of tasks and monitors the health of the cluster. Every cluster has a master node, and it’s possible to create a single-node cluster with only the master node.
  • Core node: A node with software components that run tasks and store data in the Hadoop Distributed File System (HDFS) on your cluster. Multi-node clusters have at least one core node.
  • Task node: A node with software components that only runs tasks and does not store data in HDFS. Task nodes are optional.

Submitting work to a cluster:

When you run a cluster on Amazon EMR, you have several options as to how you specify the work that needs to be done.

  • Provide the entire definition of the work to be done in functions that you specify as steps when you create a cluster.
  • Create a long-running cluster and use the Amazon EMR console, the Amazon EMR API, or the AWS CLI to submit steps, which may contain one or more jobs. For more information, see Submit work to a cluster.
  • Create a cluster, connect to the master node and other nodes as required using SSH, and use the interfaces that the installed applications provide to perform tasks and submit queries, either scripted or interactively. For more information, see the Amazon EMR Release Guide.

Processing data:

When you launch your cluster, you choose the frameworks and applications to install for your data processing needs. To process data in your Amazon EMR cluster, you can submit jobs or queries directly to installed applications, or you can run steps in the cluster.

Submitting jobs directly to applications:

You can submit jobs and interact directly with the software that is installed in your Amazon EMR cluster. To do this, you typically connect to the master node over a secure connection and access the interfaces and tools that are available for the software that runs directly on your cluster.

Running steps to process data:

You can submit one or more ordered steps to an Amazon EMR cluster. Each step is a unit of work that contains instructions to manipulate data for processing by software installed on the cluster.

Steps are run in the following sequence:

  1. A request is submitted to begin processing steps.
  2. The state of all steps is set to PENDING.
  3. When the first step in the sequence starts, state changes to RUNNING. The other steps remain in the PENDING state.
  4. After the first step completes, its state changes to COMPLETED.
  5. The next step in the sequence starts, and its state changes to RUNNING. When it completes, its state changes to COMPLETED.
  6. This pattern repeats for each step until they all complete and processing ends.

Cluster Lifecycle:

Running a job on Flink cluster — EMR:

Flink can be deployed through different Resource Provider Frameworks, such as Kubernetes or YARN. These frameworks manage containers and resource allocation.

Flink supports a set of deployment resource providers, you can check them out here.

Some of them would be:

  • Standalone
  • Native Kubernetes
  • Yarn
  • Mesos (deprecated)

Flink supports a set of deployment modes which can be found here

  • Application Mode (For production environments)
  • Per-job mode
  • Session Mode

References:

--

--

Zain Haider
Retailo Tech

Data Engineer at Retailo Technologies | Ex IBMer | Learning new things everyday