QUpid — End-to-end data quality pipelines (Part2)

Zakariae EL MESAOUDI
Nemo Technology
Published in
7 min readJan 11, 2022

In the first part, I talked about the different approaches we can opt for when doing data quality assurance. In this second part, I will present the third and final approach.

3 — The fully automatic approach: End-to-end automation with QUpid

Given that Deequ is a library on top of Apache Spark, the team that wants to adopt this solution must have technical profiles whose jobs will be to write the Deequ code and data engineers who will deploy, execute and exploit its results.

Unlike what the name might suggest, QUpid is not an ancient Roman god (For the curious minds, It’s a wordplay around the sentence: Quality pipelines for data). It’s a tool designed to complement Deequ by relieving the team of this technical endeavor. QUpid offers:

  • A tool that connects with the DBMS, automatically generates and executes Spark/Deequ code, and exports the results in the form of indices and dashboards.
  • A Graphic User Interface that can be used by data profiles who do not necessarily have a technical background.

The next chapter will go into detail about the main concepts, architecture, and how we integrated QUpid with the client’s pre-existing pipeline.

IV — How does QUpid work?

1 — Code generation

As we have seen earlier in this article, Deequ helps you write unit tests for your data. You do that by writing Spark code that uses the functions provided by the Deequ library. This is a tedious manual step that we have to execute whenever there is new data to perform checks on or whenever updates on previous jobs are necessary.

What if we can have a magic pet that deterministically generates correct Spark/Deequ code whenever we need it to? That would be the dream, right? Well, that’s exactly what QUpid does: Code generation.

QUpid takes a JSON Metadata file as input and runs it through its CodeGen Engine module. Let’s say that this module is a black box for now. Its job is to take the file and parse it to make sure the meta definitions specified in it are correct. Once the parsing is done, the module takes the accurate metadata and generates Spark/Deequ code that is ready to be executed in our data lake cluster.

The JSON Metadata file schema:

The metadata file follows this schema:

The file contains a list of table objects. Each table object includes a set of fields that define how to retrieve the data from the given table: table name, table retrieval mode, table columns, data source, etc. In addition to these fields, the table has to contain an analyzers field, which is a list of analyzer objects. Each of these analyzers is specific to a particular metric, and it contains a list of rule objects which define how this metric is to be executed on what columns.

Here is an example of a Metadata file:

The file tells us to generate one Spark/Deequ job for a given table (orc_actionnaire). It specifies the appropriate retrieval mode (the last batch ingested) and the columns needed (*). The job will contain three analyzers for three different columns: Completeness (column numero_personne_host cannot have null values), Compliance (column code_type_piece_identite has to be one of the values [’N’, ‘P’]), and PatternMatch (column code_type_identifiant must contain two-letter values only).

CodeGen Engine:

Earlier, I asked you to consider this module a black box to understand the flow of data that runs in and out of it. Let’s detail the view now. The following is an overview of the elements that make the CodeGen Engine:

The first step is the Metadata file loader/parser. This class takes the JSON file and ensures that it is compliant with the schema we specified earlier, and verifies that the defined rules are implementable. A parsed JSON object is then generated and fed to the jinja-based templating engine.

The templating engine calls two generator classes: AnalyzerGenerator and MetricsAggregatorGenerator. As the name indicates, the AnalyzerGenerator generates as many Analyzer classes (See example in the third chapter) as there are tables defined in the metadata file. The MetricsAggregatorGenerator generates aggregator code that takes the data frames rendered by the analyzer classes and creates a global data frame with the logic necessary to store it as an Elasticsearch index.

The last step takes care of bundling the generated classes into Scala code that is pluggable into a pre-existing Spark job.

This way, we automated the most significant part of the workload. We take a metadata file, and we generate ready-to-run Deequ/Spark jobs from it. Now, I’m sure you’re wondering that the metadata file is still a technical task that requires someone with knowledge of the schema and the data in order to write it. And even if we find someone willing to take on the tedious task of writing a file that might easily surpass thousands of lines, they would still have to take the generated Spark job and run it manually on the cluster. For automation, these are still two bottlenecks that we cannot ignore.

2 — QUpid (Finally!)

Well, we have thought about the previously mentioned bottlenecks, and this is how we put the last pieces of the puzzle together to ensure end-to-end automation:

  • User Interface: We took the code generation logic and created an interactive graphic user interface on top. With a few simple clicks, the user can connect to a database and select the tables, the columns, and the rules required. The interface takes care of generating the metadata file automatically. That’s what makes QUpid.
  • CI/CD: Instead of just bundling the code into a Deequ/Spark job and stop there, we plugged QUpid into the client’s pre-existing CI/CD pipeline. This way, once the job is generated, QUpid pushes the code into Git, and the pipeline takes care of building the binaries and running them on the cluster.

The following is the architecture of the end-to-end pipeline:

The scenario goes as follows:

  1. The user interacts with the QUpid interface to create the data quality job required.
  2. QUpid saves the changes, generates the JSON Metadata file, and pushes it to Git.
  3. The code push triggers a Jenkin’s job responsible for calling the CodeGen Engine and generating the Deequ/Spark code.
  4. The generated code is bundled and pushed to a pre-existing Spark job repository.
  5. The second code push triggers another Jenkin’s job, which tests, builds, and deploys the generated Spark job to the data lake using Ansible.
  6. Ansible downloads the binaries, unarchives them, and launches the Spark job using Oozie.
  7. The job writes the aggregated data frame into Elasticsearch, and the results can be displayed in Kibana.

QUpid — UI

The interface offers the possibility of choosing a database (Hive database in our case) and adding tables from it.

Once the database and the table are picked, the user can choose the columns they want to load along with the appropriate retrieval mode.

The user can now tick the boxes of the metrics they want to use in their job. Each metric has its specificities, and QUpid takes that into consideration.

The following is an example that defines Completeness, Compliance, and PatternMatch metrics:

Kibana dashboards

The dashboard shows gauges with thresholds that represent the global average by metric (in this case: Completeness, Compliance, PatternMatch, Uniqueness). We notice, for instance, that only 64.4% of the data present on the cluster respects the Compliance rules we have defined.

The following charts represent the same average metrics but per table this time: This view allows us to know which tables affect the global average and causing it to drop.

We can go even further into our analysis by displaying the rule scores per table per metric. This way, we can put our finger right on the issue and fix it right away.

V — What are the next steps?

QUpid was designed and developed by OCTO Technology as an internal data quality management tool within the data lake of a well known African bank.

The tool is, therefore, natively dependent on the data lake’s internal piping (DBMS (Apache Hive), Codebase (internal Gitlab), CI/CD pipeline (Jenkins), Elasticsearch/Kibana (internal ES Cluster)).

Having said that, it turns out that QUpid offers a solution to a need that is not necessarily specific to the context it was born for but rather a need requested by the much broader public.

So what are we suggesting?

Since it might serve a wide range of use cases, we suggest taking QUpid out of its original context and open-sourcing it!

The tool will help data profiles (technical or not) achieve large-scale data quality for free and without code hassles.

Stay tuned for the launch of QUpid!

--

--