Managing Memory and Disk Resources in PySpark with Cache and Persist

An overview of PySpark’s cache and persist methods and how to optimize performance and scalability in PySpark applications

Ahmed Uz Zaman
ILLUMINATION
6 min readFeb 21, 2023

--

Photo by frank mckenna on Unsplash

Intro

In PySpark, cache() and persist() are methods used to improve the performance of Spark jobs by storing intermediate results in memory or on disk. Here's a brief description of each:

  1. cache(): This method is used to cache the RDD (Resilient Distributed Dataset) in memory. When an RDD is cached, the data is stored in memory so that it can be quickly accessed the next time it is needed. This can greatly improve the performance of Spark jobs by reducing the amount of time spent reading data from disk.

For example, consider the following code:

rdd = sc.parallelize(range(1000000))
rdd.cache()
result = rdd.reduce(lambda x, y: x + y)

(OR)

df = spark.range(1000000)
df.cache()
df_filtered = df.filter("id % 2 == 0")
df_sum = df_filtered.selectExpr("sum(id)").collect()

In this code, the RDD is cached before the reduce operation. This means that the data will be stored in memory and can be quickly accessed during the reduce operation, which should improve performance.

Note that caching a DataFrame can be especially useful if you plan to reuse it multiple times in your PySpark application. However, it’s important to use caching judiciously, as it can consume a lot of memory if not used correctly. In some cases, persisting a DataFrame with a more suitable storage level (e.g. disk storage) may be a better option.

2. persist(): This method is similar to cache(), but it allows you to specify where the RDD should be stored (in memory, on disk, or both). By default, persist() caches the RDD in memory, but you can use the StorageLevel parameter to specify a different storage level.

For example, consider the following code:

rdd = sc.parallelize(range(1000000))
rdd.persist(storageLevel=StorageLevel.DISK_ONLY)
result = rdd.reduce(lambda x, y: x + y)

(OR)

df = spark.range(1000000)
df.persist(storageLevel="DISK_ONLY")
df_filtered = df.filter("id % 2 == 0")
df_sum = df_filtered.selectExpr("sum(id)").collect()

In this code, the RDD is persisted on disk instead of being cached in memory. This means that the data will be stored on disk and can be accessed from there during the reduce operation, which should improve performance compared to reading the data from disk.

Also note that, like caching, persisting a DataFrame can be useful if you plan to reuse it multiple times in your PySpark application. However, it’s important to use persistence judiciously, as it can consume a lot of disk space if not used correctly.

Note that Dataset cache() is an alias for persist(StorageLevel.MEMORY_AND_DISK)

Advantages of using Cache and Persist in PySpark

There are several advantages to using cache() and persist() in PySpark:

  1. Faster Execution: By caching or persisting an RDD or DataFrame, subsequent computations that use the same RDD or DataFrame can avoid the overhead of reading the data from a disk. This can result in much faster execution times, especially for iterative or interactive workloads.
  2. Reduced Data Movement: When an RDD or DataFrame is cached or persisted, it stays on the nodes where it was computed, which can reduce data movement across the network. This can be particularly beneficial in distributed environments where network bandwidth is limited.
  3. Improved Resource Utilization: Caching or persisting an RDD or DataFrame can reduce the need for recomputing the same data multiple times, which can improve the utilization of compute resources. This can be particularly useful when working with large datasets or complex computations.
  4. Improved Debugging: Caching or persisting an RDD or DataFrame can help with debugging by allowing you to examine the data that is stored in memory or on a disk. This can be particularly useful when working with complex or iterative algorithms.
  5. Custom Storage Levels: persist() allows you to specify custom storage levels for an RDD or DataFrame, which can be useful when working with different types of data or hardware configurations. For example, you might want to store some data in memory but persist other data on disk.

Overall, using cache() and persist() can help improve the performance, scalability, and usability of Spark applications. However, it's important to use these methods judiciously, as caching or persisting too much data can lead to memory issues or inefficient use of resources.

Different Levels of Caching and Persistence in PySpark

PySpark provides different levels of caching and persistence for RDDs, which determines where the data is stored and how it is partitioned across the cluster. Here are the different storage levels that can be used with cache() and persist() methods:

  1. MEMORY_ONLY: This level stores the RDD in memory as deserialized Java objects. This is the default level used by cache() and persist(). It provides fast access to the data, but if the RDD does not fit entirely in memory, it may need to be recomputed from the original data source.
  2. MEMORY_ONLY_SER: This level stores the RDD or DataFrame in memory as serialized Java objects. This can reduce memory usage compared to MEMORY_ONLY, but accessing the data requires deserialization, which can be slower than using deserialized objects.
  3. MEMORY_AND_DISK: This level stores the RDD or DataFrame in memory as deserialized Java objects, but if the RDD or DataFrame does not fit entirely in memory, it spills the excess data to disk. This provides better performance than recomputing the data, but accessing data from disk can be slower than accessing it from memory.
  4. MEMORY_AND_DISK_SER: This level stores the RDD or DataFrame in memory as serialized Java objects, and spills excess data to disk if needed. This can be useful when memory usage is a concern, but accessing the data requires deserialization, which can be slower than using deserialized objects.
  5. DISK_ONLY: This level stores the RDD or DataFrame on disk only, and not in memory. This can be useful when memory usage is a concern and the data does not fit entirely in memory, but accessing the data from disk can be slower than accessing it from memory.

In addition to these basic storage levels, PySpark also provides options for controlling how the data is partitioned and cached, such as MEMORY_ONLY_2, which replicates the data on two nodes for fault tolerance, or MEMORY_ONLY_SER_10, which serializes the data and splits it into ten partitions.

What is uncache() and unpersist() in PySpark

In PySpark, uncache() and unpersist() are methods used to remove RDDs from memory or disk, respectively, after they have been cached or persisted using cache() or persist() methods. Here's a brief description of each:

  1. uncache(): This method is used to remove an RDD from memory that was previously cached using the cache() method. Once an RDD has been uncached, its data is no longer stored in memory, and it must be recomputed from its original source if it is needed again.

For example, consider the following code:

rdd = sc.parallelize(range(1000000))
rdd.cache()
result = rdd.reduce(lambda x, y: x + y)
rdd.unpersist()

(OR)

df = spark.range(1000000)
df.cache()
df_filtered = df.filter("id % 2 == 0")
df_sum = df_filtered.selectExpr("sum(id)").collect()
df.unpersist()

In this code, the RDD is uncached after the reduce operation has been completed. This frees up the memory used by the RDD, which can be beneficial in cases where memory usage is a concern.

2. unpersist(): This method is used to remove an RDD from the disk that was previously persisted using the persist() method. Once an RDD has been unpersisted, its data is no longer stored on disk, and it must be recomputed from its original source if it is needed again.

For example, consider the following code:

rdd = sc.parallelize(range(1000000))
rdd.persist(storageLevel=StorageLevel.DISK_ONLY)
result = rdd.reduce(lambda x, y: x + y)
rdd.unpersist()

(OR)

df = spark.range(1000000)
df.persist(storageLevel="DISK_ONLY")
df_filtered = df.filter("id % 2 == 0")
df_sum = df_filtered.selectExpr("sum(id)").collect()
df.unpersist()

In this code, the RDD is unpersisted after the reduce operation has been completed. This frees up the disk space used by the RDD, which can be beneficial in cases where disk usage is a concern.

Note that it’s important to use uncache() and unpersist() methods carefully to avoid unnecessary recomputations or data movements. It's generally a good practice to remove RDDs from memory or disk when they are no longer needed, especially if the RDDs are large or memory or disk resources are limited.

Conclusion

In conclusion, uncache() and unpersist() methods in PySpark are used to remove RDDs from memory or disk, respectively after they have been cached or persisted using cache() or persist() methods. These methods are important to manage memory and disk resources efficiently and avoid unnecessary recomputations or data movements. When using these methods, it's important to carefully consider when an RDD is no longer needed and remove it from memory or disk accordingly. By doing so, we can optimize performance and scalability in PySpark applications.

Check out my PySpark 101 series of articles for more spark related content. Enjoy Reading..!!

--

--

ILLUMINATION
ILLUMINATION

Published in ILLUMINATION

We curate and disseminate outstanding articles from diverse domains and disciplines to create fusion and synergy.

Ahmed Uz Zaman
Ahmed Uz Zaman

Written by Ahmed Uz Zaman

Lead QA Engineer | ETL Test Engineer | PySpark | SQL | AWS | Azure | Improvising Data Quality through innovative technologies | linkedin.com/in/ahmed-uz-zaman/