Enabling Scalable Machine Learning in Snowflake using SQL, Python and Bodo

There is no doubt that SQL is the language of data engineering, while Python is the language of data science. For this reason, most database engines have begun to supplement their SQL capabilities by offering Python query support to allow their more data science inclined users to embed advanced statistics or machine learning code into query pipelines or data visualization tools like Tableau. Snowflake has been bucking that trend — until now.

Image by Simon from Pixabay

At the time of writing, Snowflake only supports SQL and JavaScript functions, with Python UDFs nowhere in sight anytime soon. Therefore, if we want to leverage the agility and capabilities of Python, we’ll have to resort to using Snowflake’s External Functions feature. This moves code execution outside of Snowflake warehouses and executes code on a remote server. This post describes how we can leverage external web services that host functions as elastic lambdas, and execute arbitrary code — including Python––on external servers.

External Function architecture: the client program calls an external function that is routed by Snowflake to an HTTPS proxy service. The proxy service calls a function on a remote service in a language native thereto. The response is then returned, via the HTTPS proxy service, to the Snowflake data warehouse. Image courtesy of Snowflake, Inc.

In this post, I will discuss, step by step, how you can set up a Python ML execution pipeline in Snowflake using TabPy and then show how Bodo’s parallelized optimizing Python compiler makes it scalable and improves performance drastically, and finally how to call parallelized Python code from reporting tools such as Tableau.

Python code execution in Snowflake

To execute Python code from Snowflake, we will need three key components:

  • StarSnow, our own HTTP request library to call external web services,
  • TabPy, a generic Python evaluation service, built by Tableau Software,
  • Some Snowflake UDFs that hide the complexity of calling StarSnow/TabPy endpoints.

The execution flow is as follows.

  1. Users invoke their Python code from their SQL clients using theEXEC_PYTHON stored function.
  2. The function is parsed by Snowflake, which calls the external TabPy server using the StarSnow External Function.
  3. The StarSnow External Function ingests the arguments and calls the corresponding StarSnow function running as an elastic lambda in AWS or Azure.
  4. The elastic lambda passes its arguments to TabPy, which evaluates the Python code and returns the results.

The following figure illustrates the execution flow from calling the SQL on the client-side Snowflake data warehouse to actual code execution in TabPy.

Client program executes a stored external function with the code to be executed. The evaluation happens in an isolated TabPy Server. Image by the author.

Deployment

The first step in deploying this solution is to install StarSnow. This component is responsible for facilitating the web requests between Snowflake and TabPy. The installation instructions are laid out in our recent article on the Starschema blog.

Next, we need to install TabPy on a server that is accessible to the StarSnow lambda function. Installing and running our TabPy code is as simple as pip install:

pip install tabpy
tabpy

The TabPy server creates an endpoint called /evaluate that executes a code block, replacing named parameters with their provided values. The API expects a POST body as JSON dictionary with two elements:

  • script: contains one or more lines of code or instructions for TabPy to execute.
  • data: contains the parameter values passed to the code. These values are key-value pairs, following a specific convention for key names (_arg1, _arg2, etc.). These can be referenced from the script as values.

Therfore, invoking /evaluate with the following request:

POST /evaluate HTTP/1.1
Host: localhost:9004
Accept: application/json
{"data": {"_arg1": 1, "_arg2": 2}, "script": "return _arg1+_arg2"}

will yield the return value 3.

Now, the only missing piece is the Snowflake UDF that translates query parameters to a TabPy call:

Make sure you replace <your_tabpy_server> with the URL of your TabPy server set up in the previous step. If all is well, we ought to be able to call our first Python program from Snowflake:

tfoldi#WH@TEST.PUBLIC>select EXEC_PYTHON('return sum(_arg1)', array_construct(1,2,3) ):data ret;
+-----+
| RET |
|-----|
| 6 |
+-----+
1 Row(s) produced. Time Elapsed: 1.413s

Congratulations — you have just unlocked the power of Python inside your Snowflake database! We can try more complex examples, such as sentiment analysis using TabPy’s built-in NLTK-based sentiment analyzer:

create table sentiment_test (text string);
insert into sentiment_test values ('I just like the stock');
insert into sentiment_test values ('ICOs are just scam');
select text, EXEC_PYTHON('return tabpy.query("Sentiment Analysis", _arg1)["response"]', text):data sentiment from sentiment_test;
Positive texts yield a positive sentiment score, while negatives return negative

Previously, you had to use external services and ETL tools to bring machine learning or advanced analytics into your Snowflake data pipelines. Well, not anymore!

Of course, this opens new and exciting possibilities for supercharging our Snowflake data pipelines. Let’s see a real world use case where a popular machine learning algorithm is implemented and accelerated using Bodo.

Real-world use case: logistic regression in Snowflake using Bodo

Bodo includes a highly efficient optimizing compiler that supports massive parallelism via MPI. Bodo allows you to easily parallelize your Python code on multiple nodes — write your code, decorate your functions with the bodo.jit() decorator, and Bodo handles the rest for you. Bodo Community Edition, which supports up to 4 cores, is available via Anaconda (see installation instructions here). For parallelizable computational tasks, Bodo gives developers a significant edge, reducing execution time by 2–3 orders of magnitude (and sometimes even more).

Execution flow for invoking parallel code in Bodo from Snowflake via TabPy: the client Python-in-SQL script is passed onto the StarSnow lambda, which in turn passes it to the TabPy server. This is then passed to the IPyParallel cluster linked by MPI for parallel execution of the linear regression function. Results are returned the same way. Computation occurs in the MPI/IPyParallel cluster, but the results seamlessly integrate with the source data in Snowflake.

Comparing the performance of Pure Python versus Python with Bodo

To see for ourselves that Bodo is indeed faster, let’s do some basic benchmarks. I built a simple Docker image for running Bodo Community Edition — preloaded with typical sample use cases (TCP-H queries, Monte Carlo simulation, linear and logistic regression, etc).

To generate test data for our logistic regression use case, start the container and execute logistic_regression_datagen.py:

$ docker run -ti tfoldi/bodo-ce
# cd /Bodo-examples/data/
# conda activate Bodo
# python ./logistic_regression_datagen.py

To test the logistic regression algorithm with 10 variables and 20,000,000 rows with and without Bodo, execute the following commands:

cd /Bodo-examples/examples/miscellaneousNUMBA_DISABLE_JIT=1 python ./logistic_regression.py \ 
--file ../../data/lr.hdf5
python ./logistic_regression.py \
--file ../../data/lr.hdf5
mpiexec -n 4 python ./logistic_regression.py \
--file ../../data/lr.hdf5

The results are pretty impressive: simply switching to Bodo from Python (without changing a single line of code) reduced the execution time by 30 percent. By adding more processes, the performance gains scaled linearly. This is the true power behind the Bodo engine: you can easily scale up NumPy and Pandas workloads to thousands of CPUs, without adopting new APIs or extensively rewriting code.

Python Parallel Execution in a Service: IPyParallel

Now that we know that we’d like to use Bodo engine instead of pure Python to achieve better performance and scalability, let’s see how to integrate it with Snowflake and TabPy.

The first building block is IPyParallel, the standard parallel computation engine for Python. Since our computations will be handled by an entire cluster (potentially on multiple nodes), we need to separate the control flow (such as handling incoming web requests) from the computations themselves. IPyParallel allows this by creating ipclusters––essentially, Python servers executed as MPI processes.

Separating computation and control flow with IPyParallel

To split our logistic regression into a control python program and use an IPyParallel cluster for computation, we will first need an IPyParallel cluster, of course:

conda install ipyparallel -c conda-forge
ipcluster start -n 4 # optionally add --log-level=DEBUG

Then, we can execute our program against the cluster (using the default profile):

The dview.execute() calls invoke our MPI Python cluster and execute our Bodo functions in parallel, on multiple processes, and potentially on multiple hosts. Note that the simple @bodo.jit decorator that we added is enough, we don’t have to rewrite the Python code ourselves or have any knowledge of HPC or distributed systems to parallelize it (but having that knowledge couldn’t hurt either).

Deploying our code on TabPy Server

TabPy server also has a neat feature: you can deploy your functions directly to a running TabPy service and call it as a REST API. The way this works is similar to Spark’s approach: when invoking tabpy_client.deploy(), our code is serialized with cloudpickle, uploaded to the TabPy server and registered as a Python callable function.

The code above will upload our lr_snowflake code, accelerated with Bodo and complete with all dependencies, to the TabPy server, then make it available as the native Python functiontabpy.query(“lr_snowflake”).

It’s finally time to put everything together. The full architecture will look something like this:

Snowflake calls TabPy using StarSnow, which executes logistic regression in an IPyParallel Cluster. The Logistic Regression algorithm reads directly from Snowflake to avoid unnecessary serialization on web layers.

I built a Docker container to illustrate how this entire architecture is packaged together. You can find the Docker container here. To test the whole concept, start the container:

export SNOWFLAKE_URL=snowflake://u:p@instance/db/schema
docker run -e SNOWFLAKE_URL -p 8080:8080 -ti tfoldi/bodo-ipp-tabpy

Then test it with the following curl command:

$ curl http://localhost:8080/query/lr_snowflake -X POST --data '{"data": {"args":["lr_points","response"]}}'
{"response": [-0.30935179667370605, -7.46968667252113, 5.878480070480977, 3.200441375279214, -0.05744343889602743, 8.712203640829966, 5.859306090215424, 8.109669918388082, -16.137486681565726, 17.979411726558645], "version": 1, "model": "lr_snowflake", "uuid": "a4acc81c-9705-4cf3-97a7-932460a5b0c9"}%

It works like a charm! Now let’s invoke it from Snowflake:

select EXEC_PYTHON('return tabpy.query("lr_snowflake",_arg1)["response"]', array_construct('lr_points','response') );
Call EXEC_PYTHON from Snowflake to call our deployed Bodo function on TabPy server. The result is the impact of each variable on the odds ratio of the observed event of interest.

There is the result of our efforts: we have executed network and IO enabled Python code from Snowflake SQL — in the cloud or on-premise — to add missing data science and machine learning capabilities to Snowflake.

Integration with BI/data discovery tools

One of the powers of SQL is the seamless compatibility with BI tools, without any extra integration work. To visualize the input attributes and the coefficients from the logistic regression we can use Tableau, PowerBI, Looker, or any ordinary visualization tool. While some of these tools have Python integration, their capabilities are often limited (like Tableau is limited to execute python codes to return a single measurement, as a window calculation). Also, moving data back and forth between visualization tools, databases and Python interpreters comes with a performance penalty. Keeping everything at the lowest level and executing functions as close to the data as possible is the way to go.

Execute Python code dynamically with Tableau live Snowflake connection

By using Custom SQLs or views, we can easily expose python calculations: visualize our inputs and results, or recalculate our models in real-time (what-if scenarios).

Snowflake tables and Python models are executed in a single, live connection

Looking ahead

While it might seem overwhelming at first read, executing simple Python scripts as external functions is quite easy in practice. The Python data science ecosystem is incredibly rich: any algorithm you can think of, from simple regressions through cross-validation to cutting-edge machine learning technologies based on neural networks in Tensorflow or PyTorch. With a high-performance computing (HPC) framework like Bodo, the execution of numerical operations can be made significantly more efficient, without having to provision clusters or rewrite code manually. Through StarSnow and TabPy, you can seamlessly integrate all these powerful features, without leaving Snowflake.

Source code for the Docker containers and for the logistic regression example can be found here: https://github.com/tfoldi/bodo-ce.

Tamas is co-founder and CTO of data services firm Starschema where he leads the Starschema technical team to deliver results for the most innovative enterprises