Apache Beam GCP Pipeline Message Conversion Framework

Milind Kulkarni
Globant
Published in
4 min readNov 25, 2022

BigData is extending on three fronts: data volume, velocity, and variety. It supports data processing via batching and streaming. An Apache Beam is an open-source advanced unified programming model for both batching and streaming use cases. Using Apache Beam, you can create BigData processing pipelines using SDKs (Java, Python, Go, and Scala).

An Apache Beam message framework is a message conversion, message processing, and message-building framework. The reader of this article is expected to have some knowledge of Apache Beam pipelines and pipeline terminologies. This story is helpful for readers working in the domain of Data Science and BigData processing.

This article aims to provide an idea about a framework that provides boilerplate code needed to do a Pubsub conversion, serialization, deSerialization, and error handling of a message.

Relevant use cases

We will be looking at the below use cases:

  • Serialize and de-Serialize a PubsubMessage.
  • Build the required message attribute(s) needed to publish a message.
  • Build error message from inbound (incoming) PubsubMessage.
  • Build message attribute(s) from the error message.
  • Create a PubsubMessage for publishing (outgoing) to a GCP (Google Cloud Platform) topic.
  • Validate a PubsubMessage with the required attribute(s).
  • Exception handler for a message.

Critical components of the framework

The below sections show the important classes, fields, and methods for message conversion.

  • The ObjectMapper field and its configuration for serialization and deSerialization using the Jackson JSON library.
public static final ObjectMapper OBJECT_MAPPER;
static {
OBJECT_MAPPER = new ObjectMapper();
objectMapperDefaultConfiguration(OBJECT_MAPPER);
OBJECT_MAPPER
.setSerializationInclusion(JsonInclude.Include.NON_NUL);
OBJECT_MAPPER
.setSerializationInclusion(JsonInclude.Include.NON_EMPTY);
}

public static void objectMapperDefaultConfiguration(
ObjectMapper objectMapper) {
objectMapper.configure(
DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
objectMapper.enable(SerializationFeature.INDENT_OUTPUT);
objectMapper.configure(
SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
objectMapper.registerModule(new JavaTimeModule());
}
  • Create PubsubMessage in a simple way.
public static PubsubMessage createPubSubMessage(byte[] payload,
Map < String, String > attributes) {
PubsubMessage pubsubMessage;
if (Objects.nonNull(payload) && Objects.nonNull(attributes)) {
pubsubMessage = new PubsubMessage(payload, attributes);
} else if (Objects.nonNull(payload) &&
Objects.isNull(attributes)) {
pubsubMessage = new PubsubMessage(payload, new HashMap < > ());
} else if (Objects.isNull(payload) &&
Objects.nonNull(attributes)) {
pubsubMessage = new PubsubMessage(
ERROR_EMPTY_PUBSUB_MESSAGE.getBytes(), attributes);
} else {
pubsubMessage = new PubsubMessage(
ERROR_EMPTY_PUBSUB_MESSAGE.getBytes(), new HashMap < > ());
}
return pubsubMessage;
}
  • StringifyPubsubMessage in a simple way.
public static String readablePubSubMessage(
PubsubMessage pubSubMessage) {
if (Objects.isNull(pubSubMessage) ||
Objects.isNull(pubSubMessage.getPayload())) {
return ERROR_EMPTY_PUBSUB_MESSAGE;
} else {
return new String(pubSubMessage.getPayload(),
StandardCharsets.UTF_8);
}
}
  • Read PubsubMessage the payload in a simple way.
public static StringBuilder pubSubMessagePayload(
PubsubMessage pubSubMessage) {
StringBuilder sb = new StringBuilder(500);
if (Objects.nonNull(pubSubMessage) &&
Objects.nonNull(pubSubMessage.getPayload())) {
sb.append(new String(pubSubMessage.getPayload(),
StandardCharsets.UTF_8));
}
return sb;
}
  • The ErrorMessageBuilder class to build an ErrorMessage (Message) using a builder pattern.
public class ErrorMessage implements Serializable {
private String details;
private String cause;
private String step;
private String system;
private String messageSource;
private String messageVersion;
private String messageType;
private String messageFormat;
private List < RequiredField > fieldErrors;
public ErrorMessage(ErrorMessageBuilder builder) {
this.details = builder.details;
this.cause = builder.cause;
this.step = builder.step;
this.system = builder.system;
this.messageSource = builder.messageSource;
this.messageVersion = builder.messageVersion;
this.messageType = builder.messageType;
this.messageFormat = builder.messageFormat;
this.fieldErrors = builder.fieldErrors;
}
public static class ErrorMessageBuilder {
private String details;
private String cause;
private String step;
private String system;
private String messageSource;
private String messageVersion;
private String messageType;
private String messageFormat;
private List < RequiredField > fieldErrors;
}
public ErrorMessageBuilder withDetails(String details) {
this.details = details;
return this;
}
public ErrorMessageBuilder withCause(String cause) {
this.cause = cause;
return this;
}
public ErrorMessageBuilder withStep(String step) {
this.step = step;
return this;
}
public ErrorMessageBuilder withSystem(String system) {
this.system = system;
return this;
}
public ErrorMessageBuilder withMessageSource(String messageSource) {
this.messageSource = messageSource;
return this;
}
public ErrorMessageBuilder withMessageVersion(String messageVersion) {
this.messageVersion = messageVersion;
return this;
}
public ErrorMessageBuilder withMessageType(String messageType) {
this.messageType = messageType;
return this;
}
public ErrorMessageBuilder withMessageFormat(String messageFormat) {
this.messageFormat = messageFormat;
return this;
}
public ErrorMessageBuilder withFieldErrors(List < FieldError >
fieldErrors) {
this.fieldErrors = fieldErrors;
return this;
}
public ErrorMessage build() {
return new ErrorMessage(this);
}
}
}
  • Message attribute validator method.
public static boolean validateMessageAttributes(Map < String, String >
attributes)
  • The PubsubMessage error handler.
public static PubsubMessage pubSubErrorHandler(
String payload, Map < String, String > attributes, Exception e)
  • Create an ErrorMessage using PubsubMessage payload and other attributes.
public static Message createErrorMessage(
String payload, String cause, String type, String version, String
...args)
  • The PubsubMessage exception handler.
public static Message < ? > pubSubExceptionHandler(
PubsubMessage pubSubMessage, String cause, String type,
Exception e)
  • Define different generic messages of the type String which can be replaced using MessageFormat.format().

The details of the implementations of the methods validateMessageAttributes(), pubSubErrorHandler(), createErrorMessage(), and pubSubExceptionHandler() are intentionally left at the developers’ discretion. They can extend functional definitions to suit their needs.

Conclusion

I hope with this article as a reader you might have got an idea about how to develop a framework that facilitates message processing and conversion using Apache Beam.

Thank you for reading!

--

--

Milind Kulkarni
Globant
Writer for

M.S. (CIS) SWE 17+,Java,C++,Perf improve Multithreading, SQL.