Run Dataflow Job based on EventArc GCS Trigger.
Goal:
Create an Eventarc trigger that monitors a GCS bucket for new file uploads and automatically initiates a Dataflow job defined in a template when a new file is detected.
POC Steps
Step 1 : Create a GCS bucket
If a new file is uploaded then the event is triggered .
Eg — my-event-bucket-poc
STORAGE_BUCKET="my-event-bucket-poc"
gcloud storage buckets create \
gs://$STORAGE_BUCKET
Step 2 : Enable APIs
- Make sure below APIS are enabled in the project
Step 3 : Create Workflow
You can use the console to create workflow and use myEventWorkflow.yaml and proceed to step 5 directly .
Change below as per your project, and then run in the cloud shell.
export PROJECT_ID=mydiyproject
export PROJECT_NUMBER=711473716890
export WORKFLOW_LOCATION=us-central1
export TRIGGER_LOCATION=us-central1
export MY_WORKFLOW=myEventWorkflow
gcloud config set \
workflows/location \
${WORKFLOW_LOCATION}
gcloud config set \
eventarc/location \
${TRIGGER_LOCATION}
export \
MY_SERVICE_ACCOUNT=my-service-account
gcloud iam service-accounts create \
${MY_SERVICE_ACCOUNT}
gcloud projects \
add-iam-policy-binding \
${PROJECT_ID} \
--member="serviceAccount:${MY_SERVICE_ACCOUNT}@${PROJECT_ID}.iam.gserviceaccount.com" \
--role=roles/eventarc.eventReceiver
gcloud projects \
add-iam-policy-binding \
${PROJECT_ID} \
--member="serviceAccount:${MY_SERVICE_ACCOUNT}@${PROJECT_ID}.iam.gserviceaccount.com" \
--role=roles/workflows.invoker
gcloud projects \
add-iam-policy-binding \
${PROJECT_ID} \
--member="serviceAccount:${PROJECT_NUMBER}-compute@developer.gserviceaccount.com" \
--role=roles/logging.logWriter
SERVICE_ACCOUNT="$(gcloud storage \
service-agent \
--project=${PROJECT_ID})"
gcloud projects \
add-iam-policy-binding \
${PROJECT_ID} \
--member="serviceAccount:${SERVICE_ACCOUNT}" \
--role=roles/pubsub.publisher
gcloud projects \
add-iam-policy-binding \
${PROJECT_ID} \
--member="serviceAccount:@gcp-sa-pubsub.iam.gserviceaccount.com">service-${PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com" \
--role='roles/iam.serviceAccountTokenCreator'
myEventWorkflow.yaml
This yaml file is a workflow yaml file which will have details related to the dataflow job. For Poc purposes I have used a word count template . You can create the template of your job to deploy the pipeline and update the Yaml file accordingly .
Below if the flow chart from console ->
# This workflow demonstrates how to use the Cloud Dataflow connector.
# The workflow creates a word count job using a Dataflow public job template
# and uses a Cloud Storage bucket as temporary storage for temp files.
# The bucket resource is deleted after the job completes.
# Expected successful output: "SUCCESS"
- init:
assign:
- project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
- location: "us-central1"
- zone: "us-central1-a"
- bucket_name: "mydiybucket-workflow"
- job_name: "mydiybucket-workflow-job-from-template-launch"
- input_file: "gs://dataflow-samples/shakespeare/kinglear.txt"
- output_storage_file_prefix: ${"gs://" + bucket_name + "/counts"}
- temp_location: ${"gs://" + bucket_name + "/counts/temp"}
- template_path: "gs://dataflow-templates-us-central1/latest/Word_Count"
- launch:
call: googleapis.dataflow.v1b3.projects.locations.templates.launch
args:
projectId: ${project_id}
location: ${location}
gcsPath: ${template_path}
body:
jobName: ${job_name}
parameters:
inputFile: ${input_file}
output: ${output_storage_file_prefix}
environment:
numWorkers: 1
maxWorkers: 2
zone: ${zone}
tempLocation: ${temp_location}
- the_end:
return: "SUCCESS"
Step 4 : create Eventarc Trigger and Deploy workflow
Here source is myEventWorkflow.yaml, which I created in the previous step.
gcloud workflows deploy \
${MY_WORKFLOW} \
--source=myEventWorkflow.yaml
gcloud eventarc triggers create \
storage-events-trigger \
--destination-workflow=${MY_WORKFLOW} \
--event-filters="type=google.cloud.storage.object.v1.finalized" \
--event-filters="bucket=my-event-bucket-poc" \ --service-account="${MY_SERVICE_ACCOUNT}@${PROJECT_ID}.iam.gserviceaccount.com"
Step 5 : Add an Event — A GCS file
Here i’ve created a sample file random.txt and it will upload to gcs bucket my-event-bucket-poc, and should trigger workflow.
echo "Hello World" > random.txt
gcloud storage cp random.txt \
gs://my-event-bucket-poc/random.txt
gcloud workflows executions list \
${MY_WORKFLOW} --limit=5
echo "Hello World" > random.txt
gcloud storage cp random.txt \
gs://my-event-bucket-poc/random.txt
Step 6 : Validations
- Go to the selected workflow -> executions and see if it is running / succeeded .
- Go to a dataflow job , it should be in running /succeeded .
Reference(s) :
Connector for Dataflow | Workflows | Google Cloud
Method: projects.locations.templates.launch | Cloud Dataflow