Migrating from PySpark to Snowpark Python Series — Part 1

Dated: June 2022

Snowpark enables developers, data scientists, ML engineers, and more to build data processing pipelines using code. Many people looking to gain the benefits of Snowpark may have existing pipelines built using Spark. Spark is a popular choice for big data processing today. PySpark DataFrame API integrates with Python and Python libraries for running these Spark workloads. This document will cover considerations and best practices when moving from PySpark to Snowpark.

In Nov 2021, Snowflake announced that it natively supports Python where It allows data engineers, data scientists and data developers to use their language of choice, and execute ML workflow and data apps faster and more securely, in a single platform without having to first move data out of Snowflake.

Snowpark for Python is a developer framework for Snowflake which provides Snowpark Dataframe API whose constructs are similar to that of Pyspark DataFrame API and Pandas DataFrame queries. Snowpark DataFrame API can efficiently distribute computation using Snowflake’s intelligent, elastic performance engine so you are never bound to the scale of a single node.

This article talks in detail on why do you need to migrate and the value of using Snowflake over spark.

In Part 1 of this article, you will learn how easily you can get started migrating your code written in Pyspark DataFrame APIs to Snowpark Python. Snowpark DataFrame APIs provide many data transformation functions which developers use while coding in Pyspark. Customers can use any IDE of their choice to write the Snowpark for Python code for building the big data processing workflow.

Below are a few high level steps that are performed in most data engineering workflows written in Pyspark:

  • Session Initialization
  • Creating a DataFrame
  • Listing Files to be ingested
  • Reading source files (csv, parquet, json)
  • Performing data transformation( GroupBy, Windows Function, Adding Column)
  • Writing to destination ( parquet, table)

All of the above mentioned steps can be performed using Snowpark DataFrame APIs with minimal changes to your Pyspark code.You can find the Snowpark for Python converted code for multiple Spark based notebooks here.

I want to thank Eda Johnson and Venkat Sekar for their contributions to this blog and the content.

Let’s get started…

Session Initialization

Snowpark session initialization is similar to Spark, but much simpler.

In Spark, once a Spark cluster is created a SparkSession instance is the entry point encapsulating all different contexts used in Spark including SQLContext, HiveContext and StreamingContextWhile creating a SparkSession, a SparkConf can be set to define Spark runtime config options. There are 2 types of Spark config options: 1) Deployment configuration, like “spark.driver.memory”, “spark.executor.instances” 2) Runtime configuration. Developers need to specify what runtime Spark config options are in the code. Deployment configuration changes require restarting the Spark cluster. Also, there are different ways to specify runtime Spark config options (e.g. SparkConf object that can be specified in SparkSession, in files, spark-submit command params, using config(), etc.) Properties set directly on the SparkConf take highest precedence, then flags passed to spark-submit or spark-shell, then options in the spark-defaults.conf file. Many times, this makes troubleshooting harder. A full list of Spark configuration options are listed here. Furthermore, SparkSession implementation in managed Spark platforms is not 100% identical to the implementation in open-source Spark products.In Snowpark Python, session creation is simple! We can create a dict object containing the parameters we use in connect() in the Snowflake Python Connector and initialize the session. All Snowflake authentication mechanisms (SSO, Federated Auth OAuth) can be leveraged to connect to your Snowflake account with the Snowpark API. Below snippet shows the code difference for the Session creation between PySpark and SnowPark Python.

Image by Author — Session Parameters

Creating a DataFrame

Similar to the Spark session in Pyspark, we will create a Snowpark session to perform any kind of operations in Snowpark Python. Below snippet shows the only change required is the Snowpark session variable. In Fact you can create a session variable with name spark and reuse it.

Image by Author — Create DataFrame

Listing Files to be ingested

Often while writing the code data engineers tend to list all or a subset of the files which they are planning to ingest. The most obvious choice is dbutils in the case of managed Spark offerings. In Snowpark, we can use session.sql() function to list the files in the external stage as shown in the below snippet.

Image by Author — Listing Files

Reading source files (csv, parquet, json)

Reading data from CSV and Parquet files in Snowpark Python is very similar to that of PySpark. Snowflake supports automatically detecting the schema in a set of staged semi-structured data files and retrieving the column definitions.This feature is currently limited to Apache Parquet, Apache Avro, and ORC files. Other file formats like CSV and JSON will be supported soon for schema inference.

https://docs.snowflake.com/en/user-guide/data-load-overview.html#detection-of-column-definitions-in-staged-semi-structured-data-files

Reading CSV File

You can see from the below snippet that there are only minor changes to the parameter names used in the options. You can append with multiple options (e.g. session.read.schema().option().option()) or you can include all the option values in a dictionary as shown below. In Snowpark, schema for the csv files has to be specified while reading the csv files from the stage.

Image by Author — Reading CSV Files

Reading Parquet Files

Syntax for reading parquet files in Snowpark is very similar to PySpark. Snowpark infers the schema without explicitly specifying.

Image by Author — Reading Parquet Files

Reading JSON Files

In Snowpark the data in JSON files are stored in a single column of type variant. We can use all the JSON parsing functions to read simple json data as well as complex nested data.

Performing Data Transformations

Snowpark DataFrame API provides many data transformation functions which have similar syntax as PySpark. In this section you will see various types of transformations like Window function, aggregate functions that can be used in Snowpark.

Windows Function

A window function operates on a group (“window”) of related rows. There is no code change required while working with the majority of the windows functions. You can see from the below snippet, that the code works as is between PySpark and Snowpark Python for ROW_NUMBER and for the other functions.

ROW NUMBER

Image by Author — Windows Row Number Function

RANK and DENSE_RANK

Image by Author — Windows Rank and Dense Rank Function

ROWS BETWEEN

Image by Author — Windows Functions for getting all previous rows from current row

Lead and Lag

Image by Author — Lead and Lag Functions

GroupBy and Aggregation

In Snowpark if we are using multiple columns in groupBy or aggregation functions we should enclose it within a list as shown in the below snippet.

Image by Author — GroupBy and Aggregation Function

Writing to Destination ( parquet, table)

In Snowpark we need to use the copy_into_location function which takes the destination location along with the file format like CSV, JSON, Parquet to write the output to a specific location and in a required format.You can also specify the partition as well if the data needs to be partitioned by a column value.

Writing DataFrame to Parquet without partitions

Here we are not specifying any partition option to write the parquet files into different folders.

Image by Author — Writing to Parquet Format

Writing DataFrame to Parquet with partitions

We are partitioning the data while writing to parquet based on two columns C_MKTSEGMENT and C_NATIONKEY.

Image by Author — Writing to Partitioned Parquet Files

Writing DataFrame to Table

Snowpark provides DataFrameWriter objects which provides methods to write the DataFrame to Snowflake tables directly. Below snippet shows the minor differences between writing to delta tables in Spark vs writing to Snowflake tables. We don’t have to provide any partitions while writing to Snowflake as Snowflake stores the data as micro partitions by default.

Image by Author — Writing to Tables

Snowpark and Pyspark Function Parity

In this section we will cover in detail regarding function parity between PySpark DataFrame API and Snowpark for Python DataFrame APIs .As this is a multi part series article, in the first part we will be listing the Session class function parity between PySpark and Snowpark for Python.

Session Class Functions:

Notable Session level functions that have parity are listed in the table below:

Image by Author — Session Level Functions Parity List

Notable Session level functions that are in Snowpark only are listed below:

Image by Author —Session level Functions only in Snowpark

See the documentation for all Session functions in Snowpark.

In Part 2 of this article series, we will cover all the DataFrame API function parity between PySpark and Snowpark Python and go over how User Defined Functions are different between Spark and Snowpark as well as recommendations on how to migrate them to Snowpark.

PS: Snowpark Python is going to be in Public Preview in June 2022 and we highly recommend trying out Snowpark Python with your existing Spark workloads.

Learn more about Migrating from Apache Spark in the upcoming Snowflake Summit. Below is the session detail for the topic.

Modernize Data Engineering with Snowpark — Migrating from Apache Spark

Conclusion

In this part of the series, we learned how to use various functions in the Snowpark Python DataFrame API . We also learned function parity for Session functions between Pyspark APIs and Snowpark Python APIs.

--

--