Run Dataflow Job based on EventArc GCS Trigger.

Ashish Jain
Google Cloud - Community
4 min readSep 16, 2024

Goal:

Create an Eventarc trigger that monitors a GCS bucket for new file uploads and automatically initiates a Dataflow job defined in a template when a new file is detected.

POC Steps

Step 1 : Create a GCS bucket

If a new file is uploaded then the event is triggered .

Eg — my-event-bucket-poc

STORAGE_BUCKET="my-event-bucket-poc"
gcloud storage buckets create \
gs://$STORAGE_BUCKET

Step 2 : Enable APIs

  • Make sure below APIS are enabled in the project

Step 3 : Create Workflow

You can use the console to create workflow and use myEventWorkflow.yaml and proceed to step 5 directly .

Change below as per your project, and then run in the cloud shell.

export PROJECT_ID=mydiyproject
export PROJECT_NUMBER=711473716890
export WORKFLOW_LOCATION=us-central1
export TRIGGER_LOCATION=us-central1
export MY_WORKFLOW=myEventWorkflow

gcloud config set \
workflows/location \
${WORKFLOW_LOCATION}

gcloud config set \
eventarc/location \
${TRIGGER_LOCATION}


export \
MY_SERVICE_ACCOUNT=my-service-account
gcloud iam service-accounts create \
${MY_SERVICE_ACCOUNT}

gcloud projects \
add-iam-policy-binding \
${PROJECT_ID} \
--member="serviceAccount:${MY_SERVICE_ACCOUNT}@${PROJECT_ID}.iam.gserviceaccount.com" \
--role=roles/eventarc.eventReceiver

gcloud projects \
add-iam-policy-binding \
${PROJECT_ID} \
--member="serviceAccount:${MY_SERVICE_ACCOUNT}@${PROJECT_ID}.iam.gserviceaccount.com" \
--role=roles/workflows.invoker


gcloud projects \
add-iam-policy-binding \
${PROJECT_ID} \
--member="serviceAccount:${PROJECT_NUMBER}-compute@developer.gserviceaccount.com" \
--role=roles/logging.logWriter



SERVICE_ACCOUNT="$(gcloud storage \
service-agent \
--project=${PROJECT_ID})"

gcloud projects \
add-iam-policy-binding \
${PROJECT_ID} \
--member="serviceAccount:${SERVICE_ACCOUNT}" \
--role=roles/pubsub.publisher

gcloud projects \
add-iam-policy-binding \
${PROJECT_ID} \
--member="serviceAccount:@gcp-sa-pubsub.iam.gserviceaccount.com">service-${PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com" \
--role='roles/iam.serviceAccountTokenCreator'

myEventWorkflow.yaml

This yaml file is a workflow yaml file which will have details related to the dataflow job. For Poc purposes I have used a word count template . You can create the template of your job to deploy the pipeline and update the Yaml file accordingly .

Below if the flow chart from console ->

# This workflow demonstrates how to use the Cloud Dataflow connector.
# The workflow creates a word count job using a Dataflow public job template
# and uses a Cloud Storage bucket as temporary storage for temp files.
# The bucket resource is deleted after the job completes.
# Expected successful output: "SUCCESS"

- init:
assign:
- project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
- location: "us-central1"
- zone: "us-central1-a"
- bucket_name: "mydiybucket-workflow"
- job_name: "mydiybucket-workflow-job-from-template-launch"
- input_file: "gs://dataflow-samples/shakespeare/kinglear.txt"
- output_storage_file_prefix: ${"gs://" + bucket_name + "/counts"}
- temp_location: ${"gs://" + bucket_name + "/counts/temp"}
- template_path: "gs://dataflow-templates-us-central1/latest/Word_Count"
- launch:
call: googleapis.dataflow.v1b3.projects.locations.templates.launch
args:
projectId: ${project_id}
location: ${location}
gcsPath: ${template_path}
body:
jobName: ${job_name}
parameters:
inputFile: ${input_file}
output: ${output_storage_file_prefix}
environment:
numWorkers: 1
maxWorkers: 2
zone: ${zone}
tempLocation: ${temp_location}
- the_end:
return: "SUCCESS"

Step 4 : create Eventarc Trigger and Deploy workflow

Here source is myEventWorkflow.yaml, which I created in the previous step.

gcloud workflows deploy \
${MY_WORKFLOW} \
--source=myEventWorkflow.yaml


gcloud eventarc triggers create \
storage-events-trigger \
--destination-workflow=${MY_WORKFLOW} \
--event-filters="type=google.cloud.storage.object.v1.finalized" \
--event-filters="bucket=my-event-bucket-poc" \ --service-account="${MY_SERVICE_ACCOUNT}@${PROJECT_ID}.iam.gserviceaccount.com"

Step 5 : Add an Event — A GCS file

Here i’ve created a sample file random.txt and it will upload to gcs bucket my-event-bucket-poc, and should trigger workflow.

echo "Hello World" > random.txt
gcloud storage cp random.txt \
gs://my-event-bucket-poc/random.txt


gcloud workflows executions list \
${MY_WORKFLOW} --limit=5


echo "Hello World" > random.txt
gcloud storage cp random.txt \
gs://my-event-bucket-poc/random.txt

Step 6 : Validations

  1. Go to the selected workflow -> executions and see if it is running / succeeded .
  1. Go to a dataflow job , it should be in running /succeeded .

Reference(s) :

Connector for Dataflow | Workflows | Google Cloud

Method: projects.locations.templates.launch | Cloud Dataflow

Dataflow API | Google Cloud

Design Dataflow pipeline workflows | Google Cloud

Creating classic Dataflow templates | Google Cloud

--

--