How to load XML data into BigQuery using Python Dataflow

Parse the XML into a Python dictionary and use Apache Beam’s BigQueryIO

Lak Lakshmanan
Jun 29, 2020 · 3 min read

If your data is in Avro, JSON, Parquet, etc. loading it into BigQuery is as easy as running a federated query or using bq load. But what if your data is in XML?

The code for this article is on GitHub in the repository for the book BigQuery: The Definitive Guide.

Input XML document

Assume that the data is available in a file called orders.xml and it contains an example document of Orders (this example is taken from Microsoft’s XSD documentation):

<Root>
<Orders>
<Order>
<CustomerID>GREAL</CustomerID>
<EmployeeID>6</EmployeeID>
<OrderDate>1997-05-06T00:00:00</OrderDate>
<RequiredDate>1997-05-20T00:00:00</RequiredDate>
<ShipInfo ShippedDate="1997-05-09T00:00:00">
<ShipVia>2</ShipVia>
<Freight>3.35</Freight>
<ShipName>Great Lakes Food Market</ShipName>
<ShipAddress>2732 Baker Blvd.</ShipAddress>
<ShipCity>Eugene</ShipCity>
<ShipRegion>OR</ShipRegion>
<ShipPostalCode>97403</ShipPostalCode>
<ShipCountry>USA</ShipCountry>
</ShipInfo>
</Order>
<Order>
...
</Orders>
</Root>

Parse into a Python dictionary

To parse this into a Python dictionary, we’ll use a Python package called xmltodict:

def parse_into_dict(xmlfile):
import xmltodict
with open(xmlfile) as ifp:
doc = xmltodict.parse(ifp.read())
return doc

Given this dict, we can simply deference items to get values. For example, to get the first order, we’d do:

doc = parse_into_dict('orders.xml')
doc['Root']['Orders']['Order'][0] # first order

Specify schema

Specify the schema of the output table in BigQuery. This table will contain Order information, so we simply have to represent the structure of an Order. Because xmltodict creates an OrderedDict, it is very important that you maintain the exact order in which the elements appear in the XML representation of an Order:

table_schema = {
'fields': [
{'name' : 'CustomerID', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name' : 'EmployeeID', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name' : 'OrderDate', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name' : 'RequiredDate', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name' : 'ShipInfo', 'type': 'RECORD', 'mode': 'NULLABLE', 'fields': [
{'name' : 'ShipVia', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name' : 'Freight', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name' : 'ShipName', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name' : 'ShipAddress', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name' : 'ShipCity', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name' : 'ShipRegion', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name' : 'ShipPostalCode', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name' : 'ShipCountry', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name' : 'ShippedDate', 'type': 'STRING', 'mode': 'NULLABLE'},
]},
]
}

A few things to note:

def cleanup(x):
import copy
y = copy.deepcopy(x)
if '@ShippedDate' in x['ShipInfo']: # optional attribute
y['ShipInfo']['ShippedDate'] = x['ShipInfo']['@ShippedDate']
del y['ShipInfo']['@ShippedDate']
return y

Given the XML document, we can get the orders one-by-one using:

def get_orders(doc):
for order in doc['Root']['Orders']['Order']:
yield cleanup(order)

Beam Pipeline

Putting all this together, the Beam pipeline to take an XML file and use it to populate a BigQuery table is:

with beam.Pipeline(argv=pipeline_args) as p:
orders = (p
| 'files' >> beam.Create(['orders.xml'])
| 'parse' >> beam.Map(lambda filename: parse_into_dict(filename))
| 'orders' >> beam.FlatMap(lambda doc: get_orders(doc)))
| 'tobq' >> beam.io.WriteToBigQuery(known_args.output,
schema=table_schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)

Output BigQuery table

When the Beam job runs (either locally or in Dataflow), the table gets populated and you can query it. For example:

SELECT * EXCEPT(ShipInfo), ShipInfo.* 
FROM advdata.fromxml
WHERE CustomerID = 'LETSS'

gives us:

Next steps

  1. Try the full code (available in GitHub) in AI Platform Notebooks.
  2. Add --runner DataflowRunner to the code to run it in Dataflow
  3. To learn more about loading data into BigQuery, read Chapter 4 of BigQuery: The Definitive Guide. The book is periodically updated with these blog posts so that it remains, well, definitive.

Enjoy!

Google Cloud - Community

Google Cloud community articles and blogs

Google Cloud - Community

A collection of technical articles and blogs published or curated by Google Cloud Developer Advocates. The views expressed are those of the authors and don't necessarily reflect those of Google.

Lak Lakshmanan

Written by

Data Analytics & AI @ Google Cloud

Google Cloud - Community

A collection of technical articles and blogs published or curated by Google Cloud Developer Advocates. The views expressed are those of the authors and don't necessarily reflect those of Google.

Medium is an open platform where 170 million readers come to find insightful and dynamic thinking. Here, expert and undiscovered voices alike dive into the heart of any topic and bring new ideas to the surface. Learn more

Follow the writers, publications, and topics that matter to you, and you’ll see them on your homepage and in your inbox. Explore

If you have a story to tell, knowledge to share, or a perspective to offer — welcome home. It’s easy and free to post your thinking on any topic. Write on Medium

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store