Building Event Driven Applications with Apache Flink, Apache Kafka and Amazon EMR — Part 2

Using Apache Flink and Apache Kafka to build responsive, data streaming pipelines — Implementation

Zain Haider
Retailo Tech
8 min readJun 24, 2022

--

This blog post covers the implementation for an end to end streaming pipeline employing Apache Flink, Apache Kafka and Amazon Kinesis. To read about its architecture and technology stack in detail, check part 1 of this series of blogs here.

This article assumes you have working knowledge of Kinesis Data Streams, Kinesis Firehose and Amazon DMS.

Setting up a MSK Cluster

Amazon MSK makes it easy to ingest and process streaming data in real time with fully managed Apache Kafka
  • Sign in to the AWS Management Console, and open the Amazon MSK console.
  • Choose Create cluster.
  • For Cluster name, enter MSKTutorialCluster
  • From the table under All cluster settings, copy the values of the following settings and save them because you need them later in this tutorial:

a) VPC

b) Subnets

c) Security groups associated with VPC

For the purposes of this tutorial we will be going with authentication type as Plaintext. These settings are not configurable once the cluster has been created so configure them as per your organization’s requirements. For further clarity around creation of MSK Cluster check here.

Setting up a Client Machine

  • Open the Amazon EC2 console, launch an instance of type Amazon Linux 2 AMI (HVM)
  • Configure the VPC of your client machine to be the same as the one you set up for your MSK Cluster
  • Choose Review and Launch, and then choose Launch.
  • Create a new key pair.
  • Choose View Instances. Then, in the Security Groups column, choose the security group that is associated with this instance.
  • Open the Amazon VPC console.
  • In the navigation pane, choose Security Groups. Find the security group of your MSK Cluster.
  • In the Inbound Rules tab, choose Edit inbound rules.
  • Add a new rule, choose All traffic in the Type column. In the second field in the Source column, select the security group of the client machine and save the rules.

For more clarity around setting up a client instance check here.

To create a topic on the client machine

  • SSH into the client instance you created in the previous step using the key pair.
  • Install Java on the client machine by running the following command:

sudo yum install java-1.8.0

  • Run the following command to download Apache Kafka. (For this tutorial we will be using Kafka Version 2.6.2.
  • Run the following command in the directory where you downloaded the TAR file in the previous step.

tar -xzf kafka_<kafka_version>.tgz

  • Go to the kafka_<version_of_kafka> directory.
  • Go to your Amazon MSK Console and copy the bootstrap server connection string lister under View Client Information.
  • If the command succeeds, you see the following message: Created topic <Topic Name> .

Bonus Tip !

If you see connectivity issues between client machine and MSK cluster, re-check the security groups settings for both. Add IP range for client to the incoming traffic section in MSK cluster’s security group and vice versa.

Writing to the Kafka topic

  • We will be feeding data to the Kafka topic using a DMS task which will be loading data from a document db.
  • Set up a source DMS endpoint pointing to the document db.
  • Set up a target DMS endpoint pointing to your MSK Cluster and Kafka topic.
  • Add in the bootstrap connection string and topic name.
  • In the endpoint settings section add in authentication mechanism if you have configured any, since we have configured plaintext authentication we will skip this step.
  • Create the DMS replication task and make sure replication instance is within the same VPC as your MSK Cluster.
  • Structure of data in source db:

Bonus Tip !

You can configure the size of a single message as per your requirement here. In order to accommodate documents of size ~ 20 MB we will be configuring ‘messagemaxbytes’ field to be 20MB which is 1MB by default, some other configurations can be found here.

If you have configured a custom message max byte size you will also need to run the following command on your client machine to configure message max bytes size on the Kafka topic.

In conjunction with that you will also need to set up a cluster configuration, to do that go to your MSK console, select cluster configuration and edit the following fields:

replica.fetch.max.bytes
replica.fetch.response.max.bytes

Creating an EMR Cluster with Apache Flink

  • Open the Amazon EMR console.
  • Choose Create cluster, Go to advanced options.
  • Choose EMR Release emr-5.1.0 or later.
  • Choose Flink as an application, along with any others to install.
  • Select other options as necessary and choose Create cluster.

Some other frameworks that can be configured are :

  • Hive
  • Spark
  • Pig
  • Zeppelin

Details can be found here.

Official Amazon documentation to create EMR cluster with Apache Flink can be found here.

Configuring your EMR Cluster

  • Access EMR through ssh tunnel
  • Locate Flink installation path (Containing executable bash scripts)
  • Setting up Maven
  • Navigate to /usr/lib/flink/conf/flink-conf.yaml

You can modify configurations for your Flink cluster in this file as per your production needs.

Some of the important Flink Configurations for production readiness are:

  • jobmanager.memory.process.size: this accounts for all memory usage within the JobManager process, including JVM metaspace and other overhead.
  • taskmanager.memory.process.size: this accounts for all memory usage within the TaskManager process, including JVM metaspace and other overhead.
  • taskmanager.numberOfTaskSlots: The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.

For production jobs, make sure to allocate sufficient memory to job manager and task manager processes. You can read more about these and other configuration metrics here.

Flink is all set up. You are good to go !

Writing a Flink streaming job to process data from Kafka

For the scope of this tutorial we will be working with DataStream API.

  • Clone Flink tutorial archetype from maven repository to have the structure of your maven project set up.
  • Navigate to StreamingJob class from the cloned maven skeleton and let’s get started with writing the streaming processor !
  • In order to fetch data from Kafka, Flink has a set of connectors which you can check out here. We will be adding apache Kafka and Kinesis connectors to our pom.xml file. Connectors can be found in the maven central repository here.

If you intend to use any other external libraries, those can be added to the pom.xml file of your project beforehand to ensure seamless development experience.

  • Coming to the streaming processor, first step would be to import the required libraries:
  • Jumping into the main function of our class, we will be initializing an object with execution environment:
  • Initialize a properties object with Kafka configurational parameters:
  • Adding the configured Kafka source to stream execution environment:

To test out the raw data coming in you can add the following and run your Flink job:

Transformations:

Operators transform one or more DataStreams into a new DataStream. Programs can combine multiple transformations into sophisticated dataflow topologies.

Map

DataStream → DataStream

Takes one element and produces one element.

FlatMap

DataStream → DataStream

Takes one element and produces zero, one, or more elements.

Check out more about Flink operators here

  • We will be using a Flatmap function to extract elements from our source Documentdb and concatenate them into a comma separated string:
  • We will be using kinesis data stream as a sink which is connected to a Kinesis Firehose delivery stream, to complete end to end integration with Redshift. Other sink connectors can be found here.
  • For the purpose of the tutorial we will be providing AWS credentials directly using AWSConfigConstants class:
  • Adding the sink to our Datastream.
  • Finally adding the execute() function to execution environment.

Voila ! Your first Flink streaming job is ready to roll !

Compilation

  • Go to the source directory containing pom.xml and run:
  • You now have a compiled jar file in the target directory with your code bundled.

Running a standalone cluster and submitting our Flink job in a session mode:

  • Running the job can be as easy as two steps:
  • Check out the status of your job:
  • Checking the job status on Flink Dashboard

Install lynx if you have not, Lynx is a terminal-based web browser for all Linux distributions. To view the Flink UI on your local browser you can enable port forwarding:

  • Stopping the cluster:

Running our streaming job in yarn application mode

  • Submitting jobs in application mode:
  • Checking if application is up and running successfully:
  • Check logs of an application :
  • Kill a yarn application :
  • Jobs running within a yarn application:
  • Cancel a job without tearing down an application cluster:

You can access the yarn resource manager UI available in the Application User Interfaces section on EMR Console home page and access Flink dashboards for each application cluster.

Common errors to look out for !

  • Flink/Scala version mismatch for external connectors being used.
  • Port collision issues — In case you forget to tear down a standalone cluster.
  • User/Permission/folder lock issues within hadoop file system.
  • Resource bottlenecks available for Flink cluster.
  • Connectivity issues with external data sources and sinks.
  • Native java errors during execution of the processor.

References:

--

--

Zain Haider
Retailo Tech

Data Engineer at Retailo Technologies | Ex IBMer | Learning new things everyday