Streaming JSON messages into BigQuery JSON-type column

Olejniczak Lukasz
Google Cloud - Community
7 min readJul 15, 2022

Google recently announced new data type for BigQuery — JSON. As a result BigQuery users are able to insert semi-structured JSON messages as they come without providing message schema and without flattening messages into structured format. However, as we will show in this article there is much more than just possibility to store JSONs in BigQuery table cells:

  1. BigQuery engineers made it very easy to parse JSON structures with plain SQL and simple dot-notation access to substructures and attributes.
  2. Unlike most cloud data warehouses, BigQuery represents JSON elements as individual virtual columns on storage level therefore BigQuery is able to apply columnar compression also to JSON columns.
  3. As a consequence of #2, when users query table with JSON column and select just subset of elements available in JSON message, BigQuery does not need to read the whole JSON message but is able to fetch just requested elements which greatly reduces bytes that needs to be processes hence improves performance and … reduces costs in on-demand BigQuery plan which depends specifically on the volume of processed bytes.
  4. And watch this: Users can stream JSON messages directly into BigQuery Table with JSON column.

This article is meant to be a practical guide which demonstrates all the advantages listed above.

Context

Let’s imagine we have fraud detection system which validates e-commerce transactions. Every time there is a new payment request, e-commerce payment service asks our system to score the risk of fraud. Scoring is handled by machine learning algorithm which is retrained regularly to learn patterns from new data. Training machine learning model for fraud detection is out of scope of this article, however we will show how to build a bridge between our system and data analytics backend where such models are cooked. Because machine learning models learn from data, the main responsibility of our bridge will be to pass the payload of incoming scoring requests to a data lake which is then used by our data and machine learning engineers.

Just for this article, everything that happens between clicks on pay button and machine learning model for risk scoring is abstracted by Dataflow streaming job. This job is instantiated from Streaming Data Generator template to generate JSON messages representing the payload of scoring requests.

Messages are sent to distributed queue handled by GCP PubSub service which decouples our fraud detection system from data analytics backend.

We highly recommend reading on how Spotify migrated its event delivery system from Kafka to GCP PubSub: https://cloud.google.com/blog/products/gcp/spotifys-journey-to-cloud-why-spotify-migrated-its-event-delivery-system-from-kafka-to-google-cloud-pubsub).

We then need something which will pull messages from GCP PubSub topic as they come and push them into BigQuery. This task will also be handled by Dataflow job.

Setup

The following is a high-level list of things that need to happen in Google Console to reproduce this setup:

  1. Create PubSub topic for our messages. Our topic is named random-events:

2. Create service account which will be able to read and write messages from/to your PubSub topic.

3. Our JSON messages will have well defined schema but the values will be quite random and to generate such messages we will use data generator library that is used by Streaming Data Generator template. Streaming Data Generator template will require path to so-called schema file which describes fields of our messages. Data Generation library allows us to use various faker functions for each schema field:

Our schema will consists of the following fields:

{
"id": "{{uuid()}}",
"email": "{{email()}}",
"ip": "{{ipv4()}}",
"phone": "{{phone()}}",
"age": "{{integer(1,50)}}",
"price": "{{double(1,1000)}}",
"tid": "{{timestamp()}}",
"country": "{{country()}}"
}

Save this file and upload it to Google Cloud Storage bucket.

4. Create Dataflow streaming job from Streaming Data Generator template.

This template has two required parameters:

  • GCS path of schema location, which is the location of our schema file for our messages.
  • Output QPP, which is expected number of output messages per second. We used 100.

Click Run Job button. Your Dataflow streaming job should start streaming messages to PubSub topic.

5. Create BigQuery table with JSON column. Syntax for JSON type is rather straightforward:

bigquerydemos-337515 represents GCP project id (please note you will need your own GCP project. bigquerydemos-337515 is mine), dataflowsink represents BigQuery dataset name and jevents represents BigQuery table name.

Conclusions

We will start working on items 1–4 from the end meaning we will first setup streaming job between PubSub topic and BigQuery to stream our JSON messages into BigQuery Table with JSON column.

4. Users can stream JSON messages directly into BigQuery Table with JSON column.

This time, however there is no Dataflow template ready to be reused. We need to code so called data processing pipeline which can be understood as a graph of data processing steps.

Just as remainder: Dataflow is a managed service for Apache Beam — a unified model for defining both batch and streaming data-parallel processing pipelines. Apache Beam is just this — some proposition on how to define data processing pipelines so that the same code can be executed everywhere: locally but also on one of the many supported distributed execution engines like Spark, Flink and … Dataflow. Yes, Dataflow is also very scalable, distributed execution engine.

We can implement Dataflow processing graph in java or python. Here we used python.

Our pipeline consists of 5 steps:

  • Pull messages from PubSub topic (handled by beam.io.ReadFromPubSub in Line 16)
  • Convert PubSub message, which is just a stream of bytes, into UTF-8 encoded string (Line 19)
  • BigQuery writer will expect us to represent BigQuery Table row as JSON with <key, value> pairs where key represents column name and value represent value for that column (https://beam.apache.org/releases/javadoc/2.2.0/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.html). Our table has just one column named event which is of type JSON (Line 13). We, therefore need to tell BigQuery that we want to inject our message as value into event column and to comply with TableRowJsonCoder, for our table we need to build JSON with single <key, value> pair with event as the key, and string representation of our JSON message as the corresponding value (Line 20). We apply json.dumps function to our JSON message to convert it into escaped string: For example: {“event”: { “id”: 10, “email”: “foo@abc.com”} } becomes { “event”: “{ \”id\”: 10, \”email\”: \”foo@abc.com\”}” }
  • Convert string representing of TableRow JSON into JSON object (Line 21).
  • Write (stream) messages represented as TableRow objects into BigQuery using beam.io.WriteToBigQuery module (Line 22).

Once executed, this code will run continuously as Dataflow streaming job streaming messages pulled from PubSub topic into BigQuery table.

One thing we would like to emphasize here is that we do not modify the original JSON message — we just pass it through and specify to which column it should be assigned. There is no need to reverse engineer our JSON messages.

Last step here is to just query our target BigQuery table:

1. BigQuery engineers made it very easy to parse JSON structures with plain SQL and simple dot-notation access to substructures and attributes.

Lets go to Google Cloud console and use BigQuery workbench to query our target table and select just subset of attributes from our JSON messages using dot-notation:

Although our JSON messages have quite simple schema, dot-notation in BigQuery can also be applied to traverse much more complex structures.

2. Unlike most cloud data warehouses, on storage level BigQuery represents JSON elements as individual virtual columns therefore BigQuery is able to apply columnar compression also to JSON columns.

3. As a consequence of #2, when users query table with JSON column and select just subset of elements available in JSON message, BigQuery does not need to read the whole JSON message but is able to fetch just requested elements which greatly reduces bytes that needs to be processes hence improves performance and … reduces costs because in BigQuery on-demand plan cost depends specifically on the volume of processed bytes.

To verify this lets send two queries to our target BigQuery table. First one will be a classic select star — so BigQuery will read all JSON attributes. The second query will be just for age attribute:

Job Information tab contains all that is needed to draw some conclusions here. Indeed, with select star query, BigQuery must read whole JSON message and therefore the number of bytes processed amounts to 13.78 MB. However, when we just query for a single attribute of our JSON messages, BigQuery needs to process only 376 KB and returns results much faster. This simple experiment shows that BigQuery does not persists our messages internally as just blob but rather as individual virtual columns.

This article is authored by Lukasz Olejniczak — Customer Engineer at Google Cloud. The views expressed are those of the authors and don’t necessarily reflect those of Google.

Please clap for this article if you enjoyed reading it. For more about google cloud, data science, data engineering, and AI/ML follow me on LinkedIn.

--

--