Run Dataflow Job based on EventArc GCS Trigger.

Ashish Jain
Google Cloud - Community
4 min readSep 16, 2024

Goal:

Create an Eventarc trigger that monitors a GCS bucket for new file uploads and automatically initiates a Dataflow job defined in a template when a new file is detected.

POC Steps

Step 1 : Create a GCS bucket

If a new file is uploaded then the event is triggered .

Eg — my-event-bucket-poc

STORAGE_BUCKET="my-event-bucket-poc"
gcloud storage buckets create \
gs://$STORAGE_BUCKET

Step 2 : Enable APIs

  • Make sure below APIS are enabled in the project

Step 3 : Create Workflow

You can use the console to create workflow and use myEventWorkflow.yaml and proceed to step 5 directly .

Change below as per your project, and then run in the cloud shell.

export PROJECT_ID=mydiyproject
export PROJECT_NUMBER=711473716890
export WORKFLOW_LOCATION=us-central1
export TRIGGER_LOCATION=us-central1
export MY_WORKFLOW=myEventWorkflow

gcloud config set \
workflows/location \
${WORKFLOW_LOCATION}

gcloud config set \
eventarc/location \
${TRIGGER_LOCATION}


export \
MY_SERVICE_ACCOUNT=my-service-account
gcloud iam service-accounts create \
${MY_SERVICE_ACCOUNT}

gcloud projects \
add-iam-policy-binding \
${PROJECT_ID} \
--member="serviceAccount:${MY_SERVICE_ACCOUNT}@${PROJECT_ID}.iam.gserviceaccount.com" \
--role=roles/eventarc.eventReceiver

gcloud projects \
add-iam-policy-binding \
${PROJECT_ID} \
--member="serviceAccount:${MY_SERVICE_ACCOUNT}@${PROJECT_ID}.iam.gserviceaccount.com" \
--role=roles/workflows.invoker


gcloud projects \
add-iam-policy-binding \
${PROJECT_ID} \
--member="serviceAccount:${PROJECT_NUMBER}-compute@developer.gserviceaccount.com" \
--role=roles/logging.logWriter



SERVICE_ACCOUNT="$(gcloud storage \
service-agent \
--project=${PROJECT_ID})"

gcloud projects \
add-iam-policy-binding \
${PROJECT_ID} \
--member="serviceAccount:${SERVICE_ACCOUNT}" \
--role=roles/pubsub.publisher

gcloud projects \
add-iam-policy-binding \
${PROJECT_ID} \
--member="serviceAccount:@gcp-sa-pubsub.iam.gserviceaccount.com">service-${PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com" \
--role='roles/iam.serviceAccountTokenCreator'

myEventWorkflow.yaml

This yaml file is a workflow yaml file which will have details related to the dataflow job. For Poc purposes I have used a word count template . You can create the template of your job to deploy the pipeline and update the Yaml file accordingly .

Below if the flow chart from console ->

# This workflow demonstrates how to use the Cloud Dataflow connector.
# The workflow creates a word count job using a Dataflow public job template
# and uses a Cloud Storage bucket as temporary storage for temp files.
# The bucket resource is deleted after the job completes.
# Expected successful output: "SUCCESS"

- init:
assign:
- project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
- location: "us-central1"
- zone: "us-central1-a"
- bucket_name: "mydiybucket-workflow"
- job_name: "mydiybucket-workflow-job-from-template-launch"
- input_file: "gs://dataflow-samples/shakespeare/kinglear.txt"
- output_storage_file_prefix: ${"gs://" + bucket_name + "/counts"}
- temp_location: ${"gs://" + bucket_name + "/counts/temp"}
- template_path: "gs://dataflow-templates-us-central1/latest/Word_Count"
- launch:
call: googleapis.dataflow.v1b3.projects.locations.templates.launch
args:
projectId: ${project_id}
location: ${location}
gcsPath: ${template_path}
body:
jobName: ${job_name}
parameters:
inputFile: ${input_file}
output: ${output_storage_file_prefix}
environment:
numWorkers: 1
maxWorkers: 2
zone: ${zone}
tempLocation: ${temp_location}
- the_end:
return: "SUCCESS"

Step 4 : create Eventarc Trigger and Deploy workflow

Here source is myEventWorkflow.yaml, which I created in the previous step.

gcloud workflows deploy \
${MY_WORKFLOW} \
--source=myEventWorkflow.yaml


gcloud eventarc triggers create \
storage-events-trigger \
--destination-workflow=${MY_WORKFLOW} \
--event-filters="type=google.cloud.storage.object.v1.finalized" \
--event-filters="bucket=my-event-bucket-poc" \ --service-account="${MY_SERVICE_ACCOUNT}@${PROJECT_ID}.iam.gserviceaccount.com"

Step 5 : Add an Event — A GCS file

Here i’ve created a sample file random.txt and it will upload to gcs bucket my-event-bucket-poc, and should trigger workflow.

echo "Hello World" > random.txt
gcloud storage cp random.txt \
gs://my-event-bucket-poc/random.txt


gcloud workflows executions list \
${MY_WORKFLOW} --limit=5


echo "Hello World" > random.txt
gcloud storage cp random.txt \
gs://my-event-bucket-poc/random.txt

Step 6 : Validations

  1. Go to the selected workflow -> executions and see if it is running / succeeded .
  1. Go to a dataflow job , it should be in running /succeeded .

Reference(s) :

Connector for Dataflow | Workflows | Google Cloud

Method: projects.locations.templates.launch | Cloud Dataflow

Dataflow API | Google Cloud

Design Dataflow pipeline workflows | Google Cloud

Creating classic Dataflow templates | Google Cloud

--

--

Google Cloud - Community
Google Cloud - Community

Published in Google Cloud - Community

A collection of technical articles and blogs published or curated by Google Cloud Developer Advocates. The views expressed are those of the authors and don't necessarily reflect those of Google.

Ashish Jain
Ashish Jain

No responses yet