Building Production Data Applications on Databricks SQL for AI-embedded Observability

Databricks SQL SME
DBSQL SME Engineering
19 min readJun 30, 2024
Data App Tagging Manager Architecture

Author: Cody Austin Davis

Introduction

For the past 6 months, I have been helping customers build boutique solutions for managing and scaling their operations on Databricks. However, I have not just been writing code and building data pipelines, but I have also focused on building Data Apps to add even more value by creating home-grown custom analytical workflows that enable data teams to scale their operational efforts by empowering non-technical (or less technical) users to drive and manage the Lakehouse Platform on Databricks. At this year’s Data and AI Summit, I got to present on building Production Data Apps for AI-embedded Observability, and the turnout made it clear that companies are looking forward in this space.

In this blog, we are going to walk through 2 use cases that we have solved for by building Data Apps using Plotly Dash on top of Databricks SQL — all governed by Unity Catalog.

Lets start with the use cases:

  1. Tagging Policy Management — A common challenge I hear when customers add users and use cases to Databricks is that they want to know their usage on Databricks is properly tagged so they can ensure that their spend is going in the right places and can track who is doing what across the organization. Databricks has tons of tools for this including the new Budgets feature in the account admin console; however, the core challenge I see is the UX of knowing exactly what is NOT tagged, what SHOULD be tagged, and being able to easily fix it without poking and prodding teams across the organization. This is a perfect operational data app use case — we want to enable non technical users to find improperly tagged usage, fix the improper tags, and ensure that future compliance is tracked accordingly.
  2. Automated Alerts from system tables — System tables are a fantastic feature in Databricks to keep track of what is going on in your environment. But we can make all this data even more valuable by creating a Data App that empowers less technical users to ask english questions about scoped and cleaned up system table data to automatically define, create, and schedule alerts based on these questions using AI_QUERY on Databricks SQL. This allows operations teams to scale out their alerting efforts to create more self serve applications. At minimum, it allows engineers to skip a lot of boilerplate code and have a consistent framework for creating new operational alerts.

Before we jump into the examples, lets define a Production Data Application and describe the difference between these and regular old full-stack native apps.

The goal of a Production Data App is simple — enable data teams (teams that generally know python, SQL, Scala) to generate insights as they already do, but to then connect those insights directly into actions, embed those actions into the workflow of end users, and allow these insights and actions to be shared to all necessary stakeholders.

Building full-stack applications is generally a completely differently tech stack and set of skills, which makes building analytically-focused data applications particularly challenging, because not only are they full-stack, but the logic and domain knowledge required to create them is generally more complex than basic CRUD applications. For Data Teams to be more valuable and effective, we need to bridge this gap.

There are 2 core components that allow us to bridge this gap today: Plotly Dash and Embedding AI directly in the Databricks Platform.

Plotly Dash solves the problem of unification of code. Data teams know Python and SQL, and Plotly Dash allows python users to build full stack applications from simple to fully customizable web applications without needing to learn a new tech stack for front end web development.

Databricks solves the problem of unification of data, governance and AI. By natively embedding AI into your data with Unity Catalog and Databricks Assistant, you can build apps that seamless leverage the intelligence in your environment (models, assistant, UC context, etc.)

When utilizing these platforms together, you can build full data applications that are self-contained and governed on Unity Catalog, and all deployed using Python + SQL. Plotly Dash can seamlessly integrate with all parts of the Databricks platform to run SQL workflows, call models, train models, run jobs, and everything else, creating a simple and familiar tech stack:

This unification is what will allow Data Teams to drastically improve their value to their business.

Now lets just into the real use cases!

Prerequisites:

  1. A Databricks Account with Unity Catalog and Databricks SQL enabled.
  2. System tables enabled and accessible (you have permission to view them, mainly usage and pricing tables).
  3. Data App Code: Download the code for this app here.
  4. To get the app working on your machine, in the project directory, edit the config/.env file with your workspace info:
DATABRICKS_SERVER_HOSTNAME=<your host name here>
DATABRICKS_HTTP_PATH=<warehouse http path>
DATABRICKS_TOKEN=<user access token>
DATABRICKS_CATALOG=main
DATABRICKS_SCHEMA=dash_observability_advisor

5. Be sure your user access permission to create a schema (the default location the app uses is “main.dash_observability_advisor”, create tables in that schema, as well as have access to a SQL Warehouse.

6. Install the dependencies for the app with pip with the following command:

pip3 install -r dependent_packages.txt

7. Run the following command to start the app (the app while take a while to start up the first time, since it will create the schema, tables, MVS, etc. You should see the SQL scripts running in the warehouse query history and in Unity Catalog as well as the app log):

python3 app.py

Use Case 1 — Tagging Policy Adherence Management

This use case is all about knowing where your Databricks Usage is going. To make things clear, we will start with a basic user story:

“As a business operations / financial analyst user, I want to be able to analyze usage of Databricks by business use case, as well as identify improperly/unidentified usage and categorize that usage appropriately without relying on external teams to make changes.”

We can utilize Plotly Dash and Databricks to solve this. If you download the code, you can fill out your workspaces ./config/.env variables, and run the app in your own container!

Before we jump in, lets take a look at the Architecture for the app we are going to walk through:

Data App Tagging Manager Architecture

Our Tagging Manager App will:

  1. Automatically define and deploy its own Schema, Tables, and ETL with Materialized Views
  2. Define the parameterized SQL Statements needed to perform the tagging policy adherence analysis
  3. Create and manage the state of all write-back tables where end users can add tags, policies, and other annotations.
  4. Provide a simple UI/UX to perform the analysis for end users.

When the app is deployed, it will create all its own state in Unity Catalog like so:

Deploying app tables, MVs, and functions when app starts up

Within theses tables, there are Materialized Views to automate the analysis and cleaning of the raw system tables:

Materialized View for clean usage analysis

There are also self-contained write-back tables for users to store the creation of tagging policies, added tags, and other annotations like so:

Now that we have seen how the app sets itself up, lets review the workflow.

First, we can define the concept of a tag policy. This is just essentially a spreadsheet that is automatically saved to Delta in Unity Catalog under a self-contained schema called dash_observability_advisor. Users can define specific tag key/value pairs they want to enforce and audit compliance to and save that. Then our analysis queries can use these tables in a live feedback loop. In our app, all write-back tables like this will live in this self-contained schema — this ensures we isolate changes in a safe way (like for when we want to add tags to compute resources without non-technical users editing real production jobs) so the users of this app can do what they need without editing others’ code/configurations.

Defining a Tag Policy

We wont cover all the callbacks here, but here is what this python callback that handles the full state of this policy table looks like:

@app.callback(
[Output('policy-changes-store', 'data'),
Output('tag-policy-ag-grid', 'rowData'),
Output('policy-change-indicator', 'style'),
Output('loading-save-policies', 'children'),
Output('loading-clear-policies', 'children'),
Output('tag-policy-dropdown', 'options'),
Output('tag-policy-key-dropdown', 'options'),
Output('tag-policy-value-dropdown', 'options'),
Output('adhoc-usage-ag-grid', 'columnDefs'),
Output('tag-keys-store', 'data')],
[Input('tag-policy-save-btn', 'n_clicks'),
Input('tag-policy-clear-btn', 'n_clicks'),
Input('tag-policy-ag-grid', 'cellValueChanged'),
Input('add-policy-row-btn', 'n_clicks'),
Input('remove-policy-row-btn', 'n_clicks')],
[State('tag-policy-ag-grid', 'rowData'),
State('policy-changes-store', 'data'),
State('tag-policy-ag-grid', 'selectedRows')] # Current state of stored changes
)
def handle_policy_changes(save_clicks, clear_clicks, cell_change, add_row_clicks, remove_row_clicks, row_data, changes, selected_rows):

### Synchronously figure out what action is happening and run the approprite logic.
## This single-callback method ensure that no strange operation order can happen. Only one can happen at once.
triggered_id = callback_context.triggered[0]['prop_id'].split('.')[0]


##### CREATE - Add New row in GRID
if triggered_id == 'add-policy-row-btn' and add_row_clicks > 0:
new_row = {
'tag_policy_id': None, # Will be generated by the database
'tag_policy_name': '',
'tag_key': '',
'tag_value': '',
'update_timestamp': datetime.now()
}
row_data.append(new_row)
return dash.no_update, row_data, dash.no_update, dash.no_update, dash.no_update, dash.no_update, dash.no_update, dash.no_update, dash.no_update, dash.no_update

#### DELETE Handle removing selected rows

if triggered_id == 'remove-policy-row-btn' and remove_row_clicks > 0:
## Only attempt to delete from

## TO DO: Be able to remove / delete rows by Row Id as well
ids_to_remove = [row['tag_policy_id'] for row in selected_rows if row['tag_policy_id'] is not None]
updated_row_data = [row for row in row_data if row['tag_policy_id'] not in ids_to_remove]

if ids_to_remove:
connection = system_query_manager.get_engine().connect()
try:
delete_query = text("""
DELETE FROM app_tag_policies
WHERE tag_policy_id IN :ids
""").bindparams(bindparam('ids', expanding=True))
connection.execute(delete_query, parameters= {'ids':ids_to_remove})
connection.commit()

except Exception as e:
print(f"Error during deletion: {e}")
raise e
finally:
connection.close()

if changes is not None:
updated_changes = [change for change in changes if change['tag_policy_id'] not in ids_to_remove]
else:
updated_changes = []

# Fetch distinct tag policy names within the context manager
with QueryManager.session_scope(system_engine) as session:

## 1 Query Instead of 3
tag_policy_result = session.query(TagPolicies.tag_policy_name, TagPolicies.tag_key, TagPolicies.tag_value).all()

# Process the query result and store the distinct values in variables in your Dash app
distinct_tag_policy_names = set()
distinct_tag_keys = set()
distinct_tag_values = set()

for row in tag_policy_result:
distinct_tag_policy_names.add(row.tag_policy_name)
distinct_tag_keys.add(row.tag_key)
if row.tag_value is not None:
distinct_tag_values.add(row.tag_value)

tag_policy_filter = [{'label': name if name is not None else 'None', 'value': name if name is not None else 'None'} for name in distinct_tag_policy_names]
tag_key_filter = [{'label': name if name is not None else 'None', 'value': name if name is not None else 'None'} for name in distinct_tag_keys]
tag_value_filter = [{'label': name if name is not None else 'None', 'value': name if name is not None else 'None'} for name in distinct_tag_values]


return updated_row_data, tag_advisor_manager.get_tag_policies_grid_data().to_dict('records'), dash.no_update, dash.no_update, dash.no_update, tag_policy_filter, tag_key_filter, tag_value_filter, tag_advisor_manager.get_adhoc_ag_grid_column_defs(tag_key_filter), tag_key_filter

##### Handle Clear Button Press
elif triggered_id == 'tag-policy-clear-btn' and clear_clicks:
clear_loading_content = html.Button('Clear Policy Changes', id='tag-policy-clear-btn', n_clicks=0, style={'margin-bottom': '10px'}, className = 'prettier-button')
return [], tag_advisor_manager.get_tag_policies_grid_data().to_dict('records'), {'display': 'none'}, dash.no_update, clear_loading_content, dash.no_update, dash.no_update, dash.no_update, dash.no_update, dash.no_update # Clear changes and reload data


##### Handle cell change
elif triggered_id == 'tag-policy-ag-grid' and cell_change:
if changes is None:
changes = []
change_data = cell_change[0]['data']
row_index = cell_change[0]['rowIndex']
# Ensure the change data includes the row index
change_data['rowIndex'] = row_index
changes.append(change_data)
row_data = mark_changed_rows(row_data, changes, row_id='tag_policy_id')

return changes, row_data, {'display': 'block', 'color': 'yellow', 'font-weight': 'bold'}, dash.no_update, dash.no_update, dash.no_update, dash.no_update, dash.no_update, dash.no_update, dash.no_update


##### SAVE CHANGES

# Handle saving changes
if triggered_id == 'tag-policy-save-btn' and save_clicks:

# Combine changes by row index
grouped_changes = []

if changes:
grouped_changes = group_changes_by_row(changes) ## from data_functions.utils import *


connection = system_query_manager.get_engine().connect()
save_loading_content = html.Button('Save Policy Changes', id='tag-policy-save-btn', n_clicks=0, style={'margin-bottom': '10px'}, className='prettier-button')

if changes:
try:
# Process grouped changes for both updates and inserts
for change in grouped_changes:
#print("Combined change data:", change) # Debug statement
record_id = change.get('tag_policy_id')
if record_id:
# Update existing record
update_query = text("""
UPDATE app_tag_policies
SET tag_policy_name = :tag_policy_name,
tag_policy_description = :tag_policy_description,
tag_key = :tag_key,
tag_value = :tag_value,
update_timestamp = NOW()
WHERE tag_policy_id = :tag_policy_id
""")
connection.execute(update_query, parameters={
'tag_policy_name': change['tag_policy_name'],
'tag_policy_description': change['tag_policy_description'],
'tag_key': change['tag_key'],
'tag_value': change['tag_value'],
'tag_policy_id': record_id
})

connection.commit()

else:
# Insert new record
if not change.get('tag_policy_name') or not change.get('tag_key'):
raise ValueError("Missing required fields: 'tag_policy_name' or 'tag_key'")

insert_params = {k: v for k, v in change.items() if k in ['tag_policy_name', 'tag_policy_description', 'tag_key', 'tag_value']}
print(f"INSERT PARAMS: {insert_params}") # Debug statement

insert_query = text("""
INSERT INTO app_tag_policies (tag_policy_name, tag_policy_description, tag_key, tag_value, update_timestamp)
VALUES (:tag_policy_name, :tag_policy_description, :tag_key, :tag_value, NOW())
""")
connection.execute(insert_query,
parameters= {'tag_policy_name':insert_params['tag_policy_name'],
'tag_policy_description':insert_params['tag_policy_description'],
'tag_key':insert_params['tag_key'],
'tag_value': insert_params['tag_value']})

connection.commit()


except Exception as e:
print(f"Error during save with changes: {changes}") # Debug error
raise e
finally:
connection.close()

else:
pass

with QueryManager.session_scope(system_engine) as session:

## 1 Query Instead of 3
tag_policy_result = session.query(TagPolicies.tag_policy_name, TagPolicies.tag_key, TagPolicies.tag_value).all()

# Process the query result and store the distinct values in variables in your Dash app
distinct_tag_policy_names = set()
distinct_tag_keys = set()
distinct_tag_values = set()

for row in tag_policy_result:
distinct_tag_policy_names.add(row.tag_policy_name)
distinct_tag_keys.add(row.tag_key)
if row.tag_value is not None:
distinct_tag_values.add(row.tag_value)

tag_policy_filter = [{'label': name if name is not None else 'None', 'value': name if name is not None else 'None'} for name in distinct_tag_policy_names]
tag_key_filter = [{'label': name if name is not None else 'None', 'value': name if name is not None else 'None'} for name in distinct_tag_keys]
tag_value_filter = [{'label': name if name is not None else 'None', 'value': name if name is not None else 'None'} for name in distinct_tag_values]

return [], tag_advisor_manager.get_tag_policies_grid_data().to_dict('records'), {'display': 'none'}, save_loading_content, dash.no_update, tag_policy_filter, tag_key_filter, tag_value_filter, get_adhoc_ag_grid_column_defs(tag_key_filter), tag_key_filter


return dash.no_update, dash.no_update, dash.no_update, dash.no_update, dash.no_update, dash.no_update, dash.no_update, dash.no_update, dash.no_update, dash.no_update # No action taken

There is a lot going on, primarily because we want to connect our spreadsheet interface to all our filters and context around the rest of the app. The powerful thing here is all we need to do is manage the Apps inputs and outputs via callbacks, and we can simple write python to perform the operations of our app. Write-backs are the hardest part, but also the most powerful!

Once we have defined a tag policy — lets say the “demo” policy, our app users can analyze adherence to this policy across their whole organization like so:

In the same app, we can select the “Demo Policy” in our filter, as well as select other usage filters from our system table materialized views such as date range, product type, and other tag key/value pairs. We see that in this demo example, only 15% of our ALL_PURPOSE usage is tagged with this policy. We can see total matched/unmatched usage, trend that over time, and even see the usage by value of the tag policy keys we defined. So now we know only 15% is tagged. We need to get to 100%. This is where combining this live compliance analysis with write-back tables can help us close the loop to identify non-compliance to our tag policy and FIX it in the app with the information we have available (like cluster name, cluster owner, usage, product type, age, etc.) This looks like the following:

Tagging Compute Resources In-app

Directly in our app, we can look at the top All Purpose, Jobs, and SQL warehouse usage stack ranked by usage amount. We can see whether or not it fits our selected policies, and we can use Dash write-back capabilities to add tags to make them compliant directly in-app. Remember, we do not want to be editing production jobs, we really just need this for analysis, so these edits are saved separately in our own isolated dash_observability_advisor schema. This allows us to stack rank our compute by usage and fix tags quickly and easily with no coding for the end users!

Now lets show what happens when we fix a few clusters and add tags:

Updated Usage Matching after our changes

Now we have gone to 25% adherence, so we are getting close to knowing where our usage is going, and have an easy framework for managing stray usage in the future! Doing this compliance analysis is actually pretty tedious and requires complex SQL behind the scenes, so building an app interface around this is critical to make these analytical workflows more accessible. Just to show an example SQL statement that runs to generate these visuals, here is what the app is dynamically generating to create these adherence visuals:

WITH filtered_usage AS (
SELECT
*
FROM
clean_usage -- Query Filters
WHERE
usage_start_time >= '2024-06-16' :: timestamp
AND usage_start_time <= '2024-06-30' :: timestamp
AND billing_origin_product IN ('ALL_PURPOSE')
),
active_tags AS (
SELECT
*,
1 AS IsPolicyTag
FROM
app_tag_policies
WHERE
1 = 1
AND tag_policy_name IN ('Demo Policy')
),
combined_cluster_tags AS (
SELECT
DISTINCT compute_asset_id,
billing_origin_product,
TagKey,
TagValue,
1 AS IsUsageTag
FROM
(
SELECT
COALESCE(
clean_job_or_pipeline_id,
clean_warehouse_id,
clean_cluster_id
) AS compute_asset_id,
billing_origin_product,
clean_tags
FROM
filtered_usage
) LATERAL VIEW EXPLODE(clean_tags) AS TagKey,
TagValue
WHERE
compute_asset_id IS NOT NULL
UNION
SELECT
compute_asset_id AS compute_asset_id,
compute_asset_type AS billing_origin_product,
tag_key AS TagKey,
tag_value AS TagValue,
1 AS IsUsageTag
FROM
app_compute_tags
),
tag_potential_matches AS (
SELECT
user_tags.compute_asset_id,
user_tags.billing_origin_product,
(
SELECT
COUNT(0)
FROM
active_tags
) AS TotalPolicyTags,
SUM(COALESCE(IsPolicyTag, 0)) AS NumberOfMatchedKeys,
COUNT(DISTINCT tag_value) AS NumberOfMatchedValues,
CASE
WHEN NumberOfMatchedKeys >= TotalPolicyTags THEN 'In Policy'
ELSE 'Not Matched To Tag Policy'
END AS IsTaggingMatch,
collect_set(
CASE
WHEN COALESCE(IsPolicyTag, 0) > 0 THEN CONCAT(TagKey, COALESCE(CONCAT(': ', TagValue), ''))
END
) AS TagCombos,
--TagCombo from tag policies
collect_set(
CASE
WHEN IsPolicyTag = 1 THEN TagKey
END
) AS MatchingTagKeys,
collect_set(
CASE
WHEN IsPolicyTag = 1 THEN TagValue
END
) AS MatchingTagValues,
collect_set(
CONCAT(TagKey, COALESCE(CONCAT(': ', TagValue), ''))
) AS updated_tags
FROM
combined_cluster_tags AS user_tags
LEFT JOIN (
SELECT
*,
CONCAT(tag_key, COALESCE(CONCAT(': ', tag_value), '')) AS TagCombo
FROM
active_tags
) p ON user_tags.TagKey = p.tag_key
AND (
p.tag_value IS NULL
OR p.tag_value = ""
OR user_tags.TagValue = p.tag_value
)
GROUP BY
user_tags.compute_asset_id,
user_tags.billing_origin_product
),
unmatched_policies AS (
SELECT
a.compute_asset_id,
p.tag_key AS UnmatchedPolicyKey
FROM
(
SELECT
DISTINCT compute_asset_id,
billing_origin_product
FROM
combined_cluster_tags
) a
CROSS JOIN active_tags p
LEFT JOIN combined_cluster_tags u ON a.compute_asset_id = u.compute_asset_id
AND p.tag_key = u.TagKey
AND (
p.tag_value IS NULL
OR p.tag_value = ""
OR p.tag_value = u.TagValue
)
WHERE
u.TagKey IS NULL
),
clean_tag_matches AS (
SELECT
tpm.*,
collect_set(up.UnmatchedPolicyKey) AS MissingPolicyKeys
FROM
tag_potential_matches tpm
LEFT JOIN unmatched_policies up ON tpm.compute_asset_id = up.compute_asset_id
GROUP BY
tpm.compute_asset_id,
tpm.billing_origin_product,
tpm.TotalPolicyTags,
tpm.NumberOfMatchedKeys,
tpm.NumberOfMatchedValues,
tpm.MatchingTagKeys,
tpm.MatchingTagValues,
tpm.IsTaggingMatch,
tpm.TagCombos,
tpm.updated_tags
),
px_all AS (
SELECT
DISTINCT sku_name,
pricing.default AS unit_price,
unit_price :: decimal(10, 3) AS sku_price
FROM
system.billing.list_prices QUALIFY ROW_NUMBER() OVER (
PARTITION BY sku_name
ORDER BY
price_start_time DESC
) = 1
),
final_parsed_query AS (
SELECT
u.*,
-- TO DO: Add Discounts Table Later
((1 - COALESCE(NULL, 0)) * sku_price) * usage_quantity AS Dollar_DBUs,
-- Combine system tags with App tags
-- Combine system tags with App tags
u.clean_tags AS updated_tags,
(
SELECT
COUNT(0)
FROM
active_tags
) AS TotalPolicyTags,
COALESCE(ct.MatchingTagKeys, array()) AS MatchingTagKeys,
COALESCE(
ct.MissingPolicyKeys,
(
SELECT
collect_set(tag_key)
FROM
active_tags
)
) AS MissingTagKeys,
COALESCE(ct.NumberOfMatchedKeys, 0) AS NumberOfMatchedKeys,
COALESCE(ct.MatchingTagValues, array()) AS MatchedTagValues,
COALESCE(ct.MatchingTagKeys, array()) AS MatchedTagKeys,
COALESCE(ct.IsTaggingMatch, 'Not Matched To Tag Policy') AS IsTaggingMatch,
ct.TagCombos AS TagCombos
FROM
filtered_usage AS u
INNER JOIN px_all AS px ON px.sku_name = u.sku_name --- Join up tags persisted from the app
LEFT JOIN clean_tag_matches ct ON (
ct.compute_asset_id = u.clean_cluster_id
AND u.billing_origin_product = ct.billing_origin_product
)
OR (
ct.compute_asset_id = u.clean_job_or_pipeline_id
AND u.billing_origin_product = ct.billing_origin_product
)
OR (
ct.compute_asset_id = u.clean_warehouse_id
AND u.billing_origin_product = ct.billing_origin_product
)
),
filtered_result AS (
-- Final Query - Dynamic from Filters
SELECT
*
FROM
final_parsed_query AS f
WHERE
1 = 1
) -- TAG DATE AGG QUERY
SELECT
usage_date AS Usage_Date,
SUM(Dollar_DBUs_List) AS `Usage Amount`,
IsTaggingMatch AS `Tag Match`
FROM
filtered_result
GROUP BY
usage_date,
IsTaggingMatch

From a process perspective, this is a very basic read → analyze → edit → analyze → report workflow for internal problem solving, but the app completely abstracts away all this complex analysis behind the scenes from the end users. This is what will make Data team 10x more valuable in the coming years — being able to scale analytical workflows like this to less technical users and move faster. This is a beginning example of a Data Product.

Its not the sexiest use case, but its actually one of the most common challenges I see customers face! Now that we have a basic use case down, lets move on to use case 2.

Use Case 2 — LLM Generated Alerts on System Tables

This use case is all about making it easier to enable alerting on complex questions from our system table. Once again, we will start with a user story:

“As a business user, I want to be able to easily create and manage alerts on my system table data for nuanced and specific questions I may have without having to do a bunch of SQL coding and learning how to use the APIs to create and schedule alerts”

Back to our architecture, we focus on the alert manager side of the app:

Alert Manager + Tag Manager App Architecture

Our alert manager will:

  1. Create and register a SQL function in Unity Catalog with pre-prompts engineered for the app that utilize AI_QUERY
  2. Creates a chat window with Dash App callbacks to allow the user to interact with the model to create all the parts needed for an alert such as the SQL query, schedule, subscribers, alert conditions, etc.
  3. Leverage a writeback table with a Dash App callback to register a query, create an alert, and create a Databricks Job to run the alert given the parameters the LLM generated from the users english question.
  4. Allow users to manage (edit/remove/view) the saved generated alerts in one place.

Here is what our app looks like:

Alert Manager App

It is a very simple UI, It provides an prompt input, chat window, place to edit any in progress alerts, and write-back (AGGrid) table to save the alerts with all the metadata generated from saving it (such as alertId, query id, and job Id).

We can submit a question via the prompt box:

“Please get me an alert that lets me know when my daily usage for all usage with the ‘demo’ tag goes above 100 dollars in a given day but always the previous days usage. I want the alert to run daily at 3 am. Can you send the alert to cody@email.com”

In this question, we are asking to generate an alert on daily usage for a specific tag. We are also asking it to apply a threshold of 100 dollars per day with some additional nuance of having a 24 hour lag. We are also asking it to create a schedule for it and send it to a specific email. When we submit this, we get the following:

Alert question output:

We can continue to chat and have the LLM edit the pending alert it created, or we can edit the values directly in the textbox. This gives us easy flexibility depending on the output. Lets chat with DBRX some more to refine our requirement. We will submit another question:

“Can you change the schedule to daily at 6 PM? And can you also send it to my boss my_manager@email.com?”

We add the prompt and click submit and get the following:

Updated alert with our new asks!

Really powerful iteration! Now when we feel like we are done, we can click “Save Alert”. This will leverage the databricks-sdk to parse the output from DBX and create all the formal queries, alerts, and jobs required. Lastly, it will save all that info into our write-back table automatically like so:

Saving LLM Generated Alert Jobs

And if we look in our workspace and search for the output job id, we can see that it created our job!

Final output of our LLM Generated Alert!

This is a basic but very powerful workflow to demonstrate the possibilities of building data apps on Databricks, and is enabled once again with Dash App callbacks on Databricks SQL. For example, here is the callback to populate and manage the chat feedback loop:


##### LLM Alert Manager #####

@app.callback(
[Output('chat-history', 'data'),
Output('chat-output-window', 'children'),
Output('chat-input-box', 'value'),
Output('in-progress-alert', 'data')],
[Input('chat-submit-btn', 'n_clicks'),
Input('clear-context-btn', 'n_clicks')],
[State('chat-input-box', 'value'),
State('chat-history', 'data')],
prevent_initial_call=True
)
def submit_chat(chat_submit_n_clicks, clear_context_n_clicks, input_value, history):

ctx = dash.callback_context
if not ctx.triggered:
return dash.no_update, dash.no_update, dash.no_update, dash.no_update

button_id = ctx.triggered[0]['prop_id'].split('.')[0]

if button_id == 'clear-context-btn':
# Clear the chat history
return {'messages': []}, "Chat history cleared.", "", {'data': {'alert': {}}}

if button_id == 'chat-submit-btn' and input_value:
new_history = history['messages'][-4:] # keep only the last 4 messages for rolling basis
new_input = """**ME**: """+ input_value
new_history.append(new_input)
response_message = ""

# Formulate the input for AI Query
### each new prompt/response combo
chat_updated_input = '\n\n'.join(new_history)
query = text("SELECT result FROM generate_alert_info_from_prompt(:input_prompt)")

try:
# Assuming system_query_manager.get_engine() is predefined and correct
engine = system_query_manager.get_engine()
with engine.connect() as conn:
result = conn.execute(query, parameters={'input_prompt': chat_updated_input})
row = result.fetchone()
new_output = row[0]
if row:
response_message = """**DBRX**: """ + new_output

else:
response_message = """**DBRX**: """ + "No response generated."
new_history.append(response_message)

except Exception as e:
response_message = """**DBRX**: """ + f"ERROR getting alert data: {str(e)}"
new_history.append(response_message)

#print(f"CONTEXT CHAIN: \n{new_history}")
#print(f"CURRENT OUTPUT: \n {new_output}")

parsed_updated_output = parse_query_result_json_from_string(new_output)
print(parsed_updated_output)
#print(parsed_updated_output)
return {'messages': new_history}, '\n\n'.join(new_history), "", {'alert': parsed_updated_output}

return dash.no_update, dash.no_update, dash.no_update, dash.no_update


In this screenshot, we can see that it called the generate_alert_info_from_prompt SQL function to generate the response. This is a SQL Function that the app comes with and deploys in Unity Catalog. We can go to the dash_observability_advisor schema and take a look under “Functions”:

We can see how registering this function in Unity Catalog enable us to encapsulate the complex prompt engineering to allow apps and end users the leverage it out of the box:

Our wrapped prompt using AI_QUERY

Perhaps most importantly, we can govern all these prompts and contexts with our app natively with Unity Catalog in addition to tables, raw data, materialized views, models, and any Functions (AI and Regular).

These 2 app examples are just the beginning of the Data Products Databricks users can build all on an open data layer with Delta Uniform, and open governance layer with Unity Catalog, and an open front end framework with Plotly Dash!

Conclusion

All on Databricks in Python + SQL, we have created some powerful workflows that enable non-technical users to be in the drivers seat. This concept of Production Data Apps will be even easier to leverage with embedded AI, and will make Data Teams far more valuable.

We hope you liked the blog! If you are interested in a continuing series on the lower-level implementation best-practices and steps for creating these apps, please feel free to reach out or comment!

--

--

Databricks SQL SME
DBSQL SME Engineering

One stop shop for all technical how-tos, demos, and best practices for building on Databricks SQL