Apache Beam: Stateful Streaming and BigQuery Side Input with DataFlow

Dmitry Turchenkov
5 min readFeb 2, 2024

--

Suppose you want to track the total number of purchases/clicks made by a user in your application over time. Furthermore, you aim to have this data in real-time, maintaining a complete history, and the ability to recover the state easily in case of any issues. In this article, I will show how we can achieve it using Apache Beam with Stateful Streaming and Side Input from BigQuery.

Infrastructure

As the foundation of our infrastructure, we will use the following Google Cloud Platform components:

  • PubSub is the event source for our Streaming Pipeline;
  • BigQuery as DWH;
  • Apache Beam as the framework for real-time data processing;
  • DataFlow serves as the runner for our pipeline;

This setup allows us to handle real-time events efficiently, process them using Apache Beam, and store the analyzed data in BigQuery for further analysis. The PubSub serves as the starting point for our streaming data, and DataFlow facilitates the seamless execution of our pipeline.

For the sake of illustration, we’ll consider simulating a user purchasing a snack for a specific price.

Here is a simplified schema of our infrastructure:

AVRO schema to create transactions topics (input/outputs):

{
"type": "record",
"name": "transaction",
"fields": [
{
"name": "uid",
"type": "string"
},
{
"name": "item",
"type": "string"
},
{
"name": "price",
"type": "double"
},
{
"name": "transaction_id",
"type": "string"
},
{
"name": "transaction_ts",
"type": {
"type": "long",
"logicalType": "timestamp-millis"
}
}
]
}

We are establishing a target table in BigQuery to store all transactions originating from the transactions-output topic. Take note of the data conversion process from AVRO to BigQuery. More details can be found here.

create table `ods.transactions` (
uid STRING,
item STRING,
price FLOAT64,
transaction_id STRING,
transaction_ts TIMESTAMP,
subscription_name STRING,
message_id STRING,
publish_time TIMESTAMP,
data JSON,
attributes JSON
)

Next, we create subscriptions for the corresponding topics:

  1. transactions-input-sub: This subscription will be used by our Streaming Pipeline to calculate the accumulated UserState.
  2. transactions-output-sub: This subscription is for storing incoming transactions in the BigQuery table transactions. The data structure specified is the AVRO schema of the topic.

For this test scenario, we can leave all other parameters at their default values.

Apache Beam Pipeline

I won’t delve into the detailed implementation of parsing incoming messages from PubSub, class serialization, and PCollections in Apache Beam. There are already numerous articles on these topics, and the specific implementation heavily depends on the programming language used. Those interested in exploring the complete code for this POC in Java can download it from my repository on GitHub.

For the successful implementation of this pipeline, we’ll need the following abstractions in Apache Beam:

Side input. For the initial setup during the pipeline’s launch, we will utilize the Side Input abstraction. Essentially, this establishes an initialization state accessible for each PCollection to support the pipeline’s operation. In our case, we will use a Map<String, UserState> as an external shareable cache. The values for this cache will be fetched once from BigQuery during the pipeline’s initialization.

PCollectionView<Map<String, UserState>> bqInitialState = p
.apply("Read Initial State from BigQuery",
BigQueryIO.readTableRows()
.fromQuery("select uid, round(sum(price),2) as total, count(uid) as n from `%s` group by uid".formatted(options.getTransactionsTable()))
.usingStandardSql())
.apply("TableRow to UserState", ParDo.of(new DoFn<TableRow, KV<String, UserState>>() {
@ProcessElement
public void processElement(@Element TableRow row, OutputReceiver<KV<String, UserState>> receiver)
throws Exception {
KV<String, UserState> kv = KV.of((String) row.get("uid"), new UserState((Double) row.get("total"), Long.parseLong(row.get("n").toString())) );

receiver.output(kv);
}
}))
.apply(View.asMap());

Stateful Processing. As the UserState, we’ll define two variables:

  • count: The total number of purchases made by the user over time.
  • total: The cumulative cost of all purchases made by the user.

To simplify our handling of these variables, we’ll create a dedicated class called UserState to store the state data. We will use ValueState<UserState>, allowing us to maintain user statistics for each key (customer uid) within the current session.

         .apply("Update UserState", ParDo.of(new DoFn<KV<String, UserTransaction>, UserTransaction>() {
@StateId("userState")
private final StateSpec<ValueState<UserState>> userStateSpec = StateSpecs.value();

@ProcessElement
public void processElement(
ProcessContext context,
@Element KV<String, UserTransaction> kv,
@StateId("userState") ValueState<UserState> currentState,
OutputReceiver<UserTransaction> receiver) throws Exception {

Map<String, UserState> storage = context.sideInput(bqInitialState);

Double newTotal;
Long newCount;
UserState oldStat = currentState.read();
if (oldStat != null) {

newTotal = oldStat.getTotal() + kv.getValue().getPrice();
newCount = oldStat.getCount() + 1;

} else {

UserState storegeStat = storage.get(kv.getKey());

if (storegeStat != null) {
LOG.info("[GET FROM INITIAL STATE][%s]: %s".formatted(kv.getKey(),storegeStat.toJosn()));
newTotal = storegeStat.getTotal() + kv.getValue().getPrice();
newCount = storegeStat.getCount() + 1;
} else {
newTotal = kv.getValue().getPrice();
newCount = Long.valueOf(1);
}

}
UserState newStat = new UserState(newTotal, newCount);
currentState.write(newStat);

LOG.info("[USER STATE][%s]: %s".formatted(kv.getKey(),newStat.toJosn()));

receiver.output(kv.getValue());

}
}).withSideInputs(bqInitialState)

Here, we define the user state using the @StateId annotation and pass it as an argument to the processElement function. This allows us to handle each PCollection record with its own state. Note that this is achieved only if incoming PCollection has the structure KV<String, ValueT>.

We log the UserState modified value once the update is successful. Subsequent to this successful update of the user state, we transfer the processed transactions to the transactions-output topic. These transactions undergo automated processing by a sink to BigQuery via the PubSub subscription.

Let’s check that everything works as expected:

BigQuery results for random users
DataFlow logs

For load testing our pipeline, let’s implement a simple service in Go. Its task will be to send JSON with random values to transactions-input topic. We can use the excellent library gofakeit to avoid manual implementation of random data generation.

For a rough estimation of the time lag introduced by this streaming pipeline, you can set the Write metadata flag in the PubSub subscription. By knowing transaction_ts and publish_time, we can calculate the average message processing time.

In my case, the load profile was as follows (DataFlow vCPU = 4, memory = 15 GB):

{
"messagesPerWorker": 10000,
"sleep": 50,
"workers": 20,
"userNumber": 2000
}`

I achieved around ~ 300 milliseconds delay, which is considered an acceptable result.

Conclusion

In this example, we used default settings for topics and subscriptions. This implies that the order of message publication and processing is not guaranteed. In our case, this is not critical; however, if we want to store UserState at the time of the transaction, then we can’t do so without message ordering and providing an ordering key on the publisher side. Unfortunately, DataFlow users may encounter an unpleasant surprise in the form of the following warning:

Pub/Sub subsccription transactions-input-sub has message ordering enabled, but Dataflow does not preserve ordering when processing messages. The pipeline will work, but possibly not in the way that is expected. Also, using ordering increases latency and decreases performance when reading from Pub/Sub in Dataflow

How to overcome this issue and what to do about it will be covered in the upcoming articles.

Happy data-engineering!

--

--