Delta Lake Database/Schema Maintenance Part-1

Viral Patel
8 min readFeb 12, 2024

In this blog post, I am going to walk you through the journey of delta table maintenance. The process I have impalement as POC and It has been tested successfully. we are going to explore OPTIMZIE AND VACCUM commands.

We already know databricks is working hard to make Delta Lake better and better everyday. Some of the new features of Delta Lake will remove this process/maintenance in the future. The way databricks is going forward it is amazing. Let’s talk about the implementation.

What is OPTIMZE Command?

The “OPTIMIZE” command is used to improve the performance and efficiency of tables in Apache Spark SQL and Delta Lake. It is specifically used for Delta Lake tables.

The “OPTIMIZE” command performs the following actions on a Delta Lake table:

  1. Compact files: It merges multiple small files into larger files, reducing the number of files and improving read and write performance.
  2. Reorganize files: It reorganizes the physical arrangement of data files, allowing for better data locality and reducing disk I/O operations.
  3. Update table statistics: It updates metadata statistics about the table, such as file size, row count, and column statistics. This helps optimize query execution plans.
  4. Optionally, it can also include Z-Ordering: Z-Ordering is a technique to sort and organize data within each data file based on specific columns. It improves data skipping and query performance for certain types of queries.

It is important to note that while the “OPTIMIZE” command improves the performance of tables, it is an expensive operation that involves rewriting the data. Therefore, it should be used judiciously and at an appropriate time based on the data update patterns and workload requirements.

https://learn.microsoft.com/en-gb/azure/databricks/sql/language-manual/delta-optimize

Optimize also can be set during writing to the table. You can find more details here https://learn.microsoft.com/en-us/azure/databricks/delta/tune-file-size It may be expensive operation depending on your delta size and frequency of the write to the delta table.

What is VACUUM Command?

VACUUM removes all files from the table directory that are not managed by Delta, as well as data files that are no longer in the latest state of the transaction log for the table and are older than a retention threshold. VACUUM will skip all directories that begin with an underscore (_), which includes the _delta_log. Partitioning your table on a column that begins with an underscore is an exception to this rule; VACUUM scans all valid partitions included in the target Delta table. Delta table data files are deleted according to the time they have been logically removed from Delta’s transaction log plus retention hours, not their modification timestamps on the storage system. The default threshold is 7 days. please follow the link for more details about vacuum command.

https://learn.microsoft.com/en-gb/azure/databricks/sql/language-manual/delta-vacuum

Now, we know about optimize and vacuum command does to the delta table. question is how frequently you run this process and how do you execute this process if you have large number of tables in your lake house. It all depends on your requirements. I will talk about this two points at the end of the blog post. Let me talk about implementation.

  1. optimize_and_vacuum_tables
def optimize_and_vacuum_tables(table_name,parameters):
"""
This function optimizes and vacuums a given table in the database.

Parameters:
table_name (str): The name of the table to optimize and vacuum.
parameters (dict): Additional parameters for the optimization and vacuuming process.

Returns:
DataFrame: The result of the optimization and vacuuming process.

"""
print(table_name)
#print(parameters)
try:
emptyRDD = spark.sparkContext.emptyRDD()

# Create empty schema
columns = StructType([])

# Create an empty RDD with empty schema
vacuum_rtn = spark.createDataFrame(data = emptyRDD, schema = columns)

# Optimize Table
optimize = (
spark.sql(f"OPTIMIZE {table_name}")
.withColumn("OperationType",lit("OPTIMIZE"))
.withColumn("TableName",lit(table_name))
.withColumn("RetainHours",lit(retain_hours))
)

if vacuum_table == True:

if retain_hours < 168:
print("Setting spark.databricks.delta.retentionDurationCheck.enabled = false")
spark.sql("set spark.databricks.delta.retentionDurationCheck.enabled = false")
vacuum_rtn = (
spark.sql(f"VACUUM {table_name} RETAIN {retain_hours} HOURS")
.withColumn("OperationType",lit("VACUUM"))
.withColumn("TableName",lit(table_name))
.withColumn("RetainHours",lit(retain_hours))
)
else:
print("Setting spark.databricks.delta.retentionDurationCheck.enabled = true")
spark.sql("set spark.databricks.delta.retentionDurationCheck.enabled = true")
vacuum_rtn = (
spark.sql(f"VACUUM {table_name}")
.withColumn("OperationType",lit("VACUUM"))
.withColumn("TableName",lit(table_name))
.withColumn("RetainHours",lit(retain_hours))
)

optimize_rtn = optimize.unionByName(vacuum_rtn, allowMissingColumns=True)
return optimize_rtn
else:
return optimize

except:
print(f"please check table {table_name}")

The above function called “optimize_and_vacuum_tables” that takes in two parameters: “table_name” (the name of the table to optimize and vacuum) and “parameters” (a dictionary containing additional parameters).

The function starts by printing the table_name. Then, it creates an empty RDD and an empty schema using Spark.

Next, it executes a SQL query to optimize the table using the “OPTIMIZE” command. The resulting DataFrame is assigned to the variable “optimize” and columns “OperationType”, “TableName”, and “RetainHours” are added to it.

If the parameter “vacuum_table” is set to True, the function proceeds with vacuuming the table. If the “retain_hours” parameter is less than 168, it disables the retention duration check by setting the configuration “spark.databricks.delta.retentionDurationCheck.enabled” to false. It then executes a SQL query to vacuum the table with the specified retention hours and adds the same columns (“OperationType”, “TableName”, and “RetainHours”) to the resulting DataFrame.

If the “retain_hours” is greater than or equal to 168, it enables the retention duration check by setting the configuration to true and executes a SQL query to vacuum the table without specifying the retention hours.

Finally, the function combines the “optimize” and “vacuum_rtn” DataFrames using the “unionByName” method to create a DataFrame named “optimize_rtn”. If “vacuum_table” is False, the function returns only the “optimize” DataFrame.

If any exception occurs during the execution, the function prints a message indicating that the table should be checked.

2. run_optimize_in_parallel

schema = StructType([
StructField('path', StringType(), True),
StructField('metrics', StructType([
StructField('numFilesAdded', LongType(), False),
StructField('numFilesRemoved', LongType(), False),
StructField('filesAdded', StructType([
StructField('min', LongType(), True),
StructField('max', LongType(), True),
StructField('avg', DoubleType(), False),
StructField('totalFiles', LongType(), False),
StructField('totalSize', LongType(), False)
]), True),
StructField('filesRemoved', StructType([
StructField('min', LongType(), True),
StructField('max', LongType(), True),
StructField('avg', DoubleType(), False),
StructField('totalFiles', LongType(), False),
StructField('totalSize', LongType(), False)
]), True),
StructField('partitionsOptimized', LongType(), False),
StructField('zOrderStats', StructType([
StructField('strategyName', StringType(), True),
StructField('inputCubeFiles', StructType([
StructField('num', LongType(), False),
StructField('size', LongType(), False)
]), True),
StructField('inputOtherFiles', StructType([
StructField('num', LongType(), False),
StructField('size', LongType(), False)
]), True),
StructField('inputNumCubes', LongType(), False),
StructField('mergedFiles', StructType([
StructField('num', LongType(), False),
StructField('size', LongType(), False)
]), True),
StructField('numOutputCubes', LongType(), False),
StructField('mergedNumCubes', LongType(), True)
]), True),
StructField('numBatches', LongType(), False),
StructField('totalConsideredFiles', LongType(), False),
StructField('totalFilesSkipped', LongType(), False),
StructField('preserveInsertionOrder', BooleanType(), False),
StructField('numFilesSkippedToReduceWriteAmplification', LongType(), False),
StructField('numBytesSkippedToReduceWriteAmplification', LongType(), False),
StructField('startTimeMs', LongType(), False),
StructField('endTimeMs', LongType(), False),
StructField('totalClusterParallelism', LongType(), False),
StructField('totalScheduledTasks', LongType(), False),
StructField('autoCompactParallelismStats', StructType([
StructField('maxClusterActiveParallelism', LongType(), True),
StructField('minClusterActiveParallelism', LongType(), True),
StructField('maxSessionActiveParallelism', LongType(), True),
StructField('minSessionActiveParallelism', LongType(), True)
]), True)
]), True),
StructField('OperationType', StringType(), False),
StructField('TableName', StringType(), False),
StructField('RetainHours', IntegerType(), True)
])

def run_optimize_in_parallel(function_name, parameters, lists):
final_results = spark.createDataFrame([], schema)
with concurrent.futures.ThreadPoolExecutor(121) as executor:
futures = [executor.submit(function_name, i, parameters) for i in lists]
# wait for all futures to finish
for future in concurrent.futures.as_completed(futures):
try:
# get the result of each future
results = future.result()
final_results = final_results.union(results)

except Exception as e:
# handle any exceptions that occurred while processing the future
print(e)
pass
return final_results

This above code define a schema for a Spark DataFrame that will be used to store optimization and vacuuming results. The schema has multiple nested fields representing various metrics and statistics.

The main function in the code is run_optimize_in_parallel, which takes as input the name of a function, a set of parameters, and a list of inputs. It creates an empty DataFrame using the defined schema.

Then, it uses a ThreadPoolExecutor to concurrently execute the specified function for each input in the list. The function is called with the input and the parameters. The results of each execution are appended to the final_results DataFrame using the union method.

After all the executions are completed, the function returns the final_results DataFrame.

3. optimize_and_vacuum

def optimize_and_vacuum(list_of_databases, vacuum_table, retain_hours):
"""
Optimizes and vacuums tables in the given list of databases.

Parameters:
list_of_databases (list): List of databases to optimize and vacuum.
vacuum_table (bool): Whether to vacuum the tables or not.
retain_hours (int): Number of hours of data to retain after vacuuming.

Returns:
DataFrame: A DataFrame containing the optimized and vacuumed tables.
"""
# Create an empty DataFrame
emptyRDD = spark.sparkContext.emptyRDD()
columns = StructType([])
dfx_rtn = spark.createDataFrame(data=emptyRDD, schema=columns)

# Iterate through each database
for db in list_of_databases:
# Get list of tables in the database
df = spark.sql(f"SHOW TABLES IN {db}")
list_of_tables = df.collect()
lists = [f"{i['database']}.{i['tableName']}" for i in list_of_tables]

# Call the function to optimize and vacuum tables in parallel
function_name = optimize_and_vacuum_tables
parameters = (vacuum_table, retain_hours)
dfx = run_optimize_in_parallel(function_name, parameters, lists)
dfx_rtn = dfx_rtn.unionByName(dfx, allowMissingColumns=True)

return dfx_rtn

The code is a function called optimize_and_vacuum that takes three parameters: list_of_databases, vacuum_table, and retain_hours. It performs optimization and vacuuming operations on tables in multiple databases.

The function starts by creating an empty RDD (Resilient Distributed Dataset) and an empty schema. Then, it iterates over each database in the list_of_databases parameter.

For each database, the function retrieves a list of tables using a SHOW TABLES SQL query. It then creates a list of fully qualified table names by combining the database name and the table name.

Next, it calls a function called run_optimize_in_parallel to run optimization and vacuuming operations on the tables in parallel. The function name optimize_and_vacuum_tables and the parameters vacuum_table and retain_hours are passed to run_optimize_in_parallel along with the list of table names.

The result of the optimization and vacuuming operation is a dataframe, which is stored in the variable dfx. The dfx dataframe is then added to the dfx_rtn dataframe using the unionByName method to append the results from each database.

Finally, the function returns the dfx_rtn dataframe, which contains the results of the optimization and vacuuming operations on all tables in all databases.

4. Run optimize_and_vacuum

This below run retrieves a list of databases from Spark SQL and stores them in the variable databases. Then, it extracts the names of the databases from the databases variable and stores them in the list list_of_databases. The code then assigns Boolean values to the variables vacuum_table (True) and retain_hours (168).

Finally, it calls the function optimize_and_vacuum passing the list_of_databases, vacuum_table, and retain_hours as arguments, and stores the result in the variable df_opt. Finally, it displays the df_opt dataframe.

# Collect the list of databases
databases = spark.sql("SHOW DATABASES").collect()

# Extract the names of the databases
list_of_databases = [f"{i['databaseName']}" for i in databases]

# Print the list of databases
print(list_of_databases)

# Set the vacuum_table parameter to True
vacuum_table = True

# Set the retain_hours parameter to 168
retain_hours = 168

# Call the optimize_and_vacuum function to optimize and vacuum the tables
df_opt = optimize_and_vacuum(list_of_databases, vacuum_table, retain_hours)

# Display the resulting DataFrame
display(df_opt)

Finally, you can combine all the block of code into databricks notebook and you can schedule it using databricks job or Adf schedule trigger run based on your requirement. This process will improve the performance of the delta table and it will also save cost on storage by vacuuming unnecessary delta table history.

You can use df_opt dataframe to write to delta table and you can build report out of it to see, How many files have been compacted, deleted etc. It will give insights of your delta table behaviour. It may be useful for future improvement of your delta table and design pattern.

How often should I run OPTIMIZE?

When you choose how often to run OPTIMIZE, there is a trade-off between performance and cost. For better end-user query performance, run OPTIMIZE more often. This will incur a higher cost because of the increased resource usage. To optimize cost, run it less often.

Databricks recommends that you start by running OPTIMIZE on a daily basis, and then adjust the frequency to balance cost and performance trade-offs.

Databricks recommends that you start by running OPTIMIZE on a daily basis, and then adjust the frequency to balance cost and performance trade-offs.

Note: This process may not required in the future if you have fully migrated to unity Catalog. Auto Optimization feature is part of liquid clustering and it a part of public preview. Currently Auto Optimization is supported for Managed Tables only. You find more details about liquid clustering.

https://learn.microsoft.com/en-us/azure/databricks/delta/clustering

Thanks

--

--

Viral Patel

Azure Morden Data Platform, Senior Solution Architect, Databricks Champion