Operationalizing Spark Streaming (Part 1)

EG Tech
Expedia Group Technology
8 min readDec 29, 2016

By Brandon O’Brien

spark-streaming-2

For those looking to run Spark Streaming in production, this two-part article contains tips and best practices collected from the front lines during a recent exercise in taking Spark Streaming to production. For my use case, Spark Streaming serves as the core processing engine for a new real time Lodging Market Intelligence system used across the Lodging Shopping stack on Expedia.com, Hotels.com and other brands. The system integrates with Kafka, S3, Aurora and Redshift and processes 500 msg/sec average with spikes up to 2000 msg/sec. The topics discussed are:

  • Availability: Getting Spark running and keeping it running
  • Efficient resource utilization: Making sure Spark can process messages in the stream fast and the program is written so it can scale
  • Reliability: Having a reasonable guarantee Spark is processing data as expected
  • Operational Visibility: Having the monitoring to know when problems are happening, and having enough information to troubleshoot those problems when they occur

Sections in Part 1

  • Spark Overview and Standalone Clusters
  • Design Patterns for Performance
  • Spark Cluster and App Stability

Sections in Part 2 (coming soon)

  • Direct Kafka Integration
  • Guaranteed message processing (at-least-once semantics, strong and weak microbatch transactionality)
  • Operational monitoring

So, let’s begin with Part 1

Spark Overview and Standalone Clusters

If you already know the basics, skip down to Design Patterns for Performance

Apache Spark is a popular tool for running large batch jobs for analytics, data migration, ETL and data science. Spark Streaming is a scalable streaming processing framework built on top of Spark for processing data streams in the form of a sequence of “microbatches”, which comes with its own set of pros and cons compared to true streaming frameworks like Flink, Beam and Storm.

  • Spark Core Concepts
  • RDD: Resilient Distributed Dataset. This is an abstraction over a partitioned, distributed, logical collection of arbitrarily-typed objects, and is the primary way to manipulate data in Spark.
  • DStream: Discretized Stream. A sequence of RDDs, defined by batch interval which is the window of time you’d like reach RDD to span.
  • Code Execution
  • Lazy execution: In Spark, you define your data processing logic in the form of a sequence of transformations using which Spark internally builds a DAG as the execution plan to build the final RDD which your actions operate on. Transformations define the DAG and actions initiate the execution of the DAG.
  • Driver: The code used for initializing the program, defining the sequence of transformations and defining the actions is executed on the Driver. The idea is that this code organizes the program, but does not do the heavy number crunching itself.
  • Executors: Most of your code executes in the executor JVMs. This code is what will do the heavy lifting of operating on the elements in your RDD’s. The functions you pass into actions and transformations are executed on the executors.
  • Cluster Types:
  • Standalone (This article will only contain information about Standalone Clusters)
  • YARN
  • Mesos
  • Standalone Cluster:
  • Each Spark Node runs:
  • 1 Master JVM (one node runs the leader Master, others a Standby Master)
  • 1 Worker JVM
  • 1 Executor JVM
  • 1 node runs the Driver JVM (or the Driver JVM can actually be run outside the cluster).
  • The typical standalone Spark cluster can be visualized like this:
Spark Standalone Cluster Components
  • The standalone cluster needs access to a Zookeeper cluster, for things like electing a new master if the current master dies, or for storing Kafka offsets if you use Kafka Receivers.

Design Patterns for Performance

Best practices to utilize cluster resources efficiently and ensure your Spark app will scale linearly with the number of cores and executors available.

  • Avoid Driver bottlenecks: Make sure both the data input and data output for your program go through the Executors and not through the Driver. The role of the Driver is to manage the work of the Executors, not to actually process the data — running any significant amount of data through the Driver will create a bottleneck that will prevent the app from performing and scaling. A simple example:
  • DON’T: call RDD.collect() on the driver to collapse the RDD into an in-memory data structure and then bottleneck all IO through the driver when saving the objects to your database (this is bad from all perspectives: CPU, IO and Memory).
  • DO: call RDD.foreachPartition() and pass in a function which persists the data directly from the Spark Executors to your data store or downstream system.
  • Function closures: The functions that you pass into actions get executed in the Executor JVM’s on the Spark worker nodes. Any objects referenced within those function, Spark will attempt to serialize and send to the Executor nodes. Obviously some objects can’t be serialized, and others may be huge objects which you don’t want to send across the wire. In this case, the object can be created locally inside the Executor JVM by creating the object inside the function passed into the RDD.foreachPartition() function. If possible, employ the singleton pattern to achieve reuse of the object across multiple RDD partitions to avoid the creation of superfluous and/or expensive objects. Examples:
  • NOT SO AWESOME:
  • val myMassiveAndOrUnserializableObject = MassiveObjectFactory.createNew()
    myRDD.foreachPartition(partition => {
    myMassiveAndOrUnserializableObject.doWork(partition)
    })
  • A LITTLE MORE AWESOME:
  • myRDD.foreachPartition(partition => {
    val myMassiveAndOrUnserializableObject = MassiveObjectFactory.createNew()
    myMassiveAndOrUnserializableObject.doWork(partition)
    })
  • Avoid unnecessary shuffles: For certain operations, Spark needs to reshuffle data from every worker node to every other worker node. These operations include join, repartition, groupByKey, and others. This is an expensive operation, so make sure your Spark app is only doing this when it actually needs to. Simple example: Ensure RDD’s are partitioned correctly when they are created, so they don’t need to be repartitioned later in the transformation sequence.
  • Externalize stream joins, as alternative to Window operations: If your data flow requires joining two streams together, Spark provides a mechanism for directly joining two streams based on key, but this can only be done for RDD elements within the same batch interval or stream window. For my business use case, we needed to do a stream self-join across a 24-hour window, which is prohibitively expensive to do natively in Spark, as Spark would need to cache the RDD data on disk for 24 hours, which would be a massive amount of data. The solution I opted for was to externalize the join using sets of hashes in Redis based on a join key. As each message comes in, persist the message in Redis based on the join key, then query for other messages with the same key that have already been persisted. This allows for a shared external store between all Spark executors for space-efficiency, and has proven to be very fast. A TTL of 24 hours is set on each hash in Redis to clean up old data. A database like Cassandra could also be used for this type of streaming join, for improved scalability and availability of the external data store.
  • Reference data lookups: Many use cases in streaming processing call for enriching your data stream with reference data sets. If you want your stream processing to be super fast, you want the reference data to already be in memory by the time you want to join it to messages in the stream, but this is not always practical if the data set is too large to fit in memory or is frequently updated. Options here:
  • Spark RDD: Create an RDD for reference data in the Driver on app startup. You’ll need to manually recreate this if the external data set changes. Performance may be unpredictable depending on whether the entire RDD can fit into memory on the Executors.
  • Singleton hashmap: On each Executor — simple, but only works if you can fit your entire dataset into your JVM heap.
  • External Redis/Memcached/Cassandra: Data set can be shared between multiple executors in a single data store, but requires a network call to lookup the data.
  • Singleton Google Guava: Store a static LRU-flavored cache on each Executor JVM, set the max size, make a call to an external service when and only when the data is requested, amortize the cost of network calls across multiple items in your datastream to find the right balance between CPU/IO/Memory utilization. Can be combined with Redis approach.

Spark Cluster and App Stability

Tactics for ensuring your Spark cluster can recover from single node failures, ensuring your Driver app can restart if it crashes, and even avoiding crashes in the first place.

  • Standby masters: For a high-availability cluster, set up each Spark worker node to run a standby Master JVM process. Only one node runs the leading Master at a time. If the leading Master fails, the cluster will automatically promote one of the available standby Masters via a Zookeeper leader election. Master failover does not affect any currently running apps on the Spark cluster — it only prevents new apps from being submitted to the cluster while the new leader Master is being elected.
  • Supervised driver: For a high availability Spark application, run your Driver program in “supervise” mode so that the Spark cluster will restart your Driver program if it fails or if the Spark node it’s running on dies. To do this, add the “ — supervise” flag to the “spark-submit” command line program when submitting your job.
  • Spark worker memory headroom: On each Spark node, there’s a Worker JVM, an Executor JVM, a Master JVM, and on 1 node a Driver JVM. There needs to be enough physical memory available on the node for all JVMs to peacefully co-exist. Typically, the Executor JVM requires the most memory as this is where the data will actually be processed. One important note — the Spark UI advertises the total physical memory on the machine as though all of it were available for the Executor JVM, which is not true. The Executor JVM needs to leave available memory for the other JVMs, or else the Linux OOM Killer will show up and spoil all your fun (i.e. it will cause cluster instability.). In the example below, 30.4G of memory are available on the box, but I’ve only allocated 16.0G to the Executor JVM, to leave resources for the other running processes.
2016-12-27_18-03-08
  • To modify this setting, set the “spark.executor.memory” setting in the Spark config object when creating the Spark Streaming Context object. Example:
  • sparkConf.set("spark.executor.memory", "4g")
  • Cluster resource over-provisioning: If you have a strong requirement for stream processor availability, make sure that your Spark cluster has at least one more node than it needs to consume messages at the same rate as the messages are being produced, so if the cluster loses a node for whatever reason, the remaining nodes can handle all stream processing without falling behind.
  • Auto cleanup of temp files: Surprisingly, the default behavior of Spark 1.x and 2.x is that it does not clean up temp and work directories automatically, which leads to the hard disk inevitably filling up after a few days, which leads to cluster instability. Use this setting to force Spark to cleanup work directories:
  • sparkConf.set("spark.worker.cleanup.enabled", "true")

Check back later for Part 2, which will discuss:

  • Direct Kafka Integration (vs Kafka Receivers)
  • Guaranteed message processing (at-least-once semantics & microbatch transactionality)
  • Operational monitoring

For any questions on this, reach out to Brandon O’Brien (@hakczar)

--

--