Getting Started with Snowpark Scala with IntelliJ: Guide for Spark Scala Developers

Date : 16-Aug-2023

Photo by davide ragusa on Unsplash

Introduction

The preferred Integrated Development Environment (IDE) within the Scala developer community, particularly among those working with Spark and Snowpark, tends to be IntelliJ. Many Spark Scala developers who are actively migrating their Spark workloads to Snowpark opt for IntelliJ as their go-to IDE. This preference leads to numerous engaging discussions with developers regarding the adaptation of certain tasks from Spark’s Scala to Snowpark’s environment within IntelliJ.

One of these engaging discussion revolves around the utilization of IntelliJ for Snowpark Scala application development. Upon the completion of the development phase, a key question arises, how can the code be executed on the Snowflake, particularly in an orchestrated manner.

In this blog, we will take a deep dive into an extensive guide that covers the step-by-step process of running Snowpark Scala code through the IntelliJ IDE. Our approach will encompass steps such as creating a JAR file for the project and subsequently create a Scala Stored Procedure which imports the JAR file created which can be executed on Snowflake. This blog post serves as a valuable complement to the existing quickstart guide.

Please note - as of writing this blog post, the Snowpark API is supported with Scala 2.12 (specifically, version 2.12.9 and later 2.12.x versions). Please refer the below link for Snowpark Scala prerequisites.

Setting up Sowpark Scala Project in IntelliJ

Once you’ve initialized a new Scala project, make sure to include the following library dependencies in the build.sbt file.

libraryDependencies += "com.snowflake" % "snowpark" % "1.7.0"

If you have any other dependencies you can include them as well. For example, I am adding another library named Snowpark extensions and including that dependency in the build file as show in the above screenshot.

You should always look into the maven central repository to identify the latest version of Snowpark which is available. As of writing this blog, the latest version is 1.8.0.

https://central.sonatype.com/search?q=snowpark

For the building a small application using Snowpark Scala, I’ve created a scenario wherein we generate DeviceIOT data, subsequently leveraging this data to execute a range of transformations including usage of UDFs. The complete data trasformations operation is in a separate Scala class, residing in the file named DataProcessing.scala. Below is an overview of the project’s architecture. To execute the program, the entry point is RunSession.scala, which houses the main method.

Snowpark Scala Application Project structure

Below is the code for each of the Scala class created as part of the project. When you run RunSession.scala it will invoke the run function which is in another Scala class(IOTDataAnalysisBase).

// This is the code for RunSession.scala. Use this to just call the 
// IOTDataAnalysisBase which has run function which is the entry point used while running it from Snowflake Scala Stored Proc


package com.iot.snowpark
import com.snowflake.snowpark.{Session,FileOperation}
object RunSession extends App {

val session = Session.builder.configFile("src/main/profile_properties.txt").create
// This will generate, transform and load the data into Snowflake table
private val result = IOTDataAnalysisBase.run(session)
println(result)

}

IOTDataAnalysisBase.scala file will be invoking the methods from the Class DataProcessing to generate data , perform transformations and load the the transformed data into Snowflake tables. The run function instantiates an object for the the class DataProcessing and performs various data transformation tasks.

// This is the code for IOTDataAnalysisBase.scala

package com.iot.snowpark

import com.snowflake.snowpark._
import com.snowflake.snowpark.functions._
import com.snowflake.snowpark.types._

import scala.util.Random

object IOTDataAnalysisBase {

def run(session: com.snowflake.snowpark.Session): String = {

// Creating the DataProcessor object
val dp= new DataProcessing(session)

// Creating the dataframe with required fields
val iot_df= dp.loadData(session)

// Transform the data

val transform_df:DataFrame= dp.processData(iot_df)

// Write the transformed DF to Snowflake table

PersistData.writeTable(transform_df,"iot_transformed_table")

return "Loaded"

}

}

The initial argument, which should be a Session, is necessary to execute Scala code on the server side. In the following section, we’ll delve into the process of utilizing the JAR file and invoking the run function from the IOTDataAnalysisBase object within Snowflake Scala Stored Procedure.

// Code for DataProcessing.scala

package com.iot.snowpark

import com.snowflake.snowpark._
import com.snowflake.snowpark.functions.{array_agg, listagg}
import com.snowflake.snowpark.functions._
import com.snowflake.snowpark.types._

import scala.collection.mutable.ListBuffer
import scala.language.implicitConversions
import scala.util.Random

import org.slf4j.Logger

object PersistData{
def writeTable(df:DataFrame, tblname:String):Unit =
df.write.mode("overwrite").saveAsTable(s"$tblname")

}


class DataProcessing(val session: com.snowflake.snowpark.Session) {

// Generates the data and transforms it
def processData(iot_df:DataFrame): DataFrame = {
// Your data processing logic here
// val iot_df: DataFrame = df
val iot_df_ts = iot_df.withColumn("newTS", col("timestamp").cast(TimestampType))
// Filter records where humidity is greater than 70
val filteredDF = iot_df_ts.filter(col("humidity").cast(IntegerType) > 70)

// Adding new derived columns and performing aggregations on device id, year, month and day
val iot_transformed_df = iot_df_ts
.filter(col("deviceid").in(Seq("board_1", "board_11", "board_12", "light_2", "light_21", "light_22", "light_23", "light_blue_33", "board_dark_4", "board_45", "device_5")))
.filter(col("temperature").gt(lit(3)))
.filter(col("timestamp").is_not_null)
.withColumn("date", to_date(col("newTS")))
.withColumn("year", year(col("date")))
.withColumn("month", month(col("date")))
.withColumn("day", dayofmonth(col("date")))
.groupBy("deviceid", "year", "month", "day")
.agg(avg(col("temperature")).alias("avg_temp"), max(col("temperature")).alias("max_temp"),
avg(col("humidity")).alias("avg_humidity"), max(col("humidity")).alias("max_humidity"))
.select(col("deviceid"), col("month"), col("day"), col("avg_temp"), col("max_temp"))
//

// Adding few more columns to identify the device type, name and id
val final_df: DataFrame = iot_transformed_df
.withColumn("devicenumber", callBuiltin("split_part", col("deviceid"), lit("_"), -1).cast(IntegerType))
.withColumn("devicename", callBuiltin("split_part", col("deviceid"), lit("_"), 0))
.withColumn("devicetype", when(col("deviceid").like(lit("%board_1%")), lit("Primary"))
.when(col("deviceid").like(lit("%light_2%")), lit("Secondary"))
.when(col("deviceid").like(lit("%board%")) && col("devicenumber") == lit(4), lit("Seondary_Backup")).otherwise(lit("Unknown"))
)

return final_df

}

val getAlertMaxSpeed = udf((sensorID: String, month: Int, readings: Array[String]) => {
val threshold = 17.0
val alerts = new ListBuffer[(String,Int,Double)]()
for (reading <- readings) {
if (reading.toDouble > threshold) {
alerts+=((sensorID,month,reading.toDouble))
// alerts += s"Alert: Sensor $sensorID detected high reading of $reading in the month $month"
}
}
// Storing the ListBuffer with multiple args as an array of string
val resultArray = alerts.map { case (s, m, r) => s"$s|$m|$r" }.toArray
resultArray.distinct

})

def getSensorUnexpectedTempReading(session: Session):Unit={
// val df_data=session.sql(s"select listagg(distinct max_temp,',') as temp_monthly_readings,deviceid,month from iot_transformed_table group by deviceid,month")
val df_data=session.sql(s"select max_temp,deviceid,month from iot_transformed_table")

val df_trans = df_data.groupBy(col("deviceid"),col("month")).agg(array_agg(col("max_temp")).withinGroup(col("deviceid"),col("month")).alias("TEMP_MONTHLY_READINGS"))

//Creating Permanent UDF and using it in transformations
AllUDFs.createUDF(session)

// Using the Permanent UDF (udf_getAlertOnMaxSpeedTemp) which was created in AllUDF scala object
val df_readings = df_trans.withColumn("SpeedAnomalyDetection",callUDF("udf_getAlertOnMaxSpeedTemp",col("deviceid"),col("month"),col("temp_monthly_readings")))

// Using the anonymous UDF (getAlertMaxSpeed) which we have created in the same scala class
val df_readings_array = df_readings.withColumn("SpeedAnomalyDetectionArray",getAlertMaxSpeed(col("deviceid"),col("month"),col("temp_monthly_readings")))

df_readings_array.write.mode("overwrite").saveAsTable("Sensor_Anomaly_Details")
}


def loadData(session: Session): DataFrame = {
/**
* here we are mocking the data for the IOT and returning a Snowpark DataFrame as the output
* This can be replaced by your own logic of reading from external or internal storage
*/

val deviceids = Array("board_1", "board_11", "board_12", "light_2", "light_21", "light_22", "light_23", "light_blue_33", "board_dark_4", "board_45", "device_5")

val rows = 24 * 10 * 5

// Generate the IoT data and store it in a list of tuples
val iot_data = (1 to rows).map { i =>
val deviceid = deviceids(i % 5)
val timestamp = (System.currentTimeMillis - (rows - i) * 60 * 60 * 1000) / 1000L
val temperature = Random.nextInt(40) - 20
val humidity = Random.nextInt(100)
(deviceid, timestamp, temperature, humidity)
}

val iotSchema = StructType(Seq(
StructField("deviceid", StringType, true),
StructField("timestamp", LongType, true),
StructField("temperature", IntegerType, true),
StructField("humidity", IntegerType, true)
))

val iot_data_row = iot_data.toSeq
// println(iot_data_row)

val iot_row: Seq[Row] = iot_data_row.map(t => Row(t._1, t._2, t._3, t._4))
val iot_df = session.createDataFrame(iot_row, schema = iotSchema)

return iot_df
}

}

Below is the Scala object which has the required UDFs created. We will be calling the function createUDF in the Snowpark Dataframe transformations.

// // Code for AllUDFs.scala

package com.iot.snowpark

import com.snowflake.snowpark._
import com.snowflake.snowpark.functions.udf
import com.snowflake.snowpark.types.Variant
import scala.language.implicitConversions


import scala.collection.mutable.ListBuffer

object AllUDFs {

private val getAlertDeviceMaxSpeed = ((sensorID: String, month: Int, readings: Array[String])=> {
val threshold = 17.0
val alerts = new ListBuffer[String]()
for (reading <- readings) {
if (reading.toDouble > threshold) {
alerts += s"Alert: Sensor $sensorID detected high reading of $reading in the month $month"
}
}
// Converting the ListBuffer to Array as listbuffer is not supported as a return type in Snowpark Scala
alerts.toArray.distinct
/**
* You can also convert the ListBuffer to a variant and pass that as the return type for the function as show in the below snippet
* val fabulist = new Variant(alerts)
*/

})

def createUDF(session:Session):Unit= {
session.sql("drop function if exists udf_getAlertOnMaxSpeedTemp(varchar,number,array)").collect()
session.udf.registerPermanent("udf_getAlertOnMaxSpeedTemp",getAlertDeviceMaxSpeed,"scalajarstest")
}




}

Few things to consider while working with Snowpark Scala

  1. If you are creating and using user defined functions then make sure the return types and the arguments are confined to the below list.

Data Types support for arguments and return values

https://docs.snowflake.com/en/developer-guide/snowpark/scala/creating-udfs

2. When your object extends the App trait, it's important to ensure that your UDFs do not reference fields that have been declared and initialized. This consideration arises from Snowpark's serialization and upload of the UDF definition to Snowflake. In this process, any fields declared prior to serialization are not initialized and consequently resolve to null. To overcome this problem implement the main method like :

def main(args: Array[String]): Unit = {
// Your code …
}

Create JAR File for the Project with all dependencies

To execute the code within Snowflake, one approach involves generating a JAR file and then importing this JAR file through the creation of a Snowflake Scala Stored Procedure. We will be using sbt-plugin assembly to compile and bundle the code into an assembly JAR. This JAR file will have all the depedencies(libraries, inputfiles if any ) and the entire code. Below are the steps to create a JAR file.

  1. Create a file with name plugins.sbt in your project folder in IntelliJ.
Adding new file plugin.sbt

2. Add the following line in the plugins.sbt file.

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "1.1.0")

3. Once the plugins.sbt file is created, ensure you are not including the snowpark library as it is made avilable on the server side where the Scala code gets executed. In order to exclude the Snowpark library you have to append % “provided” to the libraryDependencies in your sbt.build file as shown below.

Updating dependencies

4. Once the above steps are completed, on terminal go to the project home folder and run sbt assembly and you should see the JAR file created for your project and it logs the path to the JAR file. You can download the sbt from https://www.scala-sbt.org/download.html

sbt assembly
Output of running sbt assembly

The JAR file will be generated within the “target” directory, residing within a subfolder that corresponds to the Scala version(2.12 in our scenario) being used (/target/scala-2.12/filename.jar).

Path of the JAR file

Running the Snowpark Scala Code on Snowflake

After successfully generating the JAR file, you should upload the file to the snowflake stage. You can use snowsql or Snowpark file operations or manually upload the file from the Snowsight UI(there are restrictions on the size of file when uploading from the UI). If the size is over 50MB use either snowsql or Snowpark fileoprations to upload the JAR file to the internal stage.

While creating the Scala Stored Procedure which is importing the JAR file you have created, you need to specify the name and path of the JAR and the handler which is name of function in your Scala class for Snowflake to understand which Scala object and function needs to be called. I have created a internal stage with name @scalajarstest and uploaded the JAR file to the stage. Below is the SQL to create a Scala Stored Procedure which is importing the JAR file.

Handler should include function you want to call and it should be like packagename.classname.functionname. This is similar to how you pass the jar file path and class name to the spark-submit command.

CREATE OR REPLACE PROCEDURE usp_LoadDeviceIotData()
RETURNS STRING
LANGUAGE SCALA
RUNTIME_VERSION = '2.12'
PACKAGES = ('com.snowflake:snowpark:latest')
IMPORTS = ('@scalajarstest/SnowparkScalaStarter-assembly-0.1.0-SNAPSHOT.jar')
HANDLER = 'com.iot.snowpark.IOTDataAnalysisBase.run';

In our example Scala object is IOTDataAnalysisBase which is under package com.iot.snowpark and the function name we are calling is run.

Using UDFs in Snowpark Scala

Customers tend to use UDFs in Spark which helps them in extending the built-in functionality and perform operations tailored to thier specific use case. One of the most discussed topic with the developers who are moving their Spark Scala workloads to Snowpark Scala is around how do we create and use the UDFs in Snowpark Scala .

The Snowpark API provides methods that you can use to create a user-defined function from a lambda, or function in Scala and you can call these UDFs to perform specific operations on your Snowpark DataFrame.

We will be creating two UDFs; first one asAnonymous UDF and second as permanent UDf.

Anonymous UDF

Below is the code for Anonymous UDF which we will use it in the transformation task. We have created the UDF in the same Scala class that has various transformations defined. Since ListBuffer is not a supported return type we are converting it to Array of strings by concatenating arguments of the ListBuffer.

  val getAlertMaxSpeed = udf((sensorID: String, month: Int, readings: Array[String]) => {
val threshold = 17.0
val alerts = new ListBuffer[(String,Int,Double)]()
for (reading <- readings) {
if (reading.toDouble > threshold) {
alerts+=((sensorID,month,reading.toDouble))
}
}
// Storing the ListBuffer with multiple args as array of string
val resultArray = alerts.map { case (s, m, r) => s"$s|$m|$r" }.toArray
resultArray.distinct

})

We are using this UDF as part of the transformations shown in the below code snippet:

val df_readings_array = df_readings.withColumn("SpeedAnomalyDetectionArray",
getAlertMaxSpeed(col("deviceid"),col("month"),col("temp_monthly_readings")))

Permanent UDF

Below is the code for Permanent UDF and you will use this approach if you want to create UDF’s which will be used by code outside the ELT/ELT pipeline you are creating. You have to specifiy the stage path where your UDF will be serialized and uploaded to.

  private val getAlertDeviceMaxSpeed = ((sensorID: String, month: Int, readings: Array[String])=> {
val threshold = 18.0
val alerts = new ListBuffer[String]()
for (reading <- readings) {
if (reading.toDouble > threshold) {
alerts += s"Alert: Sensor $sensorID detected high reading of $reading in the month $month"
}
}
// Converting the ListBuffer to Array as listbuffer is not supported as a return type in Snowpark Scala
alerts.toArray.distinct
/**
* You can also convert the ListBuffer to a variant and pass that as the return type for the function as show in the below snippet
* val fabulist = new Variant(alerts)
*/

})

// Creating permanent UDF for the logic defined in the above code block
session.udf.registerPermanent("udf_getAlertOnMaxSpeedTemp",
getAlertDeviceMaxSpeed,"scalajarstest")

If we return the ListBuffer type as the ouput return type then we will see the below errors while running the code and hence we are converting the return type to Array of string.

Exception in thread “main” java.lang.UnsupportedOperationException: Unsupported type scala.collection.mutable.ListBuffer[String] for Scala UDFs. Supported collection types are Array[Byte], Array[String] and mutable.Map[String, String]

We are using the permanent UDF as part of the transformations shown in the below code snippet. In order to call a user created UDF, we should use callUDF API which accepts the UDF name and the parameters to the UDF as the arguments.

val df_readings = df_trans.withColumn("SpeedAnomalyDetection",
callUDF("udf_getAlertOnMaxSpeedTemp",col("deviceid"),col("month")
,col("temp_monthly_readings")))

Conclusion

In a nutshell, the familiarity of building Snowpark Scala applications with IntelliJ is allowing you to explore Snowpark’s capabilities with the tools you know best. As you embark on this journey, you’re not just adapting but you’re extending your expertise to new horizons. So, use your favorite IntelliJ IDE and unlock the potential of Snowpark Scala to build data pipeline applications within the ecosystem you’re already accustomed to.

References

  1. Scala to SQL Data Types mapping

https://docs.snowflake.com/en/developer-guide/snowpark/scala/sql-to-snowpark

2. Snowpark Scala API Reference

3. Snowpark Scala developer guide

--

--