How to Setup Pyflink on Amazon EMR

Nesreen Sada
7 min readJul 10, 2020

--

What is Flink?

Apache Flink is an open-source distributed stream-processing framework developed by Apache. The core of Flink is built on Java and Scala. Recently, Flink gained popularity over other streaming frameworks such as Spark, Samza, and Storm.

The main features of Flink:

  • High throughput and low latency(milliseconds) which makes it ideal for stream processing.
  • Provides job scheduling, resource assignments, parallel processing, and reliability.
  • It is distributed and can scale horizontally.
  • In-memory computations.
  • Flink is a pure stream computing engine with a unified stream and batch processing capabilities.
  • Provides data-source and sink connectors to systems such as Amazon Kinesis, Apache Kafka, Alluxio, HDFS, Apache Cassandra, and ElasticSearch.
  • Flink is the most active open-source project in 2019 according to objective statistics of ASF.

What programming languages does Flink support?

Flink natively supports Java and Scala But It also supports Python.

Why PyFlink Is Necessary?

Pyflink is simply a combination of Apache Flink with python. This combination allows us to make use of all the computing capabilities of python on Flink and all the features of Flink can be used on python. Additionally, PyFlink provides support for Python user-defined functions to enable you to register and use these functions in Table APIs and SQL

Why Flink and python

  • The number of python users for data analysis and machine learning applications has increased in the last few years.
  • The availability of Flink features Python users, will enable running Python’s analysis and computing functions on Flink to improve Python’s ability to resolve big data issues.

Flink Installation

So, we have two options to install Flink on AWS. Either we use an Amazon EC2 instance, download the dependencies or make use of the pre-existent installation using EMR to download Flink. The latter is the recommended approach to run Flink on AWS.

Why Use Amazon EMR?

Here’s the description provided by Amazon for the EMR service:

Amazon EMR is the industry-leading cloud big data platform for processing vast amounts of data using open source tools such as Apache Spark, Apache Hive, Apache HBase, Apache Flink, Apache Hudi, and Presto. With EMR you can run Petabyte-scale analysis at less than half of the cost of traditional on-premises solutions and over 3x faster than standard Apache Spark. For short-running jobs, you can spin up and spin down clusters and pay per second for the instances used. For long-running workloads, you can create highly available clusters that automatically scale to meet demand. If you have existing on-premises deployments of open source tools such as Apache Spark and Apache Hive, you can also run EMR clusters on AWS Outposts.

So, What is Amazon EMR?

Amazon EMR is an AWS service, EMR stands for Elastic MapReduce. As the name implies, it is an elastic service that allows the users to use resizable Hadoop clusters and it has map-reduce components for the computations.

So, it is an in-the-cloud solution hosted in Amazon’s data center to supply the needed computing power and on-demand capacity to resolve big data challenges. Moreover, using Amazon EMR simplifies the setup and management of the cluster of Hadoop and MapReduce components.

Figure 4 shows an architectural view of the EMR cluster.

Flink setup

I will start by outlining the steps I followed to install Flink on my local device (OS ubuntu 18.04), then the EMR setup.

Flink local setup

I used the local installation guide from Apache Flink, and the steps were as followed:

  • Check Java installed for compatibility. Flink requires Java 8 or 11
java -version
  • Download the 1.10.1 release and untar it.
$ tar -xzf flink-1.10.1-bin-scala_2.11.tgz
$ cd flink-1.10.1/
  • Start a Cluster
$ ./bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host.
Starting taskexecutor daemon on host.
  • Submit a job
$ ./bin/flink run examples/streaming/WordCount.jar
$ tail log/flink-*-taskexecutor-*.out
(to,1)
(be,1)
(or,1)
(not,1)
(to,2)
(be,2)
  • In order to run python code on Flink, we need to ensure that the default version of python is 3.5 or higher.
$python --version
Python 3.7.3

I have faced an issue while running Flink with the system path configurations for python. So ensure that python3 is added to the path and it does not conflict with Anaconda.

Flink EMR setup

  • From AWS UI choose EMR service.
  • Choose create an EMR cluster with Hadoop, zookeeper, and Flink installed.
  • Storage: two instances with m5.xlarge storage.
  • ensure that web UI port is accessible through an ssh tunnel.

The cluster will require some time to be ready for usage. Once it is ready, we can test Flink on EMR.

  • Access EMR through ssh tunnel
ssh hadoop@******.compute.amazonaws.com -i flink-poc.pem -L 8081:localhost:8081
  • Locate Flink installation folder
$ whereis flink
flink: /usr/bin/flink /usr/lib/flink /etc/flink
$ cd /usr/lib/flink
  • Run Flink cluster
running Flink cluster

It is worth mentioning that I had to fix Flink configurations to run the cluster due to memory errors using the default configurations.

Errors with default Flink configurations
$ cd conf
$ vim flink-conf.yaml

The default configurations:

flink-conf.yaml

So, what was the issue with the configurations? the size of the task manager heap and job manager heap configurations is commented.

so, I replaced the following configurations and then the cluster ran successfully.

# The heap size for the JobManager JVMjobmanager.heap.size: 1024m
taskmanager.memory.process.size: 1728m

Settings required to enable the web interface:

# The address under which the web-based runtime monitor listens.
#
jobmanager.web.address: 0.0.0.0
# The port under which the web-based runtime monitor listens.
# A value of -1 deactivates the web server.
rest.port: 8081

Running Word Count Example

Let’s test if Flink is working by using the examples provided by Flink:

Java Example

For this, I choose the word count batch example. This program will return the number of occurrences of each word from a given input file. To run a Flink job we use the following command:

./bin/flink run 

To run the wordcount example, we just provide the jar location to the Flink running command as follows:

./bin/flink run ./examples/batch/WordCount.jar

From the console we notice:

  1. The Job has been submitted and the JobID
  2. Job Runtime is 1144 ms.
  3. key-value pair for the word and count of this word.
console output for word count

Next, let’s check the web service UI:

Completed Jobs WebServer UI
Job details Web server UI

The web service UI main page outlines information about the submitted jobs, the number of tasks, and the status of these jobs. If we click on the Job we will be directed to a detailed description of the job executed. The overview includes information about each task (duration, parallelism, records sent and bytes) and the DAG diagram.

Python Example

The python example for word_count.py will store the results in /tmp/result file, to run a python file with Flink we just need to add -py argument to Flink run command:

./bin/flink run -py /examples/python/table/batch/word_count.py
Python console output

Failed to run a Python script! but why?

The above fails because Flink is trying to use python 2.7 to run the program which is an incompatible version with Flink, we need python version above 3.5. Let’s try to check the python version installed on EMR:

So even though the default python is set to python 3.7.6, Flink was running python 2.7.

So, How to resolve this issue:

$ setup-pyflink-virtual-env.sh 1.10.0
  • Activate the virtual environment
# activate the conda python virtual environment
$ source venv/bin/activate

Let’s run it!

./bin/flink run -py /examples/python/table/batch/word_count.py
word_count.py working console output

So now the code is running successfully, what we can infer from the above:

  1. Job ID
  2. The Job run time
  3. The /tmp/result file, we can see the count for each word.

What about the web UI:

When you don’t want to use Flink cluster, you can stop the cluster with:

sudo ./bin/stop-cluster.sh
stop cluster console

--

--