Data Observability - Metadata Collection

Sojirath Thunprateep
CJ Express Tech (TILDI)
5 min readNov 27, 2022

Data Observability enables the visibility of the organization data’s health, we can make sure that the data in our system is trustworthy.

Data Volumes is one of the data observability pillars. It shows us the completeness of the data at each of stages in the system.
So that metadata collection is important to data platform management. Metadata will be visualized and monitored the data health.

This article will explain about how the metadata collector is implemented on the data platform I’m working with the TILDI team.

· Data pipeline overview
· What is metadata ?
· Metadata collection development
1. Write metadata
1.1 Write data function
1.2 Spark listener
Python class from Java interface
SparkListner from QueryExecutionListener
2. Record metadata

Data pipeline overview

Data pipeline overview

Let me introduce briefly about the data platform I’m working on with my team at TILDI CJ Express.

We use Airflow as the data pipeline orchestrator, deployed on Kubernetes. Google Cloud Storage is storage of data lakehouse which is managed by Apache Hudi. Python and Spark are the main programming languages to process or transform data.

What is metadata ?

Metadata is the information describes about the data. For our platform, we mainly focus on number of rows and data file size at each of stages.

Metadata collection implementation

An overview process of metadata collection is simply designed as 2 main steps, write and record.
Firstly, we get metadata from the data and write it as a metadata file on GCS. Then, the data pipeline will trigger a metadata collector DAG to get job information through Airflow API and read metadata files. Finally, metadata is recorded to MySQL database. The metadata records will be visualized on Grafana for monitoring.

We do it at each of data stages on the data pipelines. Let’s go deep into each of steps here.

1. Write metadata

Since we use Python scripts and Spark as main data-processing tools. We developed two different ways of writing metadata files, applied for the different cases.

1.1 Write data function

This case it quite simple, it’s just a Python function utilized by my team to all data pipelines. The function just accept data as an input, write data to data lake and finally it write metadata to GCS on the specific path.

But the point is how to tell the metadata reader these metadata file paths. So we utilize the Airflow XComs to push the metadata file names, which we can retrieve XCom values through Airflow API.

1.2 Spark listener

Since we take advantages of Spark distributed computing to process data, we design to use Spark listener to write metadata after finishing the data transformation. We’ve designed the metadata file name by the unique Spark application names, it means the metadata reader also can get this Spark application name for the metadata file paths from the Airflow API like the previous case.

Spark QueryExecutionListener interface has been provided as java class. Luckily, we can use Py4J to implement Python Spark listener class from the Java class.

Python class from Java interface

  • We simply add internal class called Java with the implements member, list of fully qualified name of implemented Java interfaces.
Python class from Java interface implementation
  • To use the modified class as Spark listener, we need to start a gateway and register the SparkListener by its instance. Unregistering the SparkListener and shutting down the gate way is required when stopping the Spark session.
Example of utilization of the modified SparkListener

SparkListener from QueryExecutionListener

  • Query Execution metrics(qe) provided the important information from data processing. It’s utilized from the params pass through to onSuccess method. However, the information is provided only for standard data file formats such as csv or parquet.
  • Due to the Hudi data lake, we cannot get the metadata directly from the qe metrics like the normal case. For Hudi lake, we need to summarize metadata from the Hoodie commit file, which we will discuss it the next article.

2. Record metadata

This step, we’re focusing on reading the metadata files. However, we also retrieve Airflow job information by an API. Not only the row counts and file bytes but also the job status and processing time will be recorded too.

The metadata reading is started when the data pipeline ends and triggers the collect metadata DAG, with passing the DAG id and DAG run id.

  • Airflow API is called to get DAG run status and processing time.
  • Process continues when the status is success, Airflow API is called again to retrieve the XCom values to know where the metadata files are located.
  • Metadata is extracted from the metadata file and recorded to the MySQL database.

Conclusion

Metadata collector is implemented to collect metadata from each of data pipeline stages. There are 2 main parts which are writing and recording metadata. We use Python script and SparkListener to manage the writing metadata part. For the latter part, the system reads metadata from files, calls Airflow API to get some information and finally record them to MySQL database.
We visualize metadata on Grafana to monitor data volumes, which is one of pillars of data observability. The metadata helps us to make sure the trustworthy to the data in the system.

References

--

--