How to get a Beam schema from a BigQuery schema JSON file

Israel Herraiz
Google Cloud - Community
2 min readAug 22, 2022

--

Photo by Ferenc Almasi on Unsplash

Beam schemas are a way to write more concise pipelines, using Beam SQL as well as higher level code APIs (e.g. joins).

If you write a class in Java for your data, it is relatively straightforward to get the corresponding schema for your pipeline. But what if you have a table schema file from BigQuery, in JSON format, and you want to apply it as the schema for a PCollection?

One possible situation in which you would want to do that is having a dynamic schema specified as an input parameter of your pipeline.

For instance, this is what a BigQuery JSON schema file looks like:

{
"id": "bigquery-public-data:github_repos.commits",
"kind": "bigquery#table",
"numRows": "262745001",
"schema": {
"fields": [
{
"mode": "NULLABLE",
"name": "commit",
"type": "STRING"
},
{
"mode": "REQUIRED",
"name": "tree",
"type": "STRING"
},
...

The file has a field named schema that contains all the schema information. That field has another field of name fields which is a list of table schema fields.

The TableSchema class is a GenericJson class that assumes that there is a list field of name fields. So we can use it to try to parse any schema from BigQuery, as long as we extract the schema field, and use the class TableSchema to parse that field.

Apache Beam already uses Jackson internally for XML, so we can leverage Jackson to extract the schema field:

ObjectMapper mapper = new ObjectMapper();
JsonNode topNode = mapper.readTree(schemaAsString);
JsonNode schemaRootNode = topNode.path("schema");

We can now transform back the schemaRootNode to a JSON string, and then use that to map it to a TableSchema

TableSchema tableSchema = defaultJsonFactory.fromString(
schemaRootNode.toString(),
TableSchema.class);

And now, we can simply transform that to a Beam schema thanks to the BigQueryUtils module of BigQueryIO:

Schema schema = BigQueryUtils.fromTableSchema(tableSchema);

You can then use that Beam schema in your pipeline, for instance to parse JSONs with dynamic schemas (using the Beam Row class):

ParseResult parsedJson = jsonStrings.apply("Parse JSON",
JsonToRow.withExceptionReporting(schema));

The full code for parsing the BigQuery schema, including all the details of the imports, is available in a Gist in Github.

Now you can use BigQuery to code your schemas and use it in your pipelines to load data with dynamic schemas. I love using the create table wizard of BigQuery to specify complex schemas in a visual way (with lists, nested fields, etc), create a dummy table, export that schema as a JSON file, and use it in my Beam pipelines to support dynamic schemas.

--

--