Apache Beam Dataflow GCP Pipeline Setup Framework

Milind Kulkarni
Globant
Published in
5 min readNov 21, 2022
Apache Beam Dataflow Pipeline

The purpose of this article is to provide boilerplate code needed when a pipeline is being set up in GCP (Google Cloud Platform) using Apache Beam so that developers do not need to write similar code again across multiple projects. This helps developers to pass dataflow options, and coders to the framework, and the framework takes care of setting them. It sets custom and default coders (for PubsubMessage) and returns an instance of Pipeline to use in the data flow.

Schematic of GCP Pipeline

The reader of this article is expected to have some knowledge of Apache Beam pipelines and pipeline terminologies. This story is helpful for readers working in the domain of Data Science and BigData processing.

Supporting batching and streaming of data

BigData is extending on three fronts: data volume, data velocity, and data variety. It supports data processing via batching and streaming. Apache Beam is an open-source advanced unified programming model for both batching (finite) and streaming (continuous) use cases. Using Apache Beam, you can create BigData processing pipelines using SDKs (Java, Python, Go, and Scala).

An Apache Beam pipeline can execute in the most popular data processing systems like GCP Dataflow, Spark, Flink, Hadoop Map Reduce, etc., and addresses the solutions like ETL (Extract, Transform, and Load), Analytics, and Stream Processing. The pipeline is the primary medium through which message flows in cloud systems.

Reasons for using Apache Beam

We will be looking at the below use cases and situations that qualify to use Apache Beam.

  • Message-driven systems: In message-driven systems with different message types, we can create different pipelines to convert messages from one form to another and publish them to relevant topics in GCP PubSub.
  • Duplex and multiplex pipeline for processing messages: The pipeline can be of a duplex or multiplex nature by which they can listen to messages from the front and back and process them independently.
  • Asynchronous message processing: Process large data in the form of messages asynchronously.
  • Focus on business logic requirements: The developers can focus on core business logic instead of worrying about how messages are handled and flow through the system.
  • System performance issues: Once message processing is delegated to the GCP Apache Beam pipeline developers no need to worry about system performance under load as it is all managed by GCP.

Building Blocks of the framework

The below sections show the classes for pipelines and coders.

Pipeline class

The AcmePipeline class provides a set of variables, constructors, methods for pipeline setup, and coders for the pipeline.

class AcmePipeline {
Pipeline pipeline;
DataflowPipelineOptions options;
PipelineOptionsFactory.Builder pipelineBuilder;
AcmePipeline(PipelineOptionsFactory.Builder pipelineBuilder)
AcmePipeline(String[] args, Class < ?
extends DataflowPipelineOptions > clazz, List < AcmeCoder >
coders)
AcmePipeline(String[] args, Class < ?
extends DataflowPipelineOptions > clazz, List < AcmeCoder >
coders, boolean streaming)
AcmePipeline(String[] args, Class < ?
extends DataflowPipelineOptions > clazz)

createPipeline(DataflowPipelineOptions options)
createPipeline(DataflowPipelineOptions options, booleam streaming)
createPipeline(DataflowPipelineOptions options, List < AcmeCoder >
coders)
createPipeline(DataflowPipelineOptions options, List < AcmeCoder >
coders, booleam streaming)

registerCoders(Pipeline pipeline, List < AcmeCoder > coders)
void setOptions(DataflowPipelineOptions options)
DataflowPipelineOptions getOptions()
Pipeline getPipeline()
}

Coder class

The AcmeCoder class provides a set of methods for registering custom coders and registering default coders.

class AcmeCoder {
Class < ? > clazz;
Coder < ? > coder;
void registerCoders(CodeRegistry codeRegistry, List < AcmeCoder > coders)
void registerDefaultCoders(CodeRegistry codeRegistry)
}

We have created classes AcmePipeline and AcmeCoder to set up our coders in the pipeline. The default coder is PubsubMessageWithAttributesCoderfor PubsubMessage.class with its attributes.

Setting up the pipeline

The pipeline can be set up as below:

public AcmePipeline(String[] args, Class < ?
extends DataflowPipelineOptions > clazz, List < AcmeCoder > coders
) {
PipelineOptionsFactory.register(clazz);
this.options = (DataflowPipelineOptions) PipelineOptionsFactory
.fromArgs(args).withValidation().as(clazz);
this.options.setStreaming(true);
this.pipeline = Pipeline.create(this.options);
log.info("Register coders called");
this.registerCoders(this.pipeline, coders);
this.setOptions(this.options);
}

In the above, the AcmePipeline() the constructor takes command line arguments that have data flow options mentioned, a respective Pipeline specific implementation of DataflowPipelineOption, and List<AcmeCoder> the coders that the pipeline is using for message transformations. The default operating mode for the pipeline is streaming true because the pipeline will be working on streaming data.

Registering the coders

The coders can be set up as below:

The registerCoder() call gets CoderRegistry from pipeline and calls registerCoders on respective AcmeCoder to register itself in the pipeline. Calls registerDefaultCoders() at the end are as below.

public static void registerDefaultCoders(
CoderRegistry coderRegistry) {
coderRegistry.registerCoderForClass(PubsubMessage.class,
PubsubMessageWithAttributesCoder.of());
}

The dataflow adapter can set the pipeline using initDataflowPipeline() to initialize the dataflow pipeline.

PubsubMessageWithAttributesCoder is a coder needed to serialize and deserialize PubsubMessage. PubsubMessage is the predefined message format in GCP. We can treat this coder as a default coder that we require and is provided by the Apache Beam library.

public void initDataflowPipeline(String[] args) {
if (null != args && 0 < args.length) {
AcmePipeline acmePipeline = new AcmePipeline(args,
AcmeDataflowOptions.class, getCoders());
AcmeDataflowOptions options = (AcmeDataflowOptions) acmePipeline
.getOptions();
executePipeline(options, acmePipeline.getPipeline());
}
}

The list of coders could be as below:

private List < AcmeCoder > getCoders() {
List < AcmeCoder > coders = new ArrayList < > ();
coders.add(new AcmeCoder(HBMessage.class, new HBMessageCoder()));
coders.add(new AcmeCoder(HBData.class, new HBDataCoder()));
coders.add(new AcmeCoder(DeviceShutdownMessage.class,
new DeviceShutdownMessageCoder()));
return coders;
}

Each coder class extends from org.apache.beam.sdk.coders.CustomCoder and implements encode() and decode(). This is required to serialize and deserialize messages during conversion; encode() is for serialization and decode() is for deSerialization.

Creating a pipeline

A pipeline can be created as below:

public AcmePipeline(String[] args, Class < ?
extends DataflowPipelineOptions > clazz, List < AcmeCoder > coders
) {
PipelineOptionsFactory.register(clazz);
options = PipelineOptionsFactory.fromArgs(args).withValidation()
.as(clazz);
options.setStreaming(true);
pipeline = Pipeline.create(options);
registerCoders(pipeline, coders);
setOptions(options);
}

public void createPipeline(DataflowPipelineOptions options,
boolean streaming) {
options.setStreaming(streaming);
pipeline = Pipeline.create(options);
}

Conclusion

I hope with this article you might have got an idea about how to develop a framework that facilitates setting up the beam pipeline seamlessly in GCP. For a better understanding of Apache Beam basics, please read the story below:

Thank you for reading!

--

--

Milind Kulkarni
Globant
Writer for

M.S. (CIS) SWE 17+,Java,C++,Perf improve Multithreading, SQL.