Adding GCS support to EMR

Sagar Gangurde
Data Engineering
Published in
1 min readMar 12, 2023

Ever wondered how can we read from or write to Google Cloud Storage(GCS) from AWS EMR!

To let EMR spark application talk to GCS, we need to add cloud storage connector in application jar.

The Cloud Storage connector is an open source Java library that lets you run Apache Spark jobs directly on data in GCS. Spark application needs cloud storage connector to talk to GCS.

We can bundle the cloud storage connector in spark application jar by downloading the connector jar and adding it in ‘lib’ folder at application root level.

We can get the google cloud storage connector from: https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-latest.jar

The spark application would also need GCP service account key to talk to GCP services. So we need to make sure that GCP key is available on the cluster.

Lets assume we already have GCP key uploaded to S3 here: s3://some-bucket/gcp_key.json. We can put the GCP key on our EMR cluster using a simple bootstrap action script that copies the key from an s3 location to a local path say ‘/mnt/gcp_key.json’ on the cluster.

Sample `bootstrap.sh`

aws s3 cp s3://some-bucket/gcp_key.json /mnt/gcp_key.json

Here is the spark word count application which reads the input from GCS and writes the output back to GCS.

SparkConf sparkConf = new SparkConf();

sparkConf.set("spark.hadoop.google.cloud.auth.service.account.enable", "true");
sparkConf.set("spark.hadoop.google.cloud.auth.service.account.json.keyfile", "/mnt/gcp_key.json");

SparkSession spark = SparkSession.builder()
.appName("WordCount")
.config(sparkConf)
.getOrCreate();

spark.sparkContext().hadoopConfiguration().set("fs.AbstractFileSystem.gs.impl","com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS");
spark.sparkContext().hadoopConfiguration().set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem");

JavaRDD<String> textFile = sc.textFile("gs://some-bucket/input.txt");
JavaPairRDD<String, Integer> counts = textFile
.flatMap(s -> Arrays.asList(s.split(" ")).iterator())
.mapToPair(word -> new Tuple2<>(word, 1))
.reduceByKey((a, b) -> a + b);
counts.saveAsTextFile("gs://some-bucket/output/");

spark.close()

Hope this helps!

--

--