Centralized dashboard for data fusion pipelines monitoring

Nilesh Jaiswal
Google Cloud - Community
6 min readDec 21, 2022

Cloud Data Fusion is a GUI based data integration service for building and managing data pipelines. It is based on CDAP, which is an open source framework for building data analytics applications for on-premise and cloud sources. It provides a wide variety of out of the box connectors to sources on GCP, other public clouds and on-premise sources.

Usecase

Customer had multiple data fusion instance with more than 2500+ data fusion pipelines deployed. Each Instance has multiple namespace to functionally segregates the pipelines.

We can visualize the complexity it involves for operations team to manage such huge deployment. To understand it little better , lets see how pipelines are managed in data fusion instances,

Namespaces in data fusion instance
Pipelines under a namespace ‘default’

As we can see that one has to open each namespace to see all the deployed pipelines and their running status.This becomes complex when number of namespace are high and each namespace contains hundreds of pipelines.

To simply the process and reduce the operational team’s pain. We have developed a solution which we are going to talk in details in this blog.

Solution

Build a centralized monitoring dashboard on data studio. Dashboard shows all the pipelines running across data fusion instance and provides various filter to see the pipelines based on a specific date , based on instance name etc.

Now let’s see how did we build this dashboard , At very high level solution involves creating a log based sink to Bigquery and making all data fusion pipeline logs available in the BQ table. Prepare a view query that will be fed to a data studio report to render a dashboard.

  1. Create BQ log Sink
  2. Create BQ view
  3. Create Mapping BQ table
  4. Building data studio dashboard

Lets deep dive to each of the steps mentioned above ,

Create BQ log Sink

  1. In the Cloud console, go to the Logging> Logs Explorer page.
  2. Select an existing Cloud project, folder, or organisation.
  3. Once the log explorer home page opens up, Click Logs Router in the left pane and click on ‘CREATE SINK’ visible at the top of the page.
  4. Provide sink name and description.
  5. Select ‘BigQuery dataset’ as sink destination and select an existing dataset or create a new one from the options provided.
  6. In the next option ‘Choose logs to include in sink’ provide the filter for data fusion pipeline logs i.e

logName:”datafusion-pipeline-logs”

7. Click the ‘Create Sink’ button and wait for the sink to be created successfully.

This will push all the data fusion pipeline logs to a BigQuery table.For this blog post we assume the name of the table is DF_PIPELINE_LOGS.below is the sample o/p of the table

BQ table — data fusion pipeline logs

BQ Metadata table — A Mapping table for Data fusion instance and pipeline mappings

This could be an optional table if all the pipelines are deployed and running to a single data fusion instance. However in our usecase pipelines were distributed to multiple data fusion instances. So a mapping table is prepared to map pipelines with data fusion instance in which they are deployed.

For out blogpost we call this table as DF_INST_PIPELINE_REGISTRY, Below is the sample entries,

BQ Mapping table

Note: This table has to be updated manually as and when pipelines are moved to different instances.

Big Query view for Data studio report

Let’s build a Bigquery view which will join the DF_PIPELINE_LOGS and DF_INST_PIPELINE_REGISTRY table and generates a ready to consume output for data studio report.Lets name the view as DF_PIPELINE_STATUS_VW for this blog.

View Definition : DF_PIPELINE_STATUS_VW

WITH dfPipelineLogs AS (
SELECT DISTINCT labels._applicationid as pipelineName,
labels._runid as run_id,
CASE WHEN resource.type = 'cloud_dataproc_cluster' THEN resource.labels.cluster_name ELSE NULL END as dataproc_cluster,
CASE WHEN jsonPayload.message like "Pipeline % running%" THEN "RUNNING" WHEN jsonPayload.message like "Pipeline % succeeded%" THEN "SUCCEEDED" WHEN jsonPayload.message like "Pipeline % failed%"
or jsonpayload_v1beta1_reportederrorevent.message like "Pipeline % failed%" THEN "FAILED" WHEN jsonPayload.message like "Pipeline % started%" THEN "STARTED" ELSE NULL END as Status,
FORMAT_DATETIME(
"%b-%d-%Y",
datetime(timestamp)
) as log_date,
FORMAT_DATETIME(
"%H:%M:%S",
datetime(timestamp)
) as log_time,
labels._namespaceid AS namespace,
resource.labels.project_id AS project_id,
FROM
`<project_id>.<dataset_id>.datafusion_pipeline_logs`
WHERE
DATE(timestamp) >= DATE_ADD(
DATE(current_timestamp),
INTERVAL -30 DAY
)
),namespaceInstanceMapping AS(
SELECT distinct REG.INSTANCE_ID as datafusion_instance,B.pipelineName as pipelineName from `<project_id>.<dataset_id>.DF_INST_PIPELINE_REGISTRY`
)
SELECT
pipelineName,
run_id,
Status,
datafusion_instance,
dataproc_cluster,
log_date,
log_time,
CASE WHEN r_n = 1 THEN 1 ELSE 0 END current_state,
'use4' as region_id,
namespace,
project_id,
startTime,
endTime,
execution_time_minutes
FROM
(
SELECT
A.pipelineName as pipelineName,
run_id,
datafusion_instance,
dataproc_cluster,
A.Status,
log_date,
log_time ,
ROW_NUMBER() OVER(
PARTITION BY A.run_id
ORDER BY
log_date,
log_time DESC
) as r_
A.namespace,
A.project_id,
startTime,endTime,TIMESTAMP_DIFF(endTime, startTime, MINUTE) as execution_time_minutes
FROM
datafusionlogs A
JOIN namespaceInstanceMapping B ON A.pipelineName = B.pipelineName
WHERE
A.Status IS NOT NULL
) where r_n = 1
Sample output of BQ view

Data Studio Dashboard

Now , lets build a dashboard by using Bigquery View as source dataset ,

Steps to follow to create a new dashboard for BQ source

  1. Go to Google Data Studio, https://datastudio.google.com
  2. Select a Blank Report and once the report opens up, click on the ‘Add Data’ option in the top bar.
  3. Select ‘BigQuery’ option
  4. Select the Project, Dataset and then the table or view to which you want to connect the report with.
  5. Once the report is connected to the data source, on the right pane all the fields will be visible.
  6. Once the data source is added, we can add charts, tiles and controls using the same data.

Here is the dashboard looks like ,

Data Fusion Pipeline Dashboard

The data filter can be used to filter the report for a specific date, similarly there are filters for pipeline name and data fusion instances etc.

Summary of few of the charts , tables and filters in dashboard

  • Successful Pipelines : Shows the total number of pipeline runs which are succeeded over a period of 30 days by default.
  • Failed Pipelines: Shows the total number of runs failed over a period of time.
  • Executing Pipelines: Total number of pipelines currently executing.
  • Distribution of pipeline runs over current status: This is a pie chart showing the percentage distribution of pipelines over the statuses like SUCCEEDED, FAILED, RUNNING and STARTED
  • Distribution of pipeline runs over dataproc clusters: This is a pie chart showing the percentage distribution of pipelines over dataproc clusters which are used by the pipelines while executing.
  • Distribution of pipeline runs over data fusion instances: This shows the percentage distribution of pipelines running over different data fusion instances.
  • The final table shows details of pipeline runs and their statuses over a time-period.

This report refreshes every 1 minute and shows the latest data accordingly. Data studio dashboard has an option to manually refresh the data on demand.

Hope you found article helpful !!! Happy Learning !!!

--

--