Apache Hudi : EMR on EKS

Syeda Marium Faheem
Bazaar Engineering
Published in
6 min readMar 8, 2022

This article is in continuation of my article Big Data Platform Migration Form EMR To EMR on EKS — Success Story, where I had shared our successful migration to and cost saving stratrgy from EMR to EMR on EKS

Migration to “EMR on EKS” from EMR helped us improve scalability, reliability, and costing of our data platform. The next step was to facilitate the streaming jobs of Apache Hudi in EMR on EKS. We are at using Apache HUDI to maintain the ACID properties at lake level.

Apache Hudi is a rich platform to build streaming data lakes with incremental data pipelines on a self-managing database layer while being optimized for lake engines and regular batch processing.

Apache hudi
Image from AWS

This article assumes that you have experience with both AWS EKS and EMR

Setup and Configuration

We’ll use command-line utilities heavily. The following tools are required.

  • AWS CLI V2 — it is the official command-line interface that enables users to interact with AWS services.
  • eksctl — it is a CLI tool for creating and managing clusters on EKS.
  • kubectl — it is a command-line utility for communicating with the cluster API server

Set up Amazon EMR on EKS

As we can refer from Amazon EMR on the EKS development guide, Amazon EKS uses Kubernetes namespaces to separate resources between various Apps and users. A virtual cluster is a Kubernetes namespace that Amazon EMR is registered with. Amazon EMR uses virtual clusters to run jobs and host endpoints.

The following steps are taken in order to set up EMR on EKS.

EKS Cluster Setup

let's create an EKS cluster

eksctl create cluster \
--name ${CLUSTER_NAME}\
--region ${AWS_REGION}\
--with-oidc \
--ssh-access \
--ssh-public-key myKeyPair \
--instance-types=m5.4xlarge \
--managed

it will take around 25–30min to up.

view and validate resources

kubectl get nodes -o wide

it will show you available nodes

Setup To cluster access for Amazon EMR on EKS

In EKS, a specific namespace will be attached to virtual EMR. All EMR jobs will run on this namespace. That’s why it is a foremost important step is to allow Amazon EMR on EKS to access the namespace. The best way is to automate it by using eksctl and specifically the following actions are performed.

  • setting up RBAC authorization by creating a Kubernetes role and binding the role to a Kubernetes user
  • mapping the Kubernetes user to the EMR on EKS service-linked role
kubectl create namespace virtualemreksctl create iamidentitymapping \
–cluster ${CLUSTER_NAME} \
–namespace virtualemr \
–service-name “emr-containers”

now we need to the attached a role with the required permission to our EKS cluster

Create a job execution role

These permissions are made up to perform tasks on S3 and AWS Glue. We’ll also enable logging on S3 and CloudWatch so that the necessary permissions are added as well.

aws iam create-role \
–role-name ${EMR_ROLE_NAME} \
–assume-role-policy-document ‘{
“Version”: “2012-10-17”,
“Statement”: [
{
“Effect”: “Allow”,
“Principal”: {
“Service”: “elasticmapreduce.amazonaws.com”
},
“Action”: “sts:AssumeRole”
}
]
}’

aws iam put-role-policy \
–role-name ${EMR_ROLE_NAME} \
–policy-name ${EMR_ROLE_NAME}-policy \
–policy-document ‘{
“Version”: “2012-10-17”,
“Statement”: [
{
“Effect”: “Allow”,
“Action”: [
“s3:PutObject”,
“s3:GetObject”,
“s3:ListBucket”,
“s3:DeleteObject”
],
“Resource”: “*”
},
{
“Effect”: “Allow”,
“Action”: [
“glue:*”
],
“Resource”: “*”
},
{
“Effect”: “Allow”,
“Action”: [
“logs:PutLogEvents”,
“logs:CreateLogStream”,
“logs:DescribeLogGroups”,
“logs:DescribeLogStreams”
],
“Resource”: [
“arn:aws:logs:*:*:*”
]
}
]
}’

Update the trust policy of the job execution role

EMR service account is allowed to assume the job execution role by the OIDC federation. In order to enable it, we need to update the trust relationship of the role. We can update it as shown below. For more reference, we have AWS Guide

aws emr-containers update-role-trust-policy \
–cluster-name ${CLUSTER_NAME} \
–namespace virtualemr \
–role-name ${EMR_ROLE_NAME}

EMR Virtual Cluster on EKS

We will link the Amazon EKS cluster with Amazon EMR. We need to provide the EKS cluster name and namespace

aws emr-containers create-virtual-cluster \
–name ${CLUSTER_NAME} \
–container-provider ‘{
“id”: “‘${CLUSTER_NAME}'”,
“type”: “EKS”,
“info”: {
“eksInfo”: {
“namespace”: “virtualemr”
}
}
}’

Now Virtual Cluster has been created now it is time to submit our first job to EMR on the EKS cluster

aws emr-containers start-job-run \
--virtual-cluster-id ${VIRTUAL_CLUSTER_ID} \
--name=pi-2 \
--region ap-southeast-1 \
--execution-role-arn ${EMR_ROLE_ARN}\
--release-label=emr-6.4.0-latest \
--job-driver='{
"sparkSubmitJobDriver": {
"entryPoint":"local:///usr/lib/spark/examples/src/main/python/pi.py",
"sparkSubmitParameters":
"--conf spark.executor.instances=1
--conf spark.executor.memory=2G
--conf spark.executor.cores=1
--conf spark.driver.cores=1"
}
}'
aws emr-containers start-job-run \
--virtual-cluster-id ${VIRTUAL_CLUSTER_ID} \
--name ${JOB_NAME} \
--region ${AWS_REGION}\
--execution-role-arn ${EMR_ROLE_ARN}\
--release-label emr-6.4.0-latest \
--job-driver=`{
"sparkSubmitJobDriver": {
"entryPoint": "s3://${S3_BUCKET_NAME}/${file}.py","entryPointArguments":[ "–data_source", "s3://${S3_BUCKET_NAME}/data.csv", "output_uri", "s3://${S3_BUCKET_NAME}/output"],"sparkSubmitParameters": "–conf spark.executor.instances=2 \ –conf spark.executor.memory=2G \ –conf spark.executor.cores=1 \ –conf spark.driver.cores=1 \ –conf spark.driver.memory=2G" }
},
"configurationOverrides": { "monitoringConfiguration": { "cloudWatchMonitoringConfiguration": { "logGroupName": "${LOG_GROUP_NAME}", "logStreamNamePrefix": "example"},"s3MonitoringConfiguration": { "logUri": "s3://${S3_BUCKET_NAME}/logs/"}}}

Apache Hudi Configuration Jobs EMR On EKS

For HUDI Config we need to add a couple of more configurations. The point to note here is the HUDI configuration in the entry point.

  • we specified the Hudi utilities bundle. it is always the best approach to store jar files externally whether it’s s3 or any other cloud storage system
  • Table Type is COPY_ON_WRITE you can use READ_ON_MERGE it entirely depends on the use-case
  • In Spark Submit configuration, add some extra jars for delta sync avro_2.12–3.1.2-amzn-0.jar
aws emr-containers start-job-run \
--virtual-cluster-id ${VIRTUAL_CLUSTER_ID} \
--name ${JOB_NAME} \
--region ${AWS_REGION}\
--execution-role-arn ${EMR_ROLE_ARN}\
--release-label emr-6.4.0-latest \
--job-driver='{
"sparkSubmitJobDriver": {
"entryPoint":"s3://${S3_BUCKET_NAME}/config/hudi-utilities-bundle_2.12-0.10.0.jar","entryPointArguments": [
"–table-type","COPY_ON_WRITE",
"–source-ordering-field","source_ts_ms",
"–props","s3://${S3_BUCKET_NAME}/config/cdc_events_s3.properties",
"–source-class","org.apache.hudi.utilities.sources.JsonDFSSource",
"–target-base-path","s3://${S3_BUCKET_NAME}/hudi/cdc-events/",
"–target-table","cdc_events",
"–enable-hive-sync",
"–min-sync-interval-seconds","60",
"–continuous",
"–op","UPSERT"
],
"sparkSubmitParameters":
"–class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer\
–jars s3://${S3_BUCKET_NAME}/lib/spark-avro_2.12-3.1.2-amzn- 0.jar,s3://${S3_BUCKET_NAME}/config/hudi-utilities-bundle_2.12-0.10.0.jar \
–conf spark.driver.cores=1 \
–conf spark.driver.memory=2G \
–conf spark.executor.instances=2 \
–conf spark.executor.memory=2G \
–conf spark.executor.cores=1 \
–conf spark.sql.catalogImplementation=hive \
–conf spark.serializer=org.apache.spark.serializer.KryoSerializer"
}
},
--configuration-overrides '{
"applicationConfiguration":
[ {
"classification": "spark-defaults",
"properties": {
"spark.driver.memory":"8G" }
}
],
"monitoringConfiguration": {
"s3MonitoringConfiguration": {
"logUri": "'"${S3_BUCKET_NAME}"'/logs/"
}
}
}'

This Article, Primarily Focused was to exploring and running Apache HUDI Jobs on EMR on EKS. I hope my attempt to shed some light on Apache HUDI: EMR On EKS helps Data and Platform engineers to move their platforms to the most cutting edge technologies

As always, feel free to share your thoughts and feedback.

Syeda Marium Faheem.

Disclaimer:

Bazaar Technologies believes in sharing knowledge and freedom of expression, and it encourages it’s colleagues and friends to share knowledge, experiences and opinions in written form on it’s medium publication, in a hope that some people across the globe might find the content helpful. However the content shared in this post and other posts on this medium publication mostly describe and highlight the opinions of the authors, which might or might not be the actual and official perspective of Bazaar Technologies.

--

--