Parallel executions with Google Workflows

guillaume blaquiere
Google Cloud - Community
5 min readFeb 22, 2021

Cloud components are useful and powerful. However, they are all disconnected from the others and when you want to deploy a full pipeline, you need to glue them. You can achieve this with PubSub and Cloud Functions.
However, it quickly becomes a spaghetti design with a lot of topics and functions. Having a centralized place to see, manage and configure your pipeline workflow could be great!

Google Workflows

Google Workflows takes place here. It has been announced since summer 2020 at Cloud Next on Air and is now generally available (GA) since January 2021. It’s a fully managed solution with a pay-as-you-use model. It’s a lighter (and very less expensive) solution to Cloud Composer, based on Apache Airflow. And far enough for a lot of use cases and very promising for the future.

Guillaume Laforge (Google Cloud Developer Advocate) has already shared his journey on Workflows. And with Mete Atamel (also Google Cloud Developer Advocate), they have released a great blog post.
So the topic has already been well covered and documented.

However, there is a missing piece: the capacity to parallelize steps in a workflow.

Experimental parallelization

I was involved in the Alpha program, and one of the top requests from the testers was the parallelization of calls. It has been pushed on the top priorities of the development team. However, today it’s not yet fully implemented and we only have an experimental feature that we can use: the execution map.

In this component, you need to reference

  • A workflow ID that you want to call. I will call it “callable workflow”
  • An array of arguments. Each entry in the array is an invocation of the “callable workflow”, in parallel
  • Optionally, you can specify the location and the project ID if the “callable workflow” isn’t in the same region and/or the project

As you can imagine, by design, there is 2 “limitations”

  1. You must create another callable workflow to use the execution map. You can’t directly call the API that you want.
  2. You must use the same callable workflow, only the parameters are different for each parallel call. You can’t parallelize different callable workflow, it’s always the same with custom parameters.

In reality, it’s no longer a limitation, see the Advanced workflows section to see the workarounds.

Anyway, let’s have a first try on it.

First parallel execution

To test the parallelization of the executions, we need to perform a call that takes more or less time. In the real world use case, you can imagine several BigQuery queries in parallel, each having a different execution time.
Then we need to build a callable workflow to use this app
And finally to create a workflow with the execution map experimental feature

Deploy the Go sleepy app

To wait more or less time, I created a simple Go application that does sleep according to the provided query parameter a w (in seconds).

You can find it in the Github repository. Clone it and run this command

gcloud beta run deploy --source=. --region=us-central1 \
--platform=managed --allow-unauthenticated sleepy-app

Get the Cloud Run service URL provided at the end of the deployment.

Create the callable workflow

This callable workflow is very simple. It calls the Cloud Run service with the parameter provided in argument (name wait here) and prints the result.

main:
params: [args]
steps:
- callSleepyApp:
call: http.get
args:
url: <SLEEPY-APP URL>
query:
w: ${args.wait}
result: result
- returnOutput:
return: ${result}

Replace <SLEEPY-APP URL> with the Cloud Run service URL

And deploy the workflow

gcloud workflows deploy --source=workflow/run-long-process.yaml \
--location=us-central1 run-long-process

In the GitHub repository, the workflow files are in the workflow directory. Adapt the command according to your project structure

Execute workflows in parallel

Finally, the execution map. As described, you need the callable workflow ID (here run-long-process) and to provide the correct argument.

main:
steps:
- parallel-executor:
call: experimental.executions.map
args:
workflow_id: run-long-process
arguments: [{"wait":5},{"wait":10},{"wait":15}]
result: result
- returnOutput:
return: ${result}

And deploy the workflow

gcloud workflows deploy --source=workflow/parallel-executor.yaml \
--location=us-central1 parallel-executor

Now you can test it, through the UI or by CLI

gcloud workflows execute parallel-executor

At the end, you can see that you get an array of results of each of the calls in the order that you defined them in parameter. No matter which one has finished first, the argument order is respected in the result

Advanced Workflows

It’s, in reality, quite simple to use, but not perfect. As mentioned before, you can have some limitations for advanced use cases and I propose workarounds.

Call different URL in parallel

One use case is the capacity to call different URLs in parallel. For this, one of the solution is to wrap the API call in a “callable workflow” with appropriate parameters like this

main:
params: [args]
steps:
- prepareQuery:
switch:
- condition: ${"query" in args}
assign:
- query: ${args.query}
- condition: true
assign:
- query: null
- callCustomUrl:
call: http.get
args:
url: ${args.url}
query: ${query}
result: result
- returnOutput:
return: ${result}

You can note the prepareQuery step which allows the callers to not pass some parameters (here the query parameter).

This workflow is an example that you can adapt to your use case. The call value can be different, a body can be passed to the callCustomUrl step,…

Then, you have the execution map that can be similar to this, with or without the query parameter

main:
steps:
- parallel-executor:
call: experimental.executions.map
args:
workflow_id: custom-api-call
arguments: [{"url":"<SLEEPY-APP URL>","query":{"w":5}},{"url":"https://www.google.com"}]
result: result
- logStep:
call: sys.log
args:
text: ${result}
severity: INFO
- returnOutput:
return: ${result}

Call different callable workflows in parallel

In another use case, you want to call different “callable workflows” with different arguments. Here again, the idea is to wrap the call to a specific “callable workflow” in a “callable workflow”.

Ok, it sounds like Russian dolls but it’s a workaround of an experimental feature, that use another experimental option: execution.run

main:
params: [args]
steps:
- callCustomWorkflow:
call: experimental.executions.run
args:
workflow_id: ${args.workflow}
argument: ${args.argument}
result: result
- returnOutput:
return: ${result}

Then, you can call this wrapper from an execution map, combined this with the previous existing workflows.

main:
steps:
- parallel-executor:
call: experimental.executions.map
args:
workflow_id: custom-workflow
arguments: [{"workflow":"run-long-process","argument":{"wait":5}},{"workflow":"custom-api-call","argument":{"url":"<SLEEPY-APP URL>","query":{"w":5}}}]
result: result
- returnOutput:
return: ${result}

You can find the instructions to deploy and test this the README.ms file located in the GitHub repository.

Only the beginning

As said in the introduction, the product is very new but already in GA and very promising.

Some solutions aren’t perfectly included and built-in but you can achieve a lot of use cases without too much limitations and constraints.

However, on these specific experimental features, keep it mind that are experimental and can be broken or removed in the future! But for a better solution, I’m sure!

--

--

guillaume blaquiere
Google Cloud - Community

GDE cloud platform, Group Data Architect @Carrefour, speaker, writer and polyglot developer, Google Cloud platform 3x certified, serverless addict and Go fan.