Protobuf to BigQuery with Apache Beam

William Attache
The AB Tasty Tech & Product Blog
6 min readSep 18, 2023
Image by Simon Berger from Pixabay

Some days ago Apache released Beam 2.50, which was announced to come with support to write protocol buffer objects into BigQuery tables, thanks to the writeProtos method.

Dataflow, the managed Apache Beam service of Google Cloud Platform, already has a Pub/Sub to BigQuery template allowing to consume messages from PubSub using Java class generated from .proto files. However, under-the-hood, some conversions are performed:

This is a lot of code to write and maintain when you want to write your own ingestion dataflow. On our end at AB Tasty we also have to maintain POJO representations of our Protobuf generated Java objects, in order to generate TableRow from them, through the .withFormatFunction() call. This is again a lots of code to maintain and a native sink would help a lot.

Protobuf to BigQuery using Java

In this story we will shortly go through this new functionality with a very simple use case.

First we define an Event Protobuf message which represents an action made by a customer at a certain timestamp:

syntax = "proto3";

import "google/protobuf/timestamp.proto";

package example.protobuf;

option java_multiple_files = true;
option java_package = "com.example.protobuf";
option java_outer_classname = "EventProto";

message Event {
string id = 1;
string cid = 2;
google.protobuf.Timestamp eventTime = 3;
}

Then we use namely/protoc-all image to generate the associated Java classes:

# Assuming previous protobug message is in proto/event.proto file, and 
# we want to generate our code into src/main/java/com/example/protobuf
docker run --platform linux/amd64 --rm -v $(pwd):/defs \
namely/protoc-all \
-l java \
-o src/main/java \
-f proto/event.proto

From there we can write our dataflow:

public final class ProtoToBigQuery {

// Data sample
static final List<Event> LINES = Arrays.asList(
Event.newBuilder()
.setId("1")
.setCid("cid1")
.setEventTime(Timestamp.newBuilder().setSeconds(System.currentTimeMillis()/1000).build())
.build()
);

public static void main(String[] args) {

// Create the pipeline.
PipelineOptions options =
PipelineOptionsFactory.fromArgs(args).create();
Pipeline pipeline = Pipeline.create(options);

// Generate data and inject them into BigQuery
pipeline
.apply(Create.of(LINES))
.apply("ProtosToBQ",
BigQueryIO.<Event>writeProtos(Event.class)
.to(String.format("%s:%s.%s", "my_project", "my", "protosSimpleTest"))
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
.withMaxRetryJobs(1)
.withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API)
);

pipeline.run();
}
}

If you run in streaming mode using the Storage Write API, you should also use withNumStorageWriteAPIStreams and withTriggeringFrequency.

You can use gradle to run this:

./gradlew :run -Pargs="\
--defaultSdkHarnessLogLevel=INFO \
--experiments=use_runner_v2 \
--streaming=true \
--runner=DirectRunner"

You will finally end up with your data in BigQuery:

Please note that I did not create my table prior to run my Dataflow, I just allowed the sink to do it by itself by calling withCreateDisposition() method withBigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED.

More complex Protobuf messages

So far we only used messages which fields are String . What if we use enum types, nested messages, repeated fields or maps ?

Enum

We now change our Protobuf definition to:

syntax = "proto3";

import "google/protobuf/timestamp.proto";

package example.protobuf;

option java_multiple_files = true;
option java_package = "com.example.protobuf";
option java_outer_classname = "EventProto";

enum EventType {
VIEW = 0;
PURCHASE = 1;
}

message Event {
string id = 1;
string cid = 2;
google.protobuf.Timestamp eventTime = 3;
optional EventType eventType = 4;
}

We also include this new field into our sample data:

static final List<Event> LINES = Arrays.asList(
Event.newBuilder()
.setId("1")
.setCid("cid1")
.setEventTime(Timestamp.newBuilder().setSeconds(System.currentTimeMillis()/1000).build())
.setEventType(EventType.VIEW)
.build()
);

We end up by getting the error message: java.lang.UnsupportedOperationException: proto type ENUM is unsupported . This is quite disappointing, as most of our Protobuf message definitions at AB Tasty use enum type. We can also note that the destination table is not created into BigQery, which is a good point. I also tried by creating the table in BigQuery first, but the result is the same.

Work-around

After trying multiple things and reading some code snippet, I finally end-up finding a work-around.

First,one can set the schema of the table while inserting by calling the setSchema() method. Executing again will display the error java.lang.RuntimeException: Enumerations not supported.

TableSchema tableSchema = new TableSchema();
tableSchema.setFields(Arrays.asList(
new TableFieldSchema().setName("id").setType("STRING"),
new TableFieldSchema().setName("cid").setType("STRING"),
new TableFieldSchema().setName("eventTime").setType("RECORD").setFields(Arrays.asList(
new TableFieldSchema().setName("seconds").setType("INTEGER"),
new TableFieldSchema().setName("nanos").setType("INTEGER")
)
),
new TableFieldSchema().setName("eventType").setType("STRING")
));

The issue comes from TableRowToStorageApiProto.jsonValueFromMessageValue() method. I just added a ParDo in my code to convert my Event into a TableRow with a slight modification: all ENUM are casted to String. You can use a more complex logic to meet your requirements.

public class ToTableRow extends DoFn<Message, TableRow> {

@ProcessElement
public void processElement(ProcessContext c) {
c.output(tableRowFromMessage(c.element(), true));
}


public static TableRow tableRowFromMessage(Message message, boolean includeCdcColumns) {
// Same code than the original
}

public static Object jsonValueFromMessageValue(
Descriptors.FieldDescriptor fieldDescriptor, Object fieldValue, boolean expandRepeated) {
// Same code than the original, only `case ENUM` does change
switch (fieldDescriptor.getType()) {
case ENUM:
return fieldValue.toString();
}
}
}

Then, using BigQueryIO.writeTableRows() instead of BigQueryIO.writeProtos() fixes the issue 🎉

Nested messages (records)

Now, we include another message Item as a field of our Event message:

syntax = "proto3";

import "google/protobuf/timestamp.proto";

package example.protobuf;

option java_multiple_files = true;
option java_package = "com.example.protobuf";
option java_outer_classname = "EventProto";

message Item {
string id = 1;
string color= 2;
int32 count = 3;
}

message Event {
string id = 1;
string cid = 2;
google.protobuf.Timestamp eventTime = 3;
Item item = 4;
}

The sample data is now:

static final List<Event> LINES = Arrays.asList(
Event.newBuilder()
.setId("1")
.setCid("cid1")
.setEventTime(Timestamp.newBuilder().setSeconds(System.currentTimeMillis()/1000).build())
.setItem(
Item.newBuilder().setId("1").setColor("red").setCount(3).build()
)
.build()
);

In this case, this works pretty well, the BigQuery table is created and the data landed in it:

It’s worth to mention that eventTime was already a RECORD, but it’s nice testing with it’s own data structure, including STRING and not only INTEGER type.

Repeated fields

Let’s consider a new field pages in our Event which lists all pages seen by a user on our website.

syntax = "proto3";

import "google/protobuf/timestamp.proto";

package example.protobuf;

option java_multiple_files = true;
option java_package = "com.example.protobuf";
option java_outer_classname = "EventProto";

message Event {
string id = 1;
string cid = 2;
google.protobuf.Timestamp eventTime = 3;
repeated string pages = 4;
}

A corresponding data sample could be:

static List<String> pages = Arrays.asList("https://page1.com", "https://page2.com");


static final List<Event> LINES = Arrays.asList(
Event.newBuilder()
.setId("1")
.setCid("cid1")
.setEventTime(Timestamp.newBuilder().setSeconds(System.currentTimeMillis()/1000).build())
.addAllPages(pages)
.build()
);

Guess what, this works perfectly, even if the table is not created before:

Map<String, String>

Let’s play a bit more, and add an entry to our data which associates a purchase content to a purchase id.

The Protobuf defintion looks like:

syntax = "proto3";

import "google/protobuf/timestamp.proto";

package example.protobuf;

option java_multiple_files = true;
option java_package = "com.example.protobuf";
option java_outer_classname = "EventProto";

message Item {
string id = 1;
string color= 2;
int32 count = 3;
}

message Event {
string id = 1;
string cid = 2;
google.protobuf.Timestamp eventTime = 3;
map<int32, Item> purchases = 4;
}

And a possible representation of it could be:

static Map<Integer, Item> purchases = new HashMap<>();
static {
purchases.put(63037, Item.newBuilder().setId("1").setColor("blue").setCount(2).build());
purchases.put(35239, Item.newBuilder().setId("2").setColor("red").setCount(3).build());
}


static final List<Event> LINES = Arrays.asList(
Event.newBuilder()
.setId("1")
.setCid("cid1")
.setEventTime(Timestamp.newBuilder().setSeconds(System.currentTimeMillis()/1000).build())
.putAllPurchases(purchases)
.build()
);

Again, it’s a very good feedback: this works perfectly, without creating the table prior to injecting data !

Conclusion

This new release seems very promising to handle the challenges we face at AB Tasty. We still deplore the fact that enum types are not supported, but it’s going the right way.

Another drawback I faced during my experiments is that using DirectRunner , the dataflow never gives hand back, it keeps running for what’s seems to be infinite:

If you ever want to test with more complex data structure, I would be very interested by getting your feedback !

--

--

William Attache
The AB Tasty Tech & Product Blog

Data engineer and architect, I mainly focus on GCP and also enjoy everything around data (access management, networking, CI/CD or testing)