How to get a Beam schema from a BigQuery schema JSON file
Beam schemas are a way to write more concise pipelines, using Beam SQL as well as higher level code APIs (e.g. joins).
If you write a class in Java for your data, it is relatively straightforward to get the corresponding schema for your pipeline. But what if you have a table schema file from BigQuery, in JSON format, and you want to apply it as the schema for a PCollection?
One possible situation in which you would want to do that is having a dynamic schema specified as an input parameter of your pipeline.
For instance, this is what a BigQuery JSON schema file looks like:
{
"id": "bigquery-public-data:github_repos.commits",
"kind": "bigquery#table",
"numRows": "262745001",
"schema": {
"fields": [
{
"mode": "NULLABLE",
"name": "commit",
"type": "STRING"
},
{
"mode": "REQUIRED",
"name": "tree",
"type": "STRING"
},
...
The file has a field named schema
that contains all the schema information. That field has another field of name fields
which is a list of table schema fields.
The TableSchema class is a GenericJson
class that assumes that there is a list field of name fields.
So we can use it to try to parse any schema from BigQuery, as long as we extract the schema
field, and use the class TableSchema
to parse that field.
Apache Beam already uses Jackson internally for XML, so we can leverage Jackson to extract the schema
field:
ObjectMapper mapper = new ObjectMapper();
JsonNode topNode = mapper.readTree(schemaAsString);
JsonNode schemaRootNode = topNode.path("schema");
We can now transform back the schemaRootNode
to a JSON string, and then use that to map it to a TableSchema
TableSchema tableSchema = defaultJsonFactory.fromString(
schemaRootNode.toString(),
TableSchema.class);
And now, we can simply transform that to a Beam schema thanks to the BigQueryUtils module of BigQueryIO:
Schema schema = BigQueryUtils.fromTableSchema(tableSchema);
You can then use that Beam schema in your pipeline, for instance to parse JSONs with dynamic schemas (using the Beam Row class):
ParseResult parsedJson = jsonStrings.apply("Parse JSON",
JsonToRow.withExceptionReporting(schema));
Now you can use BigQuery to code your schemas and use it in your pipelines to load data with dynamic schemas. I love using the create table wizard of BigQuery to specify complex schemas in a visual way (with lists, nested fields, etc), create a dummy table, export that schema as a JSON file, and use it in my Beam pipelines to support dynamic schemas.