Dataflow Design Pattern: Dynamic Streaming pipeline : Dealing with mutable JSON schema
Streaming into BigQuery with real-time schema update in Bigquery.
Why: Google has provided design template of Pubsub to Bigquery, where you specify PubSub subscription from where you read JSON object, then Bigquery table as the target and Bigquery schema. Work perfect, right? Ok, what if your requirement is you want to store you JSON data in structure Bigquery table, BUT your schema is unknown, or it can get updated over the period of time. So you think every time schema got updated you update the table, and reconfigure dataflow job with updated schema and a restart? Oh no, a big turn-off for using Dataflow.
Naive Solutions: So what are your options?
- Discard streaming pipeline, put data into GCS -> Then detect the schema from JSON files -> Update Bigquery table -> Load GCS to Bigquery Simple solution but it's not streaming.
- Have a streaming pipeline -> Apply a window -> Detect schema of messages in window -> Update Bigquery table -> Load into bigquery Quite efficient solution, BUT still its not real-time (or so called near realtime)
An Expert Solution : Dynamic Streaming pipeline
Lets not have a staging area like GCS, lets not let record to wait till window process Read from Pubsub -> Parse & Flatton JSON -> Validate schema with in memory schema -> Update Bigquery if needed -> Write to Bigquery
Will it work? here are challenges
- Validate schema for each record will slow down processing No, have schema definition in-memory, this will eliminate Bigquery schema pull hits
- How will you define Bigquery table sink With beam sdk 2.0, apache introduced a cool feature called DynamicDestinations for BigQueryIO. With this you can runtime decide the destination table for the record. https://beam.apache.org/documentation/sdks/javadoc/2.0.0/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.Write.html#to-org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinations
- How will you deal with new tables: Again same can be tackled with DynamicDestinations, once you have new table, just give it to DynamicDestinations sink. Done.
- How will you update Bigquery Once you detect change in schema : you will use Bigquery client sdk to create, update table
- How schema update will work with multiple nodes at runtime? Here is trick, when you will detect schema, there is the possibility of other workers has detected same, you can use either lock
Step 1. Read from pubsub:
Read json messages from PubSub. Provide subscription details to PubSub source
Pipeline p = Pipeline.create(pipelineOptions);
Collection<String> inLogsFromPubSub = p.apply(PubsubIO.readStrings().fromSubscription(StaticValueProvider.of(config.getProperty("subscription"))));
Step 2: Parse JSON message to load into map
This is an important step in the pattern, where you will do following things,
- Column name : Convert json path into column name eg. user_info.user_city -> userinfo_usercity
- Table name : Determine the tablename based on message type. eg. event_type can be usered determine table. Put it in tablename=’prefix’+event_type
- Partition time : Catpture the timestamp field to get Partition time. eg. event_time can be used as timestamp. Put it in partition=date(event_time)
- Error messages : It will not be the case always you will get good messages, so be prepared, mark error messages to error table
Beauty of this pipeline is you can do anything with this parser. If you want static table, instead of seperate table just change implementation of tablename name calculation. If you want to process xml instead of JSON, just change implementation of Parsor to process xml. Your flow will be same.
Step 3. Check BigQuery schema
This is the most tricky part in the pipeline.
- To capture schema in in-memory, use a hash map with tablename as key and schema as value
- Pseudo code will be like a. Check if table is present in hashmap, if not hit bigquery, fetch schema now again check if its there, if not create BigQuery table b. Check if schema of event is same as schema in Hashmap, if not fetch latest schema from BigQuery check again, if still has difference update BigQuery c. After each fetch schema, Create Table and update table request we need to update hashmap for the future record In the diagram, we can see grey boxes represents BigQuery hit. BigQuery only gets hit if there is any mismatch in schema. So, changes in schema will get handled gracefully without any manual intervention
Step 4. Route message to BigQuery table
Now we have all the information ready, we have a table name for the record, now how will write into a sink? DynamicDestinations is to rescue. Use dynamic destination, implement getTable, getPartition & get schema method to process the BigQuery write.
- Column Datatype: As you cannot detect column type from one value, you can not have a correct datatype, hence instead of actual datatype we need to use String type for any new column. Although if tables are pre-created with existing types, it will work fine with it.
Enhancements: Currently parser provided classes are not used like interfaces, we can change it to an interface instead of class type.
Conclusion: This solution works great with any type of data. We have tried and tested this pipeline. This generalized pipeline helps to integrate Pubsub to Bigquery with just a config file. You can save around 2weeks of the estimate with this. Also, it is very possible to write custom JSON parser to have a custom column name, custom destination table, and custom partition.mn name, custom destination table, and custom partition.
Any feedback and questions are welcome.
Source Code : GitHub