Stop Scheming, Start Streaming

The Power of BigQuery Event Stream Tables

Lee Doolan
Appsbroker CTS Google Cloud Tech Blog

--

Public domain- https://www.flickr.com/photos/lakeclarknps/25256076866

Introduction

In earlier blog posts, I’ve often discussed how I like getting data into Google Cloud’s BigQuery and making it available for consumption as early as possible.

With that in mind, I want to talk about a data ingestion and wrangling method that’s been a game-changer for me (and my clients), namely BigQuery Event Stream tables.

What are Event Stream tables?

An Event Stream table is simply a BigQuery table into which event data is only ever inserted, never updated, and is made available immediately for querying.

Isn’t that just a logging table?

Well, not quite. Event Stream tables are a powerful tool for capturing all your data as events, regardless of source, format, or schema. Think of it as a central nervous system for your data pipelines.

Here’s the gist:

  • One Table to Rule Them All: We ditch all predefined schemas and complex transformations usually associated with data ingestion. Instead, we shove everything — pipeline metadata, logs, actual data, etc, into a single pre-built BigQuery table called the Event Stream.
  • Schema on the Fly: This approach lets us ingest data of any structure, even if it changes over time. We figure out what things mean later using BigQuery’s extensive function library, including string & JSON manipulation functions.
  • Streaming Power: Gone are the days of waiting for data to be processed before being able to query it. With BigQuery streaming inserts, we can ensure data is available for analysis as soon as it arrives.

This approach has some serious benefits:

  • Decoupling Data Ingestion and Curation: We can get data flowing into BigQuery ASAP, without worrying about upfront schema design. Transformation and/or cleaning of the data happens later, as a separate process.
  • Flexibility for Everyone: External suppliers can send data via webhooks directly into BigQuery. No more scrambling to fit their data into our existing pipelines.
  • Aid Analytics Requirement Gathering: Being a proactive Data Engineer, we can work with data producers early in the development cycle, usually many months before analysts and data scientists get a call. Future BI and Analytics requirement gathering is significantly easier with real data already sitting there to interrogate.

For info and further reading, I’ve blogged about getting data into BigQuery using some of these methods before . . .

PubSub streaming into BigQuery just got a whole lot easier . . .

Gateway to PubSub . . . and beyond . . .

Under the hood

An Event Stream table is simply a BigQuery table, defined as:

CREATE TABLE IF NOT EXISTS `<project_id>.<dataset_id>.EventStream`
(EventDateTime TIMESTAMP,
EventID STRING,
EventPayload STRING,
EventExecutionID STRING)
PARTITION BY DATE(EventDateTime)
CLUSTER BY EventID
OPTIONS (require_partition_filter=true)

Notice it is a very narrow table with only 4 columns, but with every event being dropped in here, it will get VERY LONG and BIG fast!

To counter this, partitioning by date and enforcing filtering on queries will optimise cost and performance.

Each column in the table has an important part to play:

  • EventDateTime: This is a compulsory column containing a UTC timestamp of when the data arrives and is inserted. This should never be a business logical date and will be used to partition data by date and to help filter/optimise downstream data curation.

Don’t be tempted to insert logical business dates here, such as geo-local order date times. Place them in the EventPayload column instead!

  • EventID: This is a compulsory column which helps denote what the event is. For good visibility and to simplify queries, group similar events together using dot notation aligned with your business processes, i.e. sales.order, sales.return or pipeline.start, pipeline.status ,pipeline.end.
  • EventPayload: This is an optional column, but perhaps the ‘secret sauce’ of the Event Stream table. Data is dumped in here as a string and extracted later as required. The data could be simple text or a JSON block dumped as a string for example.

Notice that this column isn’t compulsory — an event which is inserted without any additional payload data could be information enough!

  • EventExecutionID: This is an optional column used for information and/or for use in a query to group associated events streamed separately but still part of the parent process. For example, a pipeline execution ID could be inserted here by the ‘start’ and ‘end’ events of a specific data pipeline process, and then tied together more easily via SQL when required.

The `EventPayload` column can often be huge, so make sure to use the other Event Stream table columns in your queries to help filter rows down.

Reducing a record set down before reading the payload and applying complex functions is better for both cost and performance.

Inserting Data

As mentioned earlier, the best way to use this table and make data immediately available is to use streaming inserts.

You can read more about BigQuery streaming inserts below:

https://cloud.google.com/bigquery/docs/loading-data#streaming

https://cloud.google.com/bigquery/docs/write-api

For demo purposes, I enclose an example python script below. You can view other streaming code insert samples here.

# imports
from google.cloud import bigquery
from datetime import datetime

# build event row
event = dict(
EventDateTime=str(datetime.now()),
EventID="pipeline.status",
EventPayload=str({"status": "success"}),
EventExecutionID="<pipeline_execution_id>"
)
event_row = [event]

# stream event row into bigquery
client = bigquery.Client()
table_id = "<project_id>.<dataset_id>.EventStream"

errors = client.insert_rows_json(table_id, event_row)

if errors == []:
print("New rows have been added.")
else:
print("Encountered errors inserting rows: {}".format(errors))

Example of Data Usage

So how do we use this data once it’s in the Event Stream table? We can use BigQuery’s powerful SQL engine and functions to help us slice and dice the event data on the fly.

Sample Data

To help demonstrate usage I will use some very small and contrived sample data, as below.

Event Stream Sample Data

Examples

Usage Example 1: Count of distinct events
This example demonstrates how we can count distinct events.

In this case, we will deliberately count an event with no payload, using the EventExecutionID column to dedupe, just in case it was inserted more than once.

with
-- use this first CTE to filter rows down
-- in this case we will look for events in last 2 days
EventStream as
(select *
from `<project_id>.<dataset_id>.EventStream`
where date(EventDateTime) >= date(current_timestamp())-2
and EventID = 'other.event'
)
select count(distinct EventExecutionID) as event_count
from EventStream

Usage Example 2: Return a single attribute from a single event
This example demonstrates how best to query the Event Stream table for a single event and return an attribute embedded in the EventPayload column.

In this case, we will return the status of a single pipeline.end event.

Note the use of the qualify statement in the first CTE block to filter down to the last row for each EventIDand EventExecutionID rows.

Many streaming and messaging technologies follow the ‘at-least-once delivery’ guarantee, so the potential for duplicates means it’s good practice to include this where applicable.

with
-- use this first CTE to filter rows down
-- we know roughly when event was inserted
-- use of qualify statement to dedupe
EventStream as
(select *
from `<project_id>.<dataset_id>.EventStream`
where date(EventDateTime) >= date(current_timestamp())-2
and EventID = 'pipeline.end'
qualify row_number() over (partition by EventID, EventExecutionID
order by EventDateTime desc) = 1
)
select json_value(EventPayload, '$.status') as status
from EventStream
where EventExecutionID = '<pipeline-execution-id-1>'

Usage Example 3: Manipulating non-standard JSON payloads
The whole premise of the EventStream table is to accept data as it arrives, with no upfront data manipulation. This example demonstrates how we can use other BigQuery functions to ‘fix’ payload data and make it usable.

In this case, an event has arrived that needs various corrections before we can apply any BigQuery JSON functions to extract the attributes we need.

with
-- use this first CTE block to filter rows down
EventStream as
(select *
from `<project_id>.<dataset_id>.EventStream`
where date(EventDateTime) >= date(current_timestamp())-2
and EventID = 'mangled_json.event'
),
-- use this CTE block to clean the payload column
EventStream_Clean as
(select * replace(
replace(replace(EventPayload, '\\', ''),
'False', "'False'") as EventPayload
)
from EventStream
)
select safe_cast(
json_value(EventPayload,
'$.boolean_flag') as bool) as boolean_flag
from EventStream_Clean

Usage Example 4: Handling nested JSON payloads
Many of the payloads received will include multiple levels of nesting and more complex JSON structures.

This example will demonstrate how we can unnest some of these payloads and aggregate them across multiple events.

In this case, we are going to sum the value of some sales orders, first un-nesting the order lines attribute in the payload before extracting the order line value attributes.

with
-- use this first CTE block to filter rows down
EventStream as
(select *
from `<project_id>.<dataset_id>.EventStream`
where date(EventDateTime) >= date(current_timestamp())-2
and EventID = 'sales.order'
),
-- use this CTE block to extract payload values
-- and dedupe just in case!
EventStream_Extract_Dedupe as
(select
json_value(EventPayload, '$.order-ref') as order_ref,
json_extract_array(EventPayload, '$.order-lines') as order_lines,
from EventStream
qualify row_number() over (partition by order_ref
order by EventDateTime desc) = 1
),
-- use this CTE block to unnest down to order lines
-- and cast value to numeric
Order_Lines as
(select
o.order_ref,
safe_cast(
json_extract(ol,
'$.order-line-value') as numeric) as order_line_value
from EventStream_Extract_Dedupe o,
unnest (o.order_lines) as ol
)
select sum(order_line_value) as sum_order_values
from Order_Lines

Usage Example 5: Getting value from multiple event types
Many parent processes will create many event types that individually may not be valuable, but when combined can offer a fuller picture.

This is why it’s useful to ensure the parent process IDs are captured and stored with each associated event. The Event Stream table encourages this with the EventExecutionID column.

This example will demonstrate how we can query multiple event types and extract value from the combined dataset.

In this case, we will query all events associated with a pipeline process and return each pipeline with its running duration.

with
-- use this first CTE block to filter rows down
-- notice how we can utilise the event id dot notation
-- and return all events associated with a pipeline
EventStream as
(select *
from `<project_id>.<dataset_id>.EventStream`
where date(EventDateTime) >= date(current_timestamp())-2
and EventID like 'pipeline.%'
)
-- we can group by the execution id here
-- and avoid touching the bigger payload column
select
EventExecutionID as pipeline_execution_id,
timestamp_diff(max(EventDateTime),
min(EventDateTime), SECOND) as pipeline_duration
from EventStream
group by 1

Usage Example 6: Managing disparate data schemas
Occasionally a data producer may decide to change the data schema or simply rename an attribute for example. On other occasions, events may be provided by multiple suppliers with varying schemas.

Using SQL we can standardise and query disparate schemas using various techniques. I will demonstrate this using the sample dataset below.

Sample disparate data schemas

Notice in the sample data where we have 3 different payload schemas. We can standardise these using SQL similar to below.

with
-- use this first CTE block to filter rows down
EventStream as
(select *
from `<project_id>.<dataset_id>.EventStream`
where date(EventDateTime) >= date(current_timestamp())-2
and EventID = 'dummy.event'
),
-- use this CTE block to extract payload values
-- json_value will default to null if attribute cannot be found
EventStream_Extract as
(select
json_value(EventPayload, '$.dummy-event-logical-ref') as logical_ref,
json_value(EventPayload, '$.dummy-event-logical-id') as logical_id,
json_value(EventPayload, '$.dummy-event-data-source') as data_source
from EventStream
)
-- standardise output and add defaults for missing data
select
coalesce(logical_ref, logical_id) as logical_ref,
coalesce(data_source, '<supplier-unknown>') as data_source
from EventStream_Extract

Notice how I said various techniques could be used.

In other scenarios, we may be able, or choose to use ‘flags’, to help us determine the schema. This would then indicate to the analyst how they should transform that event.

If lucky this flag could be an obvious one in the data such as a schema_version attribute, or you can make use of the data_source attribute for example.

It could also be derived from other columns such as the EventDateTime column if you know when a schema was changed.

Recommended Usage

Whilst I agree that those SQL queries might look complex and unwieldy, they unlock the power of utilising an Event Stream table. For everyday use, you might prefer a cleaner interface.

This is where tools like DBT (Data Build Tool) or Dataform come in. These fantastic orchestration layers sit on top of your BigQuery Event Stream table, allowing you to build and manage reusable transformations with pre-built views and materialised tables. Think of them as shortcuts for complex SQL queries, making your data analysis a breeze.

For further information on Google Cloud Dataform, why not check out my blog here.

Conclusion

BigQuery Event Stream tables offer a powerful and flexible way to ingest and analyse data without getting bogged down in upfront schema design.

It’s the perfect foundation for a modern data platform that can adapt and evolve as your needs change.

Thanks

A big thank you to my colleagues Max Buckmire-Monro and Jenn Calland for helping proofread and sanity-check this post. Make sure you check out other blogs from our great colleagues at Appsbroker CTS here.

About Appsbroker CTS

Appsbroker CTS is the largest Google Cloud-only digital consultancy in Europe, and we’re building talented teams ready to change the world using Google technologies. So if you’re passionate, curious and keen to get stuck in — take a look at our Careers Page and join us for the ride!

--

--

Lee Doolan
Appsbroker CTS Google Cloud Tech Blog

Cloud Data Warehouse Architect & Data Engineer | UK Based | https://www.linkedin.com/in/leedoolan77 | Thoughts are my own and not of my employer