How to Execute Dataform Workflows with Parameters from Cloud Run Function

Alex Feldman
Google Cloud - Community
5 min readSep 25, 2024

In the last article, “How to trigger the Dataform workflow execution from Cloud Functions,”[1] we discussed how to trigger the previously configured Dataform workflow from the Cloud Functions (Cloud Run Function, for now). This article considers more options for executing the Dataform workflow using Python code.

Photo by Chris Barbalis on Unsplash

This article assumes readers are familiar with Cloud Run Functions and basic Dataform concepts.

Let’s have a data pipeline that generates a JSON file and uploads it to GCS using a Cloud Run Function. The next step is to process the file further in Dataform. We should trigger the Dataform workflow execution from the Cloud Run Function and send the file path as a parameter.

In the previous article, we discussed the Dataform code cycle[2], which involves multiple steps to execute the workflow after creating and saving the action’s code to Git.

1. Create the release configuration based on the Git branch.
2. Create the release compilation based on the release configuration.
3. Create the workflow configuration based on the release compilation.
4. Run the workflow configuration execution by creating the workflow invocation.

The previous article focused on step 4, executing a workflow using the workflow configuration name where the workflow configuration runs the previously compiled action code by creating the workflow invocation.

Now, to update the action code before execution, we need to add a new variable value to be used in the action queries. This can be achieved by using compilation variables in the release configuration[3]. The Dataform API enables us to combine the four steps into two by merging steps 1 with 2 and steps 3 with 4.

Defining the parameter variable

First, open the dataform.json file in the development workspace and add the new variable to be used as the parameter.

{
"defaultSchema": "dataform",
"assertionSchema": "df_assertions",
"warehouse": "bigquery",
"defaultDatabase": "my_project",
"defaultLocation": "us-east1",
"vars": {
"inbound_json_file_path": "gs://my_bucket/test_file_for_dev_env.json"
}
}

We can add any variable value in the development workspace. For example, we can specify the path for a test file that will only be used when we execute an action in the development workspace. Since we use the main configuration file to declare the variable, it’s essential to use a clear variable name (Oh, it’s a pity that the JSON format does not support comments).

Dataform action code

Next, we write the action code.

config {
type: "incremental",
schema: "my_dataset",
tags: ["load_file_from_gcs"],
protected: true
}

pre_operations {

IF LENGTH('${dataform.projectConfig.vars.inbound_json_file_path}') > 1 THEN

LOAD DATA INTO TEMP TABLE loaded_data_table
FROM FILES (
format = 'JSON',
uris = ["${dataform.projectConfig.vars.inbound_json_file_path}"]);

ELSE RETURN;

END IF;

}

SELECT CURRENT_TIMESTAMP as created_at,
"${dataform.projectConfig.vars.inbound_json_file_path}" as file_path,
TO_JSON_STRING(a, true) as json_data
FROM loaded_data_table a

In the config block, we have the incremental table type to add every new file data. The action tag is load_file_from_gcs. We will use it in the calling code. The protected option is to protect us from losing data.

In the pre-operations block, we use the IF-ELSE construction to check if the inbound_json_file_path value is valid. If it is valid, the LOAD statement is executed. Otherwise, the action execution is stopped by the RETURN operator. The LOAD statement loads data from the JSON file with the defined variable path into a temporary table.

In the SELECT statement, the loaded data from the temp table is inserted into the json_data field, along with the addition of the creation timestamp and the file path as extra columns.

Function code

Let’s consider the Cloud Run Function code. We aim to call the Dataform workflow execution, sending the JSON file path as the parameter.

from google.cloud import dataform_v1beta1

df_client = dataform_v1beta1.DataformClient()


def create_compilation_result(repo_uri, git_branch, params):

# Initialize request argument(s)
compilation_result = dataform_v1beta1.CompilationResult()
compilation_result.git_commitish = git_branch
compilation_result.code_compilation_config.vars = params

# Add compilation overrides if needed
# compilation_result.code_compilation_config.schema_suffix = "prod"

request = dataform_v1beta1.CreateCompilationResultRequest(
parent=repo_uri,
compilation_result=compilation_result,
)

response = df_client.create_compilation_result(request=request)

return response.name


def create_workflow_invocation(repo_uri, compilation_name, tag_list):

workflow_invocation = dataform_v1beta1.types.WorkflowInvocation(
compilation_result=compilation_name
)
workflow_invocation.invocation_config.included_tags = tag_list

request = dataform_v1beta1.CreateWorkflowInvocationRequest(
parent=repo_uri,
workflow_invocation=workflow_invocation
)

response = df_client.create_workflow_invocation(request=request)

return 0

def main():
gcp_project = 'my_project'
location = 'us-east1'
repo_name = 'my_repo_name'

repo_uri = f'projects/{gcp_project}/locations/{location}/repositories/{repo_name}'

git_branch = 'my_branch'

params = {
"json_file_path": "gs://my_bucket/my_file.json"
}

tag_list = ['load_file_from_gcs']

compilation_name = create_compilation_result(repo_uri, git_branch, params)

create_workflow_invocation(repo_uri, compilation_name, tag_list)

return 0

In the main function, we start by setting the values of the gcp_project, location, and repo_name variables and then use them to compose the repo_uri. After that, we set the git_branch value and create the params object, which includes the json_file_path value. It's important to note that you should add any other compilation variables (e.g., environment) to the params object if you use them.

Next, we call the create_compilation_result function using the repo_uri, git_branch, and params as function arguments. Let's take a closer look at the code for the create_compilation_result function. It begins by initializing a compilation_result object using the CompilationResult class[4]. If we refer to the class description via the provided link, we can observe that in addition to the standard method of creating a compilation based on the release configuration, it allows us to directly utilize compilation parameters through the git_commitish and code_compilation_config class attributes. The git_commitish is set to the value of git_branch. The code_compilation_config.vars is then filled with the params. If you are using compilation overrides (e.g., schema_suffix), you can also include them according to the CodeCompilationConfig specification. A request is generated, linking the repo_uri and the initialized compilation_result. Once the request is sent, it returns the name of the created compilation result (response.name).

In the next step, we call the create_workflow_invocation function with the repo_uri, compilation_name, and tag_list arguments. This function initializes a workflow_invocation object from the WorkflowInvocation class[5]. The compilation_result attribute of the WorkflowInvocation is set to the value of compilation_name. The included_tags field of the invocation_config is then populated with tag_list. A request object is created with the repo_uri as the parent and the initialized workflow_invocation object. The request is sent to make the workflow invocation.

After we run the Cloud Run Function, we can see the Dataform job execution in the Workflow Execution Logs tab.

Summarizing

In this article, we have discussed how to use Python code to dynamically trigger Dataform workflow executions with parameters. By utilizing the Dataform API, we can integrate with systems like Cloud Run Functions and pass parameters to be processed during workflow execution. As Dataform continues to evolve, expect new features and optimizations to further improve this workflow execution process.

Thanks for reading.

Related Links

--

--