How to Deploy Your Apache Beam Pipeline in Google Cloud Dataflow
Recently the Platform Team here at Posh has been building out an analytics dashboard to help our business users visualize product usage. Posh’s main product is customer service chatbots, so our analytics dashboard allows our business users to analyze how well our bots are serving their customers. As part of this analytics dashboard, we needed to build out a data pipeline to process and store user events in real-time.
Our data pipeline was built using Apache Beam and runs on Google Cloud Dataflow. In case you’re unfamiliar with these tools, let me explain! From the Beam homepage: “Apache Beam is an open source, unified model for defining both batch and streaming data-parallel processing pipelines”. To elaborate, a Beam project consists of a series of data processing functions, known as ‘transforms’. Transforms make up a data pipeline for either batch or streaming data — hence the name “Beam”, a hybrid of batch and stream. Pipelines are built using these transforms through one of their open-source SDKs; in our case we used the Python SDK. Once a pipeline is built, it is then executed by a supported distributed processing back-end; Dataflow was used as our pipeline’s backend.
At a high-level, our pipeline takes a user event as input, processes the event, and stores it in our backend database. Customer events generally consist of bot interactions like starting a chat, asking a question, etc. When a business user of ours wants to analyze their bot’s performance, they can then make a query via our analytics dashboard which will return the relevant data from our backend database. Note that our chatbots continuously output data, meaning the input to our data pipeline is unbounded, and our pipeline is a streaming pipeline rather than a batch pipeline.
Now that we got the basics out of the way, let’s discuss deployments. Once we wrote a pipeline that was working locally, it was time to deploy our pipeline and have it run in the cloud.
Dataflow templates are a way to package and stage your pipeline in Google Cloud. Once staged, a pipeline can be run by using the Google Cloud console, the
gcloud command line tool, or REST API calls. Templated deployments have many benefits over non-templated Dataflow deployments. For example, being able to run your pipeline without a development environment and its associated dependencies. This is useful for automated deployments, such as recurring batch jobs or jobs triggered from CI/CD pipelines.
Classic vs Flex Templates
There are two different types of templates. With Classic Templates, when a pipeline is run, a JSON template file is created by the Apache Beam SDK and uploaded to Google Cloud Storage. This template file defines an execution graph — a set of data processing steps and the paths between them — based on the transforms in the pipeline code (more on that in a bit). In contrast, with Flex Templates, developers package their pipeline into a Docker image, and then use the
gcloud command line tool to build and save the template spec file in their project’s Container Registry. The execution graph is only created when the template is executed.
One key difference between the two is that because Flex Templates don’t automatically create the execution graph, you can make small variations to the execution graph when running the template. For example, if you wanted to execute multiple jobs with varying sink file formats, you could use the same base template file, but modify the execution graph at run-time. Note that although only Flex Templates allow for execution graph changes, both Classic and Flex Templates allow for run-time parameters to be customized at execution.
Ultimately we used Classic Templates for our pipeline. We find them easy to work with, and haven’t needed the additional functionality offered by Flex Templates.
Creating A Template
Templates are created by running a command like this one:
python -m MODULE_NAME \
— runner DataflowRunner \
— region us-east1 \
— project PROJECT_ID \
— staging_location gs://BUCKET_NAME/staging \
— temp_location gs://BUCKET_NAME/temp \
— template_location gs://BUCKET_NAME/templates/TEMPLATE_NAME
This command will run your pipeline module, create a template file, and save it to Google Cloud Storage at the location specified by the
staging_location flag points to a Cloud Storage path for Dataflow to stage code packages needed by workers executing the job. The
temp_location flag is a Cloud Storage path for Dataflow to stage temporary job files created during the execution of the pipeline. Note that the staging location contains files needed to execute the template, so they shouldn’t be deleted even after the template is created!
As mentioned earlier, Beam pipelines can be executed by many different back-ends. When running our pipeline locally, the Direct Runner was used to execute the pipeline on our local machine. To create a Dataflow template, the runner used must be the Dataflow Runner.
Specifying Pipeline Options
If you’d like your pipeline to read in a set of parameters, you can use the Apache Beam SDK class to define Pipeline Options. In the template creation command shown above, the various flags passed in will be read in and parsed by the
PipelineOptions class, and can then be used to dictate how the pipeline processes data. To specify custom options, you can use the
add_argument() method, which behaves the same way as Python’s standard argparse method for parsing command-line options. In the snippet below you’ll see an example of how to specify your own custom options —
MyOptions— which can then be used inside your pipeline, in this case specifying where the pipeline should read input from.
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
def _add_argparse_args(cls, parser):
parser.add_argument('--output')# Create the options object
options = PipelineOptions()# Specifying that the options expected come from our custom options class
my_options = options.view_as(MyOptions)# Create the pipeline - simple read/write transforms
with beam.Pipeline(options=options) as pipeline:
lines = pipeline | 'Read files' >> beam.io.ReadFromText(my_options.input)
lines | 'Write files' >> beam.io.WriteToText(my_options.output)
When this module is called via the command-line, flags specified in the command will be read in as
PipelineOptions. So for example, in the template creation command shown earlier, you could specify an
input option, and when running the module in the command line, you could give this option a value:
—-input = /path_to_input.
For each custom option you can also specify default values and help text. The default value will be used if the option is not assigned a value, and the help text will appear when a user passes
— help as a command-line argument.
help='Input for the pipeline',
As alluded to earlier, these options define pipeline parameters that dictate how the pipeline behaves. Pipelines can accept regular parameters and run-time parameters. While regular parameters can be specified when creating a template, run-time parameters are only available during pipeline execution. Run-time parameters are not available during pipeline construction, so you can’t use the parameter value to change your pipeline’s workflow graph. These parameters should be used when you do not know the value ahead of time, and only want to assign one when executing the pipeline.
If you want to utilize run-time parameters you can use the
ValueProvider class when defining pipeline options. To do this replace
add_value_provider_argument(), as shown below:
help='Input for the pipeline',
It’s important to note that if you don’t use the ValueProvider for your runtime arguments, then Dataflow will not use those arguments when you try to provide them at runtime.
Ok great, we have a command to create a template, and a way to specify pipeline parameters. But what if we have lots of parameters, and we want to validate that they’re given proper values? That’s where metadata files come in! Metadata files allow you to specify requirements for parameters when deploying via the Dataflow console. You can use a boolean flag to specify if a parameter is optional, and regular expressions to validate the parameter’s value.
Below is an example specification for one of our pipeline parameters. It uses a regex to specify that the
session_expiry flag must be an integer value, and because the
isOptional flag isn’t specified and defaults to false, it states that this parameter is required.
"name": "session_expiry", "helpText": "Number of minutes to wait before considering a session ended", "label": "Session expiry minutes", "regexes": [ "^[0-9]+$"
Metadata files should be JSON-formatted, and named
<your-template-name>_metadata, with no file extension. Metadata files should be stored in the same Google Cloud Storage bucket folder as template files.
In our template creation script, right after we run the command to create the template, we run a script which uploads the metadata file corresponding to the pipeline being run.
Managing Python Dependencies
Lastly, let’s discuss dependencies. When running a pipeline locally, the packages needed for your pipeline to run are readily available as they’re installed on your local machine. However when running in the cloud, you have to make sure these packages are available on the remote Dataflow VM. To do this we created a
setup.py file, which the Beam pipeline uses to package up the dependencies when building the Dataflow template. We pass the path to this
setup.py file as the
setup_file flag when running the command to create a template file. Check out this guide for step-by-step instructions.
There are many ways to execute a template file. Typically we use the Google Cloud console to select a template file from our Google Cloud Storage bucket, pictured below.
Pipeline parameters can be entered through the console, based on the criteria specified in the metadata file created earlier. However if you prefer the utility of starting jobs from the command line, you can use the
gcloud command line tool instead. To deploy our pipeline we run the
gcloud dataflow jobs run <job_name> command in our local development environment — similar to the example command shown below. If you don’t want to spend time manually entering parameters in the Dataflow console, running this command locally can make your life easier, since you can save the command along with all your parameters. Especially when you’re debugging a pipeline that’s running well locally but failing when running remotely in Dataflow, deploying with the
gcloud tool can be a huge time-saver.
gcloud dataflow jobs run JOB_NAME \--gcs-location gs://YOUR_BUCKET_NAME/templates/MyTemplate \--parameters topic=projects/project-identifier/topics/resource-name,table=my_project:my_dataset.my_table_name
CI/CD Pipeline Integration
In our CI/CD pipeline we have a manual step that pushes updated template files to Google Cloud Storage. If we make a change to our pipeline code and are ready to deploy a new version of the pipeline, we manually trigger this step. When this step is triggered our CI/CD pipeline runs the template creation script which creates and stages a template file. This CI/CD pipeline establishes a more controlled process for updating our template file and makes sure that the environment used to build our template file stays consistent.
Once a pipeline is up and running, we use Google Cloud console monitoring tools to check its status. You can see a visualization of the execution graph showing the flow of data between jobs in your pipeline. For each job you can see how much data came in and out, the job’s wall time, and other interesting metrics. You can also use these metrics to create alerts that can help detect and notify you about potential pipeline failures. We’ve used a small fraction of the monitoring tools available, so I’ll let you check out this in-depth guide if you’re interested in learning more.
We hope you enjoyed this guide on deploying Beam pipelines in Dataflow! If there’s something we missed, or you have tips on how to better use Dataflow, please let us know.
If you’re interested in working at an exciting startup building the future of conversational AI, apply to join our team!