Persist, Cache and Checkpoint in Apache Spark

Badwaik Ojas
8 min readApr 10, 2023

--

Cache and Persist

What a Data Analyst/Data Scientist do? Data Analyst pull the report, aggregate it and generate report. They work on highly repetitive queries since they are analyzing. They run it, change the parameter, run it again etc. there is a lot of back and forth they are doing during the analyzing phase. Data Scientist classically functions like a Data Analyst. Since the approach for Data Scientist is more scientific, more statistical in that they are more inclined to use heavy compute as oppose who randomly exploring data.

The datasets that Data Analyst/Scientist use is generally big in size. In case of Spark whenever we query the data it goes from the initial stage of reading the file from source and generating the results. Querying it once is ok imagine querying it repeatedly this would sometimes be frustrating.

Consider ETL/ELT jobs moving data around is expensive, both in time and money. In practice, your work will be connected to a central data store and the queries you run will pull data from there. In general, each time you run a query, you are going all the way back to the original store to fetch the data.

All of this overhead can be avoided using Cache or Persist.

One of the significant parts of Apache Spark is the ability to store things in memory during the computation. This is a neat trick that you can use as a way to speed up access to commonly queried tables or pieces of data. This is also great for iterative algorithms that work over and over again on the same data. While many see this as a panacea to speed issue but other concepts like data partitioning, clustering, bucketing can end up having a greater effect on the execution of the job than caching. However, remember these are all tools in your tool kits.

Though Spark provides computation 100 x times faster than traditional Map Reduce jobs, If you have not designed the jobs to reuse the repeating computations you will see degrade in performance when you are dealing with billions or trillions of data. Hence, we may need to look at the stages and use optimization techniques as one of the ways to improve performance.

In an application that reuse the same datasets over and over, one of the most useful optimization is caching and persisting. Caching is to place a Dataframe or Table into temporary storage across the executors in your cluster and make subsequent reads faster.

Cache:

Cache() method is used to cache the intermediate results of the transformation so that other transformation runs on top of cached Dataframe will perform faster. Caching the result of the transformation is one of the optimization tricks to improve the performance of the long-running Spark applications/jobs.

var df = spark.read.schema(emp_schema).csv("\path\to\emp\emp.csv")
df.cache()
df = df.withColumn("increment", lit(1000))
df.show()

What happens when we cache?

After calling the cache() function nothing happens with the data but the query plan is updated by the Cache Manager by adding a new operator. Actual Caching happens when you call Spark Actions.

It works on the concept of lazy evaluation i.e. until and unless we call for an action nothing is materialized. When we call for the action its starts by reading the file and it sees it is cached, the data is then cached in the memory and next it does some transformation and then perform the action.

You see that in case of the second transformation it loads the data from the Cache and does not go to the source to fetch the data.

Spark automatically monitors cache usage on each node and drops out old data partitions in a least-recently-used (LRU) fashion. So least recently used will be removed first from cache.

Both Caching and Persisting are used to save the Spark RDD, Dataframe, and Dataset’s. But, the difference is, RDD cache() method default saves it to memory (MEMORY_AND_DISK) whereas persist() method is used to store it to the user-defined storage level.

Persist

Persist is similar to Cache the only difference is it can take argument and that too is optional. If no argument is given which by default saves it to MEMORY_AND_DISK storage level and the second signature which takes StorageLevel as an argument to store it to different storage levels.

Please refer this Spark Documentation for details of different StorageLevel.

To explicitly remove the Cache data from the memory we use an API unpersist.

df.unpersist()

In case of Caching and Persisting the lineage is kept intact which means they are fault tolerant and meaning if any partition of a Dataset is lost, it will automatically be recomputed using the original transformations that created it.

As you work with data in Spark, you will often want to reuse a certain dataset. It is important to be careful how you use caching, because it is an expensive operation in itself. If you are using the dataset once, for example the cost of pulling and caching is greater than that of the original data pulling. Once the data is cached. Catalyst Optimizer will only reach back to the location where the data was Cached.

Aside from those above use cases, you should not cache dataframe because it is likely that you’ll degrade the performance of your application. Caching consumes cluster resources that could otherwise be used for task execution. Caching prevents spark from performing query optimization.

The abuse of cache feature can sometime lead to more performance problems. It gets in the way of the Catalyst Optimizer, cripples predicate pushdown. they don’t understand the amount of time it takes to populate the Cache vs the read operation that is pulling from the Cache.

With great powers come great responsibilities :)

Checkpoint

Checkpointing is the act of saving an RDD to disk so that future references to this RDD point to those intermediate partitions on disk rather than recomputing the RDD from its original source. This is similar to caching except that it’s not stored in memory, only disk.

Consider the following code. Step 1 is setting the Checkpoint Directory. Step 2 is creating a employee Dataframe. Step 3 in creating a department Dataframe. Step 4 is joining of the employee and department Dataframe then ckeckpointing the join_df.

spark.sparkContext.setCheckpointDir("\path\to\checkpoint\dir") // Step 1
var emp_df = spark.read.schema(emp_schema).csv("\path\to\emp\emp.csv") //Step 2
var dept_df = spark.read.schema(dept_schema)
.csv("\path\to\dept\dept.csv") //Step 3
var join_df = emp_df.join(dept_df, Seq("deptid"), "left") // Step 4
join_df = join_df.checkpoint() // Step 5
join_df.show() // Step 6

Now lets see the Spark UI for the difference between with checkpoint and without checkpoint.

Without Checkpoint: You see only one job is created. The Logical Plan is the complete plan that is required for achieving the target goal. The DAG as shown below has the complete flow where the data is loaded in two Dataframe and a join is made between the Dataframe.

Without Checkpoint

With Checkpoint: You see a separate job is created when a checkpoint is called. This function materializes the RDD and stores into the checkpoint directory we have specified in the code. Once the Dataframe is checkpointed the subsequent reads would be from the Checkpoint location.

You see the Logical plan here it is truncated when compared to the Logical plan for the above.

DAG is also different in this case. Here we can see the starting point becomes the point where it was checkpointed.

With Checkpoint

Checkpointing is used to truncate the logical plan of this Dataset, which is especially useful in iterative algorithms where the plan may grow exponentially.

When to Checkpoint? Every time a computed partition needs to be cached, it is cached into memory. However, Checkpoint does not follow the same principle. Instead, it waits until the end of a job, and launches another job to finish Checkpoint. An RDD which needs to be checkpointed will be computed twice; thus it is suggested to do a rdd.cache() before rdd.checkpoint(). In this case, the second job will not recompute the RDD. Instead, it will just read cache.

Persist / Cache keeps lineage intact while checkpoint breaks lineage. lineage is preserved even if data is fetched from the cache. It means that data can be recomputed from scratch if some partitions of in Cache are lost. In the second case lineage is completely lost after the checkpoint and it doesn’t carry an information required to rebuild it anymore.

What is the difference between cache and checkpoint ?

Here is the an answer from Tathagata Das:
There is a significant difference between cache and checkpoint. Cache materializes the RDD and keeps it in memory (and/or disk). But the lineage (computing chain) of RDD (that is, seq of operations that generated the RDD) will be remembered, so that if there are node failures and parts of the cached RDDs are lost, they can be regenerated. However, checkpoint saves the RDD to an HDFS file and actually forgets the lineage completely. This allows long lineages to be truncated and the data to be saved reliably in HDFS, which is naturally fault tolerant by replication.

Furthermore, rdd.persist(StorageLevel.DISK_ONLY) is also different from checkpoint. Through the former can persist RDD partitions to disk, the partitions are managed by blockManager. Once driver program finishes, which means the thread where CoarseGrainedExecutorBackend lies in stops, blockManager will stop, the RDD cached to disk will be dropped (local files used by blockManager will be deleted). But checkpoint will persist RDD to HDFS or local directory. If not removed manually, they will always be on disk, so they can be used by the next driver program.

Points to remember here is when Hadoop MapReduce executes a job, it keeps persisting data (writing to HDFS) at the end of every task and every job. When executing a task, it keeps swapping between memory and disk, back and forth. The problem of Hadoop is that task needs to be re-executed if any error occurs, e.g. shuffle stopped by errors will have only half of the data persisted on disk, and then the persisted data will be recomputed for the next run of shuffle. Spark’s advantage is that, when error occurs, the next run will read data from checkpoint, but the downside is that checkpoint needs to execute the job twice.

Caching is extremely useful than checkpointing when you have lot of available memory to store your RDD or Dataframes if they are massive.

Caching will maintain the result of your transformations so that those transformations will not have to be recomputed again when additional transformations is applied on RDD or Dataframe, when you apply Caching Spark stores history of transformations applied and re compute them in case of insufficient memory, but when you apply checkpointing spark throws away all of your transformations and stores finally Dataframe into HDFS forever.

The main problem of checkpointing is to store the data into HDFS which is slower than caching. you also need to setup checkpointing location on HDFS. persist(StorageLevel.DISK_ONLY) also has does similar thing but it stores history of your transformations. Checkpointing is mainly used in stateful transformation that combine data across multiple batches. In such transformations, the generated RDDs depend on RDDs of previous batches, which causes the length of the dependency chain to keep increasing with time and to avoid such unbounded increases in recovery time.

With this we come to an end to this Blog. Choose well and use wisely.

Thanks for Reading!!

References:

  1. Spark Internals
  2. DataBricks Community

--

--

Badwaik Ojas

#Certified Databricks Spark Developer #AWS Certified #BigData Enthusiast.