Databricks SQL Governance Alerting Patterns — Warehousing Scaling + Data Access Auditing

Databricks SQL SME
DBSQL SME Engineering
12 min readMay 14, 2024
DBSQL Alert For Query Queuing

Author: Tomasz Bacewicz, Senior Solutions Architect @ Databricks

Using Databricks Workflows, the Databricks SDK, and Databricks SQL to Build an Observability Alerting System

Intro

As an administrator of a workspace, it’s vital to create a Databricks experience for your users that is seamless and performant while also mitigating any risky behavior such as not following the principle of least privilege with data access permissions. With the power of the Databricks API, system tables, and Databricks SQL (DBSQL), workspace administrators and data stewards can track virtually every aspect of usage on the Databricks platform. In this blog, we will be setting up alerts for an administrator to monitor warehouse performance as well as data access grants that are high-risk and potentially too permissive.

Use Case 1 — Monitoring Compute Performance

In Databricks SQL Warehouses, we can scale our system both horizontally and vertically. We can scale vertically through t-shirt sizing (x-small, small, medium, etc) to add more CPU and memory for a given query. Warehouses also scale horizontally by adding more clusters to the warehouse to increases concurrency. This happens automatically if autoscaling is enabled by the warehouse creator. However, understanding the t-shirt size and autoscaling range (min and max number of clusters in the warehouse) of the warehouse configuration can often be a trial and error exercise. Especially as the usage of a warehouse varies over time with changes in query complexities, queried table sizes, or the amount of concurrent users, alerts on performance metrics can help a warehouse creator understand when to adjust the configuration to provide better query performance to the users.

In this first exercise, we will leverage the Query History API (soon to be a native system table — Private Preview) to create a table which we can use to alert warehouse administrators when either a large number of queries are being queued or when queries are spilling to disk.

Setting up a pipeline to ingest query history data

To set up alerts based on the usage and sizing of our warehouses, we first need to create a pipeline that will ingest the Query History API (Note: there is a query history system table that is coming soon to the platform which will expose this information without needing to leverage the API. It is currently in Private Preview). This API includes data about every query that was run within the workspace from the basics such as the warehouse ID or the user identity, to the performance metrics that can also be found in the query profiler UI (Make sure to set the “include_metrics” flag as True when calling the API so that it returns these metrics). With the Databricks Python SDK we can create a notebook which pulls all the queries that have been run within the last hour.

#import and setup the SDK
from databricks.sdk import WorkspaceClient
from databricks.sdk.service import sql

w = WorkspaceClient()


#function which will return the current time and the start of the time window of interest
#(in this case we will get all the queries run within the last hour)
import datetime
import time

def get_current_time_and_minus_one_hour():
current_time = datetime.datetime.now()
current_time_sub_hour = current_time - datetime.timedelta(hours=1)

current_time_ms = int(time.mktime(current_time.timetuple()))*1000
current_time_sub_hour_ms = int(time.mktime(current_time_sub_hour.timetuple()))*1000

return current_time_ms, current_time_sub_hour_ms


#call the API to get all the query history records for queries in the last hour
import json
import pandas as pd
from pyspark.sql.functions import lit

current_time_ms, current_time_sub_hour_ms = get_current_time_and_minus_one_hour()

#use the SDK to call the API
queries = w.query_history.list(
filter_by=sql.QueryFilter(
query_start_time_range=sql.TimeRange(
start_time_ms=current_time_sub_hour_ms, end_time_ms=current_time_ms
) #filtered by time range of the last hour
),
include_metrics=True #including metrics of queries
)

#convert returned query histories to dicts
queries_as_dicts = [query.as_dict() for query in queries]

#create a pandas dataframe and then convert it to a pySpark dataframe
queries_pandas = pd.DataFrame(queries_as_dicts)
queries_df = spark.createDataFrame(queries_pandas)

#add the time window to which the query belongs for future analysis
queries_df_with_window = queries_df.withColumn("start_window", lit(current_time_sub_hour_ms)).withColumn("end_window", lit(current_time_ms))

#append the dataframe to a table to be used for alerts and analysis
queries_df_with_window.write.mode("append").saveAsTable("shared.tomasz_alerts.query_history")

With this notebook, we’ve set up an ingestion process that will pull all the query history records within the last hour in our workspace and append it to a Delta table. This table can now be queried for monitoring and more importantly, alerts can be set up to trigger when users are experiencing poor query performance. Let’s now look at how we can use this table to determine when our warehouses should be resized.

Identifying warehouses where queries are stuck in queue

As a warehouse is being shared across many users, too many concurrent queries can lead to queuing until the warehouse has available resources for processing. Though often these bottlenecks can be resolved automatically through autoscaling, admins need to find the healthy upper boundary for the number of allowable clusters to scale up to for their given SLA. Hence, it’s vital for an admin to right size the warehouses by both reducing frequently occurring upscaling through an increase in the minimum cluster setting and by removing queuing bottlenecks by increasing the maximum cluster setting.

The query below will call the table that we created in the previous step. It uses the most recent hourly window that was pulled and filters on any queries which were stuck in the “queue” stage for more than one second.

select
sum(
metrics.query_compilation_start_timestamp - metrics.overloading_queue_start_timestamp
) as waiting_for_compute_duration_ms,
count(*) as count_queries_waiting_for_compute,
warehouse_id
from
query_history
where
end_window = (
select
max(end_window)
from
query_history
)
and metrics.query_compilation_start_timestamp - metrics.overloading_queue_start_timestamp > 1000
group by
warehouse_id
order by
waiting_for_compute_duration_ms desc

Based on the results of this query, the warehouse administrator can inspect the SQL warehouse monitoring page where the queueing time has been very large. If the warehouse has had a lot of query queuing while it has been scaled to the max number of clusters in the warehouse configuration, the administrator should increase this maximum limit. On the other hand, if the warehouse has not been hitting the max amount of clusters that it has been set to, the culprit of the queueing is most likely due to frequent autoscaling. In this case, the administrator should adjust the minimum cluster setting for the warehouse to a higher value.

Identifying warehouses where queries have disk spill

Another common cause of a poor query performance is that the warehouse being used is not sized properly in terms of vertical scaling (t-shirt sizing). This can be measured by looking at query runtimes compared to a desired SLA. But first, a common strategy in determining whether this is an issue is to set up an alert which triggers when queries are spilling to disk frequently. This means that the cluster is simply too small to handle the data in memory or there is a significant skew in the data.

The below query uses our ingested query history table to return the count of queries that are spilling to disk and by how much with average and max values.

select
avg(metrics.spill_to_disk_bytes) as avg_spill_to_disk_bytes,
max(metrics.spill_to_disk_bytes) as max_spill_to_disk_bytes,
count(*) as count_queries_spilled,
warehouse_id
from
query_history
where
end_window = (
select
max(end_window)
from
query_history
)
group by
warehouse_id
having
avg_spill_to_disk_bytes > 0
order by
avg_spill_to_disk_bytes desc

Through the results of this query, a warehouse admin can determine which warehouses have a lot of spill to disk occurring. Based on this, the cluster of the warehouse can be configured to a larger size to add more memory capacity to the warehouse and in turn reduce or eliminate spill to disk.

Setting up the pipeline for ingest and alerting

We have all the assets we need to set up a Databricks workflow which will ingest the data and run the queries which power the alerts letting a warehouse administrator know when it is time to re-configure the size. To do so, we can first set up two alerts for each of the queries above. In the example here, the threshold is set to greater than zero distinct warehouses which will trigger whenever there is even one warehouse with disk spill or queuing over one second occuring.

DBSQL Alert For Query Queuing in Query History
DBSQL Alert for Disk Spill in Query History

The query thresholds (size of spill to disk and queuing times) as well as the alert thresholds can be adjusted based on the level of tolerance for slower performing queries. You can decide to also trigger alerts based on proportion of queries spilling or frequency of queries spilling as well, which may be more indicative of a need to scale the warehouse or identify problems in the query itself. Once these alerts have been set up, we can create a workflow which first runs the query history API ingestion notebook and then triggers both of these alerts to refresh. To add alerts to your workflow, click the “Add Task” button, and select the SQL — Alert option. Then select your alerts created in the previous step to get the final Workflow Task Diagram.

Databricks Workflow with Ingestion + DBSQL Alerts

This workflow can be scheduled to run every hour or every day, again depending on the tolerance of slower performance on your warehouses. Once the Query History System Table is in Public Preview, you can remove the ingestion process and trigger these alerts to run directly on DBSQL only!

Relationship between disk spill and queueing

It’s important to note that warehouses which are scaled vertically may remove the need to scale horizontally and vice versa. For example, queries in a warehouse could be taking a very long time to complete, possibly indicated by the spill to disk alert above. As this is occurring, there is a possibility that new queries will be queued until these initial long running queries have completed. Hence, once the warehouse is vertically scaled from, for example, a medium to a large cluster, there’s a good likelihood that not only will the long running queries finish faster but the count of queued queries is also reduced.

In the same vein, scaling a warehouse horizontally by adding more clusters to it can allow for complex queries to have more dedicated resources allowing smaller queries to be redirected to other clusters. Hence scaling horizontally by adding more clusters to the warehouse can also reduce the amount of disk spill occurring during execution of dynamic queries.

Use Case 2 — Monitoring Access Controls

Another set of useful alerts that can be set up when using Databricks warehouses is related to access controls. A common point of focus of data stewards and governance admins is understanding when a user may have been granted access which is too permissive. To lay the groundwork for setting up alerts on grants which stray from best practices, organizations should have an established set of protocols defining how users should get access to data outside of their typical access. With these sets of practices defined, alerts can be used to understand when behaviors stray from standard operating procedures. In the examples below, alerts on audit system tables will be used for admins to monitor when users are granted access to sensitive PII data and when users have been given write access when they most likely should have been granted read only access instead.

Editor grants to an individual user

A common best practice when leveraging Unity Catalog is to govern access to data through user groups rather than by grants directly to individual users. Catalogs should be created for a team or a project and user groups of owners, editors, and readers specific to that catalog should be used for data access. If a colleague from another team needs ad-hoc access to such a catalog however, a governance admin might not want to put them into one of the existing groups as the grant can be temporary in nature. In this scenario, a guest user group can be pre-created for such users to be given temporary access. This makes governance more manageable in comparison to providing access directly through the user account.

With governance best practices established, alerts can be set up to monitor when grant behaviors stray from those guardrails. Hence in accordance with the best practice of using a guest group for ad hoc user grants, we can set up a query to identify when user accounts (rather than user groups) are provided direct access to a catalog, schema, or table. In this example, we will filter the grants to only MODIFY or ALL_PRIVILEGES permissions but this can be expanded to include READ grants depending on how strict the governance rules are within the organization.

-- Get all write permissions (MODIFY or ALL_PRIVILEGES) given to user accounts directly on table, schema, or catalog level
SELECT
event_time,
user_identity.email as granter,
request_params.changes as perm_changes,
audit.request_params.securable_type as securable_type,
request_params.securable_full_name as securable_full_name
FROM
system.access.audit
WHERE
audit.service_name = "unityCatalog"
AND audit.action_name = "updatePermissions"
AND audit.request_params.securable_type in ("catalog", "schema", "table")
AND audit.event_time > (current_timestamp() - INTERVAL 70 MINUTES)
AND audit.response.status_code = 200
AND request_params.changes:[*].principal like "%@databricks.com%" -- org email domain
AND (
array_contains(
flatten(
from_json(
request_params.changes:[*].add,
'array<array<string>>'
)
),
"MODIFY"
)
OR array_contains(
flatten(
from_json(
request_params.changes:[*].add,
'array<array<string>>'
)
),
"ALL_PRIVILEGES"
)
)

We use the domain of the organization (which in this case is “@databricks.com”) to identify that a user account has been granted direct write access to a data asset. For this example, an hourly schedule has been set up. Hence there is a filter to look back on the audit table for the last 70 minutes (as there is a delay in updates to the audit tables).

We can set up an alert to run this query every hour and notify us if there are any records returned by selecting the count of any column and triggering if greater than 0 is returned.

DBSQL Alert Creation for Users Gettign Write Grants

Grants to Sensitive PII tables

Another common Unity Catalog pattern is to have tags on tables or columns which specify that the data contains sensitive PII data and hence should be restricted to only a set of users to see. We can use the information schema system table to find all the tables and columns which have a “pii” tag and then get a list of these tables as well as schemas and catalogs which contain them. We can then cross reference this list with any permissions that were granted using the audit table. By setting up an alert for this query, the administrator will be notified of any and all permissions that have been added to these sensitive assets allowing them to validate that no erroneous grants were set.

-- Find all tables (and their catalogs and schemas) with column tags or table tags containing "pii"
WITH table_tags_union AS (
SELECT
explode(
array(
catalog_name,
CONCAT(catalog_name, ".", schema_name),
CONCAT(catalog_name, ".", schema_name, ".", table_name)
)
) as securable_full_name
FROM
system.information_schema.column_tags
WHERE
tag_name LIKE "%pii%"
OR tag_value LIKE "%pii%"
GROUP BY
all
UNION
DISTINCT
SELECT
explode(
array(
catalog_name,
CONCAT(catalog_name, ".", schema_name),
CONCAT(catalog_name, ".", schema_name, ".", table_name)
)
) as securable_full_name
FROM
system.information_schema.table_tags
WHERE
tag_name LIKE "%pii%"
OR tag_value LIKE "%pii%"
GROUP BY
all
)
-- Join the securables which contain "pii" tags with audit logs where there has been an addition of permissions
SELECT
event_time,
user_identity.email,
request_params.changes,
request_params.securable_type,
request_params.securable_full_name
FROM
system.access.audit
INNER JOIN table_tags_union ON audit.request_params.securable_full_name = table_tags_union.securable_full_name
WHERE
audit.service_name = 'unityCatalog'
AND audit.action_name = 'updatePermissions'
AND audit.event_time > (current_timestamp() - INTERVAL 70 MINUTES)
AND audit.response.status_code = 200
AND audit.request_params.changes:[*].add is not null

Just like in the last setup, we can create an alert for this query which triggers whenever any access has been granted to PII data.

DBSQL Alert for Users Getting PII Access
Scheduling Alert for PII Access

Conclusion

These are just some examples of the many ways that the query history API , system tables, DBSQL, and Databricks Workflows can be leveraged for an administrator to monitor their workspace with the goal of providing a seamless and secure experience for end users of the platform. These queries and alerts can be modified to your organization’s needs with regards to performance as well as governance. Reach out directly if you have a specific governance and monitoring topic you would like to see!

--

--

Databricks SQL SME
DBSQL SME Engineering

One stop shop for all technical how-tos, demos, and best practices for building on Databricks SQL