How to Execute Dataform Workflows with Parameters from Cloud Run Function
In the last article, “How to trigger the Dataform workflow execution from Cloud Functions,”[1] we discussed how to trigger the previously configured Dataform workflow from the Cloud Functions (Cloud Run Function, for now). This article considers more options for executing the Dataform workflow using Python code.
This article assumes readers are familiar with Cloud Run Functions and basic Dataform concepts.
Let’s have a data pipeline that generates a JSON file and uploads it to GCS using a Cloud Run Function. The next step is to process the file further in Dataform. We should trigger the Dataform workflow execution from the Cloud Run Function and send the file path as a parameter.
In the previous article, we discussed the Dataform code cycle[2], which involves multiple steps to execute the workflow after creating and saving the action’s code to Git.
1. Create the release configuration based on the Git branch.
2. Create the release compilation based on the release configuration.
3. Create the workflow configuration based on the release compilation.
4. Run the workflow configuration execution by creating the workflow invocation.
The previous article focused on step 4, executing a workflow using the workflow configuration name where the workflow configuration runs the previously compiled action code by creating the workflow invocation.
Now, to update the action code before execution, we need to add a new variable value to be used in the action queries. This can be achieved by using compilation variables in the release configuration[3]. The Dataform API enables us to combine the four steps into two by merging steps 1 with 2 and steps 3 with 4.
Defining the parameter variable
First, open the dataform.json
file in the development workspace and add the new variable to be used as the parameter.
{
"defaultSchema": "dataform",
"assertionSchema": "df_assertions",
"warehouse": "bigquery",
"defaultDatabase": "my_project",
"defaultLocation": "us-east1",
"vars": {
"inbound_json_file_path": "gs://my_bucket/test_file_for_dev_env.json"
}
}
We can add any variable value in the development workspace. For example, we can specify the path for a test file that will only be used when we execute an action in the development workspace. Since we use the main configuration file to declare the variable, it’s essential to use a clear variable name (Oh, it’s a pity that the JSON format does not support comments).
Dataform action code
Next, we write the action code.
config {
type: "incremental",
schema: "my_dataset",
tags: ["load_file_from_gcs"],
protected: true
}
pre_operations {
IF LENGTH('${dataform.projectConfig.vars.inbound_json_file_path}') > 1 THEN
LOAD DATA INTO TEMP TABLE loaded_data_table
FROM FILES (
format = 'JSON',
uris = ["${dataform.projectConfig.vars.inbound_json_file_path}"]);
ELSE RETURN;
END IF;
}
SELECT CURRENT_TIMESTAMP as created_at,
"${dataform.projectConfig.vars.inbound_json_file_path}" as file_path,
TO_JSON_STRING(a, true) as json_data
FROM loaded_data_table a
In the config
block, we have the incremental table type to add every new file data. The action tag is load_file_from_gcs
. We will use it in the calling code. The protected
option is to protect us from losing data.
In the pre-operations
block, we use the IF-ELSE
construction to check if the inbound_json_file_path
value is valid. If it is valid, the LOAD
statement is executed. Otherwise, the action execution is stopped by the RETURN
operator. The LOAD
statement loads data from the JSON file with the defined variable path into a temporary table.
In the SELECT
statement, the loaded data from the temp table is inserted into the json_data
field, along with the addition of the creation timestamp and the file path as extra columns.
Function code
Let’s consider the Cloud Run Function code. We aim to call the Dataform workflow execution, sending the JSON file path as the parameter.
from google.cloud import dataform_v1beta1
df_client = dataform_v1beta1.DataformClient()
def create_compilation_result(repo_uri, git_branch, params):
# Initialize request argument(s)
compilation_result = dataform_v1beta1.CompilationResult()
compilation_result.git_commitish = git_branch
compilation_result.code_compilation_config.vars = params
# Add compilation overrides if needed
# compilation_result.code_compilation_config.schema_suffix = "prod"
request = dataform_v1beta1.CreateCompilationResultRequest(
parent=repo_uri,
compilation_result=compilation_result,
)
response = df_client.create_compilation_result(request=request)
return response.name
def create_workflow_invocation(repo_uri, compilation_name, tag_list):
workflow_invocation = dataform_v1beta1.types.WorkflowInvocation(
compilation_result=compilation_name
)
workflow_invocation.invocation_config.included_tags = tag_list
request = dataform_v1beta1.CreateWorkflowInvocationRequest(
parent=repo_uri,
workflow_invocation=workflow_invocation
)
response = df_client.create_workflow_invocation(request=request)
return 0
def main():
gcp_project = 'my_project'
location = 'us-east1'
repo_name = 'my_repo_name'
repo_uri = f'projects/{gcp_project}/locations/{location}/repositories/{repo_name}'
git_branch = 'my_branch'
params = {
"json_file_path": "gs://my_bucket/my_file.json"
}
tag_list = ['load_file_from_gcs']
compilation_name = create_compilation_result(repo_uri, git_branch, params)
create_workflow_invocation(repo_uri, compilation_name, tag_list)
return 0
In the main
function, we start by setting the values of the gcp_project
, location
, and repo_name
variables and then use them to compose the repo_uri
. After that, we set the git_branch
value and create the params
object, which includes the json_file_path
value. It's important to note that you should add any other compilation variables (e.g., environment) to the params
object if you use them.
Next, we call the create_compilation_result
function using the repo_uri
, git_branch
, and params
as function arguments. Let's take a closer look at the code for the create_compilation_result
function. It begins by initializing a compilation_result
object using the CompilationResult
class[4]. If we refer to the class description via the provided link, we can observe that in addition to the standard method of creating a compilation based on the release configuration, it allows us to directly utilize compilation parameters through the git_commitish
and code_compilation_config
class attributes. The git_commitish
is set to the value of git_branch
. The code_compilation_config.vars
is then filled with the params
. If you are using compilation overrides (e.g., schema_suffix
), you can also include them according to the CodeCompilationConfig
specification. A request is generated, linking the repo_uri
and the initialized compilation_result
. Once the request is sent, it returns the name of the created compilation result (response.name
).
In the next step, we call the create_workflow_invocation
function with the repo_uri
, compilation_name
, and tag_list
arguments. This function initializes a workflow_invocation
object from the WorkflowInvocation
class[5]. The compilation_result
attribute of the WorkflowInvocation
is set to the value of compilation_name
. The included_tags
field of the invocation_config
is then populated with tag_list
. A request object is created with the repo_uri
as the parent and the initialized workflow_invocation
object. The request is sent to make the workflow invocation.
After we run the Cloud Run Function, we can see the Dataform job execution in the Workflow Execution Logs tab.
Summarizing
In this article, we have discussed how to use Python code to dynamically trigger Dataform workflow executions with parameters. By utilizing the Dataform API, we can integrate with systems like Cloud Run Functions and pass parameters to be processed during workflow execution. As Dataform continues to evolve, expect new features and optimizations to further improve this workflow execution process.
Thanks for reading.
Related Links
- [1] Medium. Alex Feldman, How to trigger the Dataform workflow execution from Cloud Functions.
- [2] Google Cloud Docs, Introduction to code lifecycle in Dataform.
- [3] Google Cloud Docs. Dataform, View details of a release configuration
- [4] Google Cloud Docs. Dataform, Class CodeCompilationConfig
- [5] Google Cloud Docs. Dataform, Class WorkflowInvocation