Orchestrating SQL Files From Git on DBSQL

Databricks SQL SME
DBSQL SME Engineering
6 min readMay 5, 2024

A standout feature of orchestrating version-controlled SQL queries

Orchestrating SQL Files Directly From Git on DBSQL with Workflows

Authors

Saurabh Shukla — Sr Specialist Solution Architect @ Databricks
Mrityunjay Kumar — Resident Solutions Architect @ Databricks

Introduction

Organizations across industries are adopting the lakehouse architecture and using a unified platform for all their data, analytics and AI workloads. Databricks Workflows is the unified orchestration solution for the Lakehouse.

One of its standout features is the ability to orchestrate external files containing SQL queries defined in files on Databricks SQL warehouses, as well as run alerts, and update dashboards. The SQL files task also allows you to store .sql files in a Git repository. Every time the job runs, the latest version of the file from a specific branch is fetched. This new functionality makes it easy to co-version notebooks and SQL queries together. Using Git for these artifacts improves collaboration between team members and reduces the risk of errors.

In this blog, we will walk through an example pipeline published and reproducible by anyone here in this git repo.

You can use this functionality to regularly execute SQL commands (e.g., to optimize a table), or to orchestrate data transformations that benefit from versioned SQL.

Development Flow

Step 1 : Set up Git integration

Git Integration In User Setting within Workspace

Step 2 — Create .sql files in the GIT Repository for a simple data pipeline.

(You can import and use the provided repo with the example files already there)

Git Repository Root

├──/1.Create_Tables.sql

├──/2.Load_Data.sql

├──/3.Query_Fact_Sales.sql

├──/4.Create_Workflow_through_CLI.py

Task Breakdown

1.Create_Tables.sql

USE CATALOG MAIN;
USE SCHEMA DEFAULT;


-- 1/ Create a Dimension & Fact Tables In Unity Catalog
--STORE DIMENSION
CREATE OR REPLACE TABLE dim_store(
store_id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
store_name STRING,
address STRING
);
--PRODUCT DIMENSION
CREATE OR REPLACE TABLE dim_product(
product_id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
sku STRING,
description STRING,
category STRING
);
--CUSTOMER DIMENSION
CREATE OR REPLACE TABLE dim_customer(
customer_id BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 0 INCREMENT BY 10) PRIMARY KEY,
customer_name STRING,
customer_profile STRING,
address STRING
);
-- Fact Sales
CREATE OR REPLACE TABLE fact_sales(
sales_id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
product_id BIGINT NOT NULL CONSTRAINT dim_product_fk FOREIGN KEY REFERENCES dim_product,
store_id BIGINT NOT NULL CONSTRAINT dim_store_fk FOREIGN KEY REFERENCES dim_store,
customer_id BIGINT NOT NULL CONSTRAINT dim_customer_fk FOREIGN KEY REFERENCES dim_customer,
price_sold DOUBLE,
units_sold INT,
dollar_cost DOUBLE
);

This step represents creating the DDL for our pipeline / model.

2.Load_Data.sql

DECLARE OR REPLACE VARIABLE catalog_name STRING DEFAULT 'main';
DECLARE OR REPLACE VARIABLE schema_name STRING DEFAULT 'default';


SET VAR catalog_name = {{catalog_param}};
SET VAR schema_name = {{schema_param}};


-- 2/ Let's add some data to the Dimension Tables
INSERT INTO
IDENTIFIER(catalog_name || '.' || schema_name || '.' || 'dim_store')
(store_name, address)
VALUES
('City Store', '1 Main Rd, Whoville');
INSERT INTO
IDENTIFIER(catalog_name || '.' || schema_name || '.' || 'dim_product')
(sku, description, category)
VALUES
('1000001', 'High Tops', 'Ladies Shoes'),
('7000003', 'Printed T', 'Ladies Fashion Tops');
INSERT INTO
IDENTIFIER(catalog_name || '.' || schema_name || '.' || 'dim_customer')
(customer_name, customer_profile, address)
VALUES
('Al', 'Al profile', 'Databricks - Queensland Australia'),
('Quentin', 'REDACTED_PROFILE', 'Databricks - Paris France');
-- 3/ Let's add some data to the Fact Tables
INSERT INTO
IDENTIFIER(catalog_name || '.' || schema_name || '.' || 'fact_sales')
(product_id, store_id, customer_id, price_sold, units_sold, dollar_cost)
VALUES
(1, 1, 0, 100.99, 2, 2.99),
(2, 1, 0, 10.99, 2, 2.99),
(1, 1, 0, 100.99, 2, 2.99),
(1, 1, 10, 100.99, 2, 2.99),
(2, 1, 10, 10.99, 2, 2.99);

This task is a small example of inserting data dynamically into a table based on job paramters pushed down from our parent job and inserted into the SQL using Job Parameters + Variables.

3.Query_Fact_Sales.sql

DECLARE OR REPLACE VARIABLE catalog_name STRING DEFAULT 'main';
DECLARE OR REPLACE VARIABLE schema_name STRING DEFAULT 'default';


SET VAR catalog_name = {{catalog_param}};
SET VAR schema_name = {{schema_param}};


-- ### Query the tables joining data
SELECT * FROM IDENTIFIER(catalog_name || '.' || schema_name || '.' || 'fact_sales')
INNER JOIN IDENTIFIER(catalog_name || '.' || schema_name || '.' || 'dim_product') USING (product_id)
INNER JOIN IDENTIFIER(catalog_name || '.' || schema_name || '.' || 'dim_customer') USING (customer_id)
INNER JOIN IDENTIFIER(catalog_name || '.' || schema_name || '.' || 'dim_store') USING (store_id);

This is our final task querying the source table dynamically depending on the job parameters. This is a common pattern if we want to create the same job code that runs dynamically across environments such as DEV/TEST/QA/PROD.

Step 3 — ( UI ) Create a Databricks Workflow with the SQL Task Type.

Select SQL file task under SQL task type

Creating a Databricks Workflow with the DBSQL Task Type

Note this is running on a Serverless DBSQL Warehouse and getting the SQL from a file in an external Git Repo.

Enter Git information

Enter the URL of your Git repository and a Git reference to execute against. The SQL file task automatically pulls the latest .sql file from the Git repository during each run, so you do not have to update anything in Databricks.

Connect to external Git Repo (remember, you must authenticate in user settings first)

Specify file path and SQL warehouse(Pro or Serverless)

  • File path is the relative path from the root of the Git repository, so it should NOT start with “/”.
  • This feature requires a Pro or Serverless warehouse
  • Parameters can be any key value pair.

Step 4( CLI ) Create a Databricks Workflow with the SQL Task Type

This is an awesome feature that shows you how to create and manage jobs as code directly in your Databricks workspace with the Databricks CLI.

We want to create a new job like this:

API Reference

API Command Example

Now with DBR 15.1 and above, we can easily issue and manage the creation and editing of jobs directly from our workspace with the CLI:

In a notebook under View — Developer — run your cli in an embedded web terminal!

This opens up a terminal you can run the CLI in.

Creating Jobs directly from our Workspace Terminal

Edit and run on cli command on databricks web terminal/IDE:

databricks jobs create --json '{
"name": "Orchestrating_SQL_Files_on_DBSQL",
"email_notifications": {
"no_alert_for_skipped_runs": false
},
"webhook_notifications": {},
"timeout_seconds": 0,
"max_concurrent_runs": 1,
"tasks": [
{
"task_key": "Create_Tables",
"run_if": "ALL_SUCCESS",
"sql_task": {
"file": {
"path": "<GIT PATH>/1.Create_Tables.sql",
"source": "GIT"
},
"warehouse_id": "<WAREHOUSE ID>"
},
"timeout_seconds": 0,
"email_notifications": {},
"notification_settings": {
"no_alert_for_skipped_runs": false,
"no_alert_for_canceled_runs": false,
"alert_on_last_attempt": false
},
"webhook_notifications": {}
},
{
"task_key": "Load_Data",
"depends_on": [
{
"task_key": "Create_Tables"
}
],
"run_if": "ALL_SUCCESS",
"sql_task": {
"file": {
"path": "<GIT PATH>/2.Load_Data.sql",
"source": "WORKSPACE"
},
"warehouse_id": "<WAREHOUSE ID>"
},
"timeout_seconds": 0,
"email_notifications": {},
"notification_settings": {
"no_alert_for_skipped_runs": false,
"no_alert_for_canceled_runs": false,
"alert_on_last_attempt": false
},
"webhook_notifications": {}
},
{
"task_key": "Query_Fact_Sales",
"depends_on": [
{
"task_key": "Load_Data"
}
],
"run_if": "ALL_SUCCESS",
"sql_task": {
"file": {
"path": "<GIT PATH>/3.Query_Fact_Sales.sql",
"source": "WORKSPACE"
},
"warehouse_id": "<WAREHOUSE ID>"
},
"timeout_seconds": 0,
"email_notifications": {},
"notification_settings": {
"no_alert_for_skipped_runs": false,
"no_alert_for_canceled_runs": false,
"alert_on_last_attempt": false
},
"webhook_notifications": {}
}
],
"git_source": {
"git_url": "https://github.com/<GITUSERNAME>/dbsql_sme.git",
"git_provider": "gitHub",
"git_branch": "feature_branch_sqlfiles"
},
"queue": {
"enabled": true
},
"parameters": [
{
"name": "catalog_param",
"default": "main"
},
{
"name": "schema_param",
"default": "default"
}
],
"run_as": {
"user_name": "<username@domain>.com"
}
}'

This job now is created and you can run it directly in your workspace:

Final Job Output

References

  1. Use Databricks SQL in a Databricks job | Databricks on AWS

--

--

Databricks SQL SME
DBSQL SME Engineering

One stop shop for all technical how-tos, demos, and best practices for building on Databricks SQL