Transforming XML to JSON for BigQuery Integration

Naresh Erukulla
3 min read2 hours ago

--

As a Data Engineer, I often encounter challenges that require innovative solutions. Recently, I faced a problem where streaming data came in XML format, but our data processing pipeline, which uses Google Cloud Dataflow and BigQuery, requires data in JSON format. To address this, I developed a Python solution to convert incoming XML data to JSON, ensuring smooth integration with BigQuery. In this article, I’ll walk you through the solution, provide a sample implementation, and share some insights for professionals in the field.

The Problem

XML (Extensible Markup Language) is a widely used format for data interchange. However, BigQuery, a powerful data analytics platform, does not natively support XML. This necessitates a conversion to JSON (JavaScript Object Notation), which is a more compatible and lightweight data interchange format. The challenge was to design a system that listens to a Pub/Sub topic, processes XML messages, converts them to JSON, and then loads them into BigQuery.

The Solution

The solution involves a Python script that subscribes to a Google Cloud Pub/Sub topic, processes the XML messages, converts them to JSON, and prepares them for loading into BigQuery. The following is a detailed explanation of the implementation.

  • Setting Up Pub/Sub Subscriber

We start by setting up a Pub/Sub subscriber to listen for incoming messages:

import json
import xml.etree.ElementTree as ET
from google.cloud import pubsub_v1
project_id = "your-project-id"
subscription_id = "your-subscription-id"
subscriber_client = pubsub_v1.SubscriberClient()
subscription_path = subscriber_client.subscription_path(project_id, subscription_id)
  • Parsing XML to JSON

The core functionality involves parsing XML data into a dictionary format, which is then easily convertible to JSON. Here’s the function to parse XML:

def get_local_name(tag):
return tag.split('}')[-1]
def parse_xml_to_dict(elem):
if not list(elem) and elem.text:
return elem.text.strip()
result = {}
for child in elem:
child_tag = get_local_name(child.tag)
if not child.attrib:
child_result = parse_xml_to_dict(child)
if child_tag in result:
if not isinstance(result[child_tag], list):
result[child_tag] = [result[child_tag]]
result[child_tag].append(child_result)
else:
result[child_tag] = child_result
else:
child_result = parse_xml_to_dict(child)
for key, value in child.attrib.items():
attributes = {}
attributes['id'] = value
attributes['value'] = child_result
if child_tag in result:
if not isinstance(result[child_tag], list):
result[child_tag] = [result[child_tag]]
result[child_tag].append(attributes)
else:
result[child_tag] = attributes

if elem.attrib:
attributes = {get_local_name(key): value for key, value in elem.attrib.items()}
result['@attributes'] = attributes

return result
  • Handling Messages and Conversion

The callback function handles incoming messages, decodes the XML, converts it to JSON, and acknowledges the message:

def callback(message):
print(f"Received message: {message}")
try:
xml_data = message.data.decode('utf-8')
root = ET.fromstring(xml_data)

body = root.find('.//{http://schemas.xmlsoap.org/soap/envelope/}Body')
if body is None:
raise ValueError("SOAP Body not found in the message")

filename = 'converted_json_message.json'
with open(filename, 'w') as file:
json_data = json.dumps(parse_xml_to_dict(body))
file.write(json_data + '\n')

message.ack()
except Exception as e:
print(f"Error processing message: {e}")
message.nack()
  • Running the Subscriber

Finally, the subscriber listens for incoming messages:

streaming_pull_future = subscriber_client.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}...")
try:
streaming_pull_future.result()
except KeyboardInterrupt:
streaming_pull_future.cancel()
  • Dummy Data Example

To illustrate, let’s consider a sample XML message:

<Envelope xmlns="http://schemas.xmlsoap.org/soap/envelope/">
<Body>
<Message>
<ID>12345</ID>
<Content>Hello, world!</Content>
</Message>
</Body>
</Envelope>

The script converts this to the following JSON:

{
"Message": {
"ID": "12345",
"Content": "Hello, world!"
}
}

Conclusion

This Python-based solution effectively transforms XML data into JSON format, enabling seamless integration with BigQuery. It demonstrates a practical approach to handling format conversion in data pipelines, crucial for real-time data processing.

Feel free to connect with me on LinkedInConnect or Medium for more insights and updates. Sharing knowledge and learning from the community are vital parts of our growth as professionals. Let’s continue to innovate and solve challenges together!

I hope this article helps you understand the process of converting XML data to JSON for use in BigQuery using Python. If you have any questions or need further assistance, please leave a comment or reach out directly. Happy coding!

--

--