Apache Spark AWS S3 Datasource
Hello. In this post, we will integrate Apache Spark to AWS S3. In this context, we will learn how to write a Spark dataframe to AWS S3 and how to read data from S3 with Spark. We will do this on our local machine. If you ask me what’s the point of writing so simple thing as a post, I’d say: “It’s a bit of a tedious job, I suffered, you don’t”. There are actually a few tricks to be aware of, then, it’s an easy thing when you do them right. Our example will be to read a simple CSV file from our local disk and write it to S3. Then we will read, what we have just written, from S3 again and print it in the notebook/console.
Environment Information
Spark: 3.0.0 (Hadoop 3.2)
Java: 1.8.0
Python 3.6.8
Operating System: CentOS7
IDE: Jupyter Notebook
The information I shared above is the environment I had at the time of writing the post. It is not mandatory to use the very same environment and versions to code along with. However, I recommend using Spark’s Hadoop 3.2 build, because the trick in this S3 job is to select the appropriate version of jar files/maven coordinates from the maven repo. Also, if you use MinIO, S3 compatible object storage, instead of S3, it is not fine with Hadoop 2.7 compilation or it doesn’t work well or it’s missing something, I can’t remember exactly.
Assumptions
Your AWS S3 bucket has been created, you have accessKeyId and secretAccessKey.
Coding
Open up a Jupyter notebook and put the following to the first paragraph to find findspark.
import findspark
# /opt/manual/spark: this is SPARK_HOME path
findspark.init(“/opt/manual/spark”)
Download standard PySpark libs
from pyspark.sql import SparkSession, functions as F
from pyspark import SparkConf, SparkContext
Libraries (Jars) for S3 Connection
Now here we will download the additional libraries (jar files) to be used in the S3 connection and put them among the other jar files of Spark. Of course, there are other ways to do this. But here I prefer this way.
When downloading files from the Maven repo, you should definitely pay attention to the spark-hadoop version (You decide this while downloading and installing spark and you choose among many compilations, my example is Spark 3.0.0. Hadoop 3.2). To prevent any mistake please pay attention to the following figures and explanations.
Searching hadoop-aws-3.2.0.jar in maven:
Choosing the right version:
Copy the jar link:
We will do the two things in here (Figure-3). First; copy hadoop-aws 3.2.0 link (first couple-red rectangle). Second; reach the compatible version aws-java-sdk-bundle link (third right bottom corner red rectangle) from the same page and click it.
Downloading and moving jar files:
There are other jar files in spark installation. We will put these two into the `SPARK_HOME/jars` directory.
wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.2.0/hadoop-aws-3.2.0.jarwget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.11.375/aws-java-sdk-bundle-1.11.375.jarmv aws-java-sdk-bundle-1.11.375.jar hadoop-aws-3.2.0.jar /opt/manual/spark/jars/
Spark Session
spark = SparkSession.builder.getOrCreate()
spark.verisionOutput
3.0.0
Configuration
Let’s first assign access keys to variables. You can also introduce it to environment variables if you want. However, neither send these keys to repos such as github, nor share them with anyone else, if in doubt, revoke the keys and generate new ones.
accessKeyId=’your_access_key’
secretAccessKey=’your_secret_key’
Now let’s define the configurations with a function and add them to the SparkContext:
sc._jsc.hadoopConfiguration().set(‘fs.s3a.access.key’, accessKeyId)sc._jsc.hadoopConfiguration().set(‘fs.s3a.secret.key’, secretAccessKey)sc._jsc.hadoopConfiguration().set(‘fs.s3a.path.style.access’, ‘true’)sc._jsc.hadoopConfiguration().set(‘fs.s3a.impl’, ‘org.apache.hadoop.fs.s3a.S3AFileSystem’)
sc._jsc.hadoopConfiguration().set(‘fs.s3a.endpoint’, ‘s3.amazonaws.com’)
Let’s read the dataset. You can access it here.
df = spark.read \
.option(“inferSchema”,True) \
.option(“header”, True) \
.csv(“file:///home/train/datasets/simple_data.csv”)
Let’s take a look at the dataset:
df.show(3)
+ — — — + — — -+ — -+ — — — — + — — — — + — — — — — -+
|sirano | isim |yas | meslek | sehir |aylik_gelir |
+ — — — + — — -+ — -+ — — — — + — — — — + — — — — — -+
| 1 |Cemal | 35 | Isci | Ankara | 3500 |
| 2 |Ceyda | 42 | Memur | Kayseri | 4200 |
| 3 |Timur | 30 |Müzisyen |Istanbul | 9000 |
+ — — — + — — -+ — -+ — — — — + — — — — + — — — — — -+
Writing Spark Dataframe to AWS S3
After downloading the libraries with the necessary and appropriate versions above and configuring Spark, the work is no different than writing to the local disk at this point.
df.write.format(‘csv’).option(‘header’,’true’) \
.save(‘s3a://<your_bucket_name_here>/<your_folder_here>’, mode=’overwrite’)
Let’s check the result on AWS.
Reading Data from AWS S3 with Spark
Now let’s read this data again with Spark.
df_s3 = spark.read.format(‘csv’).option(‘header’,’true’) \
.load(‘s3a://<your_bucket_name_here>/<your_folder_here>’)
Here we have come to the end of our post. Three apples fell from the sky; one falls on S3, another on Cassandra, the other on HDFS’s head. We didn’t say HDFS the other for nothing 🙂