Read and write to S3 with Spark

Shad Amez
Gigahex
Published in
2 min readFeb 12, 2022

We will developing a sample spark application in Scala that will read JSON file from S3, do some basic calculation and then write to S3 in csv format.

About S3

S3 is an AWS managed distributed object storage that can be used for a wide variety of scenarios like video storage, static file hosting, data warehouse storage and many more.

Configure dependencies

Before we starting writing the program, we will declare the dependencies required for the application to work. Here is the list of dependencies that needs to be added.

libraryDependencies ++= Seq(
“org.apache.spark” %% “spark-core” % “3.2.0” ,
“org.apache.spark” %% “spark-sql” % “3.2.0”,
“org.apache.hadoop” % “hadoop-common” % “3.3.1”,
“org.apache.hadoop” % “hadoop-aws” % “3.3.1”,
)

Program description

We will be creating a basic Spark program that reads a json file that contains data pertaining to flight schedules and using Spark Dataframe APIs we will calculate the total flights starting from a specific city. The result of the program would be saved in CSV format.

Here is the sample record of the dataset in json format, that would be read using spark.read.json api

{
"id": 1,
"source": "New York",
"destination": "Dallas",
"departureTime": "2021-05-02 21:00:00",
"arrivalTime": "2021-05-02 24:00:00"
}

We will start by initializing the Spark session and inject the AWS credentials using the System property.

val spark = SparkSession.builder()
.master("local")
.config("spark.hadoop.fs.s3a.access.key", System.getProperty("aws.key"))
.config("spark.hadoop.fs.s3a.secret.key", System.getProperty("aws.secret"))
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
.appName("spark-s3-read-write")
.getOrCreate()

Accept the parameters for the program to read the input path and the output path where the result will be stored.

if(args.length < 2){
System.err.println("Usage: -f <file-path> -o <output-path>")
sys.exit(1)
}
val fileArg = args.indexOf("-f") + 1
val outArg = args.indexOf("-o") + 1

Implement the data processing pipeline using Dataframe APIs as shown below.

spark.read.json(args(fileArg))
.groupBy("source")
.agg(count("id") as "flights_count")
.select("source","flights_count")
.withColumnRenamed("source", "city")
.write.csv(args(outArg))

Run the program

You can run the program from IntelliJ using local executor by configuring the run options.

IntelliJ screenshot

Source code#

Below is the entire code that we just developed. To get the entire project, head over to Github

--

--