Apache Beam Dataflow GCP Pipeline Setup Framework
The purpose of this article is to provide boilerplate code needed when a pipeline is being set up in GCP (Google Cloud Platform) using Apache Beam so that developers do not need to write similar code again across multiple projects. This helps developers to pass dataflow options, and coders to the framework, and the framework takes care of setting them. It sets custom and default coders (for PubsubMessage
) and returns an instance of Pipeline
to use in the data flow.
The reader of this article is expected to have some knowledge of Apache Beam pipelines and pipeline terminologies. This story is helpful for readers working in the domain of Data Science and BigData processing.
Supporting batching and streaming of data
BigData is extending on three fronts: data volume, data velocity, and data variety. It supports data processing via batching and streaming. Apache Beam is an open-source advanced unified programming model for both batching (finite) and streaming (continuous) use cases. Using Apache Beam, you can create BigData processing pipelines using SDKs (Java, Python, Go, and Scala).
An Apache Beam pipeline can execute in the most popular data processing systems like GCP Dataflow, Spark, Flink, Hadoop Map Reduce, etc., and addresses the solutions like ETL (Extract, Transform, and Load), Analytics, and Stream Processing. The pipeline is the primary medium through which message flows in cloud systems.
Reasons for using Apache Beam
We will be looking at the below use cases and situations that qualify to use Apache Beam.
- Message-driven systems: In message-driven systems with different message types, we can create different pipelines to convert messages from one form to another and publish them to relevant topics in GCP PubSub.
- Duplex and multiplex pipeline for processing messages: The pipeline can be of a duplex or multiplex nature by which they can listen to messages from the front and back and process them independently.
- Asynchronous message processing: Process large data in the form of messages asynchronously.
- Focus on business logic requirements: The developers can focus on core business logic instead of worrying about how messages are handled and flow through the system.
- System performance issues: Once message processing is delegated to the GCP Apache Beam pipeline developers no need to worry about system performance under load as it is all managed by GCP.
Building Blocks of the framework
The below sections show the classes for pipelines and coders.
Pipeline class
The AcmePipeline
class provides a set of variables, constructors, methods for pipeline setup, and coders for the pipeline.
class AcmePipeline {
Pipeline pipeline;
DataflowPipelineOptions options;
PipelineOptionsFactory.Builder pipelineBuilder;
AcmePipeline(PipelineOptionsFactory.Builder pipelineBuilder)
AcmePipeline(String[] args, Class < ?
extends DataflowPipelineOptions > clazz, List < AcmeCoder >
coders)
AcmePipeline(String[] args, Class < ?
extends DataflowPipelineOptions > clazz, List < AcmeCoder >
coders, boolean streaming)
AcmePipeline(String[] args, Class < ?
extends DataflowPipelineOptions > clazz)
createPipeline(DataflowPipelineOptions options)
createPipeline(DataflowPipelineOptions options, booleam streaming)
createPipeline(DataflowPipelineOptions options, List < AcmeCoder >
coders)
createPipeline(DataflowPipelineOptions options, List < AcmeCoder >
coders, booleam streaming)
registerCoders(Pipeline pipeline, List < AcmeCoder > coders)
void setOptions(DataflowPipelineOptions options)
DataflowPipelineOptions getOptions()
Pipeline getPipeline()
}
Coder class
The AcmeCoder
class provides a set of methods for registering custom coders and registering default coders.
class AcmeCoder {
Class < ? > clazz;
Coder < ? > coder;
void registerCoders(CodeRegistry codeRegistry, List < AcmeCoder > coders)
void registerDefaultCoders(CodeRegistry codeRegistry)
}
We have created classes AcmePipeline
and AcmeCoder
to set up our coders in the pipeline. The default coder is PubsubMessageWithAttributesCoder
for PubsubMessage.class
with its attributes.
Setting up the pipeline
The pipeline can be set up as below:
public AcmePipeline(String[] args, Class < ?
extends DataflowPipelineOptions > clazz, List < AcmeCoder > coders
) {
PipelineOptionsFactory.register(clazz);
this.options = (DataflowPipelineOptions) PipelineOptionsFactory
.fromArgs(args).withValidation().as(clazz);
this.options.setStreaming(true);
this.pipeline = Pipeline.create(this.options);
log.info("Register coders called");
this.registerCoders(this.pipeline, coders);
this.setOptions(this.options);
}
In the above, the AcmePipeline()
the constructor takes command line arguments that have data flow options mentioned, a respective Pipeline
specific implementation of DataflowPipelineOption
, and List<AcmeCoder>
the coders that the pipeline is using for message transformations. The default operating mode for the pipeline is streaming true because the pipeline will be working on streaming data.
Registering the coders
The coders can be set up as below:
The registerCoder()
call gets CoderRegistry
from pipeline and calls registerCoders on respective AcmeCoder
to register itself in the pipeline. Calls registerDefaultCoders()
at the end are as below.
public static void registerDefaultCoders(
CoderRegistry coderRegistry) {
coderRegistry.registerCoderForClass(PubsubMessage.class,
PubsubMessageWithAttributesCoder.of());
}
The dataflow adapter can set the pipeline using initDataflowPipeline()
to initialize the dataflow pipeline.
PubsubMessageWithAttributesCoder
is a coder needed to serialize and deserialize PubsubMessage
. PubsubMessage
is the predefined message format in GCP. We can treat this coder as a default coder that we require and is provided by the Apache Beam library.
public void initDataflowPipeline(String[] args) {
if (null != args && 0 < args.length) {
AcmePipeline acmePipeline = new AcmePipeline(args,
AcmeDataflowOptions.class, getCoders());
AcmeDataflowOptions options = (AcmeDataflowOptions) acmePipeline
.getOptions();
executePipeline(options, acmePipeline.getPipeline());
}
}
The list of coders could be as below:
private List < AcmeCoder > getCoders() {
List < AcmeCoder > coders = new ArrayList < > ();
coders.add(new AcmeCoder(HBMessage.class, new HBMessageCoder()));
coders.add(new AcmeCoder(HBData.class, new HBDataCoder()));
coders.add(new AcmeCoder(DeviceShutdownMessage.class,
new DeviceShutdownMessageCoder()));
return coders;
}
Each coder class extends from org.apache.beam.sdk.coders.CustomCoder
and implements encode()
and decode()
. This is required to serialize and deserialize messages during conversion; encode()
is for serialization and decode()
is for deSerialization.
Creating a pipeline
A pipeline can be created as below:
public AcmePipeline(String[] args, Class < ?
extends DataflowPipelineOptions > clazz, List < AcmeCoder > coders
) {
PipelineOptionsFactory.register(clazz);
options = PipelineOptionsFactory.fromArgs(args).withValidation()
.as(clazz);
options.setStreaming(true);
pipeline = Pipeline.create(options);
registerCoders(pipeline, coders);
setOptions(options);
}
public void createPipeline(DataflowPipelineOptions options,
boolean streaming) {
options.setStreaming(streaming);
pipeline = Pipeline.create(options);
}
Conclusion
I hope with this article you might have got an idea about how to develop a framework that facilitates setting up the beam pipeline seamlessly in GCP. For a better understanding of Apache Beam basics, please read the story below:
Thank you for reading!