How to implement Recursive Queries in Spark

Akash Chaurasia
Globant
Published in
5 min readJul 15, 2022

To identify the top-level hierarchy of one column with the use of another column we use Recursive Common Table Expressions, commonly termed as Recursive CTE in relational databases. Recursive CTE is one of the important features that many traditional relational databases such as SQL Server, Oracle, Teradata, Snowflake, etc. Spark SQL does not support recursive CTE when using Dataframe operations. In this article, we will check how to achieve Spark SQL Recursive Dataframe using PySpark.

Before implementing this solution, I researched many options and SparkGraphX API had the possibility to achieve this. However, I could not find any sustainable solution which could fulfill the project demands, and I was trying to implement a solution that is more of the SQL-like solution and PySpark compatible. Hence I came up with the solution to Implement Recursion in PySpark using List Comprehension and Iterative Map functions.

Preface

In recursive queries, there is a child element, or we can say the seed element, which is at the lowest level of the hierarchy. Query with the seed element is the first query that generates the result set. In the next step whatever result set is generated by the seed element is joined with another column to generate the result set. This step continues until the top-level hierarchy. Once no new row is retrieved, iteration ends.

I have tried to replicate the same steps in PySpark using Dataframe, List Comprehension, and Iterative map functions to achieve the same result. Let’s start with a real-time implementation, before jumping into the PySpark Dataframe operations let us check the recursive query in a relational database.

Relational Database Recursive Query Example

To create a dataset locally, you can use the commands below.

CREATE TABLE employee_record (employee_number INT ,manager_employee_number INT); 
insert into employee_record values( 404,NULL);
insert into employee_record values( 1014,1019);
insert into employee_record values( 1011,1019);
insert into employee_record values( 1010,1003);
insert into employee_record values( 1001,1003);
insert into employee_record values( 1004,1003);
insert into employee_record values( 1012,1004);
insert into employee_record values( 1002,1004);
insert into employee_record values( 1015,1004);
insert into employee_record values( 1003,404);
insert into employee_record values( 1019,404);
insert into employee_record values( 1016,404);
insert into employee_record values( 1008,1019);
insert into employee_record values( 1006,1019);

Below is the screenshot of the result set :

This table represents the relationship between an employee and its manager, In simple words for a particular organization who is the manager of an employee and manager of a manager. This means this table contains a hierarchy of employee-manager data.

Let's understand this more. Take a look at the following figure containing employees that looks like hierarchy.

This is our SQL Recursive Query which retrieves the employee number of all employees who directly or indirectly report to the manager with employee_number = 404:

WITH CTE_Recursive AS 
(
select er.employee_number from employee_record er
where er.manager_employee_number = 404
UNION ALL
select emprec.employee_number from CTE_Recursive crec
inner join employee_record emprec
on crec.employee_number = emprec.manager_employee_number
)
select * from CTE_Recursive order by employee_number

The output of the above query is as follows:

In the above query, before UNION ALL is the direct employee under manager with employee number 404, and after union all acts as an ‘iterator’ statement. Within CTE we used the same CTE, and it will run until it will get direct and indirect employees under the manager with employee number 404.

PySpark Code to Identify Hierarchies of Data

In PySpark, I am going to use Dataframe operations, List comprehension, and the iterative map function using Lambda expression to identify the hierarchies of data and get the output in the form of a List.

First, let's create the schema.

# Employee DF
schema = 'EMPLOYEE_NUMBER int, MANAGER_EMPLOYEE_NUMBER int'
employees_df = spark.createDataFrame(
[[404,None],
[1016,404],
[1003,404],
[1019,404],
[1010,1003],
[1004,1003],
[1001,1003],
[1012,1004],
[1002,1004],
[1015,1004],
[1008,1019],
[1006,1019],
[1014,1019],
[1011,1019]], schema=schema)

I have created a user-defined function (UDF) that will take a List as input, and return a complete set of List when iteration is completed.

def get_emp_recursive_udf(x) :
emp_list = []
direct_emp_list = list(employees_df
.select('EMPLOYEE_NUMBER')
.filter(employees_df.MANAGER_EMPLOYEE_NUMBER == x)
.toPandas()['EMPLOYEE_NUMBER'])
driterative_list = list(map(lambda y :
get_emp_recursive_udf(y), direct_emp_list))
final_lst = direct_emp_list + driterative_list
emp_list.append(final_lst)
return emp_list

Now, let's use the UDF. Once we get the output from the function then we will convert it into a well-formed two-dimensional List

#Enter the Input Employee List
input_emp_list = [404]
employee_list = list(map(lambda y : get_emp_recursive_udf(y),input_emp_list))
employee_list_str = str(employee_list)
employee_list_str = employee_list_str.replace("[","").replace("]","").replace("'","").replace(" ","").replace("None","").replace(',,',',')
employee_list_str_formatted = employee_list_str.replace(",,",",")
employee_list_str_formatted = employee_list_str_formatted.split(",")
list_of_employee = list(dict.fromkeys(employee_list_str_formatted))
final_list_of_employee = list(filter(None, list_of_employee))
final_list_of_employee

After running the complete PySpark code, below is the result set we get — a complete replica of the output we got in SQL CTE recursion query.

Great! Code is working fine as expected. Awesome!

Conclusion

In this blog, we were able to show how to convert simple Recursive CTE queries into equivalent PySpark code. With the help of this approach, PySpark users can also find the recursive elements just like the Recursive CTE approach in traditional relational databases. PySpark users can find the recursive elements from a Spark SQL Dataframe with a fine and easy-to-implement solution in an optimized time performance manner.

--

--

Akash Chaurasia
Globant
Writer for

Data Engineer, Big Data Enthusiast, Gadgets Freak and Tech Lover