Leveraging Google Workflow to orchestrate Dataflow in parallel and trigger via Pub-Sub

Tanmoy Kanungo
6 min readJul 21, 2023

--

Author: Tanmoy Kanungo. GCP Architect

LinkedIn : https://www.linkedin.com/in/tanmoykanungo/

GCP workflow is a serverless orchestration tool offered by Google. Cloud composer is an established tool in this field but as the companies are getting more cost concerns and showing more interest towards serverless services Workflow can be a good option to leverage

Intention of this article is to review one use case and show how to orchestrate dataflow jobs in parallel and set dependencies.

Use Case:

Create 2 dataflow jobs runs in parallel and set dependencies to check execution status.

Dataflow 1 reads data from GCS bucket and write to BigQuery(BQ)

Dataflow 2 reads data from MySQL and write to BigQuery(BQ)

Both the dataflows will write in the same the BQ table.

Write a custom code to wait the DF jobs to complete.

Trigger Workflow based on PubSub Event

All the run time parameter’s will be passed during run time in the PubSub message

Parameters details — Input_file : GCS Bucket , Output_table : BQ table (Project:Dataset.tableName)

9. Create parent workflow to call different child workflows

Orchestrating parent-child workflow

Refer the google Link https://cloud.google.com/workflows/docs/tutorials/execute-workflows-from-workflow

Pre-Requisites:

Dataflow flex template for DF1 & DF2

Reference Link : https://cloud.google.com/dataflow/docs/guides/templates/using-flex-templates

2. Enable API and give required permission ( Check Google documents https://cloud.google.com/workflows/docs/tutorials/run/cloud-run)

3. PubSub topic to trigger Workflow

4. MySql server hosted in GCP ( Cloud SQL ) , however this steps can be ignored

Workflow Orchestration:

Main Workflow Orchestration

Now we will go through steps of workflow configuration

- callDataflowparallelPipeline
parallel:
branches:
- firstPipeline:
steps:
- runFirstPipeline:
call: googleapis.workflowexecutions.v1.projects.locations.workflows.executions.run
args:
workflow_id: wf-job-gcs-to-bq
argument:
project: ${project}
region: ${region}
jobname: "df-gcs-to-bigqury"
template: "gs://<<GCS Template Bucket>>/gcs-to-bigquery/gcs-to-bigqury.json"
input_file: ${inputFile} #"gs://<<Gcs Bucket>>/input/us-500.csv"
output_table: ${outputTable} #"Project:Datase.mysql_customer"
result: firstJobId
- waitFirstDone:
call: googleapis.workflowexecutions.v1.projects.locations.workflows.executions.run
args:
workflow_id: wf-df-job-status
argument:
project: ${project}
region: ${region}
jobId: ${firstJobId}
status: "JOB_STATE_DONE"
- secondPipeline:
steps:
- runSecondPipeline:
call: googleapis.workflowexecutions.v1.projects.locations.workflows.executions.run
args:
workflow_id: wf-job-mysql-to-bq
argument:
project: ${project}
region: ${region}
jobname: "df-mysql-to-bigqury"
template: "gs://<<GCS Template bucket>>/mysql-to-bigqury.json"
output_table: ${outputTable}
result: secondJobId
- waitSecondDone:
call: googleapis.workflowexecutions.v1.projects.locations.workflows.executions.run
args:
workflow_id: wf-df-job-status
argument:
project: ${project}
region: ${region}
jobId: ${secondJobId}
status: "JOB_STATE_DONE":

In this workflow I’m creating a step — callDataflowparallelPipeline. This step will set 2 DF jobs which will run in parallel those steps are defined as — firstPipeline: and — secondPipeline:

As both the pipelines are identical we will discuss on firstpipeline.

- firstPipeline
steps:
- runFirstPipeline:
call: googleapis.workflowexecutions.v1.projects.locations.workflows.executions.run
args:
workflow_id: wf-job-gcs-to-bq
argument:
project: ${project}
region: ${region}
jobname: "df-gcs-to-bigqury"
template: "gs://<<GCS Template Location>>/gcs-to-bigquery/gcs-to-bigqury.json"
input_file: ${inputFile}
output_table: ${outputTable}
result: firstJobId
- waitFirstDone:
call: googleapis.workflowexecutions.v1.projects.locations.workflows.executions.run
args:
workflow_id: wf-df-job-status
argument:
project: ${project}
region: ${region}
jobId: ${firstJobId}
status: "JOB_STATE_DONE":

In the firstpipeline we are executing 2 steps runFirstPipeline and waitFirstDone second step is dependent on the first step. The first step will start the dataflow job and the second step will take the dataflow job ID and will wait for it to finish it and finally return the job run status.

There are 2 child workflows which are being called from the main workflow those workflows are wf-job-gcs-to-bq and wf-df-job-status

For wf-job-gcs-to-bq we will create another yaml file named wf-job-gcs-to-bq.yaml to create the child workflow

main:
params: [args] #[project, region, template,input_file,output_table]
steps:
- launch:
call: http.post
args:
url: ${"https://dataflow.googleapis.com/v1b3/projects/"+args.project+"/locations/"+args.region+"/flexTemplates:launch"}
auth:
type: OAuth2
body:
launchParameter:
jobName: ${args.jobname}
parameters:
input_file: ${args.input_file}
output_table: ${args.output_table}
environment:
numWorkers: 1
maxWorkers: 8
containerSpecGcsPath: ${args.template}
result: dataflowResponse
next: jobCreated
- jobCreated:
return: ${dataflowResponse.body.job.id}

In this workflow we are taking parameters from main workflow those parameters are 1. Project 2. region 3. template ( Dataflow template location) 4. input file location ( GCS Gucket defined in main workflow ) 5. output table ( BQ table )

This will start the dataflow job

Workflow wf-df-job-status is another child workflow we have created

main:
params: [args] #[project, region, jobId, status]
steps:
- init:
assign:
- currentStatus: ""
- failureStatuses: ["JOB_STATE_FAILED", "JOB_STATE_CANCELLED", "JOB_STATE_UPDATED", "JOB_STATE_DRAINED"]
- check_condition:
switch:
- condition: ${currentStatus in failureStatuses}
next: exit_fail
- condition: ${currentStatus != args.status}
next: iterate
next: exit_success
- iterate:
steps:
- sleep30s:
call: sys.sleep
args:
seconds: 30
- getJob:
call: http.get
args:
url: ${"https://dataflow.googleapis.com/v1b3/projects/"+args.project+"/locations/"+args.region+"/jobs/"+args.jobId}
auth:
type: OAuth2
result: getJobResponse
- getStatus:
assign:
- currentStatus: ${getJobResponse.body.currentState}
- log:
call: sys.log
args:
text: ${"Current job status="+currentStatus}
severity: "INFO"
next: check_condition
- exit_success:
return: ${currentStatus}
- exit_fail:
raise: ${"Job in unexpected terminal status "+currentStatus}

In this workflow we are defined the Job Status for successful run and taking this as a parameter and checking the current status in every 30 second interval of the Dataflow job defined in the prior steps. Once the dataflow job is finished it is retuning the result.

In this manner we have implemented 2 DF in parallel.

As wf-df-job-status taking the same input parameters we have defined only 1 child workflow and calling it different parameters.

But as Dataflow jobs are different and taking different parameters we have created 2 workflow however that can be optimized and clubbed together as a single workflow

Implementation steps

First we need to deploy the child workflows in this use case there are 3 workflows which are as follows

  1. wf-job-gcs-to-bq — DF1 workflow to read from GCS and write to BQ
  2. wf-job-mysql-to-bq — DF2 workflow to read from mySql and write to BQ
  3. wf-df-job-status — Check the DF job status and wait until it’s not finished

To deploy these we can use gcloud command line

gcloud workflow deploy wf-job-gcs-to-bq --source=wf-job-gcs-to-bq.yaml
gcloud workflow deploy wf-job-mysql-to-bq --source=wf-job-mysql-to-bq.yaml
gcloud workflow deploy wf-df-job-status --source=wf-wf-df-job-status.yaml

It will take the default region but one can pass the region parameter as well.

And finally we will deploy the main workflow

gcloud workflow deploy main-workflow --source=wf-main.yaml

Triggering the workflow via pubsub :

To trigger the dataflow via pubsub we have to set-up a eventarc please follow the link for details

In our code we need to catch this event and have to decode the same.

in wf-main.yaml we have to add these lines

main:
params: [event]
steps:
- init:
assign:
- project: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
- region: "us-central1"
- topic: "toic info"

- decode_pubsub_message:
assign:
- base64: ${base64.decode(event.data.message.data)}
- message: ${json.decode(base64)}
- inputFile: ${message.input_file}
- outputTable: ${message.output_table}

In this case we are reading the input message and decoding it and taking the parameters need for the next steps

Here is the entire wf-main.yaml file

main:
params: [event]
steps:
- init:
assign:
- project: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
- region: "us-central1"
- topic: "PubSub Topic"

- decode_pubsub_message:
assign:
- base64: ${base64.decode(event.data.message.data)}
- message: ${json.decode(base64)}
- inputFile: ${message.input_file}
- outputTable: ${message.output_table}

- callDataflowparallelPipeline:
parallel:
branches:
- firstPipeline:
steps:
- runFirstPipeline:
call: googleapis.workflowexecutions.v1.projects.locations.workflows.executions.run
args:
workflow_id: wf-job-gcs-to-bq
argument:
project: ${project}
region: ${region}
jobname: "df-gcs-to-bigqury"
template: "gs://<<GCS template location>>/gcs-to-bigquery/gcs-to-bigqury.json"
input_file: ${inputFile}
output_table: ${outputTable}
result: firstJobId
- waitFirstDone:
call: googleapis.workflowexecutions.v1.projects.locations.workflows.executions.run
args:
workflow_id: wf-df-job-status
argument:
project: ${project}
region: ${region}
jobId: ${firstJobId}
status: "JOB_STATE_DONE"
- secondPipeline:
steps:
- runSecondPipeline:
call: googleapis.workflowexecutions.v1.projects.locations.workflows.executions.run
args:
workflow_id: wf-job-mysql-to-bq
argument:
project: ${project}
region: ${region}
jobname: "df-mysql-to-bigqury"
template: "gs://<<GCS template location>>/mysql-to-bigqury.json"
output_table: ${outputTable}
result: secondJobId
- waitSecondDone:
call: googleapis.workflowexecutions.v1.projects.locations.workflows.executions.run
args:
workflow_id: wf-df-job-status
argument:
project: ${project}
region: ${region}
jobId: ${secondJobId}
status: "JOB_STATE_DONE"

Monitoring :

We can monitor the job from workflow dashboard

Credit and acknowlegement

--

--