How to Integrate PySpark, Snowflake, Azure, and Jupyter: Part 3

Doug Eisenstein
4 min readJun 5, 2020

--

Connect PySpark to Snowflake ❄️

Background

In part three of this three-part series, in Part 1 we learned about PySpark, Snowflake, Azure, and Jupyter Notebook, then in Part 2 we launched a PySpark cluster in Azure on HDInsight. In part three, we’ll learn how to connect to a Snowflake instance from PySpark an run a few exploratory queries to get you acclimated.

Step 1: Get me some data!

Snowflake provides a TPC-H dataset that we’ll use for some exploration. Let’s copy it into a separate database by running a CTAS:

create table TPCH_SF10000_ORDERS as (select * from "SNOWFLAKE_SAMPLE_DATA"."TPCH_SF10000"."ORDERS");

Step 2: Connect PySpark to Snowflake

It’s wicked easy to connect from PySpark to Snowflake. There is one ❗️warning, and it’s that the versions must be 100% compatible. Please use the versions I’ve used here as a starting point.

Fire open Jupyter Notebook. Let’s import the Snowflake Connector:

%%configure -f
{ "conf": {
"spark.jars.packages": "net.snowflake:spark-snowflake_2.11:2.5.1-spark_2.3"}
}

Create a Spark Session:

from pyspark.sql import SparkSessionspark = (
SparkSession.builder
.appName('quickstart')
.getOrCreate()
)

Enable Snowflake pushdown, my 💛:

spark._jvm.net.snowflake.spark.snowflake.SnowflakeConnectorUtils.enablePushdownSession(spark._jvm.org.apache.spark.sql.SparkSession.builder().getOrCreate())

Now let’s enter our connection details. I’ve used environment variables, with defaults.

import osSNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"SNOWFLAKE_OPTIONS = {
'sfURL': os.environ.get("SNOWFLAKE_URL", "xz88991.east-us-2.azure.snowflakecomputing.com"),
'sfAccount': os.environ.get("SNOWFLAKE_ACCOUNT", "xz88991"),
'sfUser': os.environ.get("SNOWFLAKE_USER", "dougeisenstein"),
'sfPassword': os.environ.get("SNOWFLAKE_PASSWORD", "eA@!$W7%Z"),
'sfDatabase': os.environ.get("SNOWFLAKE_DATABASE", "QUICKSTART"),
'sfSchema': os.environ.get("SNOWFLAKE_SCHEMA", "TPCH_SF100"),
'sfWarehouse': os.environ.get("SNOWFLAKE_WAREHOUSE", "COMPUTE_WH_LARGE"),
'sfRole': os.environ.get("SNOWFLAKE_ROLE", "accountadmin")
}

🥁 drum roll please, let’s get some data into a Spark frame:

data_frame = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
.options(**SNOWFLAKE_OPTIONS) \
.option('dbtable', "ORDERS") \
.load()

Let’s show the first 5 rows:

data_frame.show(5)

You’ll see a beautiful data set like this:

Step 3: Let’s explore the push-down

Many years ago, in a galaxy, far, far, away, when MPP systems were coming to market, I worked on Netezza and Greenplum, anyone remember those? These companies, and Snowflake, understand that performing the analytics, closest to the data, gives you the best performance, and that’s what I love about the analytical function push-down.

Let’s first take that that feature out for a spin. We’re going to run a group by and count function. The query below gets the count of orders across all customers by order date and then orders the result set in descending order.

data_frame = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
.options(**SNOWFLAKE_OPTIONS) \
.option('query', "select o_orderdate, count(*) from TPCH_SF10000_ORDERS group by o_orderdate order by o_orderdate desc") \
.option('partition_size_in_mb', 60) \
.option('use_cached_result', 'true') \
.option('use_copy_unload', 'false') \
.option('parallelism', '30') \
.option('sfCompress', 'on') \
.load()

The query above returns this result set:

+-----------+----------+
|O_ORDERDATE|"COUNT(*)"|
+-----------+----------+
| 1998-01-27| 6234107|
| 1998-01-26| 6234355|
| 1998-01-25| 6234326|
| 1998-01-24| 6234384|
| 1998-01-23| 6234281|
+-----------+----------+

How do we know what the query looks like when executed inside of Snowflake? Simple. Execute this query to find that out.

select * from snowflake.account_usage.query_history order by start_time desc;

We can see, Snowflake, was smart, it indeed did evaluate the query, and determined that it could execute it as straight SQL, by wrapping it in an outer select statement, and returning a 1=0, so it know that the query can be evaluated correctly.

select * from (select o_orderdate, count(*) from TPCH_SF10000_ORDERS group by o_orderdate order by o_orderdate desc) where 1 = 0

You can also run getLastSelect which will do the same for you, but I like to see every single command that is executed, you learn more that way.

spark._jvm.net.snowflake.spark.snowflake.Utils.getLastSelect()

Conclusion

You successfully ✔️were able to launch a PySpark cluster, customize your Python packages, connect to Snowflake and issue a table and query requests into PySpark pandas functions. The next steps for you to explore on your own is Pandas and PySpark Data Frame integration, in particular using Pandas UDF’s. This will allow you to run the same functions on Pandas that you can on PySpark Data Frames, sweet!

About

Have any questions? Reach out to me on LinkedIn.

--

--

Doug Eisenstein

Doug is a entrepreneur, tech leader, writer, and innovator🔥.