Dataplex —Data Lineage | Data Governance | Part — 4

Nishit Kamdar
Google Cloud - Community
7 min readOct 20, 2023
Photo by Anne Nygård on Unsplash

Data Lineage:

Data Lineage depicts the lifecycle of data. It describes where the data comes from, where it moves, and how it changes as it travels through various processes and systems within your organization. It provides a traceable route, marking the data’s origin, the transformations it undergoes, and its final destination.

With rising data volume spread across data silos, it can be challenging for organizations to ensure users have a self-service mechanism to discover, understand and trust the data. Organizations constantly struggle with questions such as:

To answer the above questions, organizations need to track how data is sourced and transformed, which can be complex and requires significant effort.

Dataplex Data Lineage

Dataplex data lineage provides an out-of-the-box solution to solve for the challenges stated above. It describes each lineage relationship by detailing what happened and when it happened in an interactable lineage graph, providing data observability.

By simply enabling the Lineage tracking, it supports the following solutions today with many more in the pipeline.

1. Data Lineage with BigQuery

For creating Lineage in Bigquery, we will create the following pipeline.

  1. Create a staging table from the Chicago crimes public dataset in the staging layer.
  2. Create a raw table from the staging table in the raw layer.
  3. Apply some transformations to the raw layer table and create a curated version in the curated layer.
  4. Use a curated table and create 4 data products in the Products zone.

1.1. Create a Staging Table

bq --location=us-central1 mk \
--dataset \
$PROJECT_ID:oda_crimes_staging_ds
bq --location=us-central1 query \
--use_legacy_sql=false \
"CREATE OR REPLACE TABLE oda_crimes_staging_ds.crimes_staging AS SELECT * FROM bigquery-public-data.chicago_crime.crime"

Reload the BQ UI, you should see the table created. Query the table-

1.2. Create a Raw table

CREATE OR REPLACE TABLE
oda_raw_zone.chicago_crimes_raw AS
SELECT
*
FROM
oda_crimes_staging_ds.crimes_staging;

1.3. Curate Chicago Crimes Data

Let’s add some temporal attributes like Case Year, Month etc to the raw Chicago Crimes which we will use in the Data Products. Create in the Curated Zone.

CREATE OR REPLACE TABLE
oda_curated_zone.chicago_crimes_curated AS
SELECT
*,
CAST(year AS Integer) AS case_year,
FORMAT_DATE('%B',date) AS case_month,
FORMAT_DATE('%d',date) AS case_day_of_month,
FORMAT_DATE('%k',date) AS case_hour,
EXTRACT(DAYOFWEEK
FROM
date) AS case_day_of_week_nbr,
FORMAT_DATE('%A',date) AS case_day_of_week_name
FROM
oda_raw_zone.chicago_crimes_raw;

1.4. Create Data Products

On the curated data, let’s use the attributes created to do the analysis of the crimes by Year, Month, Day and Hour.

Note that we persist the results in the consumption zone. From an access perspective, think of this zone as the reporting mart that you would give access to without granting access to the curated data.

CREATE OR REPLACE TABLE
oda_product_zone.chicago_crimes_by_year AS
SELECT
case_year,
COUNT(*) AS crime_count
FROM
oda_curated_zone.chicago_crimes_curated
GROUP BY
case_year;
CREATE OR REPLACE TABLE
oda_product_zone.chicago_crimes_by_month AS
SELECT
case_month AS month,
COUNT(*) AS crime_count
FROM
oda_curated_zone.chicago_crimes_curated
GROUP BY
case_month;
CREATE OR REPLACE TABLE
oda_product_zone.chicago_crimes_by_day AS
SELECT
case_day_of_week_name AS day,
COUNT(*) AS crime_count
FROM
oda_curated_zone.chicago_crimes_curated
GROUP BY
case_day_of_week_name;
CREATE OR REPLACE TABLE
oda_product_zone.chicago_crimes_by_hour AS
SELECT
case_hour AS hour_of_day,
COUNT(*) AS crime_count
FROM
oda_curated_zone.chicago_crimes_curated
GROUP BY
case_hour;

1.5. The Lineage Graph

BigQuery lineage is captured by default, without the need for any orchestration tool.

Goto Bigquery → Click on the chicago_crimes_curated table → Click on Lineage.

It will show the entire lineage graph of the pipeline we executed above. The Curated table shows 4 Data Products tables created.

Going a step back, Curated table shows as created from Chicago_crimes_raw table.

Click on the “magnifying glass” symbol between the two tables. Its called Lineage “Process”. It captures the transformation details, in our case, the query that created the Curated table from Raw.

Going further back, it shows the staging table that we created from crimes public dataset.

and finally the public dataset itself.

This is how Dataplex Lineage for Bigquery captures the entire end-to-end lineage — fully automated and out-of-the-box without any coding or configurations.

2. Data Lineage with Cloud Composer

Data lineage is available for:

2.1. Enable Data Lineage on Composer Environment:

Cloud Composer Environments have to be enabled for Data Lineage on each of the environments.

Once the feature is enabled, running DAGs that utilize any of the supported operators causes Cloud Composer to report lineage information to the Data Lineage API.

Please check the following link for the supported operators: https://cloud.google.com/composer/docs/composer-2/lineage-integration#supported-operators

2.2. The Data Engineering Pipeline DAG

We will create the following DAG.

Its very similar to the Bigquery exercise above. The main difference is instead of running BQ commands directly, we will use DAG to create a curated table from Raw and 4 data products.

2.3. Review the Airflow DAG Python script

from airflow import models
from airflow.operators import dummy_operator
from airflow.providers.google.cloud.operators import bigquery
from airflow.utils import trigger_rule
from datetime import datetime
from airflow.utils.dates import days_ago


PROJECT_ID = models.Variable.get('project_id')


CURATE_CHICAGO_CRIMES = f"""
CREATE OR REPLACE TABLE
`{PROJECT_ID}.oda_curated_zone.crimes_curated` AS
SELECT
*,
CAST(year AS Integer) AS case_year,
FORMAT_DATE('%B',date) AS case_month,
FORMAT_DATE('%d',date) AS case_day_of_month,
FORMAT_DATE('%k',date) AS case_hour,
EXTRACT(DAYOFWEEK FROM date) AS case_day_of_week_nbr,
FORMAT_DATE('%A',date) AS case_day_of_week_name
FROM
oda_raw_zone.crimes_raw;
"""

TREND_BY_YEAR = f"""
CREATE OR REPLACE TABLE
`{PROJECT_ID}.oda_product_zone.crimes_by_year` AS
SELECT
case_year,
COUNT(*) AS crime_count
FROM
oda_curated_zone.crimes_curated
GROUP BY
case_year;
"""

TREND_BY_MONTH = f"""
CREATE OR REPLACE TABLE
`{PROJECT_ID}.oda_product_zone.crimes_by_month` AS
SELECT
case_month AS month,
COUNT(*) AS crime_count
FROM
oda_curated_zone.crimes_curated
GROUP BY
case_month;
"""


TREND_BY_DAY = f"""
CREATE OR REPLACE TABLE
`{PROJECT_ID}.oda_product_zone.crimes_by_day` AS
SELECT
case_day_of_week_name AS day,
COUNT(*) AS crime_count
FROM
oda_curated_zone.crimes_curated
GROUP BY
case_day_of_week_name;
"""


TREND_BY_HOUR = f"""
CREATE OR REPLACE TABLE
`{PROJECT_ID}.oda_product_zone.crimes_by_hour` AS
SELECT
case_hour AS hour_of_day,
COUNT(*) AS crime_count
FROM
oda_curated_zone.crimes_curated
GROUP BY
case_hour;
"""

with models.DAG(
'Chicago_Crime_Trends_From_BQ_With_OOB_Lineage',
schedule_interval=None,
start_date = days_ago(2),
catchup=False) as dag:

start = dummy_operator.DummyOperator(
task_id='start',
trigger_rule='all_success'
)

end = dummy_operator.DummyOperator(
task_id='end',
trigger_rule='all_done'
)

curate_chicago_crimes = bigquery.BigQueryInsertJobOperator(
task_id="Curate_Chicago_Crimes",
configuration={
"query": {
"query": CURATE_CHICAGO_CRIMES,
"useLegacySql": False
}
}
)

trend_by_year = bigquery.BigQueryInsertJobOperator(
task_id="Trend_By_Year",
configuration={
"query": {
"query": TREND_BY_YEAR,
"useLegacySql": False
}
}
)

trend_by_month = bigquery.BigQueryInsertJobOperator(
task_id="Trend_By_Month",
configuration={
"query": {
"query": TREND_BY_MONTH,
"useLegacySql": False
}
}
)

trend_by_day = bigquery.BigQueryInsertJobOperator(
task_id="Trend_By_Day",
configuration={
"query": {
"query": TREND_BY_DAY,
"useLegacySql": False
}
}
)

trend_by_hour = bigquery.BigQueryInsertJobOperator(
task_id="Trend_By_Hour",
configuration={
"query": {
"query": TREND_BY_HOUR,
"useLegacySql": False
}
}
)

start >> curate_chicago_crimes >> [trend_by_year, trend_by_month, trend_by_day, trend_by_hour] >> end

2.4. Run the Airflow DAG

2.5. Validate the creation of tables in BigQuery

2.6. Validate the Lineage Graphs

You will see 2 sets of graphs from Curated table to Data products — one from Bigquery exercise above and other one from current DAG run depicted by the Composer icon.

Click on one of the Composer icons. It shows details such as dag_id, task_id , runs etc.

In summary, Dataplex Lineage is a powerful tool to capture the entire end-to-end lineage for Cloud Composer DAG runs out-of-the-box.

To be Continued.

Dataplex Data Lineage for Spark Applications

--

--

Nishit Kamdar
Google Cloud - Community

Data and Artificial Intelligence specialist at Google. This blog is based on “My experiences from the field”. Views are solely mine.