Enabling Scalable Machine Learning in Snowflake using SQL, Python and Bodo
There is no doubt that SQL is the language of data engineering, while Python is the language of data science. For this reason, most database engines have begun to supplement their SQL capabilities by offering Python query support to allow their more data science inclined users to embed advanced statistics or machine learning code into query pipelines or data visualization tools like Tableau. Snowflake has been bucking that trend — until now.
At the time of writing, Snowflake only supports SQL and JavaScript functions, with Python UDFs nowhere in sight anytime soon. Therefore, if we want to leverage the agility and capabilities of Python, we’ll have to resort to using Snowflake’s External Functions feature. This moves code execution outside of Snowflake warehouses and executes code on a remote server. This post describes how we can leverage external web services that host functions as elastic lambdas, and execute arbitrary code — including Python––on external servers.
In this post, I will discuss, step by step, how you can set up a Python ML execution pipeline in Snowflake using TabPy and then show how Bodo’s parallelized optimizing Python compiler makes it scalable and improves performance drastically, and finally how to call parallelized Python code from reporting tools such as Tableau.
Python code execution in Snowflake
To execute Python code from Snowflake, we will need three key components:
- StarSnow, our own HTTP request library to call external web services,
- TabPy, a generic Python evaluation service, built by Tableau Software,
- Some Snowflake UDFs that hide the complexity of calling StarSnow/TabPy endpoints.
The execution flow is as follows.
- Users invoke their Python code from their SQL clients using the
EXEC_PYTHON
stored function. - The function is parsed by Snowflake, which calls the external TabPy server using the StarSnow External Function.
- The StarSnow External Function ingests the arguments and calls the corresponding StarSnow function running as an elastic lambda in AWS or Azure.
- The elastic lambda passes its arguments to TabPy, which evaluates the Python code and returns the results.
The following figure illustrates the execution flow from calling the SQL on the client-side Snowflake data warehouse to actual code execution in TabPy.
Deployment
The first step in deploying this solution is to install StarSnow. This component is responsible for facilitating the web requests between Snowflake and TabPy. The installation instructions are laid out in our recent article on the Starschema blog.
Next, we need to install TabPy on a server that is accessible to the StarSnow lambda function. Installing and running our TabPy code is as simple as pip install
:
pip install tabpy
tabpy
The TabPy server creates an endpoint called /evaluate
that executes a code block, replacing named parameters with their provided values. The API expects a POST body as JSON dictionary with two elements:
- script: contains one or more lines of code or instructions for TabPy to execute.
- data: contains the parameter values passed to the code. These values are key-value pairs, following a specific convention for key names (
_arg1
,_arg2
, etc.). These can be referenced from the script as values.
Therfore, invoking /evaluate
with the following request:
POST /evaluate HTTP/1.1
Host: localhost:9004
Accept: application/json{"data": {"_arg1": 1, "_arg2": 2}, "script": "return _arg1+_arg2"}
will yield the return value 3
.
Now, the only missing piece is the Snowflake UDF that translates query parameters to a TabPy call:
Make sure you replace <your_tabpy_server>
with the URL of your TabPy server set up in the previous step. If all is well, we ought to be able to call our first Python program from Snowflake:
tfoldi#WH@TEST.PUBLIC>select EXEC_PYTHON('return sum(_arg1)', array_construct(1,2,3) ):data ret;
+-----+
| RET |
|-----|
| 6 |
+-----+
1 Row(s) produced. Time Elapsed: 1.413s
Congratulations — you have just unlocked the power of Python inside your Snowflake database! We can try more complex examples, such as sentiment analysis using TabPy’s built-in NLTK-based sentiment analyzer:
create table sentiment_test (text string);
insert into sentiment_test values ('I just like the stock');
insert into sentiment_test values ('ICOs are just scam');select text, EXEC_PYTHON('return tabpy.query("Sentiment Analysis", _arg1)["response"]', text):data sentiment from sentiment_test;
Previously, you had to use external services and ETL tools to bring machine learning or advanced analytics into your Snowflake data pipelines. Well, not anymore!
Of course, this opens new and exciting possibilities for supercharging our Snowflake data pipelines. Let’s see a real world use case where a popular machine learning algorithm is implemented and accelerated using Bodo.
Real-world use case: logistic regression in Snowflake using Bodo
Bodo includes a highly efficient optimizing compiler that supports massive parallelism via MPI. Bodo allows you to easily parallelize your Python code on multiple nodes — write your code, decorate your functions with the bodo.jit()
decorator, and Bodo handles the rest for you. Bodo Community Edition, which supports up to 4 cores, is available via Anaconda (see installation instructions here). For parallelizable computational tasks, Bodo gives developers a significant edge, reducing execution time by 2–3 orders of magnitude (and sometimes even more).
Comparing the performance of Pure Python versus Python with Bodo
To see for ourselves that Bodo is indeed faster, let’s do some basic benchmarks. I built a simple Docker image for running Bodo Community Edition — preloaded with typical sample use cases (TCP-H queries, Monte Carlo simulation, linear and logistic regression, etc).
To generate test data for our logistic regression use case, start the container and execute logistic_regression_datagen.py
:
$ docker run -ti tfoldi/bodo-ce
# cd /Bodo-examples/data/
# conda activate Bodo
# python ./logistic_regression_datagen.py
To test the logistic regression algorithm with 10 variables and 20,000,000 rows with and without Bodo, execute the following commands:
cd /Bodo-examples/examples/miscellaneousNUMBA_DISABLE_JIT=1 python ./logistic_regression.py \
--file ../../data/lr.hdf5python ./logistic_regression.py \
--file ../../data/lr.hdf5mpiexec -n 4 python ./logistic_regression.py \
--file ../../data/lr.hdf5
The results are pretty impressive: simply switching to Bodo from Python (without changing a single line of code) reduced the execution time by 30 percent. By adding more processes, the performance gains scaled linearly. This is the true power behind the Bodo engine: you can easily scale up NumPy and Pandas workloads to thousands of CPUs, without adopting new APIs or extensively rewriting code.
Python Parallel Execution in a Service: IPyParallel
Now that we know that we’d like to use Bodo engine instead of pure Python to achieve better performance and scalability, let’s see how to integrate it with Snowflake and TabPy.
The first building block is IPyParallel, the standard parallel computation engine for Python. Since our computations will be handled by an entire cluster (potentially on multiple nodes), we need to separate the control flow (such as handling incoming web requests) from the computations themselves. IPyParallel allows this by creating ipcluster
s––essentially, Python servers executed as MPI processes.
To split our logistic regression into a control python program and use an IPyParallel cluster for computation, we will first need an IPyParallel cluster, of course:
conda install ipyparallel -c conda-forge
ipcluster start -n 4 # optionally add --log-level=DEBUG
Then, we can execute our program against the cluster (using the default profile):
The dview.execute()
calls invoke our MPI Python cluster and execute our Bodo functions in parallel, on multiple processes, and potentially on multiple hosts. Note that the simple @bodo.jit
decorator that we added is enough, we don’t have to rewrite the Python code ourselves or have any knowledge of HPC or distributed systems to parallelize it (but having that knowledge couldn’t hurt either).
Deploying our code on TabPy Server
TabPy server also has a neat feature: you can deploy your functions directly to a running TabPy service and call it as a REST API. The way this works is similar to Spark’s approach: when invoking tabpy_client.deploy()
, our code is serialized with cloudpickle
, uploaded to the TabPy server and registered as a Python callable function.
The code above will upload our lr_snowflake
code, accelerated with Bodo and complete with all dependencies, to the TabPy server, then make it available as the native Python functiontabpy.query(“lr_snowflake”)
.
It’s finally time to put everything together. The full architecture will look something like this:
I built a Docker container to illustrate how this entire architecture is packaged together. You can find the Docker container here. To test the whole concept, start the container:
export SNOWFLAKE_URL=snowflake://u:p@instance/db/schema
docker run -e SNOWFLAKE_URL -p 8080:8080 -ti tfoldi/bodo-ipp-tabpy
Then test it with the following curl
command:
$ curl http://localhost:8080/query/lr_snowflake -X POST --data '{"data": {"args":["lr_points","response"]}}'
{"response": [-0.30935179667370605, -7.46968667252113, 5.878480070480977, 3.200441375279214, -0.05744343889602743, 8.712203640829966, 5.859306090215424, 8.109669918388082, -16.137486681565726, 17.979411726558645], "version": 1, "model": "lr_snowflake", "uuid": "a4acc81c-9705-4cf3-97a7-932460a5b0c9"}%
It works like a charm! Now let’s invoke it from Snowflake:
select EXEC_PYTHON('return tabpy.query("lr_snowflake",_arg1)["response"]', array_construct('lr_points','response') );
There is the result of our efforts: we have executed network and IO enabled Python code from Snowflake SQL — in the cloud or on-premise — to add missing data science and machine learning capabilities to Snowflake.
Integration with BI/data discovery tools
One of the powers of SQL is the seamless compatibility with BI tools, without any extra integration work. To visualize the input attributes and the coefficients from the logistic regression we can use Tableau, PowerBI, Looker, or any ordinary visualization tool. While some of these tools have Python integration, their capabilities are often limited (like Tableau is limited to execute python codes to return a single measurement, as a window calculation). Also, moving data back and forth between visualization tools, databases and Python interpreters comes with a performance penalty. Keeping everything at the lowest level and executing functions as close to the data as possible is the way to go.
By using Custom SQLs or views, we can easily expose python calculations: visualize our inputs and results, or recalculate our models in real-time (what-if scenarios).
Looking ahead
While it might seem overwhelming at first read, executing simple Python scripts as external functions is quite easy in practice. The Python data science ecosystem is incredibly rich: any algorithm you can think of, from simple regressions through cross-validation to cutting-edge machine learning technologies based on neural networks in Tensorflow or PyTorch. With a high-performance computing (HPC) framework like Bodo, the execution of numerical operations can be made significantly more efficient, without having to provision clusters or rewrite code manually. Through StarSnow and TabPy, you can seamlessly integrate all these powerful features, without leaving Snowflake.
Source code for the Docker containers and for the logistic regression example can be found here: https://github.com/tfoldi/bodo-ce.