Amal Kumar
6 min readJul 4, 2022

Google Cloud Dataflow Dynamic (Batch & Streaming) Pipeline using Flex Templates (Python)

Dataflow Flex Templates allows you to package pipelines as Docker images and stage them in a Google Cloud project’s Container Registry or Artifact Registry.

After a pipeline has been staged, it can be run using the gcloud command line tool or the REST API. Another benefit is that users can self-serve by using the Google Cloud console to trigger a pipeline.

During execution, Flex Templates generate a dynamic execution graph based on the runtime parameters provided by the user

Lets’s see things with working example!!

Use Case

Dynamically Triggering Batch/Streaming Pipelines

  • Using Dataflow Flex template to switching from Batch to Streaming Pipeline dynamically at runtime
  • Execution Graph is created at the runtime basis the runtime argument supplied to execute the templated pipeline
  • If the source is BigQuery and sink is Cloud Storage the template triggers a batch pipeline
  • If the source is Pub/Sub and sink is BigQuery the template triggers a streaming pipeline

Sample Code

The code is not production ready, it is just to demonstrate the power of flex-template and to support this article.

import argparse
import logging
import datetime
from apache_beam import window
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions, StandardOptions


def filter_out_nones_bq(elem):
if elem['user_id'] is not None:
yield elem


def filter_out_nones_pubsub(element):
if element[2] is not None:
yield element


def add_load_timestamp(elem):
return (elem[0],elem[1],datetime.datetime.now())


def to_json(element_tuple):
json_str = {"user_id":element_tuple[0],
"total_no_of_tnx": element_tuple[1],
"load_datetime": element_tuple[2]
}
return json_str

# table scheme for BigQuery Table
table_schema = {
'fields': [{
'name': 'user_id', 'type': 'STRING', 'mode': 'REQUIRED'
}, {
'name': 'total_no_of_tnx', 'type': 'INTEGER', 'mode': 'NULLABLE'
}, {
'name': 'load_datetime', 'type': 'TIMESTAMP', 'mode': 'REQUIRED'
}
]
}


def run(argv=None):
# parsing the runtime arguments
parser = argparse.ArgumentParser()
parser.add_argument('--source',
dest='source',
required=True,
help='Pipeline Source. BigQuery | PubSub ')
parser.add_argument('--destination',
dest='destination',
required=True,
help='Pipeline Sink. GCS | BigQuery')
parser.add_argument('--output',
dest='output',
required=False,
help='GCS Bucket Name')
parser.add_argument('--pubsub_subscription',
dest='pubsub_subscription',
required=False,
help='Pub/Sub Subscription')

known_args, pipeline_args = parser.parse_known_args(argv)

# basic validation on source and sink inputs
if (known_args.source == 'BigQuery' and known_args.destination != 'GCS') or \
(known_args.source == 'PubSub' and known_args.destination != 'BigQuery'):
raise ValueError(f"Argument Mismatch -> "
f"source: {
known_args.source} & destination: {known_args.destination}")

pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True

# setting Streaming as True is Souce in Pub/Sub
if known_args.source == 'PubSub':
pipeline_options.view_as(StandardOptions).streaming = True

with
beam.Pipeline(options=pipeline_options) as pipeline:

# batch Pipeline
if known_args.source == 'BigQuery':
result_data = (
pipeline # reading data from BigQuery using Standard SQL into PCollection of dictionaries
| 'Query Table Std-SQL' >> beam.io.ReadFromBigQuery(
project=<GCP_PROJECT_ID>,
query='SELECT * FROM ' \
'`GCP_PROJECT_ID.GCP_BQ_DATASET.sample_transactions`',
use_standard_sql=True)
| 'filter for last few days' >> \
beam.Filter(
lambda elem:
(datetime.datetime.now() - datetime.timedelta(days=365)) \
<= elem['created_at'].replace(tzinfo=None) <= datetime.datetime.now())


| 'Filter Out None UserIds'
>> beam.Filter(filter_out_nones_bq)

| 'User & Tnx Key Value pair'
>> beam.Map(lambda elem: (elem['user_id'], elem['transaction_id']))
| 'Count No of Transactions per user' >> beam.combiners.Count.PerKey()


| 'Write To GCS' >> beam.io.WriteToText(known_args.output)

)
# Streaming Pipeline
elif known_args.source == 'PubSub':
pubsub_data = (
pipeline
| 'Read from pub sub'
>> beam.io.ReadFromPubSub(subscription=known_args.pubsub_subscription)

| 'Decode'
>> beam.Map(lambda x: x.decode('utf-8'))

| 'Remove extra chars'
>> beam.Map(lambda data: (data.rstrip().lstrip()))

| 'Split Row'
>> beam.Map(lambda row: row.split(','))

| 'Filter Out None UserIds'
>> beam.Filter(filter_out_nones_pubsub)

| 'User & Tnx Key Value pair'
>> beam.Map(lambda elem: (elem[2], elem[0]))

| 'Window'
>> beam.WindowInto(window.FixedWindows(20))

| 'Count No of Transactions per user'
>> beam.combiners.Count.PerKey()

| 'Add Load Timestamp'
>> beam.Map(add_load_timestamp)

| 'Convert to json'
>> beam.Map(to_json)

| beam.io.WriteToBigQuery(
table='output_bq_table', dataset='GCP_BQ_DATASET', project='GCP_PROJECT_ID',
schema=table_schema,create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
)
)

else:
raise ValueError(f"Invalid source: {known_args.source}")



if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()

Creating the Flex Template

Step 1 : Dockerfile

The Pipeline dependencies are packaged in a Docker image and pushed to the Project’s Container Registry.

FROM gcr.io/dataflow-templates-base/python3-template-launcher-base

ARG WORKDIR=/dataflow/template
RUN mkdir -p ${WORKDIR}
WORKDIR ${WORKDIR}

COPY requirements.txt .
COPY beam_batch_stream_pipeline.py .

# Do not include `apache-beam` in requirements.txt
ENV FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE="${WORKDIR}/requirements.txt"
ENV FLEX_TEMPLATE_PYTHON_PY_FILE
="${WORKDIR}/beam_batch_stream_pipeline.py"

# Install apache-beam and other dependencies to launch the pipeline
RUN pip install --upgrade pip
RUN pip install apache-beam[gcp]
RUN pip install -U -r ./requirements.txt

Step 2 : Build Image and Upload to Registry

Using Cloud Build for building the Docker Image, Tagging it and Pushing it to the Container registry. We can also user docker commands.

export TEMPLATE_IMAGE="<IMAGE_NAME>"
gcloud builds submit --tag $TEMPLATE_IMAGE .

Step 3 : Metadata File for custom parameters validation

Metadata to ensure that custom parameters are validated when running the template.

The “regexes” element in the JSON is used to store regular expressions in string form that are used for parameter validation

Place the metadata file in the same cloud storage folder as the template

{
"name": "Batch and Streaming beam Python flex template",
"description": "Batch and Streaming beam Pipeline using flex template. BigQuery -> GCS Or PubSub -> BigQuery",
"parameters": [
{
"name": "source",
"label": "Pipeline Source. BigQuery | PubSub",
"helpText": "Pipeline Source. BigQuery Or PubSub",
"isOptional": false,
"regexes": [
"BigQuery|PubSub"
]
},
{
"name": "destination",
"label": "Pipeline Sink. GCS | BigQuery",
"helpText": "Pipeline Sink. GCS Or BigQuery",
"isOptional": false,
"regexes": [
"GCS|BigQuery"
]
},
{
"name": "output",
"label": "GCS Bucket Name, If Source if BigQuery",
"helpText": "GCS Bucket Name, If Source if BigQuery",
"isOptional": true,
"regexes": []
},
{
"name": "pubsub_subscription",
"label": "Pub/Sub Subscription, If Source if PubSub",
"helpText": "Pub/Sub Subscription, If Source if PubSub",
"isOptional": true,
"regexes": []
}
]
}

Step 4 : Building the Template

gcloud dataflow flex-template build command to build the Flex Template

note: Template Metadata json file must be present in the GCS Bucket (TEMPLATE_PATH). Refer previous step

export TEMPLATE_PATH="gs://<TEMPLATE_FILEPATH_WITH_NAME>.json"
gcloud dataflow flex-template build $TEMPLATE_PATH \
--image "$TEMPLATE_IMAGE" \
--sdk-language "PYTHON" \
--metadata-file "metadata.json"

Executing the Pipeline

Flex Template Dataflow Pipeline can be triggered in multiple ways. For this article we will only explore gcloud and Cloud Console.

Although out of scope for this article, you can also use an Airflow Operator to trigger Flex Template Batch Dataflow Pipelines, namely “DataflowStartFlexTemplateOperator

Triggering a Batch Pipeline

gcloud

gcloud dataflow flex-template run "batch-`date +%Y%m%d-%H%M%S`" \
--project $GCP_PROJECT_ID \
--region $GCP_REGION \
--template-file-gcs-location $TEMPLATE_PATH" \
--parameters temp_location="gs://<GCS_TEMP_LOCATION>" \
--parameters staging_location="gs://<GCS_STAGING_LOCATION>" \
--parameters source="BigQuery" \
--parameters destination="GCS" \
--parameters output="gs://<GCS_OUTPUT_LOCATION>"

cloud console : Create Job from the Template

As we defined parameters in metadata.json earlier, they are visible on the console, as well as being validated when the job is submitted

Job Execution Graph

The job execution graph is generated dynamically and a Batch Dataflow pipeline is triggered based on the input provided, i.e. BigQuery -> Tnfs -> GCS

Triggering a Streaming Pipeline

gcloud

gcloud dataflow flex-template run "stream-beam-`date +%Y%m%d-%H%M%S`" \
--project $GCP_PROJECT_ID \
--region $GCP_REGION \
--template-file-gcs-location $TEMPLATE_PATH \
--parameters temp_location="gs://<GCS_TEMP_LOCATION>" \
--parameters staging_location="gs://<GCS_STAGING_LOCATION>" \
--parameters source="PubSub" \
--parameters destination="BigQuery" \
--parameters pubsub_subscription="projects/$GCP_PROJECT_ID/subscriptions/test_sub"

cloud console : Create Job from the Template

Job Execution Graph

Streaming dataflow pipelines are triggered based on the input provided and the job execution graph is generated again dynamically, i.e. PubSub -> Tnfs -> BigQuery

I hope the article was useful to you and you learned something new. 😁

Thank you for reading. Happy Building!!