Solving for concurrency in data engineering with Snowpark for Python

Problem Statement

Snowpark for Python, an incredibly price-performant open-source python library (pip | conda) built by Snowflake, is enabling Snowflake customers to adopt architecturally-simple data engineering pipelines on Snowflake’s managed cloud. Snowpark for Python removes the need to manage, optimize or troubleshoot clusters to process vast amounts of data and allows customers to focus on their data pipelines that drive business insights.

Concurrency is a big piece of how customers scale their data engineering pipelines. If 100 data engineering jobs come at the same time throughout the day, the processing engine should be smart enough to autoscale to multiple clusters and then spin down instantly when not in use without any idle time involved. This is what Snowflake’s multi-cluster warehouses solve for.

However, if 100 concurrent jobs come in at the same time and one of the transformations requires each of the 100 concurrent jobs to retrieve data from a billion row table, can we handle some optimizations in the code to efficiently scale the transformations within these 100 concurrent python jobs; and only rely on Snowflake’s multi-cluster warehouse functionality when needed ?

Yes, absolutely. This blog presents such a use case with Snowpark for Python. In this use case, all these 100 concurrent jobs initially perform a read from a billion row table, let’s call table A. However, since all 100 concurrent python jobs are all reading independently from this billion row table A, the overall data engineering process would take time and credits. Instead, if all these 100 concurrent jobs have some identifying pattern in common, we could let one of these 100 concurrent jobs perform the read from the billion row table A, while all other concurrent jobs with the same common pattern wait for the first incoming job to finish the work and then benefit from it making the overall process not only faster but much cheaper. This blog demonstrates how we can use locking amongst concurrent python jobs and graceful handling of locking errors to solve for efficient scaling of concurrent python jobs with Snowpark for Python.

Starting point: The initial pipeline

Figure 1 shows a representative example of what an initial pipeline could look like for all N concurrent python jobs. In Figure 1, N python concurrent jobs come in at the same time, all asking to process data from the billion row table A, along with other transformations on other tables. In other words, reading the data from table A is just one part of the pipeline for these concurrent python jobs. However, the common pattern observed amongst these concurrent python jobs could be that the read from the billion row table A is only based on a timestamp (timestamp chosen just as an example), although all these concurrent jobs could still perform transformations on other tables due to another different field (lets say contract type). The challenge here is that the time slices (aka timestamp ranges) of reading the data from billion row table A could be arbitrary. Hence, what could be done to efficiently allow all these concurrent python jobs to read data from billion row table A at different time slices, without depending upon Snowflake’s multi-cluster warehouse scaling.

Figure 1: Starting point: The initial pipeline

The solution: Enter Mutually Exclusive Lock (Mutex) and graceful error handling with Snowpark for Python

We could optimize the design in Figure 1 by tracking if the data had already been read from the billion row table A within a particular time-stamp range. The redesigned process would require routing of all incoming Snowpark for Python concurrent jobs into a tracking table, and if the data was already read from the huge table A for a particular timestamp range, simply return the results of that intermediate table. Since storage is cheaper than compute, this would be a cost-efficient and performant design. This redesigned implementation is shown in Figure 2.

Figure 2: Redesigned Snowpark for Python implementation design

In order to implement the design proposed in Figure 2, there are two challenges:

  1. How do we lock concurrent python jobs and have only one python job read data from table A for a particular timestamp range so that other concurrent python jobs requesting data for the same timestamp range could benefit from the first python job that acquired a lock ?
  2. If N becomes high, how do we gracefully handle the number of waiters exceeding the X statement limit in Snowflake and retry acquiring the lock without getting errors ?

Solution to Challenge #1: Locking concurrent Snowflake for Python jobs through using a table as a serializable mutex

Thanks to our Performance Solutions team, the solution is to use a dummy table to emulate the behavior of a mutually exclusive (mutex) lock, where the lock is guaranteed to be owned by exactly one session at a time. With N concurrent python jobs coming in at the same time, all the N python jobs would create N separate Snowpark for python sessions. Through the mutually exclusive (mutex) locking technique, we would just allow one of the N concurrent python sessions to begin a transaction, lock a dummy table acting as a serializable mutex and then do the read from the huge billion row table A. Once the first incoming python job is done reading from the huge table A, the lock on the dummy table acting as a serializable mutex would be released by closing the transaction.

The dummy table could have the following structure and not have any data. The dummy table structure is shown in Figure 3.

Figure 3: Dummy table to emulate the behavior of mutex

Additionally, a tracker table could be used to track the intermediate tables needed for a timestamp range from the billion row table A, as shown in Figure 4.

Figure 4: Tracker table

Using the dummy table shown in Figure 3 and a smart where clause “iff(dummy is NULL, FALSE, TRUE)” to trick the Snowflake optimizer, a python session would acquire a mutually exclusive lock as shown in Figure 5.

Figure 5: Locking logic

This mutex functionality is based on an honor-system where the session that controls the mutex is allowed to apply DMLs to the dummy table. When other python sessions request mutex, they will be blocked since session # 1 hasn’t closed the transaction via COMMIT or ROLLBACK. Once session # 1 closes the transaction, other python sessions can acquire the lock and continue. Please note that once a transaction is open through a DML statement, it is critical that locks are released when an exception or error is encountered to avoid having the system become unavailable due to an infinitely open “zombie” transaction.

The complete code demonstrating how the dummy table is used as a mutex through python is available in Section Locking and retry logic. The full source code to try this out is available in github here.

Solution to Challenge # 2: Gracefully handling the number of waiters exceeding X statement limit error

Tenacity python library can be used to retry acquiring the mutex lock on the dummy table after a fixed interval. If a concurrent Snowpark for Python session tries to acquire the lock and fails due to the number of waiters exceeding X statement limit error, this would result in a SnowparkSQLException and the retry logic would be called to retry capturing the lock again after the pre-defined fix interval. This is shown in Figure 6.

Figure 6: Retrying logic

The complete code demonstrating how the dummy table is used as a mutex through python is available in Section Locking and retry logic. The full source code to try this out is available in github here.

Conclusion

This blog shows an architectural pattern how we can solve for concurrency in data engineering workloads with Snowpark for Python price-performance. This would allow Snowflake customers to get even a better value for their money on concurrency use cases by combining forces of the Snowflake’s multi-cluster warehouses with Snowpark for Python.

Appendix: Complete code for redesigned implementation

This section contains the complete code for the redesigned implementation. The full source code to try this out is available in github here.

Configuration

The config for all N jobs was driven through a csv file with the template as follows:

“<CONTRACT TYPE>”,”<STARTING TIMESTAMP>”,”<ENDING TIMESTAMP>”

For example:

“A”, “2022–11–22 19:00:00”, “2022–11–22 20:00:00”

The template config file is available here in github.

Python Script for creating dummy table as serializing mutex and tracker table

This python script was responsible for initiating the environment by creating the dummy table, that would emulate the behavior of a serializing mutex and the tracker table that would be referred to by all concurrent Snowpark for Python jobs.

The template python script is available here in github.

Shell script for parallelizing python jobs

GNU parallel shell utility was used to call the python file (denoting a python job that created a separate snowpark for python session), which accepted the contract type, starting timestamp and ending timestamp as arguments and called the python file job.py with arguments.

The shell script is available here in github.

Python Job

The python file accepted contract type, starting timestamp and ending timestamp as arguments and executed business functionality through Snowpark for Python.

The template python file is available here in github.

Locking and retry logic

This python file contains the main code for locking the dummy table as a serializing mutex and retries if it came across a “max waiters” limit error.

The template python file is available here in github.

--

--