Recursive CTE on Databricks

Ryan Chynoweth
3 min readApr 20, 2022

--

Introduction

SQL at Databricks is one of the most popular languages for data modeling, data acquisition, and reporting. A somewhat common question we are asked is if we support Recursive Common Table Expressions (CTE). A recursive CTE is the process in which a query repeatedly executes, returns a subset, unions the data until the recursive process completes.

Here is an example of a TSQL Recursive CTE using the Adventure Works database:

CREATE PROCEDURE [dbo].[uspGetBillOfMaterials]
@StartProductID [int],
@CheckDate [datetime]
AS
BEGIN
SET NOCOUNT ON;

-- Use recursive query to generate a multi-level Bill of Material (i.e. all level 1
-- components of a level 0 assembly, all level 2 components of a level 1 assembly)
-- The CheckDate eliminates any components that are no longer used in the product on this date.
WITH [BOM_cte]([ProductAssemblyID], [ComponentID], [ComponentDesc], [PerAssemblyQty], [StandardCost], [ListPrice], [BOMLevel], [RecursionLevel]) -- CTE name and columns
AS (
SELECT b.[ProductAssemblyID], b.[ComponentID], p.[Name], b.[PerAssemblyQty], p.[StandardCost], p.[ListPrice], b.[BOMLevel], 0 -- Get the initial list of components for the bike assembly
FROM [Production].[BillOfMaterials] b
INNER JOIN [Production].[Product] p
ON b.[ComponentID] = p.[ProductID]
WHERE b.[ProductAssemblyID] = @StartProductID
AND @CheckDate >= b.[StartDate]
AND @CheckDate <= ISNULL(b.[EndDate], @CheckDate)
UNION ALL
SELECT b.[ProductAssemblyID], b.[ComponentID], p.[Name], b.[PerAssemblyQty], p.[StandardCost], p.[ListPrice], b.[BOMLevel], [RecursionLevel] + 1 -- Join recursive member to anchor
FROM [BOM_cte] cte
INNER JOIN [Production].[BillOfMaterials] b
ON b.[ProductAssemblyID] = cte.[ComponentID]
INNER JOIN [Production].[Product] p
ON b.[ComponentID] = p.[ProductID]
WHERE @CheckDate >= b.[StartDate]
AND @CheckDate <= ISNULL(b.[EndDate], @CheckDate)
)
-- Outer select from the CTE
SELECT b.[ProductAssemblyID], b.[ComponentID], b.[ComponentDesc], SUM(b.[PerAssemblyQty]) AS [TotalQuantity] , b.[StandardCost], b.[ListPrice], b.[BOMLevel], b.[RecursionLevel]
FROM [BOM_cte] b
GROUP BY b.[ComponentID], b.[ComponentDesc], b.[ProductAssemblyID], b.[BOMLevel], b.[RecursionLevel], b.[StandardCost], b.[ListPrice]
ORDER BY b.[BOMLevel], b.[ProductAssemblyID], b.[ComponentID]
OPTION (MAXRECURSION 25)
END;
GO

Recursive CTEs are most commonly used to model hierarchical data. In the case above, we are looking to get all the parts associated with a specific assembly item. Another common use case is organizational structures.

Recursive Common Table Expression on Databricks

Unfortunately, Spark SQL does not natively support recursion as shown above. But luckily Databricks users are not restricted to using only SQL! Using PySpark we can reconstruct the above query using a simply Python loop to union dataframes.

In the TSQL example, you will notice that we are working with two different tables from the Adventure Works demo database: BillOfMaterials and Product.

Using PySpark the SQL code translates to the following:

i = 1
check_date = '2010-12-23'
start_product_id = 972 # provide a specific id
# bill_df corresponds to the "BOM_CTE" clause in the above query
df = spark.sql("""
SELECT b.ProductAssemblyID, b.ComponentID, p.Name, b.PerAssemblyQty, p.StandardCost, p.ListPrice, b.BOMLevel, 0 as RecursionLevel FROM BillOfMaterials b
INNER JOIN Product p ON b.ComponentID = p.ProductID
WHERE b.ProductAssemblyID = {} AND '{}' >= b.StartDate AND '{}' <= IFNULL(b.EndDate, '{}')
""".format(start_product_id, check_date, check_date, check_date))

# this view is our 'CTE' that we reference with each pass
df.createOrReplaceTempView('recursion_df')
while True:
# select data for this recursion level
bill_df = spark.sql("""
SELECT b.ProductAssemblyID, b.ComponentID, p.Name, b.PerAssemblyQty, p.StandardCost, p.ListPrice, b.BOMLevel, {} as RecursionLevel FROM recursion_df cte
INNER JOIN BillOfMaterials b ON b.ProductAssemblyID = cte.ComponentID
INNER JOIN Product p ON b.ComponentID = p.ProductID
WHERE '{}' >= b.StartDate AND '{}' <= IFNULL(b.EndDate, '{}')
""".format(i, check_date,check_date,check_date))
# this view is our 'CTE' that we reference with each pass
bill_df.createOrReplaceTempView('recursion_df')
# add the results to the main output dataframe
df = df.union(bill_df)
# if there are no results at this recursion level then break
if bill_df.count() == 0:
df.createOrReplaceTempView("final_df")
break
else:
i += 1

This may seem overly complex for many users, and maybe it is. However, if you notice we are able to utilize much of the same SQL query used in the original TSQL example using the spark.sql function. Additionally, the logic has mostly remained the same with small conversions to use Python syntax.

Conclusion

While the syntax and language conversion for Recursive CTEs are not ideal for SQL only users, it is important to point that it is possible on Databricks. So you do not lose functionality when moving to a Lakehouse, it just may change and in the end provide even more possibilities than a Cloud Data Warehouse.

Edit 10.03.22 — check out this blog with a similar idea but with list comprehensions instead!

Disclaimer: these are my own thoughts and opinions and not a reflection of my employer

--

--

Ryan Chynoweth

Senior Solutions Architect Databricks — anything shared is my own thoughts and opinions