Photo credit: Pexels

How we built a data pipeline with Lambda Architecture using Spark/Spark Streaming

Walmart Labs is a data-driven company. Many business and product decisions are based on the insights derived from data analysis. I work in Expo which is the A/B Testing platform for Walmart. As part of the platform we built a data ingestion and reporting pipeline which is used by the experimentation team to identify how the experiments are trending. In this blog I would like to give a little primer on how we built the data ingestion and reporting pipeline with Lamda Architecture using Spark which provides code reusability between the Streaming and the batch layers, key configurations for the deployment and a few troubleshooting tips.

Use Case

The use case is to take the click stream events, aggregate them based on the session id and generate metrics such as unique visitors, visits, orders, revenues, units, bounce rates, site error rates, performance metrics etc for both assigned and qualified experiments. Following is our architecture that accomplishes that:

Lamda Architecture

We have been running a Lambda architecture with Spark for more than 2 years in production now. The Lambda architecture provides a robust system that is fault-tolerant against hardware failures and human mistakes. The resulting system is linearly scalable by scaling out instead of scaling up. We needed a lambda architecture as there is no guarantee that the data will arrive exactly once and there can be noise in the data and to protect against data that arrives late due to network or server instability. Lambda architecture handles these issues by processing the data twice, once in the realtime streaming to give a quick view of the data/metrics that get generated and second time in a scheduled batch process to give a more reliable view of the data/metrics. One key consideration that we had with our lambda architecture was to make our writes to KairosDB idempotent in order to be able to reprocess the data and guarantee exactly once processing semantics.

Why Spark/Spark Streaming

Spark/Spark streaming improves developer productivity as it provides a unified api for streaming, batch and interactive analytics. Overall, Spark has been a flexible, robust and scalable data processing engine for our use case. Spark Streaming provides efficient fault-tolerant stateful stream processing. It provides high throughput and good integration with data stores like Cassandra, Kafka, Hadoop etc.

Stats

Here are some statistics about our data pipeline:

  • ~70k events per second on an average
  • ~250k events per second during peak hours

Details

We use Spark Kafka Direct for our streaming that guarantees exactly once processing semantics with checkpointing enabled for recovery. Batch processing is built on top of core spark.The above mentioned metrics are calculated and stored in KairosDB, a time series DB. Also, the user session information is stored in HDFS as Hive External tables from the batch processing. The user state is maintained for 12 hours in streaming and in batch the user state is maintained as long as the user is active.

RDDs in the streaming and the batch processing are processed as follows:

Parsing and Validation: This is done in a series of map(), flatMap(), filter(), groupByKey(), reduceByKey() and leftOuterJoin() RDD transformations which gets reused across Streaming and Batch jobs.

Data Enrichment and state maintenance: Data enrichment and state maintenance is done under updateStateByKey() in Streaming and mapPartitions() in case of batch processing with the business logic for merging the sessions and enriching the session with additional information getting reused across batch and streaming jobs. In streaming the state is maintained in memory and in the checkpoint as part of the functionality provided by updateStateByKey(). In batch processing, the state is maintained in Hive External Tables in HDFS. We also calculate rolling hour and 24 hour unique visitors using reduceByKeyAndWindow() with inverse function in the Streaming job.

Metric Generation and Save to KairosDB: Metrics are generated using flatMap() and reduceByKey() after the Sessionization and data enrichment and are saved to KairosDB using a kairosDB client. The write to KairosDB happens in forEachPartition() action. The logic for the save gets reused in both the Streaming and batch processing.

In the stream processing, the incoming beacons for each micro batch are grouped by sessionId and sorted by timestamp. Session state is updated by processing the sessions from the current micro batch and merging them with the session state from the previous micro batch. Metrics generated after sessionization are reduced and then stored to KairosDB. We store the offsets in Zookeeper in case we need to restart the job from the last successfully processed offsets. For accuracy, our transactions update the offsets in the same transaction when updating the results. The following code sample gives an overview of the same.

The following code sample has the reusable functions that are called from the Stream processing and batch processing for filtering and populating the valid beacons, populating grouped and sorted sessions, processing Stateful Sessions , generating the metric values, reducing the metrics and storing the generated metrics to KairosDB.

The following is a code sample of the batch processing that does the Stateful Sessionization, Metric Generation and Save to KairosDB and the above functions that are called from Stream Processing are called from batch processing as well thus providing code reusability between the batch and stream processing.

Streaming Configuration and Deployment

We use a standalone cluster for Spark Streaming job deployment. For HA/DRwe run the Streaming Job in 2 different regions. The Streaming job from only one region writes to Kairos cluster at any point of time and the job in the second region keeps running by maintaining the state with no writes to kairos. If there is any issue with the job in the first region or if we need to do any maintenance to the cluster or upgrade the code or if there are any issues with the Kafka topic in one region, we switch the writes to Kairos from the job running in the second region. This gives us higher availability of the Streaming Job with session state being maintained and the data written to kairos is continuous.

Our streaming job runs with a micro batch of 60 seconds. The correct micro batch size is chosen depending on the processing times. We started with a batch window size of 10 seconds and observed processing times and increased it incrementally until the processing times are completed within the micro batch time period and there is no scheduling delay or it’s only increasing occasionally and recovers quickly. The ideal number of executors/cores is dependent on the application by considering various factors like number of peak events per second, maximum allowed lag and the buffering capabilities of the streaming source which can be arrived at by testing in a pre-production environment. The right amount of parallelism is between 2–3 times the number of cores and needs to be arrived at iteratively by testing the job with various configurations. Another key consideration is to set spark.memory.fraction and spark.memory.storageFraction to the right values. Based on various tests, it is observed that spark.locality.wait=0s is good for the job performance. We use kryo serialization in our Spark Jobs for better performance. When upgrading the application code, the application needs to be shutdown gracefully with no further records to process.

Job Monitoring and Automatic Job Restarts for Streaming

We use a python script that runs every 5 minutes to monitor the streaming job to see if its up and running. We monitor if there is any lag, total delay, number of failed jobs vs number of successful jobs and take appropriate steps if it exceeds a certain threshold. We query for these metrics from Spark UI end point http://streamingUIURL:4040/metrics/json and http://streamingUIURL:4040/api/v1/applications/<appId>/jobs. If the job is down for some reason or the number of failure jobs are greater than the number of successful jobs for the past 5 minutes or if any of the task is stuck due to any infrastructure issues then the job gets restarted automatically and we receive an alert email.

Troubleshooting the streaming job

We use Spark streaming UI to troubleshoot any issues with the job. Following is a screenshot of the Streaming UI. Ideally the scheduling delay should be 0 or even if it increases, it should recover quickly within acceptable time periods. The processing time should be less than or equal to the microbatch time period. The Kafka Direct stream consumption parallelism is equal to the number of Kafka partitions and the number of partitions need to be adjusted if there is not enough parallelism for the reads.

In case of increased processing times, individual batches can be drilled down to see which stage is taking time as shown below. Shuffle reads/writes should be as less as possible and there should not be any data skew. If the shuffles are taking a lot of time, then one way of fixing that is by increasing the cores and the parallelism.

DAG visualization as shown below is used to determine if there are any pipelining operations that are separated by shuffles. Performance is good if all the operations go through a single pipeline without any shuffles. It also gives an insight if any of the RDDs are cached. The cached RDDs are denoted by a green highlight. Caching this RDD means future computations on this RDD can be accessed from memory thereby improving the performance.

The storage section of streaming UI as shown below provides an insight to see if the cached RDDs are regularly cleared or not. Ideally cached RDDs should be cleared at regular intervals to avoid any performance bottlenecks.

The executors tab of the Web UI as shown below also provides information about the number of active tasks, utilization of the cores, task GC time, shuffle reads/writes. If there is a huge difference between active tasks vs cores allocated then there is under utilization of the cluster and the core allocation need to be adjusted accordingly. Thread dump is used to drill down on possible performance issues.

Batch Configuration and Deployment

We use Mesos as our cluster manager for batch jobs and use HDFS for state maintenance and intermediate file storage. The batch processing job runs every 30 minutes and processes any new data available in HDFS. The data is stored in Hive external tables. We configured the executor memory depending on the amount of data that is getting processed in each batch. We also use shuffle operations like reduceByKey, groupByKey, repartition, coalesce, cogroup, join etc as part of the batch job, the configs like spark.executor.memory, spark.cores.max, spark.default.parallelism were adjusted according to the job requirement iteratively.

Troubleshooting the batch job

A few slow local tasks could cause a huge performance impact when one stage needs to finish before the next one can start. The detailed stage/tasks link in the Spark UI can be used to identify slow running nodes, nodes with resources problems, skew in data partitioning which can be identified by looking at the input size/records, or a small number of tasks taking significantly longer time to execute than the others. Drilling down into slower running tasks we can determine if the slowness is in writing data, reading data, or computation. If the processing is slow, it can be due to not enough resources and we would need to focus on how much memory and cpu has been allocated for the executers and also the total number of cores allocated for the job.

Conclusion

To recap, here are some points covered in this article

  • Building the data pipeline for A/B testing with the lambda architecture using Spark helped us to have quick view of the data/metrics that get generated with a streaming job, and a reliable view from a scheduled batch process.
  • Using Spark/Spark Streaming helped us to write the business logic functions once, and then reuse the code in a batch ETL process as well as a streaming process which helped us lower the risk for errors resulting from duplicate code bases and also helped with the developer productivity as it provides a unified api for streaming, batch and interactive analytics.
  • We focused on having a stable stream/batch processing application first before focusing on the throughput. The performance of the applications were improved by tuning Spark’s serialization, memory parameters, increasing the number of cores and parallelism iteratively.
  • Spark Streaming/Batch UI provides very good information on the performance bottlenecks like shuffles, data skew, slow running tasks due to resource issues, task GC time, shuffle reads/writes, slow running stages, storage and other information to help troubleshoot the jobs.