ETL Pipeline using Spark SQL

achraf el gdaouni
Analytics Vidhya
Published in
5 min readNov 11, 2019

--

In this tutorial we will create an ETL Pipeline to read data from a CSV file, transform it and then load it to a relational database (postgresql in our case) and also to JSON file format. we will show you how simple it is to read and write data from different sources using sprak DataFrameReader and DataFrameWriter.

Here are the steps that we are going to follow during this tutorial :

  • Load the datasets ( csv) into Apache Spark
  • Analyze the data with Spark SQL
  • Transform the data into JSON format and save it to database
  • Query and load the data back into Spark
Spark ETL Pipeline

Dataset description :

Since 2013, Open Payments is a federal program that collects information about the payments drug and device companies make to physicians and teaching hospitals for things like travel, research, gifts, speaking fees, and meals.

File : OP_DTL_GNRL_PGYR2018_P06282019.csv:

This file contains the data set of General Payments reported for the 2018 program year. General Payments are defined as payments or other transfers of value made to a covered recipient (physician or teaching hospital) that are not made in connection with a research agreement or research protocol.

File header :

$ head -1 OP_DTL_GNRL_PGYR2018_P06282019.csvChange_Type,Covered_Recipient_Type,Teaching_Hospital_CCN,Teaching_Hospital_ID,Teaching_Hospital_Name,Physician_Profile_ID,Physician_First_Name,Physician_Middle_Name,Physician_Last_Name,,
{..}
,Indicate_Drug_or_Biological_or_Device_or_Medical_Supply_5,Product_Category_or_Therapeutic_Area_5,Name_of_Drug_or_Biological_or_Device_or_Medical_Supply_5,Associated_Drug_or_Biological_NDC_5,Program_Year,Payment_Publication_Date

First Line :

"NEW","Covered Recipient Physician",,,,"258145","HOWARD",,"SADINSKY",,"321 BOSTON POST RD",,"MILFORD","CT","06460-2574","United States",,,"Doctor of Osteopathy","Allopathic & Osteopathic Physicians|Pediatrics","CT",,,,,"Mission Pharmacal Company","100000000186","Mission Pharmacal Company","TX","United States",13.78,"04/13/2018","1","In-kind items and services","Food and Beverage",,,,"No","No Third Party Payment",,"No",,,"No","521226951","No","Yes","Covered","Drug","Adrenocorticosteroid","Prednisolone 25","0178-0582-08",,,,,,,,,,,,,,,,,,,,,"2018","06/28/2019"

In our example we will use only the following fields :

{"physician_id":"258145","date_payment":"04/13/2018 ","record_id":"521226951","payer":"Mission Pharmacal Company","amount":13.78,"Physician_Specialty":"Allopathic & Osteopathic Physicians|Pediatrics ","Nature_of_payment":"Food and Beverage"}

Read The data from a csv file into DataFrame :

First we create a spark session object using SparkSession.builder function, then we called the read function on the sparksession object so we can get the DataFrameReader object. as you can see we used the function csv to load the csv file instead of calling [ format(“csv”).load(paths : ) ], because this function did that in our place, you can verify this if you check the doc of this func :

@scala.annotation.varargs
def csv(paths: String*): DataFrame = format("csv").load(paths : _*)

we created a UDF function toDouble to ensure that the column amount contain only double values, so we can convert it next to a dataset of payment object type.

Transform into a Dataset of payment object :

Here we used the spark sql function to execute a sql query on the payment view, we can also use the dataframe df2 directly to perform the same query, then we convert it to a dataset of payment, note that in order to use this syntax, we had to import spark implicits. Also the payment class should be a case class

import spark.implicits._

Finally we cache the newly created dataset.

Explore and query the Open Payment data with spark Dataset :

Save datasets to json file :

to export the dataset to an external file is as simple as reading process. this time instead of the read method we call the write method to get a DataFrameWriter, we specify the write mode (here we specified overwrite to recreate the file if it already exist), then we call the json method and we provide the output path, again json is the same as calling :

format("json").save(path)

Save dataset to the database :

Source Code :

import java.sql.DriverManager

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{Dataset, SparkSession}
import org.apache.spark.sql.functions.{desc, sum, udf}


object ETL extends App {

Logger.getLogger("org").setLevel(Level.OFF)
val spark = SparkSession.builder.appName("ETL pipeline").master("local[*]").getOrCreate()
import spark.implicits._

/*
I - Read data from the csv file and create a temp view
*/

def strToDouble(str: String): Double = {
try{
str.toDouble
} catch {
case n: NumberFormatException => {
println("Cant cast " + str)
-1
}
}
}
val toDouble = udf[Double, String](strToDouble(_))
val df = spark.read.option("header","true").csv("/home/achraf/Desktop/labo/SparkLab/datasets/open_payments_csv/OP_DTL_GNRL_PGYR2018_P06282019.csv")
val df2 = df.withColumn("amount",toDouble(df("Total_Amount_of_Payment_USDollars")))
df2.createOrReplaceTempView("payments")


/*
II - Transform into a dataset of payment object
*/

/*
This class represent one row of our dataset, note that we selected only the fields that interest us
*/

case class Payment(physician_id: String, date_payment: String, record_id: String, payer: String, amount: Double, physician_specialty: String, nature_of_payment: String)

/*
Now we select fields that interest us from the temp view
*/
val ds: Dataset[Payment] = spark.sql(
"""select physician_profile_id as physician_id,
| Date_of_payment as date_payment,
| record_id as record_id,
| Applicable_manufacturer_or_applicable_gpo_making_payment_name as payer,
| amount,
| physician_specialty,
| nature_of_payment_or_transfer_of_value as nature_of_payment
| from payments
| where physician_profile_id is not null""".stripMargin).as[Payment]
ds.cache()

/*
III - Explore and query the Open Payment data with Spark Dataset
*/

//print first 5 payment
ds.show(5)

//print the schema of ds
ds.printSchema()

// What are the Nature of Payments with reimbursement amounts greater than $1000 ordered by count?
ds.filter($"amount" > 1000).groupBy("nature_of_payment")
.count().orderBy(desc("count")).show(5)

// what are the top 5 nature of payments by count?
ds.groupBy("nature_of_payment").count().
orderBy(desc("count")).show(5)

// what are the top 10 nature of payment by total amount ?
ds.groupBy("nature_of_payment").agg(sum("amount").alias("total"))
.orderBy(desc("total")).show(10)

// What are the top 5 physician specialties by total amount ?
ds.groupBy("physician_specialty").agg(sum("amount").alias("total"))
.orderBy(desc("total")).show(5,false)

/*
IV - Saving JSON Document
*/

// Transform the dataset to an RDD of JSON documents :
ds.write.mode("overwrite").json("/home/achraf/Desktop/labo/SparkLab/datasets/open_payments_csv/OP_DTL_GNRL_PGYR2018_P06282019_demo.json")

/*
V - Saving data to database
*/
val url = "jdbc:postgresql://localhost:5432/postgres"
import java.util.Properties
val connectionProperties = new Properties()
connectionProperties.setProperty("Driver", "org.postgresql.Driver")
connectionProperties.setProperty("user", "spark")
connectionProperties.setProperty("password","spark")

val connection = DriverManager.getConnection(url, connectionProperties)
println(connection.isClosed)
connection.close()

val dbDataFrame = spark.read.jdbc(url,"payment",connectionProperties)
dbDataFrame.show()

ds.write.mode("append").jdbc(url,"payment",connectionProperties)
dbDataFrame.show()

}

Summary :

In this blog post, you’ve learned how to ETL Open Payments CSV file data to JSON, explore with SQL, and store to relational database using Spark Datasets.

--

--

achraf el gdaouni
Analytics Vidhya

Spark and Scala Developer and a Data Science enthusiast