Launch Apache beam pipeline locally using DirectRunner

Jitendra Jaladi
Google Cloud - Community
2 min readSep 20, 2023

Apache Beam is a unified programming model for batch and streaming data processing. It provides a simple, powerful programming model that can be used to build both batch and streaming pipelines.

Direct running is a mode of execution for Apache Beam pipelines that runs the pipeline locally on your machine. This is a good option for testing and debugging pipelines, as it allows you to quickly iterate on your code and see the results immediately.

To launch an Apache Beam pipeline using direct running, you can use the following steps:

  1. Create a new Apache Beam pipeline.
  2. Add the DirectRunner to the pipeline options.
  3. Run the pipeline.

below is a simple beam pipeline that we will use as an example to launc a beam pipeline locally using direct runner.

This pipeline will read a CSV file, transform the data, and write the results to a new CSV file.

import apache_beam as beam
from apache_beam import Pipeline
from apache_beam.io import WriteToText, ReadFromText
from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions
import argparse

class transformcsv(beam.DoFn):
def process(self,element):
df = element.split(',')[:-1]
yield df

def run(argv=None, save_main_session=True):
parser = argparse.ArgumentParser()
parser.add_argument('--input_file_name', dest='input_file_name', help='source filename')
parser.add_argument('--output_file_name', dest="output_file_name", help='target filename')

known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
with beam.Pipeline(options=pipeline_options) as p:
df = p | ReadFromText('./'+known_args.input_file_name) #reads each row of the file as an element and passes it to transform csv

(df | beam.ParDo(transformcsv())
| WriteToText('./'+known_args.output_file_name))

if __name__ == '__main__':
run()

The ReadFromText transform reads a CSV file and returns a PCollection of strings. The transformcsv transform splits each string in the PCollection into a list of strings. The WriteToText transform writes the PCollection of lists of strings to a new CSV file.

The run function parses the command-line arguments and creates a new Apache Beam pipeline. The with statement ensures that the pipeline is closed when the function exits. The df variable is a PCollection of strings that contains the contents of the input file. The transformcsv transform splits each string in the PCollection into a list of strings. The WriteToText transform writes the PCollection of lists of strings to the output file.

The above pipeline can be launched using the below command.

 python3 -m app  --runner DirectRunner  --input_file_name=input.csv --output_file_name=output.csv

the command assumes the the app.py file is present in the current directry where the command is being executed and also takes current directory as the staging location for the beam pipeline.

The piepline reads the input.csv file form the current directory where the pipeline is running and creates the output file in the same current directory.

In the next blogs we will discuss more about below

  1. How to run the same pipeline in GCP using dataflow runner.
  2. How to create a flex template out of the same code.
  3. How to launch a flex template.
  4. How to create a CI/CD pipeline for a flex template.

--

--