How Cache Works in Apache Spark
cache() is an Apache Spark transformation that can be used on a DataFrame, Dataset, or RDD when you want to perform more than one action. cache() caches the specified DataFrame, Dataset, or RDD in the memory of your cluster’s workers. Since cache() is a transformation, the caching operation takes place only when a Spark action (for example, count(), show(), take(), or write()) is also used on the same DataFrame, Dataset, or RDD in a single action.
Under what scenarios caching is an optimized solution —
Reusing Data: Caching is optimal when you need to perform multiple operations on the same dataset to avoid reading from storage repeatedly.
Frequent Subset Access: Useful for frequently accessing small subsets of a large dataset, reducing the need to load the entire dataset repeatedly.
Iterative Algorithms and interactive data exploration.
Let’s understand this with an example
val spark = SparkSession.builder().master("local")
.appName("HowCacheWork2")
.getOrCreate()
val sparkContext = spark.sparkContext
sparkContext.setLogLevel("ERROR")
val df = spark.read.option("header","true")csv("D:\\SparkDoc\\CsvDoc\\Csv2\\AppleStore.csv")
df.repartition(8)
df.select("SNo","id","track_name").filter(col("SNo") > 10).cache() //Line 1
df.select("SNo","id","track_name").filter(col("SNo") > 10).count() //Line 2
df.select("SNo","id","track_name").filter(col("SNo") > 100).count() // Line 3
As our data is loaded in df data frame and we are repartitioning to 8 partition.
Now at Line 1 — Caching will happen by storing only one partition since cache is lazy operation in spark as shown in below figure.
Now at Line 2 — Since count() is an action and spark need to count all the records in data frame it need to complete caching.
At Line 3, it should ideally access the cache() since we are attempting to retrieve a subset of data that has already been cached. However, at Line 3, it won’t use the cached data because there is a disparity in the ‘Analyzed logical plan’ between the two, causing it not to access the cache. Below is the logical plan for Line 3.
Now lets understand more in deep
df.select("SNo","id","track_name").filter(col("SNo") > 100).cache() //Line 4
df.select("SNo","id","track_name").filter(col("SNo") > 100).count() //Line 5
At Line 5 — In this case this will hit the cache() since Analyzed Logical plan is same for Line 4 and Line 5.
Conclusion from above approach is —
When a DataFrame is cached and a subsequent operation is performed on the cached DataFrame, Spark first compares the analyzed logical plan of the operation to the one used when the DataFrame was cached. If the plans match, Spark retrieves the data from the cache; otherwise, it does not access the cached data.
Lets take another approach -
val cdf = df.select("SNo","id","track_name").filter(col("SNo") > 10).cache() /line A
cdf.select("SNo","id","track_name").filter(col("SNo") > 10).count() //Line B
cdf.select("SNo","id","track_name").filter(col("SNo") > 100).count() //Line C
At Line B and Line C, caching will be utilized because we’re storing the cached DataFrame in a variable and reusing it. This approach helps us avoid redundant calls to cache()
.
you can connect me here
https://www.linkedin.com/in/charchit-patidar/
for any assitance, feel free to connect on topmate
Happy Learning !!