Transforming Data Pipelines: dbt+PowerBI+Airflow

Nuno Pinela
Towards Data Engineering
9 min readJul 30, 2024

Leveraging Airflow’s powerful scheduling and workflow management capabilities is essential for managing complex and interdependent data tasks efficiently. This post shares key achievements within our organization’s open data mesh platform ecosystem in the finance domain after implementing such orchestration. Notably, we have seen a significant reduction in data loading times, enhanced reliability with processes starting exactly when needed, and increased transparency. These advancements have strengthened trust in our data products and reinforced our position as the single source of truth within our data domain. Furthermore, automated management of task dependencies, real-time monitoring, and improved scalability and flexibility have contributed to more streamlined processes and optimized resource utilization. The following sections focus on critical components involved — dbt transformations, necessary configurations, and Power BI refresh processes — detailing the steps and strategies applied.

Introduction

Within the finance domain data projects, our focus is on robust data warehousing and efficient transformations using Snowflake and dbt. With over 50 SAP ERP systems contributing with data, our approach centers on consolidating and harmonizing such amount of data into a cohesive structure. At the heart of these projects lies a central project known as dom_finance, which serves as the primary source consolidating data from all these different systems. It is crucial to ensure that the dom_finance is fully updated and synchronized before initiating the loads for all other projects to maintain data accuracy and consistency.

Therefore, this necessity led us to choose Airflow to orchestrate our processes, to ensure all processes run in the correct sequence. Additionally, all our frontend solutions, based in Power BI, need to be refreshed immediately after the backend data is updated. This includes not only the large fact tables but also all the dimension tables, as we adhere to the Kimball approach, particularly leveraging the star schema model. Airflow ensures that these updates occur seamlessly and in the correct order, maintaining data integrity and consistency across our entire data pipeline.

Similarly, just as dom_finance centralizes all relevant data upstream at the beginning, we also use it to centralize all processes in Airflow. All relevant data needed from each project is selected from this central project. We achieve this by sharing the relevant views from each project with dom_finance, ensuring that all follow the same structure and nomenclature. This strategy allows us to maintain only one connection to Snowflake through a single technical user, streamlining the process and reducing complexity.

dbt Loads

The first critical component of our data orchestration workflow is the handling of dbt loads and we implemented a streamlined process to manage this effectively.

To begin with, we have an initial DAG dedicated to the dbt load of dom_finance. Once the central project load is complete, we use the TriggerDagRunOperatorto initiate a separate DAG for each downstream project. This operator allows us to manage dependencies effectively, ensuring that each project starts processing only after the central data is fully updated and synchronized.

To maintain robustness and reliability in our workflow, we have incorporated theDbtCloudRerunJobOperator, a custom Apache Airflow operator that enables us to rerun jobs from the start or from the point of failure in dbt Cloud (available since version 1.6). While the ability to restart from a failure point has been present in dbt for some time, this operator introduces the ability to run the rerun process in an automated way, eliminating the need for manual intervention.

This operator ensures that any disruptions or errors in the dbt load process can be managed efficiently, minimizing downtime and ensuring data integrity. The rerun process involves checking the job’s status and rerunning it from the failure point if necessary, thereby preserving progress and reducing the need for manual intervention. To interact with dbt Cloud and manage HTTP requests, the operator uses both DbtCloudHook and HttpHook and the monitoring + rerun attempts are throttled with sleep intervals to avoid excessive API calls.

Execution (execute method):

Hook Setup:

  • DbtCloudHook: Used to interact with dbt Cloud
  • HttpHook: Used for making HTTP POST requests not available through the DbtCloudHook, such as the ‘Retry Failed Job’ API Call

Trigger Job Run:

  • A dbt job is triggered using thedbtCloudHook.trigger_job_runmethod.
  • The response from this trigger contains details of the job run, including a run_id which is used to monitor the job status.

Monitoring the Job Run:

  • The status of the job run is polled at regular intervals (every 10 seconds).
  • If the job fails (is_success is False and in_progress is False), a rerun is attempted.

Rerun the Job:

  • The job is rerun from the point of failure using an HTTP POST request to the rerun endpoint.
  • The status of the rerun is then monitored in a similar fashion (every 5 seconds) until it completes.
  • If the rerun succeeds, the process ends; otherwise, an exception is raised indicating the rerun failed.

Power BI Refresh Strategy

Due to the substantial size of our tables, performing a full update every load would be impractical and inefficient. To optimize this process, we have adopted a partition-based update strategy. Partitions segment the dataset into smaller and independent logical parts based on specific criteria, enabling partition to be processed separately. This approach offers the following key advantages:

· Faster refreshes: Refreshing only the most recent data changes, increases the overall process speed

· Increased reliability: Faster data queries to the source, reduce the potential for network issues

· Reduced consumption: Having smaller tables in Snowflake that contain only the relevant partitions, results in faster queries from Power BI to Snowflake, lowering the consumption of Snowflake credits

· Large data models: Models with billions of rows can grow without the need for full model refreshes each time

To find an optimal solution for our datasets, we evaluated various partitioning strategies and identified the most effective configurations. We discovered that different models benefit from different partition setups based on performance metrics. This flexibility allows us to optimize each model for efficient data processing. By leveraging the optimal partitioning strategy for each model, we achieve significant performance improvements and ensure the system’s overall efficiency.

By adopting a partition-based update strategy, we ensure that our data processing is not only efficient but also scalable and robust. Here is an outline of the whole process in place:

Prepare the Partitions in dbt + Snowflake

The identification of partitions that require updating, based on the current state of the final table, is handled in dbt. Firstly, we grant teams the capability to configure which partitions per model they find relevant, as well as specify from which tables these partitions should be sourced. Additionally, teams can configure the dimensions they want to refresh in Power BI alongside the fact table.

For each data model, a dbt model (materialized as a table) is created. This model contains all the relevant partitions that need to be loaded into Power BI along with their counts, serving as a tracking mechanism for reporting and comparison purposes. The dbt process also incorporates Jinja coding to accurately retrieve the necessary sources.

On top of these individual models, there is a unified model that consolidates all of them. This unified view is then shared with the centralised project dom_finance.

Retrieve Data from Snowflake + Convert the Data to JSON Format

On a second step of this process, we fetch the relevant data partitions from Snowflake by using the SnowflakeOperator to run SQL selects on the centralized table containing the partitions by model. Then, this data needs to be converted into .json format and for that there are some python routines that ensure the output adheres to what is expected by Power BI. Additionally, other checks are performed on the data to ensure the partitions are not empty and there are no formatting issues, as these problems could potentially lead to issues on the entire Power BI data model.

Initiating a Dataset Refresh in Power BI

To refresh a Power BI dataset, we use a combination of custom Airflow operators: PbiRefreshDatasetsOperator and PBIRefreshStatusCheck.

The PbiRefreshDatasetsOperator operator is responsible for initiating the refresh and passing partition information to Power BI, ensuring that only relevant data is refreshed. It uses the following parameters for initialization:

  • pbi_tenant_id: Power BI tenant ID
  • pbi_client_id: Client ID for the Power BI service principal
  • pbi_client_secret: Client secret for the Power BI service principal
  • pbi_dataset_id: ID of the dataset to be refreshed
  • pbi_workspace_id: ID of the Power BI workspace
  • payload: JSON payload containing information for the refresh

After execution, the operator checks the response status. If the status code is 202 (Accepted), it logs the request ID and other details. If the refresh is unsuccessful, it logs the error and raises an exception. The operator returns the request ID of the refresh operation, which is then used by the PBIRefreshStatusCheck operator.

The PBIRefreshStatusCheck operator, responsible for monitoring the refresh process to ensure its successful completion, sends a GET request to the Power BI API using the request ID to check the status of the refresh operation. It evaluates the response as follows:

  • If the status is “Completed,” it logs success and returns True.
  • If the status is “Failed,” it raises an exception.
  • If the status is still in progress, it waits and retries after a delay (polling).

This polling loop continues at regular intervals until a final status is received.

Cleaning Up XComs After the Process is Complete

After the process is complete, it is essential to clean up XComs to maintain a clean and efficient Airflow environment by removing temporary data. The goal is to prevent any residual data from affecting subsequent runs or system performance. This involves utilizing Airflow’s XCom cleanup functionalities to delete the intermediate data stored during the process.

Power BI Model Backup and Restoration via Azure DevOps Pipeline

Beyond the standard refresh of Power BI models, our Airflow orchestration process includes the capability to efficiently create and restore Power BI model backups. This capability is particularly crucial for disaster recovery scenarios, where rapid recovery is essential to minimize downtime and maintain business continuity. The benefits of using it through Airflow also include enhanced reliability due to the regular backups and reduction of manual intervention and errors.

In order to achieve that, two distinct pipelines have been prepared in Azure DevOps:

Backup Creation Pipeline — responsible for creating backups of Power BI models, allows teams to decide the recurrence of these backups, ensuring they are taken at appropriate intervals based on their specific needs. When triggered, it takes a snapshot of the current Power BI model. This backup includes the semantic model itself, which contains the data, relationships, calculations, and other metadata and it is stored in a secure location, ensuring that it is readily available for restoration.

Backup Restoration Pipeline — designed to restore Power BI models from backups. It is triggered on demand, typically in response to data corruption or system failure. This process is significantly faster than reloading the entire model from the original data sources, especially for large models.

Conclusion

In conclusion, the implementation of our workflow, utilizing Apache Airflow, has led to several significant achievements. The most notable accomplishment is the drastic reduction in loading times by eliminating idle periods where no activity occurs. This efficiency boost ensures that our data processing is as fast and effective as possible.

Additionally, the real-time monitoring capabilities now available, coupled with customizable notifications, enable us to keep a close watch on the workflow’s performance and address issues promptly. This setup ensures data integrity and minimizes downtime, which is crucial for maintaining business operations.

Our end-to-end process is not only scalable and flexible, but it is also designed to adapt to future business needs and growth. This readiness for business evolution ensures that our data infrastructure can handle increasing demands and complexity.

Looking ahead, we see promising opportunities to further optimize our system. By leveraging Airflow, we can investigate and potentially reduce Snowflake credit consumption, leading to significant cost savings. Furthermore, there is room to enhance our issue handling mechanisms, making the system even more robust and resilient.

Overall, the integration of these advanced tools and methodologies has transformed our data processing workflow, making it more efficient, reliable, and adaptable to future challenges and opportunities.

--

--