Dataproc lifecycle management orchestrated by Composer

Oscar Pulido
Google Cloud - Community
4 min readApr 19, 2023

Ephemeral Dataproc or EMR Hadoop clusters are infrastructure in the Cloud, however, we usually do not want to manage them as a part of the infrastructure provisioning CI/CD pipelines via a mix of tools like Terraform, Jenkins or GitHub Actions, but as a part of the data pipelines orchestration via Airflow or Composer.

Keeping the cluster lifecycle management as a part of the data pipelines orchestration allows the data pipelines to take advantage of the elasticity and scalability properties of the Cloud, as well as to use resources in a cost efficient way by sharing clusters between multiple jobs and by scaling already provisioned clusters instead of creating new ones. Overall jobs processing time reduction is another advantage of sharing and scaling the ephemeral clusters to run multiple jobs, as the cluster provisioning time is zero for the second and subsequent jobs running in a cluster.

Imagine you have multiple Spark jobs to be scheduled and run in Dataproc clusters. Internals of each Spark job are unique and related to a specific use case, but it is common that execution parameters are similar, changing only the values between them. Here it makes sense to dynamically generate DAGs based on a template.

I have put together a very simplistic code to illustrate those concepts:

It is a terraform template that will create a Composer environment and a folder structure in the code that will read Json parameter files to generate Airflow DAGs and deploy them in the Composer environment.

Using this project you can deploy multiple Airflow DAGs which means you will create configuration files for DAGs to be automatically generated during the deployment.

main.tf
...
dags/ (Autogenerated on Terraform Plan/Apply from /dag_config/ files)
├── ephemeral_cluster_job_1.py
├── ephemeral_cluster_job_2.py
jobs/
├── hello_world_spark.py
├── ... (Add your dataproc jobs here)
include/
└── dag_config
├── dag1_config.json
└── dag2_config.json
└── ... (Add your Composer/Airflow DAGs configuration here)
...

Each DAG will have a task step to run a Dataproc Job referenced in the parameters file, and that Job will be executed in a Dataproc Cluster.

{
"DagId": "ephemeral_cluster_job_1",
...
"SparkJob":"hello_world_spark.py"
}

The Dataproc Clusters can be reused for multiple Jobs/DAGs, and you can think of it as a queue. If you want two DAGs sharing a cluster, you only need to set the same cluster name parameter in both configuration files.

{
"DagId": "ephemeral_cluster_job_1",
...
"ClusterName":"ephemeral-cluster-test",
...
"SparkJob":"hello_world_spark.py"
}

The Dataproc cluster lifecycle management will be done by the automatically generated Airflow DAGs to reuse or create clusters accordingly. The cluster proposed configuration includes a scalability policy that allows it to scale out if multiple Jobs are running in a single cluster at a specific moment.

resource "google_dataproc_autoscaling_policy" "dataproc_autoscaling_policy_test" {
project = var.project_id
policy_id = var.dataproc_config.autoscaling_policy_id
location = var.region
worker_config {
max_instances = 5
}
basic_algorithm {
yarn_config {
graceful_decommission_timeout = "30s"
scale_up_factor = 0.5
scale_down_factor = 0.5
}
}
}

This approach aims to use resources efficiently meanwhile minimizing provision and execution time.

Prerequisites

  1. This blueprint will deploy all its resources into the project defined by the project_id variable. Please note, that we assume this project already exists.
  2. The user deploying the project (executing terraform plan/apply) should have admin permissions in the selected project, or permissions to create all the resources defined in the Terraform scripts.

Project Folder Structure

main.tf
...
dags/ (Autogenerated on Terraform Plan/Apply from /dag_config/ files)
├── ephemeral_cluster_job_1.py
├── ephemeral_cluster_job_2.py
jobs/
├── hello_world_spark.py
├── ... (Add your dataproc jobs here)
include/
├── dag_template.py
├── generate_dag_files.py
└── dag_config
├── dag1_config.json
└── dag2_config.json
└── ... (Add your Composer/Airflow DAGs configuration here)
...

Adding Jobs

Prepare Dataproc Jobs to be executed

  1. Clone this repository
  2. Locate your Dataproc jobs in the /jobs/ folder in your local environment

Prepare Composer DAGs to be deployed

3. Locate your DAG configuration files in the /include/dag_config/ folder in your local environment. DAG configuration files have the following variables:

{
"DagId": "ephemeral_cluster_job_1", --DAG name you will see in Airflow environment
"Schedule": "'@daily'", --DAG Schedule
"ClusterName":"ephemeral-cluster-test", --Dataproc Cluster to be Used/created for this DAG/Job to be executed in
"StartYear":"2022", --DAG start year
"StartMonth":"9", --DAG start month
"StartDay":"13", --DAG start day
"Catchup":"False", --DAG backfill catchup
"ClusterMachineType":"n1-standard-4", --Dataproc machine type to be used by master and worker cluster nodes
"ClusterIdleDeleteTtl":"300", --Time in seconds to delete unused Dataproc cluster
"SparkJob":"hello_world_spark.py" --Spark Job to be executed by DAG, should be placed in /jobs/ folder of this project. (if other type of Dataproc jobs modify dag_template.py)
}

4. (Optional) You can run python3 include/generate_dag_files.py in your local environment if you want to review generated DAGs before deploying(TF plan/apply) those.

Deployment

  1. set Google Cloud Platform credentials on local environment: https://cloud.google.com/source-repositories/docs/authentication
  2. You must supply the project_id variable as minimum in order to deploy the project. Default Terraform variables and example values in varibles.tf file.
  3. Run Terraform Plan/Apply
 $ cd terraform/
$ terraform init
$ terraform plan
$ terraform apply
##Optionally variables could be used
$ terraform apply -var 'project_id=<PROJECT_ID>' \
-var 'region=<REGION>'

Once you deploy terraform plan for the first time and Composer environment is running, you can terraform plan/apply after adding new DAG configuration files, to generate and deploy DAGs to the existing environment.

First time it is deployed, resource creation will take several minutes (up to 40) because of Composer Environment provisioning. You should expect successful completion along with a list of the created resources.

Running DAGs

DAGs will run per Schedule, StartDate, and Catchup configuration in DAG configuration file, or it can be triggered manually trough the Airflow web console after the deployment.

--

--