Spark Streaming, Kinesis, and EMR: Overcoming Pain Points
Spark is a distributed MapReduce framework designed for large scale batch and streaming operations. Over the past few months we’ve been exploring the use of Spark Streaming on Amazon’s Elastic MapReduce (EMR) as an alternative to existing custom AWS based solutions for large scale data processing of Kinesis streams. This blog post will go into detail on some of the challenges of running Spark Streaming + Kinesis workloads on EMR. Specifically it will touch on
- IAM permissions for S3
- Passing flags for
spark-submit
and setting environment variables in CloudFormation - Accessing Spark and YARN UIs
- Configuring executors with Kinesis in mind.
Example Use Case + Architecture
Before diving into the topics listed above it’s useful to consider an example use case and architecture. The Spark Streaming documentation works with a simple word count example. Consider an a version of this where logs from an application are enriched via a lambda function before being written to a Kinesis stream. Once they’re in the stream they can be processed by a Spark Streaming job and the result set can be published to S3. An example architecture could look like this.
Publishing to S3 with EMRFS
Before EMR shipped with its own implementation of the Hadoop File System (HDFS), result sets were published to S3 by including the hadoop-aws
library and using the s3a://
prefix when outputting text files at the end of a Spark job. The only additional requirement was having valid AWS credentials with the correct S3 permissions in the default credentials provider chain. Permissions could be as tightly scoped as needed — perhaps simple read/write permissions on the bucket and the objects the job required access to. It was a bit of a hassle, but it got the job done. With EMRFS things have gotten theoretically easier.
The EMR File System (EMRFS) is an implementation of HDFS that all Amazon EMR clusters use for reading and writing regular files from Amazon EMR directly to Amazon S3. EMRFS provides the convenience of storing persistent data in Amazon S3 for use with Hadoop while also providing features like consistent view and data encryption.
Now, by using EMR, you get EMRFS for free thus making it easy to publish results sets to S3 without the hadoop-aws
JAR. What about that old s3a
prefix though? The AWS documentation for Working With Storage and File Systems for EMR does specify that when running on EMR you need use the S3 URI scheme — s3://
— to start publishing results to the destination bucket. This is a great change that makes the intent much more explicit.
Setting S3 permissions with EMRFS in mind is where this gets slightly complicated. Following the same sort of least permissions strategy one would follow under the old hadoop-aws
system doesn't quite get there. In the case where a job only needs to write objects, the IAM role should only need s3:ListBucket
on the bucket itself and s3:Put*
on objects within the bucket. When running a job like this on EMR with the hadoop-aws
JAR, the result sets are written out to S3 without issue. However, dropping this dependency and leveraging EMRFS without updating permissions won't work — jobs will start failing with 401 Unauthorized
errors. There are a few reasons for this.
EMRFS needs the IAM role to have s3:DeleteObject
permissions. EMRFS writes some additional files and metadata into the specified key space before the final commit and needs to be able to clean up these temp files. Furthermore, EMRFS appears to interact with additional resources beyond the bucket specified in the S3 URI used for publishing results sets — s3:*
on just the destination bucket and objects isn't enough to eliminate the 401s. Giving the EC2 Instance Profile Role expansive read and write permissions on all S3 resources while limiting destructive permissions to just the destination bucket and objects is a good place to start.
- PolicyName: "emr-hdfs-s3-read-write-delete"
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: "Allow"
Action:
- "s3:Get*"
- "s3:List*"
- "s3:Put*"
Resource:
- "*"
- Effect: "Allow"
Action:
- "s3:*"
Resource:
- !Sub "arn:aws:s3:::${BucketName}/"
- !Sub "arn:aws:s3:::${BucketName}/*"
Passing Arguments to spark-submit
Running jobs with spark-submit
on EMR may require passing additional arguments such as --master
, --deploy-mode
, and —-class
. While configuring a job with spark-submit
via the EMR console is straight forward, CloudFormation is more difficult. In the documentation for the Args
property on the AWS::EMR::Step HadoopJarStepConfig
resource, AWS provides the following following information:
A list of command line arguments passed to the JAR file’s main function when executed.
Unfortunately AWS doesn’t provide any examples within CloudFormation for how this should be written in the template, nor does the Internet have many third party deep dives. The question is whether an element in this list should be a string representing an argument flag and the value or just the flag, or just the value.
It is worth noting here that many AWS resources that take arbitrary lists specify the elements as KeyValue
objects, explicitly communicating to the developer how everything should be passed and structured in the template. The lack of structure with this Args
list seemed to imply the flag and value approach such that a single element in the list would look like --master yarn
rather than --master
or yarn
.
After a bit of trial and error, along with some careful examination of the EMR dashboard in the AWS console, it became clear this was the wrong approach. To get these arguments correctly passed to the command-runner
each command, flag, flag value, and command parameter needs to be a discrete element in the list and appear in the order in which the elements should be concatenated. Here's an example for how this might look in CloudFormation.
MyStep:
Type: AWS::EMR::Step
Properties:
ActionOnFailure: "CONTINUE"
HadoopJarStep:
Args:
- "spark-submit"
- "--deploy-mode"
- !Ref DeployMode
- "--class"
- !Ref MainClass
- "--master"
- "yarn"
- !Join
- ""
- - "s3://"
- Fn::ImportValue:
!Sub "${BucketName}-${AWS::Region}-${Env}"
- "/jars/"
- !Ref JarFile
Jar: "command-runner.jar"
MainClass: ""
JobFlowId:
Fn::ImportValue:
!Sub "${MyClusterId}-${AWS::Region}-${Env}"
Name: "streaming-job"
Caring About The Environment [Variables]
Adding custom environment variables into the Spark runtime environment on EMR isn’t as straight forward as it seems. Many might be familiar with the spark-env
script or the spark-defaults
configuration file for solving this kind of problem. The EMR documentation points toward this as well.
spark-env
—Sets values in thespark-env.sh
file. For more information, see Environment Variables in the Spark documentation.
We found this confusing... based on information gathered from an AWS forum, Amazon’s own documentation and StackOverflow, EMR doesn’t actually distribute the spark-env
to the worker nodes. It does, however, distribute the yarn-env
to the executors, making this configuration the right vector for getting custom environment variables to the worker nodes. Using CloudFormation this can be achieved by adding a Configuration
object to the Configurations
list parameter for AWS::EMR::Cluster
.
Configurations:
- Classification: "yarn-env"
Configurations:
- Classification: "export"
ConfigurationProperties:
{
"APP_NAMESPACE": !Ref MyNamespace,
"BATCH_DURATION_SECONDS": !Ref BatchDurationSeconds,
"SOURCE_STREAM_NAME": !Ref SourceStreamName,
"RESULTS_BUCKET_NAME": !Ref ResultsBucketName,
"NUM_EXECUTORS": "16"
}
Quick Access to the YARN Resource Manager & Spark UIs
Short feedback loops and simple resource management are key to keeping a high velocity when iterating on Spark jobs. Given the slowness of log flushing on EMR it’s even more important to have access to both the YARN Resource Manager and the Spark UIs. If the cluster is in the public subnet accessing these UIs via the public DNS name is easy. But what if the cluster is running in a private subnet? Are bastion hosts and proxies really necessary?
The short answer is no. While you may want to set these resources up regardless, it’s possible to view the Spark and YARN Resource Manager UIs by allowing ingress to the cluster on ports 8088
, 18080
, and 20888
. Once ingress has been granted, to view either the Spark History UI (:18080
) or the YARN Resource Manager UI (:8088
) just grab the private IP address for the master node. This can be found under the Hardware
tab in the EMR console.
While Spark History UI is really useful for debugging resource and environment issues it isn’t real time. This slight delay makes it more difficult to see whats going on while a job is actually running — something that is important with streaming jobs that run with longer batch intervals. That said, it is possible to access the Spark UI (:20888
) via the tracking URL provided for the application. Each of these UIs is like a tool in the toolbelt, helping the developer learn more about how the the jobs runs, making it easier to tune the cluster to optimize performance.
One Executor to Bind Them
Out of the box, EMR sets the Spark configuration option for spark.dynamicAllocation.enabled
to true — something Spark itself sets to false by default. This option should allow YARN to dynamically manage executors as tasks enter and exit the queue. Depending on cluster size and workload this might not work quite as expected. If the cluster size is small -- one master and two workers -- this may manifest as a single executor with no additional resources to consume should more tasks come in.¹ With larger clusters it may become difficult to understand how resources are being consumed at scale.
Disabling dynamic allocation and configuring executors based on available resources and type of workflow can help developers address these issues. This does come with the caveat that if a given streaming workload would benefit from EMR autoscaling it might be worthwhile to eventually explore all the configuration options with dynamic allocation enabled, otherwise it will be difficult to effectively leverage the benefits of autoscaling. That said, by taking responsibility of executor configuration it becomes much easier to understand how cluster resources are consumed and further tune the sizing and performance of the Spark cluster. In the end this will save developers time and the company money whether or not the end state is dynamic allocation with autoscaling.
Spark executors on EMR can be configured by adding another Configuration
object to the Configurations
property for a AWS::EMR::Cluster
in CloudFormation.
- Classification: "spark-defaults"
ConfigurationProperties: {
"spark.dynamicAllocation.enabled": "false",
"spark.executor.cores": "2",
"spark.executor.memory": "3g",
"spark.executor.instances": "16"
}
1) contrary to what one may expect, the Spark Driver runs on a worker node rather than the master node. The master node runs the YARN resource manager. With EMR reserving one worker node for the Spark Driver, a small cluster with dynamic allocation enabled might spin up a single executor by default and have no excess capacity to create additional executors as tasks join the queue despite the UIs showing excess memory and compute resources.
Configuring Kinesis DStreams
Figuring out the right relationship between executors and Kinesis receiver processes is crucial to running an efficient operation from both a cost and resource standpoint. Checking out the official Apache Spark + Kinesis Word Count example on Github one can see that they create a Kinesis DStream per shard in the stream. Each Kinesis DStream runs an instance of the Kinesis Client Library, which in turn manages record processor threads that track a given shard.
// Create the Kinesis DStreams
val kinesisStreams = (0 until numStreams).map { i =>
KinesisInputDStream.builder
.streamingContext(ssc)
.streamName(streamName)
.endpointUrl(endpointUrl)
.regionName(regionName)
.initialPosition(new Latest())
.checkpointAppName(appName)
.checkpointInterval(kinesisCheckpointInterval)
.storageLevel(StorageLevel.MEMORY_AND_DISK_2)
.build()
}
Despite the example code here (and to be fair to Spark, they do mention this in a comment in said code) it’s not mandatory to create a DStream per shard. Check out what the documentation from Spark has to say about this:
A single Kinesis input DStream can read from multiple shards of a Kinesis stream by creating multiple KinesisRecordProcessor threads.
This is really important. If the upstream Kinesis source for a Spark Streaming job has a small number of shards it’s not really a big deal to create a DStream per shard. The cluster, however small, should have plenty of capacity. But what if the stream has 100 shards? 500 shards? As the workload scales this approach becomes increasingly expensive and inefficient.
A more flexible approach is to use the number of initial executors rather than the number of shards. For example, if each executor has at least 2 cores then a fairly simple, naive approach is to create one Kinesis receiver per executor. This allows one core to run the receiver DStream and manage the shard processor threads while leaving at least 1 core open for additional tasks.
def getKinesisDStreams(
scc: StreamingContext,
appName: String,
checkpointInterval: Duration,
endpoint: String,
region: String,
streamName: String,
numExecutors: Int
): Seq[DStream[Array[Byte]]] = {
val kinesisStreams = (0 until numExecutors).map { _ =>
KinesisInputDStream.builder
.streamingContext(scc)
.streamName(streamName)
.endpointUrl(endpoint)
.regionName(region)
.initialPosition(new Latest)
.checkpointAppName(appName)
.checkpointInterval(checkpointInterval)
.storageLevel(StorageLevel.MEMORY_AND_DISK_2)
.build()
}
kinesisStreams
}
Taking a look at how both these approaches play out in practice is quite illuminating. This example used an EMR cluster with 32 available cores across 4 workers, and a configuration of 16 executors each with two cores. A Kinesis stream with 36 shards was also used, ensuring there’d be more shards than available cores. Finally the streaming batch duration used was 5 minutes.
Starting with the one DStream per executor approach, the Executors
tab in Spark History UI shows each executor has one core in use — the DStream reciever — and one core available for additional task processing.
This available capacity allows the cluster to schedule and run all map reduce tasks added by the YARN resource manager. Compare this to the approach taken in the Word List tutorial from Spark where a DStream receiver per shard is created.
Unsurprisingly the Spark History UI shows that every core is busy with a task — all 32 of them. Checking the Jobs
tab for this application shows four additional Kinesis Receiver tasks stuck in a pending state, as well as the actual map reduce job.
As this screenshot shows the lack of available capacity has killed the cluster’s ability to process the remaining tasks. Since the Kinesis DStream receiver tasks are long lived there isn’t really a completed state, thus freeing up resources to work on the remaining tasks isn’t an option. This job will be stuck in a kind of limbo until more cores/executors are brought online. Extrapolate these results to a much larger stream and it’s easy to see how this approach could cause costs to spiral out of control and lead to a dramatic over provisioning of resources.
Wrapping Up
Running production MapReduce workloads is complicated. There are tons of moving pieces and no matter how nice the APIs are — and Spark’s are quite nice — the devil is always in the details. Amazon’s EMR certainly reduces a lot of the barriers to entry when it comes to running batch or streaming jobs in the cloud. That said it does come loaded with its own quirks and nuances, and learning how to effectively manage those is crucial to keeping a high velocity.