You have a pipeline that writes data from PubSub/Kafka or any other source to BigQuery, and everything streams well.
Until, you are informed that your source JSON event had changed, and a new field should be added.
You can stop the job, update it with the new field and re-run it again. I’m sure you don’t want to do this for any change, you wish to find some better solution.
Here I’ll describe how I overcome it.
We will keep a GS Blob file contains the schema definition.
We will load the file in the Stream Pipeline, and track for updates.
For any event, we will take only the fields that appear in the loaded GS Blob schema. we will ignore any field in the event that doesn’t appear in the schema configuration file.
And, when we will have a change, First, we will need to add the column manually to the BigQuery table, update the Blob file contains the schema definition. (we have a script that does this, using
bq update and
gsutil cp), our job will identify that the Blob file is updated, and reload the configuration, now, it can extract any new field added to the event and send it also to BigQuery.
Why we didn’t choose a fully automated solution?
If I would take any new field in JSON event, that doesn’t appear in the target BigQuery table, I’m opened to frauds. what does it mean? imagine to yourself some hacker sends dummy events to my server with fake fields, f1,f2,f3… f10 and I will treat these fields as new fields, and create for them columns in my BigQuery table automatically, Then, my table will become garbaged and can also reach some quotas limits. I need control on this, and therefore I must enforce the manual updating of the BigQuery schema and Blob file. In this form, I can be sure I didn’t put garbaged columns in my BigQuery table.
Let’s look into the class that loads fields configuration:
You can see that this is a singleton class, that on construction loads the data and starts to wait for changes.
Field Config JSON example:
How We use the FieldConfig in the Step function itself:
You can see here, that I load in the
@setup time the config fields, this INSTANCE is being changed if a new config is loaded.
And while transforming the JsonNode event, I took only fields that appear in the configuration, and transform them by their type.
Do we have a design pattern?! yes!
So, we found a way to influence our stream by bucket change, this can be relevant to schema change or any other configuration, for example, for Geo Ip we use DB of Maxmind. the IP collection can be updated and I want to be aware of this change in my pipeline and look for IPs in the new DB file, I used the same pattern also for it, and created a provider that looks for bucket changes and load them, if I will update the bucket with a new IP DB my stream will automatically identify it and will look on the change.
One disadvantage to remember:
This Singleton class, that open up a thread to look for bucket change isn’t a function step, and you can’t look into it in the DataFlow graph. No logs no statistics….
You may try using a
Side Input collection view that looks into changes and loads them. This solution requires time sliding windows, which can be bad in case you are not processing your data in the time window.
Here is an example of
SideInput with time-slicing:
and using it:
And then, only then iterate function is changed: