Technical and Business data alerts using Dataform

Alex Feldman
Google Cloud - Community
10 min readJan 31, 2024
Photo by Robert Thiemann on Unsplash

I want to continue discussing the topic of creating and tracking data alerts. Last year, I wrote a series of posts (1, 2, 3) on creating BigQuery data alerts using the Scheduled Queries service, Pub/Sub, and Cloud Functions. However, since then, Google Cloud has introduced new tools that have improved data pipeline development - Dataform and Remote Function. So, I’ve changed my approach and will discuss the latest way in this article.

Let's see how to create and run data alerts using Dataform and send notifications to Slack.

We can categorize alerts into two types:

  • Technical alerts are notifications for the technical team about data errors, with no complex requirements for message content.
  • Business alerts notify other teams (marketing, product, finance) of data-driven events and their details.

Technical alerts

We will use Dataform's assertion features and a Cloud Logging service to create and track technical alerts.

Dataform lets us create built-in assertions in the config block or manual assertions with SQLX files.

Built-in assertion

We can add a built-in assertion option to any table by defining the assertion conditions in the config block. Dataform compiles the additional debugging statement to check these conditions and produces an error if conditions are not met.

Built-in assertions can check the following conditions: nonNull, rowConditions, uniqueKey, and uniqueKeys. For example:

config {
type: "table",
schema: "my_dataset",
assertions: {
uniqueKey: ["id"],
nonNull: ["user_id"]
}
}

SELECT id, user_id
FROM ${ref('source_table')}

In this case, the assertion checks for duplicates in the ID field and null values in the user ID field. If either of these conditions is not met, the workflow execution will produce an error.

Manual assertion

A manual assertion with SQLX files can be set up to check certain conditions in a query. If the query returns any results, an error will be produced. For instance, the following assertion checks whether the table has been re-created within a specific time period.

config {
type: "assertion",
schema: "my_dataset",
tags: ["my_tag"]
}

SELECT creation_time, CURRENT_TIMESTAMP() as currect_time
FROM `my_project.my_dataset.INFORMATION_SCHEMA.TABLES`
WHERE table_name = 'checked_table_name'
AND (creation_time >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(),
INTERVAL 3 HOUR) ) IS FALSE

The query will only produce results if the table was created more than 3 hours ago.

To enable alerts, we need to create a Dataform workflow configuration, add SQLX files by tags or file names, and schedule the run frequency, for instance, every hour. Let’s name the workflow configuration alerts_hourly. After its execution, we can view the execution status in the Dataform workflow execution log.

Slack notification using Cloud Logging

To deliver an alert notification to Slack, we can Configure alerts for failed workflow invocation using Slack as a notification channel.

A query for the log entries alert policy section can look like this.

resource.type="dataform.googleapis.com/Repository"
severity=ERROR
jsonPayload.workflowConfigId="alerts_hourly"

Creating alerts this way catches alerts as errors from logs and delivers minimal information about incidents to the technical team for deeper analysis in the next step.

Business alerts

Business alerts have different requirements than technical alerts. The people who receive these alerts are often colleagues from other departments who may not be tech-savvy. Including too many technical details in the message can confuse and distract them. Instead, we should focus on providing specific information about the events in the message, as that is what the receivers are more likely to need.

To send the Slack notification, we will use a BigQuery remote function that invokes the Cloud Function, which will deliver the text to the Slack channel (more details about using a remote function in BigQuery in my article).

Preparations

To proceed, we must prepare by creating a Cloud Function, configuring Slack, setting up a BigQuery connection, and creating a remote function.

Let’s create the send_to_slack Cloud Function with a code:

import os
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError
import functions_framework
import json
import google.cloud.logging

log_client = google.cloud.logging.Client()
log_client.setup_logging()


def send_slack_message(message_text, sender, channel):
slack_token = os.environ['SLACK_TOKEN']
client = WebClient(token=slack_token)
message = f"{sender}: {message_text}"
try:
client.chat_postMessage(channel=channel_id, text=message)
logging.info(f"Message sent to Slack: {message}")
return 0
except SlackApiError as e:
logging.error(f"Error sending message: {e}")
return 1


@functions_framework.http
def main(bq_request):
request_json = bq_request.get_json()
message_text = request_json['calls'][0][0]
sender = request_json['calls'][0][1]
channel = request_json['calls'][0][2]
if message_string is not None and len(message_string) > 3:
try:
send_slack_message(message_text, sender, channel)
except Exception as e:
logging.error(e)
exit(1)
Else:
logging.info("No data to send to Slack.")
resp = json.dumps({"replies": ["Job is finished"]})
logging.info(resp)
return resp

The function accepts three arguments: message_text, sender, and channel. It sends a message to a particular Slack channel, along with the sender's name and the message text. When you deploy the Cloud Function, add your Slack token as an environment variable (it's better to use Cloud Secret) and include a requirements.txt file with dependencies.

slack-sdk>=3.20.0
google-cloud-logging~=3.6.0

We also need to configure Slack to receive messages from Google Cloud. One of my articles explains how to do it.

Then, we need to create a BigQuery connection with the BigLake type and remote functions (Cloud Resource) (please read my other article on how to do it in detail). The connection name is cloud_func_connection.

Next, let’s start developing code in Dataform. Initially, we create the remote function that invokes our send_to_slack Cloud function. The file name is send_to_slack_remote.sqlx.

config {
type: "operations",
schema: "my_alerts",
tags: ["alert"],
hasOutput: true
}

CREATE FUNCTION IF NOT EXISTS ${self()}(
message_text STRING,
sender STRING,
channel STRING)
RETURNS STRING
REMOTE WITH CONNECTION `my_project.us-east1.cloud_func_connection`
OPTIONS (
endpoint = 'https://us-east1-my_project.cloudfunctions.net/send_to_slack'
)

In the operations-type SQLX file, a code creates a remote function using the cloud_func_connection in the us-east1 region. This remote function is built with the endpoint of the send_to_slack function. The remote function is then saved in the my_alerts dataset with the name send_to_slack_remote, thanks to the hasOutput: true option.

We are now prepared to create business alerts using Dataform.

Alert query view

Let’s consider an alert for the case: an order without payment. We must create a query defining these issues and producing an alert text about incidents. We will make a Dataform view in the order_without_payment.sqlx file.

config {
type: "view",
schema: "my_alerts",
tags: ["alert"]
}
SELECT a.order_id, a.order_timestamp, a.user_id, b.payment_id
FROM ${ref('orders')} a
LEFT JOIN ${ref('payments')} b ON a.order_id=b.order_id
WHERE b.payment_id IS NULL
AND a.status = 'delivered'
AND DATE(a.order_timestamp) = CURRENT_DATE()-1 -- date filter

The query produces the list of orders delivered but not paid from the previous day.

Let’s compose a message.

config {
type: "view",
schema: "my_alerts",
tags: ["alert"]
}
WITH
orders_payments as (
SELECT a.order_id, a.order_timestamp, a.user_id, b.payment_id
FROM ${ref('orders')} a
LEFT JOIN ${ref('payments')} b ON a.order_id=b.order_id
WHERE b.payment_id IS NULL
AND a.status = 'delivered'
AND DATE(a.order_timestamp) = CURRENT_DATE()-1 -- date filter
)

-- compose message
SELECT order_timestamp,
FORMAT("Alert. Order #%d was delivered at %t for the user: %s without a payment",
order_id, FORMAT_TIMESTAMP('%H:%M on %d %b %Y',
order_timestamp), user_id) as message_text
FROM orders_payments

Look at the result.

We composed the alert messages with information about the order ID, user ID, and order time using the FORMAT function. The message_text field contains the result as Alert. Order #3000432 was delivered at 21:32 on 26 Jan 2024 for the user A001234 without a payment. The text is quite evident in describing what happened and when it occurred.

Since we want to send all information by one alert notification, let’s wrap up all messages in one, separated by the new line symbol.

config {
type: "view",
schema: "my_alerts",
tags: ["alert"]
}
WITH
orders_payments as (
SELECT a.order_id, a.order_timestamp, a.user_id, b.payment_id
FROM ${ref('orders')} a
LEFT JOIN ${ref('payments')} b ON a.order_id=b.order_id
WHERE b.payment_id IS NULL
AND a.status = 'delivered'
AND DATE(a.order_timestamp) = CURRENT_DATE()-1 -- date filter
),

compose_message as (
SELECT order_timestamp,
FORMAT("Alert. Order #%d was delivered at %t for the user: %s without a payment",
order_id, FORMAT_TIMESTAMP('%H:%M on %d %b %Y',
order_timestamp), user_id) as message_text
FROM orders_payments
)

SELECT
ARRAY_TO_STRING(ARRAY_AGG(message_text), '\n') as message_text
FROM compose_message

The result is

Now we have one message about all 3 incidents.

Next, we will create the alert script and send this message to Slack.

Alert with assertion

We can create an alert using the assertion type in the config block and invoking the send_to_slack_remote function. Let’s create the order_without_payment_alert.sqlx file as

config {
type: "assertion",
schema: "my_alerts",
tags: ["alert"]
}

WITH
order_without_payment as (
SELECT message_text
FROM ${ref('order_without_payment')}
WHERE message_text IS NOT NULL
),

SELECT ${ref('send_to_slack_remote')} ( -- calling remote function
message_text, -- the message text argument
'order_without_payment_alert', -- the sender argument
'#operational_alerts' -- the channel argument
) as send_message_job
FROM order_without_payment

In the config block, we used the assertion type. We added the message_text null values filter to avoid sending empty messages.

In the last SELECT, we added calling the send_to_slack_remote function. The function sends the message text to Slack if the previous subquery has any output. The function has three arguments: message_text (composed alert text), sender with order_without_payment_alert value, and channel as #operational_alerts (the Slack channel where we expect to receive an alert).

The query has returned the remote function's response.

Thus, If the alert is triggered, the script will notify Slack and log the error in the execution log. The date filter we used in the view implies running the workflow every morning to check the data from the previous day.

Great, we have achieved it!

Let's explore some other options. What if we want to avoid any errors appearing in the execution log? What if we need to run it more often and keep track of all alerts sent for future analysis? What if we use separate datasets and Slack channels for production and development?

Alert with incremental table

To prevent errors from appearing in the logs, we can modify the config block from an assertion type to a table type. Additionally, if we want to save all incidents and only check for new ones, we should use the incremental table type and activate the protected flag.

config {
type: "incremental",
schema: "my_alerts",
tags: ["alert"],
protected: true
}

We will consolidate the code into a single file to update both the view and table.

config {
type: "incremental",
schema: "my_alerts",
tags: ["alert"],
protected: true
}

WITH
orders_payments as (
SELECT a.order_id, a.order_timestamp, a.user_id, b.payment_id
FROM ${ref('orders')} a
LEFT JOIN ${ref('payments')} b ON a.order_id=b.order_id
WHERE b.payment_id IS NULL
AND a.status = 'delivered'
${when(incremental(), -- new date filter
`AND a.order_timestamp > (SELECT MAX(last_timestamp) FROM ${self()})`
),

compose_message as (
SELECT order_timestamp,
FORMAT("Alert. Order #%d was delivered at %t for the user: %s without a payment",
order_id, FORMAT_TIMESTAMP('%H:%M on %d %b %Y',
order_timestamp), user_id) as message_text
FROM orders_payments
),

wrap_up_message as (
SELECT MAX(order_timestamp) as last_timestamp, -- new field added
ARRAY_TO_STRING(ARRAY_AGG(message_text), '\n') as message_text
FROM compose_message
),

SELECT '${name()}' as sender, -- new field with file name value
CURRENT_TIMESTAMP() as created_at, -- new field
message_text, -- new field
${ref('send_to_slack_remote')} (
message_text,
'${name()}',
'#operational_alerts'
) as send_message_job,
last_timestamp, -- new field
FROM wrap_up_message
WHERE message_text IS NOT NULL

We replaced the date filter in the orders_payments subquery with the latest timestamp value, which will only work in incremental execution mode.

${when(incremental(),
`AND a.order_timestamp > (SELECT MAX(last_timestamp) FROM ${self()})`

Using query conditions like date = CURRENT_DATE()-1 is not a best practice because if the execution is filed someday, we can miss some alert incidents on this day. It would be preferable if the code takes into account the previous timestamp.

We add the MAX(order_timestamp) as last_timestamp field in the wrap_up_message subquery to provide incremental condition working.

In the last SELECT, we added fields: sender, created_at, message_text, and last_timestamp. Since we use the incremental table name as the sender name, we can utilize the name() function.

Whenever we have an alert incident, a new notification will be sent to the Slack channel, and a new row will be inserted into the table in the following format.

Inserted row

We can use the first three columns to gather statistics from various alerts in a single table if we have multiple alerts.

Alert in separate environments

It is helpful to have a separate development workspace from the production when building a pipeline. Dataform lets us do it using workspace compilation overrides. We can use the Schema suffix feature to add our production dataset name _production suffix. Also, using the test Slack channel when developing an alert is a good idea. The self() function can help to define the channel where the script should send notifications.

To do it, we need to replace the #operational_alerts value with an expression:

 IF('${self()}' LIKE '%_production%', '#operational_alerts', '#test_channel') 

The final alert code

config {
type: "incremental",
schema: "my_alerts",
tags: ["alert"],
protected: true
}

WITH
orders_payments as (
SELECT a.order_id, a.order_timestamp, a.user_id, b.payment_id
FROM ${ref('orders')} a
LEFT JOIN ${ref('payments')} b ON a.order_id=b.order_id
WHERE b.payment_id IS NULL
AND a.status = 'delivered'
${when(incremental(), -- new date filter
`AND a.order_timestamp > (SELECT MAX(last_timestamp) FROM ${self()})`
),

compose_message as (
SELECT order_timestamp,
FORMAT("Alert. Order #%d was delivered at %t for the user: %s without a payment",
order_id, FORMAT_TIMESTAMP('%H:%M on %d %b %Y',
order_timestamp), user_id) as message_text
FROM orders_payments
),

wrap_up_message as (
SELECT MAX(order_timestamp) as last_timestamp,
ARRAY_TO_STRING(ARRAY_AGG(message_text), '\n') as message_text
FROM compose_message
),

SELECT '${name()}' as sender,
CURRENT_TIMESTAMP() as created_at,
message_text,
${ref('send_to_slack_remote')} ( -- calling remote function
message_text,
'${name()}',
IF(
'${self()}' LIKE '%_production%', -- calculating the Slack channel
'#operational_alerts', -- production channel
'#test_channel' -- testing channel
)
) as send_message_job,
last_timestamp,
FROM wrap_up_message
WHERE message_text IS NOT NULL

When we develop the script in Dataform, it sends messages to #test_channel. After compiling the code in the production release with the _production suffix, the dataset name is overridden as my_alerts_production. Messages are then sent to the #operational_alerts channel.

--

--