Getting started with Data Lineage
What is data lineage and how does it provide visibility at Dailymotion?
Do you know your data lake inside out? Are you keeping track of every change applied across all your tables and processes? Can you quickly teach a newcomer how to navigate between your different datasets? Building a “data lineage” platform could be a good start.
Data lineage is a cutting edge subject in the data world but remains in its early stages. At Dailymotion, we began tackling the topic a year ago. The purpose of this article is to share the thought-process of the Data Engineering Team and our current outlook on a solution. We hope this can trigger a conversation with other teams facing similar issues (don’t hesitate to comment or reach out).
In this article, an entity represents one or a collection of attributes that define a specific representation of data, similar to a Class definition in programming or a table in databases (inspired by Apache Atlas). An entity instance is the instantiation of an entity on a particular date (more explanations below with an example).
If you are a data junkie who wants to have an overview of what data lineage is, the first part will interest you. Otherwise jump straight to the second part, if you want to learn how our data engineers addressed the data lineage problem.
Data lineage overview
Data lineage includes the data’s origins, what happens to it and where it moves over time. Data lineage provides visibility while greatly simplifying the ability to trace errors back to the root cause in a data analytics process. (wikipedia)
Data lineage is based upon a graph of entities. Take this example, aberrant values are seen in a dashboard, with a data lineage solution a data engineer will be able to know exactly what upstream entities need to be monitored. When the root cause is found, the data engineer will be able to know which entities need to be reprocessed.
The figure above entity_13 is the dashboard containing the aberrant values. We mark it in red to signify it is corrupted, we suspect entity_11, entity_6, entity_7, entity_3, entity_4, entity_5 to be the root cause. We find that it is entity_3, so we can reprocess all downstream entities, even entity_10 which did not seem to be related to the issue at first glance.
A data lineage solution also includes the notion of temporality. Entity_13 could have been monthly, entity_11 daily and entity_3 hourly. As we find the issue in entity_3 during particular hours, we can directly know which date(s) need to be reprocessed on all downstream entities. If the issue has lasted for 10 hours, we’d need to recompute 10 hours of entity_3 and 1 day of entity_11.
With a graph of entities, the first use cases are:
- Dependencies visualization: If entities are tagged with some metadata, a user would be able to visualize the graph from different abstraction levels (product dependencies, team dependencies…), they also would be able to visualize a partial graph by deploying filters based on specific criteria or visualize the graph at a specific time in the past.
- Automatic job reprocesses: If the graph of entities is linked to a programmatic workflow framework (example: Apache airflow), the user would be able to backfill all dependencies from tainted entity instances.
The first main 3 challenges are:
- gathering all entities and metadata entities spread across all the entire infrastructure (Google BigQuery, Aerospike, MySQL, Google Cloud Storage…)
- building a solution which integrates well with all the different systems in the company and simple enough to not discourage building new entities (otherwise the framework would be by passed)
- finding the right entity model
Data lineage feedback
In this section, we’ll cover the Data team’s motivation behind data lineage and give an overview of our first attempt to build a graph of entities. Then we’ll go in-depth into our second prototype. This includes a definition of our entity model which we will illustrate and an explanation of our solution integrated to Apache Airflow.
Data lineage motivation
At Dailymotion, the data team’s most prevalent use-case is to trace the origin of an error and be able to relaunch automatically the workflow downstream.
the Data Team has hundreds of Airflow DAGs (Directed Acyclic Graph) generating tens of thousands of DAG Runs every day. Many DAGs depend on data built by other ones (DAGs are often chained through sensors on partitions or files in buckets, not trigger_dag). When one of the DAGs fails, downstream DAGs will also start failing once their retries expire; similarly, when we discover a bug in data, we want to mark that data as tainted so the challenge resides in determining impacted downstream DAGs (possibly having to convert DAG periodicities) and then clear them.
As our data lake is Google BigQuery, our first thought was to use BigQuery audit logs. Once configured, Google Cloud Platform saves all the activities in a Google BigQuery table composed of over seven hundred fields. Data is raw and it can be hard to understand how to extract the correct information. From each BigQuery job, we were able to get source tables, destination table, user email which was enough to build a first draft of the dependencies graph draft. We pushed all the data in Neo4j and visualized it.
It was not usable as it was. All the interactive queries from BigQuery users are logged, as well as all temporary tables. Tables suffix with the date (before partitioning was available) were each a distinct entity. BigQuery views were also entities. Some views are adding new functional sense, so we want them in the graph. Other views are just referencing the last table available, so as they don’t bring new information, we don’t want them in the graph…
We realised a lot of cleaning specific to BigQuery storage was necessary, so we didn’t go further and preferred to find another solution which covers a larger scope than BigQuery.
Second (in progress) prototype
Rebuilding a solution from scratch is not ideal but we’ve not yet found something that suits our needs. Our idea is to implement the following solution on top of our Apache Airflow Scheduler:
- Create a graph of our entities class model and the pipelines producing their (time-based) instances so that an airflow sensor can, through an API, know the status entities’ parents, before running a task that creates new entity instance for specific dates.
- Build a status table that describes the state of each entity instance produced by our DAG (valid, estimated, tainted…etc), we think this can be down through “on_success_callback” of Apache Airflow.
- From the graph of entities and the entities status table we can handle the failure use-case: we can create an API that takes a DAG and an execution date as input and returns the list of tasks to clear and DAG Runs to start downstream.
- From this we can handle the tainted/backfill use-case: we can build an on-the-fly @once DAG which will update the data status table to taint all the downstream data sources built from a corrupted one and then clear all dependent DAGs from the corrupted source.
Entity Model by example (word count)
Before defining what an entity model looks like, let’s take the example of a simple Extract Transform Load (ETL) pipeline. Let’s suppose that we receive daily articles written by authors in a specific file form and we need to count the number of times a word appears in each article. In order to achieve this we load in BigQuery all the articles, count the number of words and write the output in another BigQuery table.
We have two tables, each of them provides specific information, which we can materialize into two sections representing two entities: articles_by_author and words_count_by_articles_by_author (words_count for short). Entity words_count depends on articles_by_author.
The most basic schema for an entity is:
- name: a unique identifier for the entity
- granularity: the minimal range for the entity
- states: list of possible states of an entity instance(estimated, partial, completed…)
The model of articles_by_author and words_count is written like this:
As shown above, two daily entities are defined and linked through a depends_on section. The two entity instances have only one possible state (completed), word_count declaration also clarifies that. To be in completed state, word_count requires a one day of articles_by_author.
The states section adds complexity to the configuration but is really explicit and modular, So how do we take advantage of this?
The State Paradigm
Let’s assume that these articles are Dailymotion Dev Portal posts and by looking at past articles, we are able to estimate the number of reads an article will have at the end of the first week depending on the words: simple word, complicated word, buzz word, new word, technical word…(this is an assumption made for the sake of the example and is, by no means, true or feasible).
We introduce 3 entities:
- Word_prominence: a list of words with a value giving the prominence of a word. The more prominent words you have in an article, the more you have a chance of being read. This entity is updated each month.
- User_weekly_reads: this entity contains for each user, the number of reads he has in total for all his/her articles aggregated by weeks.
At the end of the week we can know exactly how many times an article was read, so the table will be enriched with the exact amount of reads.
In other words, the entity user_weekly_reads is passing by two states: estimated at the beginning of the week and complete at the end of the week.
Let’s write the definition of the entity user_weekly_reads:
Let’s decompose this configuration file line by line:
- Line 1: name unique identifier for the entity
- Line 2: period is weekly, it means the data available for this entity will be aggregated at a weekly level
- Line 4: states is a new section describing the different possible states for entity instances of this entity
- Line 5 & 12: entity_state is the entity instances state (estimated, completed…)
- Line 6 to 10: depends_on specify dependencies. To be estimated the entity needs one week of data of words_count, as words_count is daily, we need the all_week. It also depends on the last available list of word_prominence
- Line 14 to 16: to be completed the entity needs words_count and the full week of nb_read_exact. As nb_read_exact is weekly we need the all_week
Entity Model template
If we sum up what we define in the example above, a template of the configuration can be written as:
Each state is associated with a set of dependencies. It is possible to specify for each dependency:
dates: The range of entity_date needed for the dependency. This is calculated dynamically in our model.
states: The different state accepted for the dependency (estimated, partial, completed…). Defaults to [completed]
Until now we have seen that an entity can enter different states and can depend on other entities belonging to other date ranges.
Now that we have defined our entities, we need to instantiate them for specifics dates, in order to update our status table notifying when it is available and whether data is valid or not. The schema of this table is as follows.
Entity instance table schema
In this schema, 2 concepts are possible:
- States: is a list because an entity instance can have multiple non-exclusive states at the same time (for example it can be estimated and partial if some attributes are initially missing).
- Valid: A new field to specify if the data (i.e. entity instance) on this date is valid for use. If the field valid is set to false it could mean two things: the data is not available yet or data is corrupted and waiting to be fixed.
Our experience with our analytics platform has led us to regroup our entity in different sections:
- TimeSeries: (@hourly, @monthly…) for example every hour we receive the data generated the last hour.
— Sliding: everyday to compute for instance the last 30 days or 2 months
- Snapshot keeps a full referential at a specific point in time (the result of a consolidation of changes received such as a videos catalog)
- Static table, referential table with no specific date (example country_name / country_code)
- Cumulative : period of data != period of process (the information of views could be corrected each day in our weekly table, i.e. Monday=Monday, Tuesday=Monday+Tuesday, Wednesday=Monday+Tuesday+Monday…)
To take into account these different types of entity, the notion of granularity is not sufficient. To solve this problem, we’ve introduced the notion of “period”, which is the frequency of entity refreshment.
For instance, we cannot specify a granularity for a snapshot (we always get the full table), but we can specify at which frequency it is refreshed.
The DAG definition to update an entity name test daily looks like this:
We add an entities parameter to the DAG definition. We use it to link airflow tasks to entities. We don’t need to write the dependencies anymore as it is done in the configuration file, the LineageSensor take care of that.
We are moving step by step on this project and we keep a list of open questions to tackle as we go along:
- How to reduce the configuration overhead for data lineage to the minimum for each new entity? and how to make sure it is on the critical path?
- How to keep the data storage and the status table synchronized if data is deleted in storage?
- In order to backfill, do we need a to build an on-the-fly @once Airflow DAG or can we leave the responsibility to the DAG itself to relaunch it’s own downstream dependencies?
- How to increase the precision of our data lineage from table to attribute?
- What do we need to add in order to have the exact data lake status at any given time in the past?
All in all, tackling data lineage has helped us to put words and define a pattern on concepts which were implicit until now, thus making our pipelines more homogeneous. More work needs to be done to retrace exactly what happened to specific data at any exact time in our data lake, but we have laid the foundations for this to happen in the near future.
- Talk: Democratizing Data at Airbnb — Chris Williams and John Bodley, Airbnb
- Doc: Apache airflow lineage
- Post: Building and Scaling Data Lineage at Netflix to Improve Data Infrastructure Reliability, and Efficiency