Google Cloud Dataflow Using Java

Shreya Patil
Globant
Published in
5 min readMay 31, 2023
Photo by NASA on Unsplash

In this article, we will see how we can create a data pipeline using Apache Beam in Java.

Data is generated in real-time from different sources but capturing, processing, and analyzing it is not easy because it’s usually not in the desired format for downstream systems.

The solution for this issue is Dataflow, a serverless, fast, and cost-effective data-processing service for stream and batch data. Which removed operational overhead by automating the infrastructure provisioning and auto-scaling as your data grows.

We can handle data better way by using Dataflow. You read data from the source, transform it and write it back into a sink. It provides portability with the processing pipeline created using open-source Apache Beam libraries in the language of your choice and applying it as a dataflow job which then executes the processing on worker virtual machines. You can create and run dataflow jobs using the Cloud Console UI, gCloud CLI or the API’s.

Apache Beam

Apache Beam is an open-source model for defining batch and streaming data pipelines. The pipeline object encapsulates computations required in reading transforming and writing data.

The following are the main Apache Beam processing concepts:

  • Pipeline: A pipeline encapsulates the entire series of computations involved in reading input data, transforming that data, and writing output data. The input source and output sink can be the same or of different types, allowing you to convert data from one format to another. Apache Beam programs start by constructing a Pipeline object and then using that object as the basis for creating the pipeline’s datasets. Each pipeline represents a single, repeatable job.
  • PCollection: A PCollection represents a potentially distributed, multi-element dataset that acts as the pipeline’s data. Apache Beam transforms use PCollection objects as inputs and outputs for each step in your pipeline. A PCollection can hold a fixed-size dataset or an unbounded dataset from a continuously updating data source.
  • Transforms: A transform represents a processing operation that transforms data. A transform takes one or more PCollections as input, performs an operation that you specify on each element in that collection, and produces one or more PCollections as output. A transform can perform nearly any kind of processing operation, including performing mathematical computations on data, converting data from one format to another, grouping data, reading and writing data, filtering data to output only the elements you want, or combining data elements into single values.
  • Windows: Windowing a PCollection divides the elements into windows based on the associated event time for each element. This is especially useful for PCollection with unbounded size since it allows operating on a sub-group of the elements placed into a related window. For PCollection with a bounded size (aka. conventional batch mode), by default, all data is implicitly in a single window, unless Window is applied. The available window types are fixed-time, sliding, session, global, and finally calendar-based.
  • ParDo: ParDo is the core parallel processing operation in the Apache Beam SDKs, invoking a user-specified function on each of the elements of the input PCollection. ParDo collects the zero or more output elements into an output PCollection. The ParDo transform processes elements independently and possibly in parallel.
  • Pipeline I/O: Apache Beam I/O connectors let you read data into your pipeline and write output data from your pipeline. For example Kafka, Parquet. An I/O connector consists of a source and a sink. All Apache Beam sources and sinks are transforming that let your pipeline work with data from several different data storage formats. You can also write a custom I/O connector.
  • Aggregation: Aggregation is the process of computing some value from multiple input elements. The primary computational pattern for aggregation in Apache Beam is to group all elements with a common key and window. Then, it combines each group of elements using an associative and commutative operation.
  • User-defined functions (UDFs): Some operations within Apache Beam allow executing user-defined code as a way of configuring the transform. For ParDo, user-defined code specifies the operation to apply to every element, and for Combine, it specifies how values should be combined. A pipeline might contain UDFs written in a different language than the language of your runner. A pipeline might also contain UDFs written in multiple languages.
  • Runners: Runners are the software that accepts a pipeline and executes it. Most runners are translators or adapters to massively parallel big-data processing systems. Other runners exist for local testing and debugging.
  • Source: A transform that reads from an external storage system. A pipeline typically reads input data from a source. The source has a type, which may be different from the sink type, so you can change the data format as it moves through the pipeline.
  • Sink: A transform that writes to an external data storage system, like a file or a database.

Example of implementing pipeline on Local:

import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
public class BeamdemoApplication {
public static void main(String[] args) {
/**
* Created pipleline using PipelineOptionsFactory
*/
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline pipeline = Pipeline.create(options);
final List < String > input = Arrays.asList("New_words", "New_List");
/**
* Output file generate with the intial name of element which contatins in the list and followed with .txt format to the specified local path
*/
pipeline.apply(Create.of(input)).apply(TextIO.write()
.to(INPUT_DIR)
.withSuffix(".txt"));
pipeline.run().waitUntilFinish();
}
}

You can see output files will be generated in the specified path with a demo name and the suffix will be .txt.

Example of implementing pipeline (On GCP bucket):

public class BeamdemoApplication {
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline pipeline = Pipeline.create(options);
/**
* Pipeline read data from specified bucket location and output each word
* encountered into the output PCollection to the specified bucket location.
*/
pipeline.apply(TextIO.read().from("gs://some/path/inputdata.txt")).apply(ParDo.of(new ExtractWordsFn()))
.apply(TextIO.write().to("gs://some/path/inputdata.txt"));
pipeline.run().waitUntilFinish();
}
}
public class ExtractWordsFn extends DoFn < String, String > {
public static final String TOKENIZER_PATTERN = "[^\\p{L}]+";
@ProcessElement
public void processElement(ProcessContext c) {
for (String word: c.element().split(TOKENIZER_PATTERN)) {
if (!word.isEmpty()) {
c.output(word);
}
}
}
}

In both examples, you can see the path is different. This can be a local path (if running locally), or a Google Cloud Storage filename with the format “gs://<bucket>/<filepath>”

Example of implementing pipeline (With PubSub Topic):

public class BeamdemoApplication {
public static void main(String[] args) {
// Creation of custom pipline using PipelineOptions and StreamingOptions
PubSubToGCSOptions pubSubToGCSOptions = PipelineOptionsFactory
.fromArgs(args)
.withValidation()
.as(PubSubToGCSOptions.class);
Pipeline p = Pipeline.create(pubSubToGCSOptions);
PCollection < String > lines = p
.apply(PubsubIO.readStrings().fromTopic(pubSubToGCSOptions.getInputTopic())
.apply(ParDo.of(new ExtractWordsFn()))
.apply(PubsubIO.writeStrings().to(pubSubToGCSOptions.getOutputTopic(); p.run().waitUntilFinish();
}
}
public class ExtractWordsFn extends DoFn < String, String > {
public static final String TOKENIZER_PATTERN = "[^\\p{L}]+";
@ProcessElement
public void processElement(ProcessContext c) {
for (String word: c.element().split(TOKENIZER_PATTERN)) {
if (!word.isEmpty()) {
c.output(word);
}
}
}
}

Topic Interface:

//Interface of custom pipeline implementation
public interface PubSubToGCSOptions extends PipelineOptions, StreamingOptions {
String getInputSubscription();
void setInputSubscription(String inputSubscription);
String getInputTopic();
void getOutputTopic();
}

Command to run locally:

mvn compile exec:java \
-D exec.mainClass=com.demo.BeamdemoApplication \
-D exec.args=" - output=counts"

Command to run for dataflow:

mvn -P dataflow-runner compile exec:java \
-D exec.mainClass=com.demo.BeamdemoApplication \
-D exec.args=" - project=PROJECT_ID \
- gcpTempLocation=gs://BUCKET_NAME/temp/ \
- output=gs://BUCKET_NAME/output \
- runner=DataflowRunner \
- region=REGION"

when you run the command you will get an output file in the specified location of the GCP bucket.

Conclusion

In this above article, we cover how we can creates a data pipeline for either batch or streaming data by using Apache Beam.

Hope you enjoy it !!

Referred Documents:

https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline

https://cloud.google.com/dataflow/docs/guides/setting-pipeline-options

https://www.youtube.com/watch?v=4OnTliMIqTY

https://www.youtube.com/watch?v=XdsuDOQ9nkU

--

--