DataDoc — The Criteo Data Observability Platform

How we regained control of our data ecosystem and tackled governance issues

Maxime Kestemont
Criteo R&D Blog
Published in
16 min readJul 2, 2020

--

A bit of history

Data is at the core of Criteo’s business, as this is powering our ML models and predictive capacities. Leveraging the most of this asset is thus critical and as such, we have heavily invested in developing tools to reduce the barrier to create new datasets. Among others, we developed our own internal schedulers (one being open-sourced), we abstracted a lot of the complexity of data processing engines, and we are developing our very own self-service data production platform using a high-level declarative approach with integrated scheduling, regression testing and monitoring. All of that so that we can democratize data production at Criteo.

This direction is nothing new, and we already leaned towards it several years ago when we provided our analysts with a simple platform, which was mostly scheduling SQL files and triggering hive jobs based on them. This was, from a pure data production perspective, a massive success. Several hundreds of datasets were quickly added in production by various people or teams, not necessarily technical ones (which was one of the very first goal of this initiative).

However, this had the pretty nasty consequence of growing into a mess, through issues such as hidden dependencies between datasets, no monitoring or alerting whatsoever, unclear ownership, or redundant datasets. Easing datasets creation while not doing the same thing on the observability part quickly revealed its limitations. It probably peaked the day when our data engineering manager got called during his holidays because a critical client-facing dashboard was not updated for the last days, and he and his teams struggled during several days just so they could find what was the datasets lineage leading to the production of this final dashboard. This war story sparked the creation of our first data catalog tool, which has greatly evolved since its inception.

In retrospect, we basically hit, in our very own way and on a similar time frame, the traditional governance problems that a lot of big companies have faced in the last years. Indeed, there has been a growing interest lately among the industry on getting better control over one’s data ecosystem and improving its operational efficiency. This is further accentuated by the increasing regulatory and compliance requirements (such as GDPR), which highlights, even more, the need for such tooling. Applications such as Amundsen (Lyft), DataHub (Linkedin), Databook (Uber) or Metacat (Netflix) are, among others, some of the answers big tech companies have developed to tackle this issue.

Introducing DataDoc

When investigating the existing data catalog/data discovery/metadata engine, we came to the conclusion that our own attempt, named DataDoc, was better tailored to Criteo’s needs, especially as we wanted to push it further than only a data discovery tool. Proposing an actual data observability platform, that would answer the needs of both data producers and consumers, have been our long term vision fueling this effort and as such, we want to present the key features of our solution as well as share our vision for its future roadmap.

The intent of DataDoc is to solve data governance issues. In the remainder of this article we will dive into its main features and the specific problems that they help to solve:

  • Data discovery, being able to easily find and discover new datasets that are relevant to your use case and then getting insights and context about those datasets
  • Data availability, so that data consumers know which part of the dataset can be accessed and data producers can monitor their workflow and its progress
  • Data lineage, making dependencies between datasets explicit and supporting transversal analysis over multiple datasets at once

On tackling data discovery

At Criteo, data analytics and data science is an integral part of our business. The performed analysis range from getting a macro view of market ebbs and flows to gain detailed insights into customer campaigns and user behavior. It is ironic then how it is not until relatively recently that we have been able to perform a similar analysis on Criteo’s data graph as a whole.
Historically, analysts and data scientists would hit upon a wall when trying to perform a search for data. Criteo’s data-ecosystem is comprised of thousands of datasets that exist not in one system but across multiple heterogeneous systems (Hive, Presto, HDFS, MS SQL, Vertica,…) with varying degree of searchability. A simple search by name would prove to be a Sisyphean task, where the user would have to invoke a search query in different flavors towards multiple different systems.

To address this data discovery issue, DataDoc proposition is a federated view of these different systems, enabling one search query to make a lookup in all of them. For instance, there exists a large number of tables centered around ad clicks at Criteo (being an adtech company…), most of them, therefore, containing “clicks” in their name. A search on “clicks” will thus leverage this naming convention and give the user an immediate overview of click-centered datasets. However, searching by name only goes so far, and by making the metadata describing our datasets searchable, users are able to ask more specific questions:

  • analysts and data scientists frequently perform exploratory work where new combinations of metrics are needed to gain insights. Rather than performing endless joins and aggregations, they will start by searching for a dataset that could satisfy this need. By extending search on datasets schema metadata, users can now request datasets containing a specific set of fields or even more complex conditional statements. The benefit is two-fold: the search helps users leverage already existing data whilst reducing the waste of building pipelines producing datasets that to a great extent overlap with already existing datasets.
  • DataDoc provides a generic tag mechanism to extend the technical metadata with business and user-defined metadata. This enrichment of metadata enables a number of different use-cases, such as searching for datasets given a specific topic. As an example, due to the vast size of the Criteo data graph, it is hard to guarantee perfect data quality across the board. What has been done is to identify a subgraph of critical datasets for which a high level of data quality is enforced and to tag those datasets as such in DataDoc. As a result, data consumers can search for datasets where a high level of data quality is guaranteed.
Searching for clicks datasets with a “click_id” field and data available in the 2020–01–01 and 2020–06–01 range

The first iteration of Criteo’s data catalog provided the federated search along with rich technical metadata (such as schema definition, location, or sampled values). This alleviated the pain of data discovery to some extent. Nonetheless, one of the major pain points for data consumers and producers alike is that technical metadata alone does not capture the semantics of a dataset. Data consumers would dive into datasets with promising names and familiar-sounding fields but quite soon realize that the queries they ran produced incorrect results, or that those familiar fields were not actually representing what they were expecting. In addition, organizational factors made the pain worse. Seeing that Criteo is a geographically distributed company spanning multiple time-zones, finding the right owner and inquire about a dataset would often be a days-long ordeal.

Consequently, for the second iteration, the focus was put into enriching the technical metadata with business and user-defined metadata. This way, the semantics and thereby the higher-level intent of the dataset could be conveyed. The analogy used when pitching these features was that in traditional software engineering, documentation of source code signals a mature and well-maintained codebase that facilitates ease of use for clients. The very same principles should be applied to datasets production, they should be well-documented to make the datasets clear and easy to use, and as such we arrived at the name DataDoc.

Example of dataset documentation, with a mix of technical and business metadata

DataDoc was introduced on top of an already existing ecosystem of thousands of mostly undocumented datasets. As such, the decision was taken to support a collaborative form of documentation. Users now have the means to directly contribute and see past contributions to the user-defined metadata. Part of the bootstrapping of the documentation effort was to identify a subset of the data graph where usage was the heaviest and the enrichment of metadata would have the most impact. This approach revealed itself to be quite successful, as DataDoc is now widely used at Criteo and is the primary tool to share and refer to datasets documentation.

Data availability

At Criteo, most datasets are computed in a time series fashion, meaning that each week/day/hour, new partition (or subpartition) will be computed and added to the existing dataset. Getting an accurate view on what partitions have been computed for a given dataset can thus be quite tedious, especially when you need to be careful of potential holes (missing partitions) that could impact your analysis (for instance when requesting multiple days of data). Not all systems provide utility functions to get a grasp on this data availability, and even when provided (e.g. Hive “show partitions” function), it can be very error-prone and time-consuming as you can easily fail to spot a gap in your dataset.

Moreover, lacking a source of truth for data availability has always been a major pain point at Criteo. Some of our schedulers have their own internal state about whether a partition has been computed or not, which can diverge from the physical state on HDFS — our main storage system. Analysts rely mostly on Hive, but the Hive Metastore can diverge as well from HDFS. Or they use Vertica (distributed SQL query engine widely used at Criteo), which again can diverge as it has its own storage and datasets first need to be exported there. Even production jobs can be impacted, as they often rely on inputs data coming from other jobs, but actually detecting if the needed data is partially or fully present requires domain knowledge that can be outside of the scope of the job. For instance, one job can rely on the daily data outputted by another job, but this latter job produces its outputs by day/country partitions. The first job should only start when the full day has been computed, but it has no notion of “country” — so no way to know when it should start (except by reimplementing this logic in the job itself, or by explicitly depending on the first job code).

Those divergences in availability between systems, as well as the fact that there is no centralized definition of “done” for a specific date/partition, can lead to some nasty consequences and more generally to the introduction of several hacks and workarounds to support particular edge cases. Consequently, we spend significant times nailing down this issue in DataDoc, as it was impacting both data producers and consumers.

Data availability for the last 4 months: several holes are present (missing partitions) and the blue color indicates that this specific partition is being processed right now

In the above visualization, we display data availability through a calendar view, with a color code indicating if a dataset is fully available or incomplete for a given day or hour. Additionally, users have the possibility to drill down on a specific box to get a more granular view displaying sub-partitions info as well. A search feature supporting regex filtering is present as well so that power users can easily perform more advanced queries on the availability of a dataset (for instance, filtering on all data related to a specific country partition).

This feature has been a massive success, especially for data consumers. So much that we realized at some point that even data producers were starting to use our application to get insights about their job executions, by indirectly looking at the dataset availability state to know if a job was finished or not. We thus decided to extend this calendar view by strengthening the integration with our scheduler tools, so that we could display information related to job executions as well (displaying the fact that a partition is currently being computed or backfilled, if it has recently failed, etc.). As data producers are mainly monitoring their jobs from a workflow perspective (i.e. a set of jobs sharing the same business or technical purpose), we also introduced a new “workflow” entity to DataDoc, supporting an aggregated view of data availability for multiple datasets at once. This is easier to grasp visually, so here it is:

Workflow view: at the top, aggregated view (all jobs together) across several months; at the bottom, daily view for all jobs, with one job being untested to display availability at a more granular partition level

Lineage

As stated in the introduction, the very first thing that sparked the creation of our data catalog was the need to better understand our data lineage. Data lineage is a feature that a lot of companies are trying to get right, as it has huge untapped potential. Indeed, it provides valuable information about the context and is a crucial tool to reach a good understanding of your data. Among others, it enables:

  • data consumers to understand where the data they are using is coming from, acting as constantly up-to-date documentation, and giving hints about the data transformation logic
  • data producers to get insights about the teams and users that are using their datasets. This is fundamental, especially when rolling out potentially breaking changes
  • addressing issues that can affect multiple datasets (potentially the whole data graph), typically deprecating/renaming some fields or analyzing some privacy concerns
  • supporting complex debugging process, in particular when one needs to understand how a particular faulty piece of data was generated and what were its impacts

This lineage information can be captured at different granularities, mainly table and field ones: for each table/column, knowing what are the input and output dependencies (respectively, which tables/columns are used to compute this table/column, and which tables/columns are using this table/column as inputs). Both can be further extended by introducing partition level (which partitions were used to generate a specific partition), which is especially useful for debugging purposes as it enables the tracking of the actual computation flow and not only functional dependencies.

The crux of the problem is that this lineage needs to be reliable, meaning that its coverage should be good enough that users can trust it and potentially even rely on it in their code or to deprecate fields. However, the big limitation here is a technical one: while automatically inferring lineage can work for some systems and execution engines (mostly the ones based on a high-level declarative syntax, i.e. SQL-like such as Hive or Presto), tracing field lineage and the transforms applied in a data processing job written in a low-level turing-complete job is a non-trivial problem.

We have however been able to solve this issue for partition table lineage in a very reliable and automated way, without having to infer imperative programs. Our solution is based on Garmadon, which is an open-sourced Criteo tool (https://github.com/criteo/garmadon) that provides Hadoop cluster introspection. Without diving too much into the details (this will be the subject of another blog post), Garmadon allows us to track every read and write performed by programs running on our Hadoop cluster by instrumenting Yarn containers. By leveraging those read and write data, we were able to rebuild our full data graph and its dependencies, per partition. Finally, we integrated with other tools (such as our Hive → Vertica exporter ) to get the last missing dependencies that are outside of the Hadoop realm, and we managed to get a 100%, fully reliable, table lineage.

While the technical part is quite complex, the UI one is not trivial either, as it spawns some neat UX challenges. Displaying potentially thousands of datasets, with multiple connections for each, can quickly become a visual mess:

As such, we provide some cool UX tricks, where users can select the display depth, search or filter on specific nodes so that it only shows the graph between 2 datasets (the focused and the filtered one).

Navigating the lineage graph by filtering on specific nodes
Filtered lineage, only showing the path between 2 selected nodes and what is upstream and downstream

As stated above, data lineage can help us doing transversal analysis, looking at multiple datasets at once. Indeed, this visualization is a starting block for the more complex use cases. For instance, when looking at the data availability for a dataset, users often wonder why a specific hour or day has not been computed yet. Understanding the root of this latency can be a very tedious and time-wasting process, and the issue is often not located at the dataset level but coming from above: one upstream dataset being delayed for some reason, and blocking the processing of all its downstream dependencies. While investigating such an issue could take one hour or more to an analyst or data engineer before, it is now as trivial as simply looking at our lineage graph: the blocking node is very apparent (the first red one in the flow), and the delay cascades from it to its children.

An unavailable dataset blocking all its downstream dependencies

Another interesting use case (among others) of our lineage will bring us back to the root of the application, i.e. being a data catalog. Automatically detecting which dataset should and should not be exposed in the catalog is not as trivial as it sounds, in particular for systems like HDFS where there is no clear distinction between a random file, and user-specific or test dataset, and a production one. In theory, clear naming and convention could help to work around this issue (for instance by writing all production datasets in specifically-named sub-folder), but the reality is always a bit messier, especially at the scale of Criteo. White-listing datasets is another approach, which was the one used by DataDoc up to recently, but it requires manual inputs and is thus more error-prone.

Thanks to our lineage, we have been able to replace most of the white-listing mechanism. Indeed, as in the end, production datasets are explicitly exposed on some systems (Hive, Vertica, our ML platform,…), we can start from there and go up our lineage graph to detect all their upstream dependencies (direct and indirect). If we reach a dataset that is exposed to HDFS, we know that this is an actual production dataset (as used as input for another production table) and not a test-specific one, and it should thus be exposed on DataDoc.

Future

DataDoc has received pretty successful results, especially among data consumers, and its adoption rate is continuously increasing. Across the last year, we increased by 5 our average unique users, and while we plan to strengthen the core proposal of DataDoc, we believe that there is room for other ambitious features that could improve even further the productivity of data users at Criteo but also unlock new use cases.

One example of such a feature would be the work we already started around resource usage and its monitoring. Criteo is operating at a massive scale (around 180 PB of actual data on HDFS only, without factoring replication), which translates into very significant infrastructure costs. There is thus a constant need for optimizing our data usage, and this starts by actually getting a clear picture of the consumption of each dataset. We already support this in DataDoc by leveraging the job execution metrics related to the processing of each dataset, and exposing those through the following graphs:

Usage repartition of the dataset
Resource used (cpu, memory) to compute the dataset

This is however only a first step, as we plan to quickly introduce trends analysis to better monitor our usage or operational issues, as a sudden increase in resource usage is often symptomatic of an issue either linked to the data itself or to the execution platform.

On the subject of improved monitoring and issue detection, we have a big opportunity to leverage our lineage there. Indeed, issues are often impacting multiple datasets at once, and while our data lineage is currently a very helpful tool to reactively address such an issue, we could be much more proactive thereby cascading alerts and notifications through our graph to all relevant users. If we consider a data quality problem for instance, where we are able to detect that there is a sudden spike in term of rows or aggregated value in a specific table, we could then display a warning to all downstream datasets that use this dataset as input so that the users are aware that they should be careful when using those partitions. Or even go further and temporarily block the processing of those downstream datasets as the input data is probably invalid and computing the rest of the graph would be counterproductive and potentially lead to even more issues.

While table lineage can already be a very powerful asset, as illustrated by this example, a huge use case still remains locked behind field lineage. Due to the constantly evolving data ecosystem that we face at Criteo, where new datasets are being frequently added or updated (or made obsolete), we have up to now be unable to deprecate fields (or even tables) in a “scalable” way, i.e. without requiring too much manual work (having to analyze 10 or 100s of different repos to make sure that the field is indeed not used anymore). This field deprecation challenge is further accentuated by the fact that we are promoting flexibility and innovation speed, meaning that users can easily and independently create new datasets, adding (unknown) dependencies on existing ones. And due to the nature of our work, the logs from where we would gain the most to deprecate fields (as they are the biggest in terms of volume) are also the most complex to work around, as they are the most upstream datasets.

Getting an actual field lineage would thus help tremendously, and this is a big part of our current roadmap. As it can however only be realistically automated for declarative systems and we cannot rely on our table lineage approach here (instrumenting yarn containers) as it is not granular enough, we need user inputs to cover the imperative systems such as spark or TensorFlow. While this is a tedious task for users, this is not trivial either on the automated part (inferring field dependencies in declarative systems). Moreover, this whole combination of manual and automated works need to be reliable, especially if we want to rely on this lineage as a safety net to deploy potentially breaking changes or as the main vector to propagate alerts or warnings. The manual (and tedious) approach can thus only work if it is strictly enforced (meaning that the dataset cannot be deployed/will fail in production if its dependencies are not properly defined), and this is something we plan to achieve by enforcing this documentation as the scheduler level, where only a manually declared projection of the data could be read and used by production jobs.

Thanks to the Criteo engineers, Anton Lin and Jean-Benoit Joujoute, who are reviewers of this post and also the main contributors to this application.

--

--