How to load XML data into BigQuery using Python Dataflow

Parse the XML into a Python dictionary and use Apache Beam’s BigQueryIO

Lak Lakshmanan
Google Cloud - Community
3 min readJun 29, 2020

--

If your data is in Avro, JSON, Parquet, etc. loading it into BigQuery is as easy as running a federated query or using bq load. But what if your data is in XML?

The code for this article is on GitHub in the repository for the book BigQuery: The Definitive Guide.

Input XML document

Assume that the data is available in a file called orders.xml and it contains an example document of Orders (this example is taken from Microsoft’s XSD documentation):

<Root>
<Orders>
<Order>
<CustomerID>GREAL</CustomerID>
<EmployeeID>6</EmployeeID>
<OrderDate>1997-05-06T00:00:00</OrderDate>
<RequiredDate>1997-05-20T00:00:00</RequiredDate>
<ShipInfo ShippedDate="1997-05-09T00:00:00">
<ShipVia>2</ShipVia>
<Freight>3.35</Freight>
<ShipName>Great Lakes Food Market</ShipName>
<ShipAddress>2732 Baker Blvd.</ShipAddress>
<ShipCity>Eugene</ShipCity>
<ShipRegion>OR</ShipRegion>
<ShipPostalCode>97403</ShipPostalCode>
<ShipCountry>USA</ShipCountry>
</ShipInfo>
</Order>
<Order>
...
</Orders>
</Root>

Parse into a Python dictionary

To parse this into a Python dictionary, we’ll use a Python package called xmltodict:

def parse_into_dict(xmlfile):
import xmltodict
with open(xmlfile) as ifp:
doc = xmltodict.parse(ifp.read())
return doc

Given this dict, we can simply deference items to get values. For example, to get the first order, we’d do:

doc = parse_into_dict('orders.xml')
doc['Root']['Orders']['Order'][0] # first order

Specify schema

Specify the schema of the output table in BigQuery. This table will contain Order information, so we simply have to represent the structure of an Order. Because xmltodict creates an OrderedDict, it is very important that you maintain the exact order in which the elements appear in the XML representation of an Order:

table_schema = {
'fields': [
{'name' : 'CustomerID', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name' : 'EmployeeID', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name' : 'OrderDate', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name' : 'RequiredDate', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name' : 'ShipInfo', 'type': 'RECORD', 'mode': 'NULLABLE', 'fields': [
{'name' : 'ShipVia', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name' : 'Freight', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name' : 'ShipName', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name' : 'ShipAddress', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name' : 'ShipCity', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name' : 'ShipRegion', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name' : 'ShipPostalCode', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name' : 'ShipCountry', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name' : 'ShippedDate', 'type': 'STRING', 'mode': 'NULLABLE'},
]},
]
}

A few things to note:

  • Fields like CustomerID, EmployeeID, etc. are defined as primitive fields.
  • The example document stores the EmployeeID as <EmployeeID>3</EmployeeID> and therefore, xmltodict treats it as a text field. Therefore, we have to define it as a String.
  • ShipInfo has child elements, so we make it a record (struct) in BigQuery
  • The attribute ShippedDate is at the very end, following all the child elements. xmltodict actually stores these as \@ShippedDate but the @ symbol is not a legal BigQuery column name, so we’ll have to clean it up to remove the symbol:
def cleanup(x):
import copy
y = copy.deepcopy(x)
if '@ShippedDate' in x['ShipInfo']: # optional attribute
y['ShipInfo']['ShippedDate'] = x['ShipInfo']['@ShippedDate']
del y['ShipInfo']['@ShippedDate']
return y

Given the XML document, we can get the orders one-by-one using:

def get_orders(doc):
for order in doc['Root']['Orders']['Order']:
yield cleanup(order)

Beam Pipeline

Putting all this together, the Beam pipeline to take an XML file and use it to populate a BigQuery table is:

with beam.Pipeline(argv=pipeline_args) as p:
orders = (p
| 'files' >> beam.Create(['orders.xml'])
| 'parse' >> beam.Map(lambda filename: parse_into_dict(filename))
| 'orders' >> beam.FlatMap(lambda doc: get_orders(doc)))
| 'tobq' >> beam.io.WriteToBigQuery(known_args.output,
schema=table_schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)

Output BigQuery table

When the Beam job runs (either locally or in Dataflow), the table gets populated and you can query it. For example:

SELECT * EXCEPT(ShipInfo), ShipInfo.* 
FROM advdata.fromxml
WHERE CustomerID = 'LETSS'

gives us:

Next steps

  1. Try the full code (available in GitHub) in AI Platform Notebooks.
  2. Add --runner DataflowRunner to the code to run it in Dataflow
  3. To learn more about loading data into BigQuery, read Chapter 4 of BigQuery: The Definitive Guide. The book is periodically updated with these blog posts so that it remains, well, definitive.

Enjoy!

--

--

Lak Lakshmanan
Google Cloud - Community

articles are personal observations and not investment advice.