Apache Spark and RDDs: Distributed resilient in-memory abstraction for ML

Ameya
Coinmonks
Published in
9 min readAug 26, 2018

--

Introduction

A lot of applications these days which rely on heavy data processing and use iterative algorithms like Gradient Descent or run interactive data mining queries. These techniques are widely used in the fields of Machine learning, Data analytics. One of the popular frameworks that provides this capability is Apache Spark. The foundational building block of Spark is a fault tolerant, distributed memory abstraction called RDDs (Resilient Distributed Dataset). This abstraction and the framework built on it, outperform existing infrastructures like Hadoop by 20 times on ML, data mining algorithms. Let’s review this paper in this post.

Design Goals

There are many algorithms like K-means clustering, Gradient Descent based Regression where the algorithms iteratively operate on the same dataset. Same applies for interactive data mining queries, where an end-user keeps on running different queries against the same data subset. Frameworks invented in the past, like MapReduce make parallel data processing possible, but don’t provide an efficient mechanism for such data reuse. The common way to perform data reuse in these frameworks is to write this data to disk and then reuse it. This can be somewhat expensive when dealing with large data sets.

The design goals for RDD are:

  1. Provide a general purpose efficient abstraction for distributed shared memory
  2. Coarse grained access to data i.e. We are interested in optimizing and working on a batch of data and not a specific key-value pair in the dataset, as done in distributed databases or Distributed Shared Memory(DSM) systems.
  3. Efficient fault tolerance. This comes from the last bullet. In a fine grained DSMs, the fault tolerance is expensive and is achieved by logging key-value accesses or by replicating a lot of data. This is pretty inefficient for large datasets.
  4. Control on partitioning of data for efficient parallel data reuse and locality.

As you can see from the above, these goals lead to solutions to applications that can benefit from batch oriented usage — specially batched writes and is not suitable for fine grained writes such as modification of a single value in a database.

RDD

The way these goals are achieved in Spark is by providing an abstraction called RDD. RDDs at a high level represent a batch of data, that can be recreated by tracing its lineage. RDDs are created from input data set using some parsing on input data and then using map or filter transformations on that data — this forms the lineage of the RDD. If this RDD was lost, then it can be recreated by re-running map/filter transformations on input data. Thus RDDs can only be created by running transformations on disk data or by transforming other RDDs. These transformations link together to form the lineage for any given RDD. The key point is that: RDDs that cannot be recreated after failure by tracing the lineage graph, can also NOT be referenced at all in this system. In addition, for efficiency, users can control how to partition the data on to different machines.

Programming interface for RDDs: Programmers use object oriented interface in Scala for accessing RDDs. They can use map, filter, join functions to create RDD objects. Then they can call “Actions” such as “count” or “save” which then returns those values. All the RDDs in spark are lazily populated at the time execution of an action. Programmers can call “persist” method which saves the data to RAM and swaps it out to disk if there isn’t enough RAM. In addition, some specific data can be prioritized for persistence.

RDDs and Graph lineage

Let’s look at how a simple spark program gets converted to a graph that constructs the RDD lineage. The following example from the paper returns the third field(timestamps) from all the lines that contain hdfs related errors .

lines = spark.textFile(logfile)
errors = lines.filter(_.startsWith("ERROR"))
errors.persist()
errors.filter(_.contains("HDFS")).map(_.split("\t")(3)).collect()

This is how the RDD lineage gets formed for this program:

time_fields RDD getting created from input log lines. Map and filter transformations along the way extract from the loglines and finally an action is taken to “collect” all the values into time_fields

When the time_field values need to be collected, i.e. lazily, Spark will send these transformations to nodes where some of these RDDs might already be cached in memory. Now hopefully this makes it clear on how the fault-tolerance works. If RDD pertaining to “errors” was lost, then it can be easily recovered by recomputing the filter transformation on “lines” again. Also If the program wants to access time_fields, then it may not not need worry about failure of “errors” RDD, if HDFS errors RDD is still available.

Also one key point to note is that RDDs are immutable which makes recovery easier —the system doesn’t need to worry about maintaining a log of operations that tracks the individual changes within an RDD.

While RDDs are immutable and suitable for batch oriented writes, by no means they prevent efficient reads on a finer granularity. One can always use an RDD as a lookup table for reading.

Main components of an RDD

RDDs are typically represented using the following main componenets:

  1. Partitions: Partitions contain the atomic subsets of the dataset.
  2. PreferredLocations(Partitions): For optimizations, it is more efficient to be able to place partitions on specific locations e.g. nodes
  3. Dependencies: A set of dependencies on parent RDDs. There are two types of dependencies: narrow and wide. Narrow dependencies do one-to-one mapping from parent to child e.g. when using a “map” transformation. While wide dependencies result from transformations such as “join”. These may have one-to-many types of dependencies because join of two tables can lead to many more partitions.
  4. Iterator(partition, parent dependencies): Returns the elements of the partitions by traversing the dependencies on the parent.
  5. Partitioner: Functions for partitioning the RDD.

Some examples of RDD creation

Here are some examples of different types of dependencies between RDDs when they undergo some transformation.

Each blue box is an RDD enclosed in partitions represented by rectangles

RDD scheduling on clusters

At a high level, when user launches a program, a driver is created. The driver will launch workers and schedule tasks for workers on different nodes for RDD usage.

Workers reading data blocks and then creating RDDs that get persisted to memory.

Going into more detail on scheduling, when some action like “count” or “collect” is called on an RDD, the scheduling kicks in. It computes the lineage graph of RDDs for this program. Then it tries to build stages such that each stage contains as many RDDs as possible that have pipelined transformations with narrow dependencies for better colocation. If there are any wide dependencies, then that means another stage is needed. The scheduler then launches tasks to compute missing RDD partitions from each stage. This continues until the requisite RDDs are available from the given stage.

Let’s take an example for this. As described in the digram below, blue and gray filled rectangles are partitions that make corresponding RDDs. Gray partitions are already in RAM, so need not be recreated. First, all the narrow dependencies are mapped to Stages 1 to 3. For performing an action on G, B RDD is needed. But B is already in RAM, so the scheduler can go to stage 2. It will then calculate the missing partitions to arrive at RDD F. One of the partitions in E was already available, so it need not be created. Once F is created, a wide dependency(join) that resulted in stage 3, is executed.

A scheduling that illustrates A-G RDDs with action to be run on RDD G

Now comes the ask of mapping tasks to machines. Generally, if any node has available data partitions for the given RDD, then the task is sent there. If there are any “preferredLocations” for the given RDD, then the task is sent there. For wide dependencies, scheduler tries to place RDDs near parent RDDs. The detailed scheduling algorithm is covered by this paper. The high level goal of this delay scheduling algorithm are:

  1. Maintain fairness of resources for incoming tasks using min-max fair scheduling. This is generally the key issue and deals with — what about when a new job comes in? How and when do we assign resources to it? If cluster is occupied by 2 tasks that each use 50% or resources and a new task comes in, then ideally all three should get 1/3 or resources. But since the task is already running, should we kill the task or wait for it to finish? Pre-empting, like in kernel schedulers, is expensive in large distributed systems — saving of state won’t be trivial. The paper mentioned above has more details on how waiting for the job for a limited time to finish(relaxing fairness a little), works well and also helps with placing tasks on nodes that contain the data.
  2. Use data locality and place tasks where the data is for efficient execution patterns

Obviously some tasks may fail to run or may get blocked because of earlier failures. In such cases, the problem reverts to resubmitting tasks for the failed RDDs in the lineage.

Checkpointing

As we mentioned earlier, the RDDs are immutable and also that they can be recovered from the lineage graph. In some examples that use RDD paradigm, lineage graphs can grow quite a bit. Consider an iterative example like PageRank. In PageRank, in every iteration, input file is parsed to get links out of it. It then sends contributions to its linked documents. Then the rank is calculated for each document based on the contributions it received. This is how lineage graph for PageRank algorithm can look like in Spark:

Page Rank’s graph lineage grows with every iteration as RDDs are immutable and new ranks, contribs RDDs get created

For such cases it might be beneficial to checkpoint some specific RDD for quicker recovery. For applications that have trivial lineage graphs, checkpointing is not necessary. Due to immutability of RDDs, checkpointing implementation is easier — write in the background. One need not worry about locking and changes to RDDs while checkpointing is being done. Potentially, the scheduler can also do checkpointing based on the knowledge it has about RDD computation.

RDDs and expressing existing paradigms using it

Let’s look at how general purpose RDD is and how it can express some of the existing paradigms even.

MapReduce: Flatmap and group/reduce-by-key can express this paradigm

DyradLINQ: I hadn’t heard much about this framework before. But it lets folks use SQL for expressing queries on large datasets across multiple clusters.It mostly supports bulk operations over large data which can be expressed in RDDs.

Pregel:Pregel is useful for graph based algorithms such as PageRank and shortest paths. In each step, Pregel applies same function to the data represented by vertices in the pregel graph. So each vertex data can be represented using RDDs and then functions can be mapped to transformations.

Iterative MapReduce: These applications run multiple map-reduce iterations on the same data. This fits in very nicely with RDD model. Spark authors were able to implement HaLoop in 200 lines of code.

Batched Stream Processing: Many applications that use state of last few minutes to update the current state of the model like using clickstream data of the last window for real time advertising decisions. These systems use transformation on the same data and then store it on the disk. They could use RDD model for faster processing by storing these intermediate results in RDDs.

Conclusions

It seems like while RDDs look fundamentally limiting because of immutability, bulk operation dependence — these are the basic primitives that are used in data heavy, parallel cluster computing systems. So RDDs fit in nicely with that. In-memory computations are obvious for speedups, but graph lineage based approach aids with fault tolerance mechanism which would be needed for a system relying heavily on on-memory operations.

Get Best Software Deals Directly In Your Inbox

--

--