How to write Spark ETL Processes

Matthew Powers
Jan 5, 2018 · 3 min read

Extract

val dataLakeDF = spark.read.parquet("s3a://some-bucket/foo")
val extractDF = dataLakeDF
.where(col("mood") === "happy")
.repartition(10000)

Transform

def withGreeting()(df: DataFrame): DataFrame = {
df.withColumn("greeting", lit("hello world"))
}

def withFarewell()(df: DataFrame): DataFrame = {
df.withColumn("farewell", lit("goodbye"))
}
def model()(df: DataFrame): DataFrame = {
df
.transform(withGreeting())
.transform(withFarewell())
}

Load (or Report)

def exampleWriter()(df: DataFrame): Unit = {
val path = "s3a://some-bucket/extracts/bar"
df.write.mode(SaveMode.Overwrite).parquet(path)
}

EtlDefinition

val etl = new EtlDefinition(
sourceDF = extractDF,
transform = model(),
write = exampleWriter()
)
etl.process()
case class EtlDefinition(
sourceDF: DataFrame,
transform: (DataFrame => DataFrame),
write: (DataFrame => Unit),
metadata: scala.collection.mutable.Map[String, Any] = scala.collection.mutable.Map[String, Any]()
) {

def process(): Unit = {
write(sourceDF.transform(transform))
}

}

Multiple EtlDefinitions

val etls = scala.collection.mutable.Map[String, EtlDefinition]()
etls += ("bar" -> etl)
etls += ("baz" -> etl2)
etls("bar").process()

Next Steps

Welcome to a place where words matter. On Medium, smart voices and original ideas take center stage - with no ads in sight. Watch
Follow all the topics you care about, and we’ll deliver the best stories for you to your homepage and inbox. Explore
Get unlimited access to the best stories on Medium — and support writers while you’re at it. Just $5/month. Upgrade