Apache Beam with GCP Dataflow and Pubsub

Harsh Sharma
Globant
Published in
11 min readSep 6, 2022

Introduction

In this article, we’ll look at how to create dataflow pipelines using apache beam in Java. we’ll also see how we can build and deploy the pipelines, writing integration test cases using the beam test framework, and also we will be looking into some advanced concepts like bifurcating pipelines using tuple tags.

  • Apache beam — This is a framework used to create distributed batching and streaming production-ready pipelines in less amount of time it's very flexible, supports multiple sources and sinks, and supports various distributed processing back-ends, which include Apache Flink, Apache Spark, and Google Cloud Dataflow.

The beam is particularly useful for embarrassingly parallel data processing tasks, in which the problem can be decomposed into many smaller bundles of data that can be processed independently and in parallel. You can also use Beam for Extract, Transform, and Load (ETL) tasks and pure data integration. These tasks are helpful in moving data between different storage media and data sources, transforming data into a more desirable format, or loading data onto a new system.

  • GCP Dataflow — This is a managed service for running apache beam pipelines. It's also said to be one of the runners which are supported by an apache beam.

Use Case

Let’s assume we want to build a dataflow job that reads data from GCP pubsub topic input-topic with subscription named as input-topic-sub, processing it and converting the message to another format and put it back to another pubsub topic named as output-topic and in case of any error while transformation like validation error or malformed JSON error we need to create error message and put it to pubsub error-topic. i.e. we have to bifurcate our pipeline based on success and error.

Eg. Let’s say we have an input message in JSON format which has only two fields which is firstname and lastname as -

{firstname: "harsh", lastname: "sharma"}

Now, the transformed message should have three fields which are fullname, id, and length as -

{fullname: "Harsh Sharma", id: "Unique-Id", length: 12}

Now, If firstname or lastname is empty or does not match the criteria we should publish an error message on the error topic as -

{fields: [{field: "firstname", message:"Should not be empty"}, {field: "lastname", message:"Should not contain any special characters"}], error: "Validation Error"}

Code In Action

  • Defining Pipeline Options

Create a class named as NameTransformerOptions which extends DataflowPipelineOptions declare setter and getter methods for inputSubscription, outputTopic ,outputErrorTopic and configure other GCP parameter as —

import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.Validation;
public interface NameTransformerOptions extends DataflowPipelineOptions { // specific to Dataflow Runner
// inputSubscription - for input PubSubMessage
@Description("The Cloud Pub/Sub subscription to consume from. " + "The name should be in the format of "
+ "projects/<project-id>/subscriptions/<subscription-name>.")
@Validation.Required
String getInputSubscription();
void setInputSubscription(String inputSubscription);
//outputTopic - for Out PubSubMessage
@Description("The Cloud Pub/Sub topic to publish to. The name should be in the format of projects/<project-id>/topics/<topic-name>.")
@Validation.Required
String getOutputTopic();
void setOutputTopic(String outputTopic);
// outputErrorTopic - for error during message processing
@Description("The Output error topic Pub/Sub to publish errors to")
@Validation.Required
String getOutputErrorTopic();
void setOutputErrorTopic(String outputErrorTopic);
@Description("Windowing delay in minutes for allowed lateness.")
@Default.Long(60)
Long getAllowedWindowDelayInMins();
void setAllowedWindowDelayInMins(Long allowedWindowDelayInMins);
@Description("Window time in seconds. ")
@Default.Long(10)
Long getWindowTimeInSeconds();
void setWindowTimeInSeconds(Long windowTimeInSeconds);
@Description("Max No. of elements in window")
@Default.Integer(1000)
Integer getWindowElementsSizeLimit();
void setWindowElementsSizeLimit(Integer windowElementsSizeLimit);
@Description("Pub/Sub max batch size ")
@Default.Integer(10)
Integer getPubSubMaxBatchSize();
void setPubSubMaxBatchSize(Integer pubSubMaxBatchSize);
}
  • Create DTO’s for input and output
@Data
public class InputDto implements Serializable { // Serializable is needed for PTransform
private static final long serialVersionUID = -33331355L;
@NotBlank(message="Should not be empty") @SpecialChar(message="Should not contain any special char")//Custom Validator
private String firstname;
@NotBlank(message="Should not be empty") @SpecialChar(message="Should not contain any special char")
private String lastname;
}
@Data
public class OutputDto implements Serializable {
private static final long serialVersionUID = -13331355L;
private String id;
private String fullname;
private Integer length;
private ErrorDto errorDto; // report error
}
@Data
public class ErrorDto implements Serializable {
private static final long serialVersionUID = -23331355L;
private List<FieldError> fields;
private String error;
}
@Data
public class FieldError implements Serializable {
private static final long serialVersionUID = -23331355L;
private String field;
private String message;
}
  • Create a pipeline —

We are creating a pipeline with the required options. Also, we need to register coders. The reason being is the data in Apache Beam moves. It can either be persisted on disk or shuffled between workers. The coder is responsible for these 2 operations since it explains how the data can be written to disk or transferred over the network and converted back to objects. The coder is then required for every PCollection<T> manipulation. Without it, the pipeline won’t execute because the runner won’t know how to translate Java objects to persistable format.

For example below InputDtoCoder is used to serialize and deserialize input dto —

import com.demo.dto.InputDto;
import com.demo.util.JsonUtil;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.util.StreamUtils;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;

public class InputDtoCoder extends CustomCoder<InputDto> {
private static final long serialVersionUID = -83328263L;

@Override
public void encode(InputDto value, OutputStream outStream) throws IOException {
if (value == null) {
throw new CoderException("* cannot encode a null object of type InputDto");
}
String json = JsonUtil.serialize(value);
byte[] bytes = json.getBytes();

outStream.write(bytes);
}

@Override
public InputDto decode(InputStream inStream) throws IOException {
byte[] bytes = StreamUtils.getBytesWithoutClosing(inStream);
String json = new String(bytes, StandardCharsets.UTF_8);
return JsonUtil.deserialize(json, InputDto.class);
}
}

Similarly, you can create for output DTO and there are some coders which are already present in beam’s lib like PubsubMessageWithAttributesCoder.

Now, In order to use these coders you need to register it via CoderRegistry like

CoderRegistry coderRegistry = pipeline.getCoderRegistry()coderRegistry.registerCoderForClass(PubsubMessage.class, PubsubMessageWithAttributesCoder.of());
coderRegistry.registerCoderForClass(InputDto.class, new InputDtoCoder());
coderRegistry.registerCoderForClass(OutputDto.class, new OutputDtoCoder());

In above pipeline there are two transformation function named NameTransformerFunction and NameTransformerPubSubFunction

  • NameTransformerFunction

It will be used to get the input pubsub message from the payload then serialize it to the InputDTO message then convert it to the OutputDTO message

I have created a Util class for creating output and error objects like below

public static OutputDto createOutputDto(String firstName, String lastName) {
String fullName = firstName.concat(" ").concat(lastName);
OutputDto outputDto = new OutputDto();
outputDto.setFullName(fullName);
outputDto.setId(UUID.randomUUID().toString());
outputDto.setLength(fullName.length());
return outputDto;
}
public static ErrorDto createError(String error) {
ErrorDto errorDto = new ErrorDto();
errorDto.setMessage(error);
return errorDto;
}

public static ErrorDto createError(String error, List<FieldError> list) {
ErrorDto errorDto = new ErrorDto();
errorDto.setMessage(error);
//Any field errors
errorDto.setFields(list);
return errorDto;
}
  • NameTransformerPubsubFunction

It is used to convert OutpuDTO message to pubsub message and bifurcate messages based on tuple tags which are validNameTag and errorNameTag.

If you want to output a message with a tag then you must specify the tag name in the output method of process context and if you don’t specify the tag then the message will tag to the default tag.

Now the question arises who will decide on the default tag? The answer is when we have created the pipeline and specify withOutputTags(validNameTag, TupleTagList.of(errorNameTag)), the first argument is registered as the default and that’s the reason we have only passed errorNameTag in the constructor.

Also, note one thing here we wrapped the error object inside OutputDto object the reason is we only want the bifurcation happens at the very last stage of the pipeline.

Also for conversion, we have used Jackson lib object mapper to serialize and deserialize it.

Both function classes extends NameFunction. It's a custom-created class that has some message conversion methods and some common constants.

Integration Test

Testing is an essential aspect of any technology and apache beam provides very great support for testing the pipelines in an integrated way by using its testing framework.

Here I am only focusing on writing integration tests using apache beam test framework.

  1. Prepare for test data

Create a JSON file with the data you want to perform the test under the directory Eg. src/test/resources/validInputData.json

Create a Utility class and add the method which reads the data of the file and converts it into of type Pubsub Message

public PubsubMessage pubSubMessage(String fileName) throws IOException {
String json = Resources.toString(Resources.getResource(fileName), StandardCharsets.UTF_8);
InputDto inputDto;
PubsubMessage pubsubMessage;

try {
inputDto = JsonUtil.deserialize(json, InputDto.class);
LOGGER.log(LEVEL_INFO, fileName + " File Data " + json + " inputDto "+ inputDto);
pubsubMessage = new PubsubMessage(json.getBytes(), new HashMap<>());
} catch (JsonProcessingException e) { // when we have invalid JSON send data as it is in PubSubMessage
pubsubMessage = new PubsubMessage(json.getBytes(), new HashMap<>());
}
return pubsubMessage;
}

2. Define the test pipeline

Create a class named as NameTransformerPipelineIntegrationTest and annotate with @RunWith(JUnit4.class) then initiate the test pipeline and define the test as below

3. Asserting the data

In order to assert the data, you either check for message count or validate the output payload with the expected data.

For asserting the payload I have created the hasData() method in PubSubAssertUtil class.

If you want to assert an attribute you just need to get the attribute via msg.getAttribute(“attr1”) method and the rest logic is the same.

Build and Deploy

Pre-requisite

Before deploying your job to GCP dataflow you first need to set up your account as below —

a. Login to your GCP account.
b. Create a Storage and inside it create two folders temp and stag
c. Create 3 topics, one for input and the other two for output
d. Create a subnet of your region
e. Create a service account with the below permissions

  • “roles/pubsub.subscriber”
  • “roles/pubsub.publisher”
  • “roles/pubsub.viewer”
  • “roles/storage.objectCreator”
  • “roles/storage.objectViewer”
  • “roles/dataflow.admin”
  • “roles/iam.serviceAccountUser”
  • “roles/dataflow.worker”

f. Setup Google application credential for authentication on local as mentioned here.

g. install gcloud utility on local and authenticate via below commands -

  • gcloud auth login
  • gcloud auth application-default login

In order to deploy your pipeline, you have two options —

*Note — I am using Gradle as a build tool

  1. Using fat/uber Jar file

In order to deploy your pipeline you first need to create a fat jar. For creating the fat jar I am using shadowJar plugin for Gradle if you are using Maven then use the shade Jar plugin and refer doc here

plugins {
id 'com.github.johnrengelman.shadow' version '7.1.2'
}
//other code for dependencies.//shadowJar {
archiveBaseName.set('name-transformer-job')
archiveClassifier.set('')
archiveVersion.set('')
exclude 'META-INF/INDEX.LIST'
exclude 'META-INF/LICENSE'
exclude 'META-INF/*.RSA'
exclude 'META-INF/*.SF'
exclude 'META-INF/*.DSA'
mergeServiceFiles()
manifest {
attributes 'Main-Class': 'com.demo.dataflow.NameTransformerDataflowApplication'
attributes 'Description': 'Dataflow job for name transformer'
}
}

Now, use the following command to create a build gradle clean shadowJar”

Once the build is successful go to libs>build folder you will find generated jar file with the name name-transformer.jar now run the below command in order to deploy your pipeline.

java -jar name-transformer.jar --runner=DataFlowRunner --project=<project-name> --gcpTempLocation=gs://dataflow-test-bkt/temp --stagingLocation=gs://dataflow-test-bkt/stag --inputSubscription=projects/<project-name>/subscriptions/input-topic-sub --outputTopic=projects/<project-name>/topics/output-topic --region=us-east1 --windowTimeInSeconds=120 --windowElementsSizeLimit=1000 --jobName=name-transformer-job --serviceAccount=name-transformer-service-account@<project-name>.iam.gserviceaccount.com  --defaultWorkerLogLevel=DEBUG --subnetwork=regions/us-east1/subnetworks/default --usePublicIps=false --outputErrorTopic=projects/<project-name>/topics/error-topic

After executing the above command go to the dataflow section of the GCP cloud console and you will see a newly created job with the name name-transformer-job.

2. Templated Approach

In templated approach first, we need to create a template then once the template is created we need to use that template to deploy our job. So in order to create a template we first need to add a new Gradle task named execute.

task execute (type:JavaExec) {
main = System.getProperty("mainClass")
classpath = sourceSets.main.runtimeClasspath
systemProperties System.getProperties()
args System.getProperty("exec.args").split()
}

Run the task using the below command to generate a template

gradlew execute -DmainClass=com.demo.dataflow.NameTransformerDataflowApplication -Dexec.args="--runner=DataFlowRunner --project=<project-name> --gcpTempLocation=gs://dataflow-test-bkt/temp --stagingLocation=gs://dataflow-test-bkt/stag --inputSubscription=projects/<project-name>/subscriptions/input-topic-sub --outputTopic=projects/<project-name>/topics/output-topic --region=us-east1 --windowTimeInSeconds=120 --windowElementsSizeLimit=1000 --jobName=name-transformer-job --serviceAccount=name-transformer-service-account@<project-name>.iam.gserviceaccount.com  --defaultWorkerLogLevel=DEBUG --subnetwork=regions/us-east1/subnetworks/default --usePublicIps=false --outputErrorTopic=projects/<project-name>/topics/error-topic --templateLocation=gs://dataflow-test-bkt/name-transformer-job.template"

You can see I have added a new argument named templateLocation and assigned the template name with bucket location to it. Now once you execute the above command the template with the name name-transformer-job.template file has been created inside your bucket.

Now, in order to deploy the created template use the below commands —

gcloud beta dataflow jobs run name-transformer-job --project=<project-name> --region=us-east1 --staging-location=gs://dataflow-test-bkt/stag --service-account-email=name-transformer-service-account@<project-name>.iam.gserviceaccount.com --gcs-location=gs://dataflow-test-bkt/name-transformer-job.template

After executing the above command go to the dataflow section of the GCP cloud console and you will see a newly created job with the name name-transformer-job.

Templated approach is most suitable if you want to create CICD pipeline for your job.

Graph View of Dataflow Job

Processing large volumes of data

As you are already aware that dataflow is used mainly for BigData use cases where we need to deal with large volumes of data, which would majorly be batching(bounded data) or streaming(unbounded data). Now let's understand how dataflow handles large volumes of data and what configuration we need to do in our beams pipeline to get the best out of it.

The answer is simple Dataflow is serverless and it can divide work amongst multiple worker nodes based on processing load. It automatically spins up new nodes based on load but let's say if you want to control the worker nodes and other configurations then there are tons of options available to configure in apache beam for Eg.

  • In DataflowPipelineWorkerPoolOptions there is an option called setMaxNumWorkers to set the max worker nodes
  • If you want to change the auto-scaling algorithm AutoscalingAlgorithmType to BASIC or THROUGHPUT_BASED
  • If you want to change the optimization then setFlexRSGoal to SPEED_OPTIMIZED or COST_OPTIMIZED
  • For streaming operations, you can change the windowing strategy as per your use case.

Conclusion

So this is how we could make bifurcation in pipelines using tuple tags and deploy production-ready pipelines via Apache Beam with GCP Dataflow. We have also seen how we can write integration test cases using the apache beam test framework.

Hope you liked the write-up. Please leave me a comment for any questions/comments you have!

--

--