Understanding the Dataflow Quickstart for Python Tutorial

A Step-by-Step Deconstruction of the WordCount Pipeline

John Grinalds
Google Cloud - Community
11 min readNov 15, 2022

--

Introduction

The Dataflow Quickstart for Python tutorial is an excellent way to get up and running in Apache Beam and Dataflow. However, if you have no prior Apache Beam experience, you can execute the entire tutorial and… still have no idea what you just did. What is a “pipeline”? What’s with this “|” pipe operator? What is a “ParDo”?

So I took the time to break down the entire Dataflow Quickstart for Python tutorial into the basic steps and first principles, complete with a line-by-line explanation of the code required. By understanding the Python Apache Beam SDK fundamentals and constituent parts, you’ll be able to confidently design and build your own pipelines and not just have to rely on templates.

If you’d like to follow along, read on.

Initialization

Start by completing the steps from “Before you begin” through “Run the pipeline locally” from the Dataflow Quickstart for Python tutorial.

Now, download the wordcount.py source code from Apache Beam Github and run this command to ensure you can run the source code locally:

python wordcount.py --output outputs

This is where we will begin.

Context

This will be an iterative exploration of the wordcount.py code, starting from first principles and explaining the Apache Beam SDK along the way. As we go, be sure to run python wordcount.py --output outputs at each step along the way, observing the print statement results and output file; this will help solidify the concepts as they build on one another. Additionally, everyone is at a different level of competence and understanding; feel free to both skip steps as needed as well as dig deeper with your own research where necessary. Let's go!

Step 1: Hello World

We’ll start at ground zero: Hello World. Create and run this code, confirming your sanity.

Step 2: Create the Apache Beam Pipeline

Now, import Beam and create the pipeline inside a context manager:

The beam.Pipeline() constructor returns an Apache pipeline object. From the Apache Beam docs:

A Pipeline encapsulates your entire data processing task, from start to finish. This includes reading input data, transforming that data, and writing output data. All Beam driver programs must create a Pipeline.

Additionally, the with statement creates a runtime context in which the pipeline runs. This is helpful for handling errors and for cleanup in case something fails. Click here for more insight on with statements.

Note that if you run this code, nothing changes; you still get the same “Hello World” output.

Step 3: Handle the Input Arguments

Note: if you are already familiar with command-line arguments for Python, feel free to skip this step.

Now, import argparse, set argv=None as an argument in the run() function, instantiate the parser, and add arguments to listen for.

The parse_known_args() method parses the inputs given to the file via the argv arguments. "It returns [a tuple] containing the populated namespace and the list of remaining argument strings." (Python Docs) In other words, the "namespace" contains the known/expected arguments ( known_args), and the pipeline_args list includes everything else that has been passed to the Python program.

Go ahead and run the following commands, tweaking to your interest, to see how the given arguments populate the known_args and pipeline_args variables.

python wordcount_3.py --input known1 --output known2
python wordcount_3.py --input known1 --output known2 aRandomPipelineArg1 aRandomPipelineArg2

Step 4: Generate the Pipeline Options

This step imports PipelineOptions and uses it to create an object to give to the Beam Pipeline. This is how the list of misc options gets from the command line into the Beam pipeline itself.

While these pipeline args are not necessary for running the Beam pipeline locally in this tutorial, they are relevant when you want to run the pipeline on Dataflow.

Step 5: Access the Known Arguments in the Pipeline

Now, let’s get rid of the “Hello World” and replace it with some of the values we passed in at the Python call.

In the parser.add_argument() calls, add the lines for dest='input' and dest='output'. And then in the Beam pipeline, replace "Hello World" with print(known_args.input). Notice that, for whatever you pass in for the --input flag, it will be attached to the attribute specified by dest=''. In our case, because dest='input', input is the key that we can use. Therefore, what we pass in for the --input flag will be accessible by calling known_args.input.

Note: if you don’t set dest explicitly in the add_argument() call, then the namespace key will be set to whatever was used as the argument flag. In our case they are both called input, so it doesn't matter, but it's something to be aware of.

Feel free to test this code yourself to understand how these pieces interact.

Step 6: The Pipe Operator

Now we’re going to add a bit more to the pipeline — using the pipe operator. Go ahead and import ReadFromText and WriteFromText. We will have the pipeline simply write whatever it gets as input to the output, with no transformations.

The pipe operator is similar to a Java “apply”. In the Python world, it operates similarly to method chaining. From the Apache Beam documentation:

Each transform in the Beam SDKs has a generic apply method (or pipe operator |). Invoking multiple Beam transforms is similar to method chaining, but with one slight difference: You apply the transform to the input PCollection, passing the transform itself as an argument, and the operation returns the output PCollection.

The docs also give a really helpful, simple formula for the pipe operator:

[Output PCollection] = [Input PCollection] | [Transform]

Now you might also be wondering what the '<Text>' >> between the pipe and Transform is for. As the user "rf-" on StackOverflow explains

“>>” allows you to name a step for easier display in various UIs — the string between the | and the >> is only used for these display purposes and identifying that particular application.

So, it’s for display purposes only.

I also added in logging in this step, for visibility.

Step 7: The ParDo

Now let’s get into the beam.ParDo() transform.

You can create a DoFn object by creating a subclass of beam.DoFn. A subclass is created by passing beam.DoFn to your class definition.

We’ll start with the ComputeWordLengthFn() example from the Beam documentation. This is a function that simply returns the length of each element. Notice we add the 'Lengths' step in between the 'Read' and 'Write' steps in the pipeline.

Note that the process method in your ParDo function needs to adhere to a specific format. As explained in the Apache Beam documentation:

Inside your DoFn subclass, you’ll write a method process where you provide the actual processing logic. You don’t need to manually extract the elements from the input collection; the Beam SDKs handle that for you. Your process method should accept an argument element, which is the input element, and return an iterable with its output values. You can accomplish this by emitting individual elements with yield statements You can also use a return statement with an iterable, like a list or a generator.

So your process method must have this signature:

def process(self, element):

Go ahead and run this code to see how it works: python wordcount.py --output outputs

Note that we have a print and input statement in the function for visibility. Also note that the output file now contains the length of each line instead of the words in the file.

Step 8: WordExtractingDoFn()

Now we will implement the WordExtractingDoFn() specified in the original wordcount.py file.

So get rid of the ComputeWordLengthFn ParDo transform and replace with beam.ParDo(WordExtractingDoFn().with_output_types(str)

Notice in the function definition for WordExtractingDoFn, it uses regex to convert each element (each "line") into a list of the constituent words. I added print statements to show each line and the resulting list.

Adding this Transform takes the output from looking like this:

    KING LEAR


DRAMATIS PERSONAE


LEAR king of Britain (KING LEAR:)

KING OF FRANCE:

DUKE OF BURGUNDY (BURGUNDY:)

DUKE OF CORNWALL (CORNWALL:)

DUKE OF ALBANY (ALBANY:)

EARL OF KENT (KENT:)

EARL OF GLOUCESTER (GLOUCESTER:)

EDGAR son to Gloucester.

EDMUND bastard son to Gloucester.

CURAN a courtier.

Old Man tenant to Gloucester.

...

To this:

KING
LEAR
DRAMATIS
PERSONAE
LEAR
king
of
Britain
KING
LEAR
KING
OF
FRANCE
DUKE
OF
BURGUNDY
BURGUNDY
DUKE
OF
CORNWALL
CORNWALL
DUKE
OF
ALBANY
ALBANY
EARL
OF
KENT
KENT
EARL
OF
GLOUCESTER
GLOUCESTER

...

Note that I also added in the with_output_types() method in the 'Split' transform step. As explained on Stackoverflow: "Specified output types are only used to ensure agreement with subsequent transforms." In our case it won't make a difference, but I'm adding it in to reflect the wordcount.py file.

Step 9: beam.Map()

For Step 9, we will be utilizing the beam.Map() transform; include it in the new 'PairWithOne' step.

The beam.Map() method accepts a function and applies it to every element in the PCollection. In our case, we are passing in an anonymous lambda function that returns a tuple of the input element paired with 1 (hence the step name 'PairWithOne')

While there are other functions that could suffice here, the Map() function was selected because, according to the documentation:

If your ParDo performs a one-to-one mapping of input elements to output elements-that is, for each input element, it applies a function that produces exactly one output element, you can use the higher-level Map transform.

Adding this Transform takes the output from this:

KING
LEAR
DRAMATIS
PERSONAE
LEAR
king
of
Britain
KING
LEAR
KING
OF
FRANCE
DUKE
OF
BURGUNDY
BURGUNDY
DUKE
OF
CORNWALL
CORNWALL
DUKE
OF
ALBANY
ALBANY
EARL
OF
KENT
KENT
EARL
OF
GLOUCESTER
GLOUCESTER

...

To this:

('KING', 1)
('LEAR', 1)
('DRAMATIS', 1)
('PERSONAE', 1)
('LEAR', 1)
('king', 1)
('of', 1)
('Britain', 1)
('KING', 1)
('LEAR', 1)
('KING', 1)
('OF', 1)
('FRANCE', 1)
('DUKE', 1)
('OF', 1)
('BURGUNDY', 1)
('BURGUNDY', 1)
('DUKE', 1)
('OF', 1)
('CORNWALL', 1)
('CORNWALL', 1)
('DUKE', 1)
('OF', 1)
('ALBANY', 1)
('ALBANY', 1)
('EARL', 1)
('OF', 1)
('KENT', 1)
('KENT', 1)
('EARL', 1)
('OF', 1)
('GLOUCESTER', 1)
('GLOUCESTER', 1)

...

Step 10: beam.CombinePerKey()

Now we need a transform that will count all the instances of each word. For that we will use the beam.CombinePerKey(sum) transform.

Create the ‘GroupAndSum’ step as shown:

The CombinePerKey aggregation “accepts a function that takes a list of values as an input, and combines them for each key.” ( Beam Docs) In this case, the keys are words in each tuple. So the “sum” function adds all the values attached to each key, for each key. Because all the values of the tuples are just 1, it’s effectively the same as counting each unique word.

Adding this Transform takes the output from this:

('KING', 1)
('LEAR', 1)
('DRAMATIS', 1)
('PERSONAE', 1)
('LEAR', 1)
('king', 1)
('of', 1)
('Britain', 1)
('KING', 1)
('LEAR', 1)
('KING', 1)
('OF', 1)
('FRANCE', 1)
('DUKE', 1)
('OF', 1)
('BURGUNDY', 1)
('BURGUNDY', 1)
('DUKE', 1)
('OF', 1)
('CORNWALL', 1)
('CORNWALL', 1)
('DUKE', 1)
('OF', 1)
('ALBANY', 1)
('ALBANY', 1)
('EARL', 1)
('OF', 1)
('KENT', 1)
('KENT', 1)
('EARL', 1)
('OF', 1)
('GLOUCESTER', 1)
('GLOUCESTER', 1)

...

To this:

('KING', 243)
('LEAR', 236)
('DRAMATIS', 1)
('PERSONAE', 1)
('king', 65)
('of', 447)
('Britain', 2)
('OF', 15)
('FRANCE', 10)
('DUKE', 3)
('BURGUNDY', 8)
('CORNWALL', 63)
('ALBANY', 67)
('EARL', 2)
('KENT', 156)
('GLOUCESTER', 141)

...

Step 11: beam.MapTuple()

We come to the final transform in the pipeline, beam.MapTuple().

As the documentation observes: “If your PCollection consists of (key, value) pairs, you can use MapTuple to unpack them into different function arguments.”

In our case, we pass a custom formatting function, format_result, to the MapTuple() transform. This format_result function accepts the two parts of each element's tuple, "word" and "count", and returns them as a reformatted string.

The code now looks like this:

This reformatting Transform takes the output from this:

('KING', 243)
('LEAR', 236)
('DRAMATIS', 1)
('PERSONAE', 1)
('king', 65)
('of', 447)
('Britain', 2)
('OF', 15)
('FRANCE', 10)
('DUKE', 3)
('BURGUNDY', 8)
('CORNWALL', 63)
('ALBANY', 67)
('EARL', 2)
('KENT', 156)
('GLOUCESTER', 141)

...

To this:

KING: 243
LEAR: 236
DRAMATIS: 1
PERSONAE: 1
king: 65
of: 447
Britain: 2
OF: 15
FRANCE: 10
DUKE: 3
BURGUNDY: 8
CORNWALL: 63
ALBANY: 67
EARL: 2
KENT: 156
GLOUCESTER: 141

...

Step 12: Chaining Transforms

We’ve reached the end of our journey! We will conclude by renaming and reformatting the steps in the Beam pipeline code to match the baseline wordcount.py file. Notice how the transforms can be chained together, with each line starting with the pipe operator:

Even though the steps look a bit different with the chaining, the same [Output PCollection] = [Input PCollection] | [Transform] flow applies and the output is the same.

Note also how in this step I include the SetupOptions class to "save the main session." As the original wordcount.py comment goes on to state: "We use the save_main_session option because one or more DoFn's in this workflow rely on global context (e.g., a module imported at module level)." When I ran this on my local machine, its absence did not cause problems, but I am including it for completeness.

That’s it! We’re done!

Addendum

To further extend your knowledge and learning, here are two more things that you can try:

  1. Modify the pipeline code to convert all word strings to lowercase before counting them. The answer is found in Step 7 of the tutorial.
  2. Try deploying your pipeline to a Dataflow runner on Google Cloud. Details here.

Conclusion

This has been a step-by-step, iterative exploration of the Dataflow Quickstart for Python tutorial. I hope this approach has been useful for you in building a foundational understanding of how Apache Beam’s Python SDK works.

Let me know in the comments if there is anything I missed or any steps that need further clarification.

Take care!

Code License

All code screenshots in this guide are covered under the Apache License, Version 2.0

--

--