Federated Query using Spark

Manoj Gosavi
Globant
Published in
6 min readJul 5, 2023

In today’s era, enterprises always prefer multiple cloud providers to reduce unplanned downtime or outage and to avoid vendor locking. Large enterprises with different business units may prefer other cloud platforms like AWS and Azure based on their architecture and business needs, performance, and feature capabilities. We often encounter scenarios where we must analyze the datasets stored in AWS S3 and ADLS Gen2.

There are two ways we can analyze these datasets,

  1. Copying one of the datasets in another environment where we will compute the results.
  2. Without copying the datasets and using Spark federated query feature.

In this blog, we will use Apache Spark’s query federation feature to combine and analyze the datasets stored in AWS S3 and ADLS Gen2. We will connect to ADLS Gen2 from AWS EMR and run a spark job to join the datasets stored in the AWS S3 bucket and ADLS Gen2; results will be stored back in another S3 bucket.

Setup & Configuration in AWS & Azure

First, we will understand the setup requirements and configuration steps involved in enabling federated data integration between AWS and Azure.

An EMR Cluster with a custom Hadoop 3.3.3, Hive 3.1.3, Hue 4.10.0, and Spark 3.3.1 application bundle is used. The cluster configuration consists of one master, core, and task node, each of EC2 Instance type c4.2xlarge. Set up the S3 bucket, which cluster can be used to access the data and store results.

Lauch Custom EMR Cluster

Once the cluster is up and running, ssh into the primary node of the cluster using the Amazon EC2 key pair created at the launch of the cluster. Run the below command in the terminal.

ssh -i <ssh key> <edge node url>
ssh -i ~/pem/qf-poc-cluster.pem hadoop@ec2-3-239-240-15.compute-1.amazonaws.com
EMR CLI

For Azure Setup, Create an Azure Storage account with a subscription type. Create the container and load the datasets. Ensure the access level is set to public access for the storage container to establish the connection between ADLS Gen2 and AWS EMR. This can be regulated using the AWS VPC and Azure VNet configuration, which is beyond the blog's scope.

Azure ADLS Gen2 Storage
Storage container

Setup Access Keys which will be used to access the ADLS Gen2 location from EMR Cluster using the Spark code.

Storage Keys

The ABFSS (Azure Blob File System Storage) connector is a component that allows you to access and interact with data stored in Azure Blob Storage using Apache Hadoop-compatible APIs. Follow the below steps to access the ADLS location from EMR.

  1. Install the ABFSS jars for Hadoop. We need to download the jars and copy them into the /usr/lib/hadoop/lib/location to be accessible to Hadoop.
sudo wget -P /usr/lib/hadoop/lib/ https://repo1.maven.org/maven2/com/microsoft/azure/azure-data-lake-store-sdk/2.3.10/azure-data-lake-store-sdk-2.3.10.jar
sudo wget -P /usr/lib/hadoop/lib/ https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-azure-datalake/3.3.1/hadoop-azure-datalake-3.3.1.jar
sudo wget -P /usr/lib/hadoop/lib/ https://repo1.maven.org/maven2/com/microsoft/azure/azure-storage/3.1.0/azure-storage-3.1.0.jar
sudo wget -P /usr/lib/hadoop/lib/ https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-azure/3.3.1/hadoop-azure-3.3.1.jar

2. Adding Hadoop classpath in /etc/hadoop/conf/hadoop-env.sh file. Open the Hadoop-env.sh file and add the below entries.

export HADOOP_CLASSPATH="$HADOOP_CLASSPATH:/usr/lib/azure-data-lake-store-sdk-2.3.10.jar"
export HADOOP_CLASSPATH="$HADOOP_CLASSPATH:/usr/lib/hadoop/lib/hadoop-azure-datalake-3.3.1.jar"
export HADOOP_CLASSPATH="$HADOOP_CLASSPATH:/usr/lib/hadoop/lib/azure-storage-3.1.0.jar"
export HADOOP_CLASSPATH="$HADOOP_CLASSPATH:/usr/lib/hadoop/lib/hadoop-azure-3.3.1.jar"

3. Azure Storage Account Key Generation. The Shared Key for the Azure Storage Account can be obtained from Access Key under Security + Networking in the pane of the storage account container.

Access Keys for Storage

4. Adding configuration in /etc/hadoop/conf/core-site.xml file.

<property>
<name>fs.azure.account.auth.type</name>
<value>SharedKey</value>
</property>
<property>
<name>fs.azure.account.key.adobepocstorage.dfs.core.windows.net</name>
<value>9y73utBXLwX/iASFeKaEHXOMQrwfre02nAF5n/HS+mTeDC2a8bt9yNqFQQ4+</value>
</property>

5. Start and Stop dependent services

sudo systemctl stop hadoop-yarn-resourcemanager
sudo systemctl start hadoop-yarn-resourcemanager
sudo systemctl stop hadoop-yarn-nodemanager
sudo systemctl start hadoop-yarn-nodemanager

6. Validate if the ADLS location is accessible from HDFS commands

hdfs dfs -ls abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/
ADLS Gen 2 Storage Data

7. Adding jars for Spark

sudo wget -P /usr/lib/spark/jars/ https://repo1.maven.org/maven2/com/microsoft/azure/azure-data-lake-store-sdk/2.3.10/azure-data-lake-store-sdk-2.3.10.jar
sudo wget -P /usr/lib/spark/jars/ https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-azure-datalake/3.3.1/hadoop-azure-datalake-3.3.1.jar
sudo wget -P /usr/lib/spark/jars/ https://repo1.maven.org/maven2/com/microsoft/azure/azure-storage/3.1.0/azure-storage-3.1.0.jar
sudo wget -P /usr/lib/spark/jars/ https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-azure/3.3.1/hadoop-azure-3.3.1.jar

Once the above configurations are done, we will create the external table in Hive pointing to ADLS and AWS S3 location that contains the datasets.

For this, I have selected the datasets from Kaggle: https://www.kaggle.com/datasets/andrezaza/clapper-massive-rotten-tomatoes-movies-and-reviews. Here we store the movie datasets in ADLS and movie reviews in AWS S3.

CREATE EXTERNAL TABLE `movies` (
`id` STRING,
`title` STRING,
`audienceScore` STRING,
`tomatoMeter` STRING,
`rating` STRING,
`releaseDateTheaters` STRING,
`releaseDateStreaming` STRING,
`runtimeMinutes` STRING,
`genre` STRING,
`director` STRING,
`boxOffice` STRING
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE
LOCATION 'abfss://poc-data@adobepocstorage.dfs.core.windows.net/movies/'
TBLPROPERTIES (
"skip.header.line.count"="1"
);


CREATE EXTERNAL TABLE `moviereviews` (
`id` STRING,
`reviewId` STRING,
`creationDate` STRING,
`criticName` STRING,
`isTopCritic` STRING,
`originalScore` STRING,
`reviewState` STRING,
`publicationName` STRING,
`reviewText` STRING,
`scoreSentiment` STRING,
`reviewUrl` STRING
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE
LOCATION 's3://query-federation-poc/data/moviereviews/'
TBLPROPERTIES (
"skip.header.line.count"="1"
);

Run the below commands to check if you can access both tables.

select * from movies limit 5

If all the configurations are correctly set up, you should be able to see the data as shown below in Hive or Pyspark.

Hive Query
PySpark Query

In addition to Select queries, the Join and Insert Overwrite operations we performed on the datasets with results stored in AWS and Azure for different sizes of datasets.

We benchmark the results for query joining the two datasets stored in AWS & Azure and store the results once in Azure & once in AWS. Below are the benchmarking results for different sizes of data.

Benchmarking Results
Benchmarking Trends

Conclusion

In conclusion, the adoption of Apache Spark for cross-cloud data processing empowers businesses to enrich their data pipelines with enhanced context and processing capabilities. Our benchmarking tests validate the efficiency and scalability of Spark’s Select, Join, and Insert Overwrite operations across datasets of different sizes. Armed with these insights, businesses can make informed decisions to improve their data processing workflows and leverage the full potential of their cross-cloud datasets.

--

--