How to load XML data into BigQuery using Python Dataflow

Parse the XML into a Python dictionary and use Apache Beam’s BigQueryIO

If your data is in Avro, JSON, Parquet, etc. loading it into BigQuery is as easy as running a federated query or using bq load. But what if your data is in XML?

The code for this article is on GitHub in the repository for the book BigQuery: The Definitive Guide.

Input XML document

Assume that the data is available in a file called orders.xml and it contains an example document of Orders (this example is taken from Microsoft’s XSD documentation):

<Root>
<Orders>
<Order>
<CustomerID>GREAL</CustomerID>
<EmployeeID>6</EmployeeID>
<OrderDate>1997-05-06T00:00:00</OrderDate>
<RequiredDate>1997-05-20T00:00:00</RequiredDate>
<ShipInfo ShippedDate="1997-05-09T00:00:00">
<ShipVia>2</ShipVia>
<Freight>3.35</Freight>
<ShipName>Great Lakes Food Market</ShipName>
<ShipAddress>2732 Baker Blvd.</ShipAddress>
<ShipCity>Eugene</ShipCity>
<ShipRegion>OR</ShipRegion>
<ShipPostalCode>97403</ShipPostalCode>
<ShipCountry>USA</ShipCountry>
</ShipInfo>
</Order>
<Order>
...
</Orders>
</Root>

Parse into a Python dictionary

To parse this into a Python dictionary, we’ll use a Python package called xmltodict:

def parse_into_dict(xmlfile):
import xmltodict
with open(xmlfile) as ifp:
doc = xmltodict.parse(ifp.read())
return doc

Given this dict, we can simply deference items to get values. For example, to get the first order, we’d do:

doc = parse_into_dict('orders.xml')
doc['Root']['Orders']['Order'][0] # first order

Specify schema

Specify the schema of the output table in BigQuery. This table will contain Order information, so we simply have to represent the structure of an Order. Because xmltodict creates an OrderedDict, it is very important that you maintain the exact order in which the elements appear in the XML representation of an Order:

table_schema = {
'fields': [
{'name' : 'CustomerID', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name' : 'EmployeeID', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name' : 'OrderDate', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name' : 'RequiredDate', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name' : 'ShipInfo', 'type': 'RECORD', 'mode': 'NULLABLE', 'fields': [
{'name' : 'ShipVia', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name' : 'Freight', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name' : 'ShipName', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name' : 'ShipAddress', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name' : 'ShipCity', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name' : 'ShipRegion', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name' : 'ShipPostalCode', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name' : 'ShipCountry', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name' : 'ShippedDate', 'type': 'STRING', 'mode': 'NULLABLE'},
]},
]
}

A few things to note:

  • Fields like CustomerID, EmployeeID, etc. are defined as primitive fields.
  • The example document stores the EmployeeID as <EmployeeID>3</EmployeeID> and therefore, xmltodict treats it as a text field. Therefore, we have to define it as a String.
  • ShipInfo has child elements, so we make it a record (struct) in BigQuery
  • The attribute ShippedDate is at the very end, following all the child elements. xmltodict actually stores these as \@ShippedDate but the @ symbol is not a legal BigQuery column name, so we’ll have to clean it up to remove the symbol:
def cleanup(x):
import copy
y = copy.deepcopy(x)
if '@ShippedDate' in x['ShipInfo']: # optional attribute
y['ShipInfo']['ShippedDate'] = x['ShipInfo']['@ShippedDate']
del y['ShipInfo']['@ShippedDate']
return y

Given the XML document, we can get the orders one-by-one using:

def get_orders(doc):
for order in doc['Root']['Orders']['Order']:
yield cleanup(order)

Beam Pipeline

Putting all this together, the Beam pipeline to take an XML file and use it to populate a BigQuery table is:

with beam.Pipeline(argv=pipeline_args) as p:
orders = (p
| 'files' >> beam.Create(['orders.xml'])
| 'parse' >> beam.Map(lambda filename: parse_into_dict(filename))
| 'orders' >> beam.FlatMap(lambda doc: get_orders(doc)))
| 'tobq' >> beam.io.WriteToBigQuery(known_args.output,
schema=table_schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)

Output BigQuery table

When the Beam job runs (either locally or in Dataflow), the table gets populated and you can query it. For example:

SELECT * EXCEPT(ShipInfo), ShipInfo.* 
FROM advdata.fromxml
WHERE CustomerID = 'LETSS'

gives us:

Next steps

  1. Try the full code (available in GitHub) in AI Platform Notebooks.
  2. Add --runner DataflowRunner to the code to run it in Dataflow
  3. To learn more about loading data into BigQuery, read Chapter 4 of BigQuery: The Definitive Guide. The book is periodically updated with these blog posts so that it remains, well, definitive.

Enjoy!

--

--

--

A collection of technical articles and blogs published or curated by Google Cloud Developer Advocates. The views expressed are those of the authors and don't necessarily reflect those of Google.

Recommended from Medium

Alternatives to Jira in 2021 for Self-Hosted

The future of applications

My $0.02 on Is Worse Better?

Support Engineer at Microsoft, Hyderabad: Applications Open

Why We Are Saying Software Development Life Cycle

Making reusable, configurable, and clean UI

Technology Radar — April 2021 review — Part 1

Take your cow anywhere with Azure functions

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Lak Lakshmanan

Lak Lakshmanan

Operating Executive at a technology investment firm; articles are personal observations and not investment advice.

More from Medium

Working with JSON data in BigQuery

Connecting Steampipe with Google BigQuery

Cloud Data Fusion: Update deployed pipelines through REST API

GCP: How To Sync Cloud SQL with BigQuery

https://cloud.google.com/bigquery#section-9