Spark Scala job with Dataproc Serverless

Mazlum Tosun
Google Cloud - Community
18 min readAug 10, 2023

1. Explanation of the use case presented in this article

This article shows a complete example and real world use case with a Spark Scala job launched with Dataproc Serverless.

With this Serverless approach, developers only need to be focused on the Spark code and logic and no need to manage a cluster and an infrastructure.

Spark with Scala was chosen for this first article but a second article will show the same use case with Spark Python.

For the deployment of the Spark job, we can use a Docker container containing the expected environment with a fat jar or directly generate a fat jar and pass it in the command line that launches the job with Dataproc Serverless.

The second approach was chosen with the generation of a fat jar copied in a Cloud Storage bucket.

The CI CD part will be managed by Cloud Build :

  • Deploy the Spark job : generate the fat jar with the sbt build tool and copy it in a Cloud Storage bucket
  • Run the Spark job : a gcloud command will launch the Spark job with Dataproc Serverless

Here you can see the diagram of this use case :

I also created a video on this topic in my GCP Youtube channel, please subscribe to the channel to support my work for the Google Cloud community :

English version

French version

An example of raw data in a Json format :

{
"teamName": "PSG",
"teamScore": 30,
"scorers": [
{
"scorerFirstName": "Kylian",
"scorerLastName": "Mbappe",
"goals": 15,
"goalAssists": 6,
"games": 13
},
{
"scorerFirstName": "Da Silva",
"scorerLastName": "Neymar",
"goals": 11,
"goalAssists": 7,
"games": 12
},
{
"scorerFirstName": "Angel",
"scorerLastName": "Di Maria",
"goals": 7,
"goalAssists": 8,
"games": 13
},
{
"scorerFirstName": "Lionel",
"scorerLastName": "Messi",
"goals": 12,
"goalAssists": 8,
"games": 13
},
{
"scorerFirstName": "Marco",
"scorerLastName": "Verrati",
"goals": 3,
"goalAssists": 10,
"games": 13
}
]
}

The corresponding computed domain data :

{
"teamName": "PSG",
"teamScore": 30,
"teamTotalGoals": 48,
"teamSlogan": "Paris est magique",
"topScorerStats": {
"firstName": "Kylian",
"lastName": "Mbappe",
"goals": 15,
"games": 13
},
"bestPasserStats": {
"firstName": "Marco",
"lastName": "Verrati",
"goalAssists": 10,
"games": 13
}
}

The goal is to calculate :

  • The total goals per team
  • The top scorer node
  • The best passer node
  • Set the slogan per team

2. Enable private access subnet

Dataproc Serverless requires Google Private Access to be enabled in the regional subnet where you run your Spark workloads since Spark drivers and executors require private IP addresses.

Enable Google Private Access on the default subnet in your selected region.

gcloud compute networks subnets \
update default \
--region=europe-west1 \
--enable-private-ip-google-access

Verify that Google Private Access is enabled. The output should be True.

gcloud compute networks subnets \
describe default \
--region=europe-west1 \
--format="get(privateIpGoogleAccess)"

3. Create a Persistent History Server

The Spark UI provides insights into Spark batch workloads. You can create a single-node Dataproc persistent history server that hosts the Spark UI and provides access to the history of completed Dataproc Serverless workloads.

Set a name for your persistent history server.

PHS_CLUSTER_NAME=gb-spark-job-history

gcloud dataproc clusters create \
${PHS_CLUSTER_NAME} \
--region=europe-west1 \
--single-node \
--enable-component-gateway \
--properties=spark:spark.history.fs.logDirectory='gs://mazlum_dev/phs/*/spark-job-history'

4. Structure of the project

4.1 Environment variables

Set the following environment variables :

# Common
export PROJECT_ID={{project_id}}
export LOCATION={{location}}

# Deploy Spark job
export LOCAL_JAR_PATH=target/scala-2.13/teams-league-spark-scala-assembly-0.1.0-SNAPSHOT.jar
export GCS_JARS_PATH=gs://mazlum_dev/spark/jars

# Run Spark job
export SERVICE_ACCOUNT={{your_sa_email}}
export JAR="gs://mazlum_dev/spark/jars/teams-league-spark-scala-assembly-0.1.0-SNAPSHOT.jar"
export MAIN_CLASS=fr.groupbees.application.TeamLeagueApp
export HISTORY_SERVER_CLUSTER=projects/gb-poc-373711/regions/europe-west1/clusters/gb-spark-job-history
export INPUT_TEAM_STATS_FILE_PATH="gs://mazlum_dev/hot/etl/spark/input_teams_stats_raw.json"
export INPUT_TEAM_SLOGANS_FILE_PATH="gs://mazlum_dev/hot/etl/spark/input_team_slogans.json"
export OUTPUT_TEAM_LEAGUE_DATASET="mazlum_test"
export OUTPUT_TEAM_STATS_TABLE="team_stat"

4.2 Code structure and logic

4.2.1 Build tool

In this use case, sbt was chosen as Scala build tool.

Maven could also have been used instead but sbt is popular in the Scala ecosystem and most often used in Scala projects.

val scala2Version = "2.13.11"
val sparkVersion = "3.3.2"

lazy val root = project
.in(file("."))
.settings(
name := "teams-league-spark-scala",
version := "0.1.0-SNAPSHOT",

scalaVersion := scala2Version,
libraryDependencies += "org.scalactic" %% "scalactic" % "3.2.16",
libraryDependencies += "org.scalatest" %% "scalatest" % "3.2.16" % "test",
libraryDependencies += "org.apache.spark" %% "spark-core" % sparkVersion % "provided",
libraryDependencies += "org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
libraryDependencies += "com.google.cloud.spark" %% "spark-bigquery-with-dependencies" % "0.32.0"
)

Dataproc Serverless has Spark runtime version

For this use case, we chose the 2.0 runtime version containing Spark 3.3.2 , that’s why we used this version in the sbt build file.

Spark 3 is compatible with Scala 2.13 and currently not compatible with Scala 3. For these reasons we chose the Scala 2.13.11 version.

The settings bloc contains :

  • The artifact name : team-league-spark-scala
  • The artifact version : 0.1.0-SNAPSHOT

The dependencies are :

Two dependencies to write tests with Scala Test

libraryDependencies += "org.scalactic" %% "scalactic" % "3.2.16",
libraryDependencies += "org.scalatest" %% "scalatest" % "3.2.16" % "test",

The Spark Core dependency to use the general libraries around Spark. This library is marked as provided because it’s given by the Dataproc Serverless runtime environment. The build.sbt file does not specify a Cloud Storage dependency because the connector implements the standard HDFS interface. When a Spark job accesses Cloud Storage cluster files (files with URIs that start with gs://), the system automatically uses the Cloud Storage connector to access the files in Cloud Storage

libraryDependencies += "org.apache.spark" %% "spark-core" % sparkVersion % "provided",

The Spark SQL dependency to use the Dataframe and Dataset structures and the Spark SQL libraries. This library is marked as provided for the same reason as Spark Core

libraryDependencies += "org.apache.spark" %% "spark-sql" % sparkVersion % "provided",

The Spark BigQuery connector to read and write to BigQuery

libraryDependencies += "com.google.cloud.spark" %% "spark-bigquery-with-dependencies" % "0.32.0"

4.2.2 Java version and JVM option

The Java version used in the project is Java 17.

According to the documentation, Spark runs on Java 8/11/17, Scala 2.12/2.13, but there was an issue where we run the Spark job with Java 17 version :

Caused by: java.lang.IllegalAccessError: class org.apache.spark.storage.StorageUtils$ (in unnamed module @0x59d016c9) cannot access class sun.nio.ch.DirectBuffer (in module java.base) because module java.base does not export sun.nio.ch to unnamed module @0x59d016c9

There is a thread about this issue on Stackoverflow.

To solve this issue, we added a JVM option at compile time with sbt. There are different ways to add options with sbt but the most interesting for us was to create a .jvmopts file at the root of the project. sbt detects this file automatically and applies the configured JVM options.

--add-exports=java.base/sun.nio.ch=ALL-UNNAMED

This option resolves the compilation issue with sbt, Spark 3 and Java 17.

4.2.3 Fat jar and Assembly plugin

As explained in the introduction part, we need to generate a fat jar for the project and then copy it in a Cloud Storage bucket.

To generate a fat jar with sbt, we added the assembly plugin with the creation of a assembly.sbt file in the project folder :

The content of the assembly.sbt file is :

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.15.0")

sbt will load this plugin and we can then use it to generate the fat jar.

4.2.4 The Spark code structure and explanations

There are two main folders :

  • application : contains the class with the main method, that corresponds to the entrypoint of the Spark Scala application
  • domain : contains all the typed objects with raw and domain objects

A class at the root package called TeamStatsDatasetMapper has the responsibilty to build a Dataset of the domain data from the raw Dataset.

The TeamLeagueApp class :

package fr.groupbees.application

import fr.groupbees.TeamStatsDatasetMapper
import fr.groupbees.domain.{TeamStats, TeamStatsRaw}
import org.apache.spark.sql.functions.current_timestamp
import org.apache.spark.sql.{Dataset, SparkSession}

object TeamLeagueApp {

def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder
.appName("Team League Application")
.getOrCreate()

val inputTeamStatsFilePath = args(0)
val inputTeamSlogansFilePath = args(1)
val outputTeamLeagueDataset = args(2)
val outputTeamStatsTable = args(3)

val teamStatsRawEncoder = org.apache.spark.sql.Encoders.product[TeamStatsRaw]
val teamStatsRawDataset: Dataset[TeamStatsRaw] = spark.read.json(inputTeamStatsFilePath).as(teamStatsRawEncoder)

val teamSlogansDataframe = spark
.read
.json(inputTeamSlogansFilePath)

val teamStatsDataset: Dataset[TeamStats] = TeamStatsDatasetMapper.toTeamStatsDataset(
teamStatsRawDataset = teamStatsRawDataset,
teamSlogansDataframe = teamSlogansDataframe
)

teamStatsDataset
.withColumn("ingestionDate", current_timestamp())
.write
.format("bigquery")
.option("writeMethod", "direct")
.option("createDisposition", "CREATE_NEVER")
.mode("append")
.save(s"$outputTeamLeagueDataset.$outputTeamStatsTable")

spark.stop()
}
}

1 — A Spark session is created

2 — Some arguments are passed when executing the command line to run the Job, and these arguments are retrieved in the job

3 — The Json raw file is read as a Dataset with the Spark SQL library. Dataset is like a table of data with rows and columns, we preferred to use it because this structure benefits an optimized engine with Spark compared to the standard RDD. This structure is immutable and offers a very concise/expressive code to apply pure functions and transformations on columns (UDF). Dataset is a typed structure compared to Dataframe.

4 — A side input is retrieved containing the slogans per team as a Dataframe. A Dataframe is similar to Dataset but it’s not a typed structure. In this case no need to type the slogan data.

5 — We call a mapper, from the raw Dataset and the slogan Dataframe to build a Dataset of Domain data

6 — We write the Dataset of domain data to a BigQuery table, via the BigQuery connector with Spark. The ingestionDate technical field is added to the structure and other options are applied :

  • writeMethod=direct : allows to use the BigQuery Storage Write API
  • createDisposition=CREATE_NEVER : prevents the creation of the BigQuery table by the Spark job, because we prefer to delegate this responsibility in a Terraform code
  • mode("append") : allows to append the data in the table

An important detail here, the schema of the result Dataset needs to match exactly the schema of the BigQuery output table.

The TeamStatsDatasetMapper class :

package fr.groupbees

import fr.groupbees.domain._
import org.apache.spark.sql.functions.{col, udf}
import org.apache.spark.sql.{DataFrame, Dataset}

object TeamStatsDatasetMapper {

private val rawColumnScorersName = "scorers"
private val rawColumnTeamName = "teamName"
private val rawColumnTeamSlogan = "slogan"
private val domainColumnTotalGoalsName = "teamTotalGoals"
private val domainColumnSloganName = "teamSlogan"
private val domainColumnTopScorerName = "topScorerStats"
private val domainColumnBestPasserName = "bestPasserStats"

def toTeamStatsDataset(teamStatsRawDataset: Dataset[TeamStatsRaw],
teamSlogansDataframe: DataFrame): Dataset[TeamStats] = {
val teamStatsEncoder = org.apache.spark.sql.Encoders.product[TeamStats]

val totalGoalsUdf = udf(calculateTeamTotalGoals _)
val topScorerUdf = udf(getTeamTopScorer _)
val bestPasserUdf = udf(getTeamBestPasser _)

teamStatsRawDataset
.join(teamSlogansDataframe, rawColumnTeamName)
.withColumnRenamed(rawColumnTeamSlogan, domainColumnSloganName)
.withColumn(domainColumnTotalGoalsName, totalGoalsUdf(col(rawColumnScorersName)))
.withColumn(domainColumnTopScorerName, topScorerUdf(col(rawColumnScorersName)))
.withColumn(domainColumnBestPasserName, bestPasserUdf(col(rawColumnScorersName)))
.drop(rawColumnScorersName)
.as(teamStatsEncoder)
}

private def calculateTeamTotalGoals(scorers: List[TeamScorerRaw]): Long = {
scorers.map(_.goals).sum
}

private def getTeamTopScorer(scorers: List[TeamScorerRaw]): TeamTopScorerStats = {
val topScorerRaw: TeamScorerRaw = scorers.maxBy(_.goals)

TeamTopScorerStats(
firstName = topScorerRaw.scorerFirstName,
lastName = topScorerRaw.scorerLastName,
goals = topScorerRaw.goals,
games = topScorerRaw.games
)
}

private def getTeamBestPasser(scorers: List[TeamScorerRaw]): TeamBestPasserStats = {
val bestPasserRaw: TeamScorerRaw = scorers.maxBy(_.goalAssists)

TeamBestPasserStats(
firstName = bestPasserRaw.scorerFirstName,
lastName = bestPasserRaw.scorerLastName,
goalAssists = bestPasserRaw.goalAssists,
games = bestPasserRaw.games
)
}
}

1 — We firstly join the raw stats data with the slogan on the teamName field, like in SQL queries, Dataframe and Dataset offer this functionality to cross and join data.

2 — The slogan field from the side input has the slogan name. We want to rename this field to teamSlogan for the domain data, that’s why we used the withColumnRenamed method.

3 — The three following transformations use pure function and UDF with Spark SQL to calculate the total goals per team, the top scorer and the best passer. All these transformations are calculated from the raw scorers field.

4 — We drop the scorers raw data from the Dataframe, because this field is not needed in the result domain data.

5 — An encoder is used to type the result Dataframe in order to have a typed Dataset of domain data.

5. Autoscaling for Dataproc Serverless jobs

When you submit your Spark workload, Dataproc Serverless for Spark can dynamically scale workload resources, such as the number of executors, to run your workload efficiently. Dataproc Serverless autoscaling is the default behavior, and uses Spark dynamic resource allocation to determine whether, how, and when to scale your workload.

We can override Spark properties when we submit a batch workload to control autoscaling. Check the Spark dynamic allocation properties section.

To have more details on how to set properties, you can check this link :

gcloud dataproc batches submit \
--properties=spark.sql.catalogImplementation=hive,spark.hive.metastore.uris=METASTORE_URI,spark.hive.metastore.warehouse.dir=" tabindex="0" style="color: unset; font-family: var(--devsite-code-font-family); font-feature-settings: unset; font-kerning: unset; font-optical-sizing: unset; font-palette: unset; font-size: unset; font-stretch: unset; font-style: unset; font-synthesis: unset; font-variant: unset; font-variation-settings: unset; font-weight: unset; forced-color-adjust: unset; text-orientation: unset; text-rendering: unset; -webkit-font-smoothing: unset; -webkit-locale: unset; -webkit-text-orientation: unset; -webkit-writing-mode: unset; writing-mode: unset; zoom: unset; accent-color: unset; place-content: unset; place-items: unset; place-self: unset; alignment-baseline: unset; animation-composition: unset; animation: unset; app-region: unset; appearance: unset; aspect-ratio: unset; backdrop-filter: unset; backface-visibility: unset; background: unset; background-blend-mode: unset; baseline-shift: unset; baseline-source: unset; block-size: unset; border-block: unset; border: unset; border-radius: unset; border-collapse: unset; border-end-end-radius: unset; border-end-start-radius: unset; border-inline: unset; border-start-end-radius: unset; border-start-start-radius: unset; inset: unset; box-shadow: unset; box-sizing: unset; break-after: unset; break-before: unset; break-inside: unset; buffered-rendering: unset; caption-side: unset; caret-color: unset; clear: unset; clip: unset; clip-path: unset; clip-rule: unset; color-interpolation: unset; color-interpolation-filters: unset; color-rendering: unset; color-scheme: unset; columns: unset; column-fill: unset; gap: unset; column-rule: unset; column-span: unset; contain: unset; contain-intrinsic-block-size: unset; contain-intrinsic-size: unset; contain-intrinsic-inline-size: unset; container: unset; content: unset; content-visibility: unset; counter-increment: unset; counter-reset: unset; counter-set: unset; cursor: unset; cx: unset; cy: unset; d: unset; display: unset; dominant-baseline: unset; empty-cells: unset; fill: unset; fill-opacity: unset; fill-rule: unset; filter: unset; flex: unset; flex-flow: unset; float: unset; flood-color: unset; flood-opacity: unset; grid: unset; grid-area: unset; height: unset; hyphenate-character: unset; hyphenate-limit-chars: unset; hyphens: unset; image-orientation: unset; image-rendering: unset; initial-letter: unset; inline-size: unset; inset-block: unset; inset-inline: unset; isolation: unset; letter-spacing: unset; lighting-color: unset; line-break: unset; line-height: unset; list-style: unset; margin-block: unset; margin: unset; margin-inline: unset; marker: unset; mask: unset; mask-type: unset; math-depth: unset; math-shift: unset; math-style: unset; max-block-size: unset; max-height: unset; max-inline-size: unset; max-width: unset; min-block-size: unset; min-height: unset; min-inline-size: unset; min-width: unset; mix-blend-mode: unset; object-fit: unset; object-position: unset; object-view-box: unset; offset: unset; opacity: unset; order: unset; orphans: unset; outline: unset; outline-offset: unset; overflow-anchor: unset; overflow-clip-margin: unset; overflow-wrap: unset; overflow: unset; overscroll-behavior-block: unset; overscroll-behavior-inline: unset; overscroll-behavior: unset; padding-block: unset; padding: unset; padding-inline: unset; page: unset; page-orientation: unset; paint-order: unset; perspective: unset; perspective-origin: unset; pointer-events: unset; position: unset; quotes: unset; r: unset; resize: unset; rotate: unset; ruby-position: unset; rx: unset; ry: unset; scale: unset; scroll-behavior: unset; scroll-margin-block: unset; scroll-margin: unset; scroll-margin-inline: unset; scroll-padding-block: unset; scroll-padding: unset; scroll-padding-inline: unset; scroll-snap-align: unset; scroll-snap-stop: unset; scroll-snap-type: unset; scroll-timeline: unset; scrollbar-gutter: unset; shape-image-threshold: unset; shape-margin: unset; shape-outside: unset; shape-rendering: unset; size: unset; speak: unset; stop-color: unset; stop-opacity: unset; stroke: unset; stroke-dasharray: unset; stroke-dashoffset: unset; stroke-linecap: unset; stroke-linejoin: unset; stroke-miterlimit: unset; stroke-opacity: unset; stroke-width: unset; tab-size: unset; table-layout: unset; text-align: unset; text-align-last: unset; text-anchor: unset; text-combine-upright: unset; text-decoration: unset; text-decoration-skip-ink: unset; text-emphasis: unset; text-emphasis-position: unset; text-indent: unset; text-overflow: unset; text-shadow: unset; text-size-adjust: unset; text-transform: unset; text-underline-offset: unset; text-underline-position: unset; white-space: unset; touch-action: unset; transform: unset; transform-box: unset; transform-origin: unset; transform-style: unset; transition: unset; translate: unset; user-select: unset; vector-effect: unset; vertical-align: unset; view-timeline: unset; view-timeline-inset: unset; view-transition-name: unset; visibility: unset; border-spacing: unset; -webkit-box-align: unset; -webkit-box-decoration-break: unset; -webkit-box-direction: unset; -webkit-box-flex: unset; -webkit-box-ordinal-group: unset; -webkit-box-orient: unset; -webkit-box-pack: unset; -webkit-box-reflect: unset; -webkit-highlight: unset; -webkit-line-break: unset; -webkit-line-clamp: unset; -webkit-mask-box-image: unset; -webkit-mask: unset; -webkit-mask-composite: unset; -webkit-print-color-adjust: unset; -webkit-rtl-ordering: unset; -webkit-ruby-position: unset; -webkit-tap-highlight-color: unset; -webkit-text-combine: unset; -webkit-text-decorations-in-effect: unset; -webkit-text-fill-color: unset; -webkit-text-security: unset; -webkit-text-stroke: unset; -webkit-user-drag: unset; -webkit-user-modify: unset; widows: unset; width: unset; will-change: unset; word-break: unset; word-spacing: unset; x: unset; y: unset; z-index: unset;">" style="color: unset; font-family: var(--devsite-code-font-family); font-feature-settings: unset; font-kerning: unset; font-optical-sizing: unset; font-palette: unset; font-size: unset; font-stretch: unset; font-style: unset; font-synthesis: unset; font-variant: unset; font-variation-settings: unset; font-weight: unset; forced-color-adjust: unset; text-orientation: unset; text-rendering: unset; -webkit-font-smoothing: unset; -webkit-locale: unset; -webkit-text-orientation: unset; -webkit-writing-mode: unset; writing-mode: unset; zoom: unset; accent-color: unset; place-content: unset; place-items: unset; place-self: unset; alignment-baseline: unset; animation-composition: unset; animation: unset; app-region: unset; appearance: unset; aspect-ratio: unset; backdrop-filter: unset; backface-visibility: unset; background: unset; background-blend-mode: unset; baseline-shift: unset; baseline-source: unset; block-size: unset; border-block: unset; border-bottom: 1px dotted var(--devsite-var-color); border-radius: unset; border-collapse: unset; border-end-end-radius: unset; border-end-start-radius: unset; border-image: unset; border-inline: unset; border-left: unset; border-right: unset; border-start-end-radius: unset; border-start-start-radius: unset; border-top: unset; inset: unset; box-shadow: unset; box-sizing: unset; break-after: unset; break-before: unset; break-inside: unset; buffered-rendering: unset; caption-side: unset; caret-color: unset; clear: unset; clip: unset; clip-path: unset; clip-rule: unset; color-interpolation: unset; color-interpolation-filters: unset; color-rendering: unset; color-scheme: unset; columns: unset; column-fill: unset; gap: unset; column-rule: unset; column-span: unset; contain: unset; contain-intrinsic-block-size: unset; contain-intrinsic-size: unset; contain-intrinsic-inline-size: unset; container: unset; content: unset; content-visibility: unset; counter-increment: unset; counter-reset: unset; counter-set: unset; cursor: pointer; cx: unset; cy: unset; d: unset; display: unset; dominant-baseline: unset; empty-cells: unset; fill: unset; fill-opacity: unset; fill-rule: unset; filter: unset; flex: unset; flex-flow: unset; float: unset; flood-color: unset; flood-opacity: unset; grid: unset; grid-area: unset; height: unset; hyphenate-character: unset; hyphenate-limit-chars: unset; hyphens: unset; image-orientation: unset; image-rendering: unset; initial-letter: unset; inline-size: unset; inset-block: unset; inset-inline: unset; isolation: unset; letter-spacing: unset; lighting-color: unset; line-break: unset; line-height: unset; list-style: unset; margin-block: unset; margin: unset; margin-inline: unset; marker: unset; mask: unset; mask-type: unset; math-depth: unset; math-shift: unset; math-style: unset; max-block-size: unset; max-height: unset; max-inline-size: unset; max-width: unset; min-block-size: unset; min-height: unset; min-inline-size: unset; min-width: unset; mix-blend-mode: unset; object-fit: unset; object-position: unset; object-view-box: unset; offset: unset; opacity: unset; order: unset; orphans: unset; outline: unset; outline-offset: unset; overflow-anchor: unset; overflow-clip-margin: unset; overflow-wrap: unset; overflow: unset; overscroll-behavior-block: unset; overscroll-behavior-inline: unset; overscroll-behavior: unset; padding-block: unset; padding: unset; padding-inline: unset; page: unset; page-orientation: unset; paint-order: unset; perspective: unset; perspective-origin: unset; pointer-events: unset; position: unset; quotes: unset; r: unset; resize: unset; rotate: unset; ruby-position: unset; rx: unset; ry: unset; scale: unset; scroll-behavior: unset; scroll-margin-block: unset; scroll-margin: unset; scroll-margin-inline: unset; scroll-padding-block: unset; scroll-padding: unset; scroll-padding-inline: unset; scroll-snap-align: unset; scroll-snap-stop: unset; scroll-snap-type: unset; scroll-timeline: unset; scrollbar-gutter: unset; shape-image-threshold: unset; shape-margin: unset; shape-outside: unset; shape-rendering: unset; size: unset; speak: unset; stop-color: unset; stop-opacity: unset; stroke: unset; stroke-dasharray: unset; stroke-dashoffset: unset; stroke-linecap: unset; stroke-linejoin: unset; stroke-miterlimit: unset; stroke-opacity: unset; stroke-width: unset; tab-size: unset; table-layout: unset; text-align: unset; text-align-last: unset; text-anchor: unset; text-combine-upright: unset; text-decoration: unset; text-decoration-skip-ink: unset; text-emphasis: unset; text-emphasis-position: unset; text-indent: unset; text-overflow: unset; text-shadow: unset; text-size-adjust: unset; text-transform: unset; text-underline-offset: unset; text-underline-position: unset; white-space: pre; touch-action: unset; transform: unset; transform-box: unset; transform-origin: unset; transform-style: unset; transition: unset; translate: unset; user-select: unset; vector-effect: unset; vertical-align: unset; view-timeline: unset; view-timeline-inset: unset; view-transition-name: unset; visibility: unset; border-spacing: unset; -webkit-box-align: unset; -webkit-box-decoration-break: unset; -webkit-box-direction: unset; -webkit-box-flex: unset; -webkit-box-ordinal-group: unset; -webkit-box-orient: unset; -webkit-box-pack: unset; -webkit-box-reflect: unset; -webkit-highlight: unset; -webkit-line-break: unset; -webkit-line-clamp: unset; -webkit-mask-box-image: unset; -webkit-mask: unset; -webkit-mask-composite: unset; -webkit-print-color-adjust: unset; -webkit-rtl-ordering: unset; -webkit-ruby-position: unset; -webkit-tap-highlight-color: unset; -webkit-text-combine: unset; -webkit-text-decorations-in-effect: unset; -webkit-text-fill-color: unset; -webkit-text-security: unset; -webkit-text-stroke: unset; -webkit-user-drag: unset; -webkit-user-modify: unset; widows: unset; width: unset; will-change: unset; word-break: unset; word-spacing: unset; x: unset; y: unset; z-index: unset;">WAREHOUSE_DIR> \
other args ...

6. The CI CD part to deploy and run the job

The CI CD part is managed by Cloud Build :

  • deploy-spark-job.yaml : deploy the Spark job and the fat jar to Cloud Storage
  • run-spark-job.yaml : run the Spark with Dataproc Serverless

The deploy-spark-job.yaml file :

steps:
- name: sbtscala/scala-sbt:eclipse-temurin-17.0.4_1.7.1_3.2.0
script: |
sbt assembly
- name: google/cloud-sdk:420.0.0-slim
script: |
gcloud alpha storage cp $LOCAL_JAR_PATH $GCS_JARS_PATH
env:
- 'LOCAL_JAR_PATH=$_LOCAL_JAR_PATH'
- 'GCS_JARS_PATH=$_GCS_JARS_PATH'

1 — A first step from the official sbt Docker image is retrieved with Java 17 version, then a command line is executed with sbt to generate the fat jar for the application : sbt assembly

2 — A second step from the Google Cloud SDK brings the gcloud cli , then a command line is executed to copy the fat jar built previously to the specified Cloud Storage bucket :

gcloud alpha storage cp $LOCAL_JAR_PATH $GCS_JARS_PATH

We could have used the gsutil cp command instead but we used an equivalent proposed recently with more efficiency, however this command is still in alpha version.

Variables substitutions are proposed with Cloud Build and set to environment variables.

The run-spark-job.yaml file :

steps:
- name: google/cloud-sdk:420.0.0-slim
args: [ './scripts/run_spark_job.sh' ]
env:
- 'PROJECT_ID=$PROJECT_ID'
- 'LOCATION=$LOCATION'
- 'SERVICE_ACCOUNT=$_SERVICE_ACCOUNT'
- 'JAR=$_JAR'
- 'MAIN_CLASS=$_MAIN_CLASS'
- 'HISTORY_SERVER_CLUSTER=$_HISTORY_SERVER_CLUSTER'
- 'INPUT_TEAM_STATS_FILE_PATH=$_INPUT_TEAM_STATS_FILE_PATH'
- 'INPUT_TEAM_SLOGANS_FILE_PATH=$_INPUT_TEAM_SLOGANS_FILE_PATH'
- 'OUTPUT_TEAM_LEAGUE_DATASET=$_OUTPUT_TEAM_LEAGUE_DATASET'
- 'OUTPUT_TEAM_STATS_TABLE=$_OUTPUT_TEAM_STATS_TABLE'

1 — A step from the gcloud cli launches a separate bash script to run the Spark Scala job with Dataproc Serverless :

#!/usr/bin/env bash

set -e
set -o pipefail
set -u

echo "#######Running the Spark job with Dataproc Serverless"

gcloud dataproc batches submit spark \
--project="$PROJECT_ID" \
--region="$LOCATION" \
--service-account="$SERVICE_ACCOUNT" \
--jars="$JAR" \
--class="$MAIN_CLASS" \
--history-server-cluster="$HISTORY_SERVER_CLUSTER" \
-- "$INPUT_TEAM_STATS_FILE_PATH" "$INPUT_TEAM_SLOGANS_FILE_PATH" "$OUTPUT_TEAM_LEAGUE_DATASET" "$OUTPUT_TEAM_STATS_TABLE"

2 — A service account with the needed privileges is passed to the command line : service-account

We used predefined roles for simplicity but it’s better in real projects to have more fine grained permissions with custom roles and the least privilege principle. You can check this article to have a full example on this security principle.

3 — jars : the path of the fat jar from the Cloud Storage bucket

4 — class : the Scala main class fr.groupbees.application.TeamLeagueApp

5 — history-server-cluster : By default Dataproc Serverless proposes metrics for jobs, but we can also have access to the Spark UI for our serverless jobs. We give the path to the cluster dedicated to the jobs history. This gives more functionalities to monitor Spark jobs with the UI and the history server.

6 — The rest of the parameters corresponds to the job arguments like the input raw file paths and the BigQuery output table

7. Spark job detail, metric and observability

In the Dataproc and Clusters menu in Google Cloud, we have the history cluster created previously :

In the Batches menu, we have all the Spark Serverless jobs :

We click on a successful job and we can access to different elements :

Monitoring : monitoring for the job like Spark executors

Details : elapsed time and parameters used for the job…

We can access to the logs in Cloud Logging, the job output and the metrics from the History Server :

Example of capture from the History Server :

Conclusion

This article showed a full use case with a Spark Scala job launched with Dataproc Serverless.

This feature is really interesting compared to the traditional Dataproc cluster approach, developers can be focused on the code and the logic, without worrying about managing the infrastructure.

Moreover, Dataproc Serverless proposes autoscaling by default and the flexibility to override properties.

Batch metrics, job details and the possibility to check the Serverless jobs in a History Server gives many indicators to monitor Spark jobs in detail.

--

--

Mazlum Tosun
Google Cloud - Community

GDE Cloud | Head of Data & Cloud GroupBees | Data | Serverless | IAC | Devops | FP