Sending BigQuery data alerts to the Slack channel

Alex Feldman
7 min readApr 20, 2023

--

This is the third part of the BigQuery data alert article series (the first and the second parts), where we will study how to process the notification messages and send them to the Slack channel.

In the first part, we created the Pub/Sub topic and, based on it, the subscription that receives alerts (and other error) messages from the assert statement query and writes them to the BigQuery table. Let’s continue the alert message processing from this point using Python script. To do it, we will prepare the Slack channel for receiving notifications, create the Pub/Sub triggered Cloud function, and write the Python script to send messages to the Slack channel.

Prepare the Slack channel for receiving notifications.

To prepare the Slack channel for receiving messages from the outside, we need to create the Slack app, generate the Slack token to access it, and add the new Slack app to the specific channel. It supposes that your organization uses Slack, and you have permission as Slack admin.

First, we need to create a new Slack app. To do it, we must sign in to the Slack workspace and go to the Slack App Directory at https://api.slack.com/apps. Click the “Create an App” button to create a new app. Give our app a name (for example, data-alert-service), and select the workspace where we want to use it. In the following steps, we should add “Bots” options and authorize the app to access your Slack workspace. As a result, we will have OAuth Access Token. More details about creating Slack apps and generating tokens are here.

Now we can add the new Slack app to the specific channel where we want to get alert messages. To do it, in the Slack app, select the channel, and click on the down arrow next to the channel name in the header.

In the opened window, choose the Integrations tab, click the Add Apps button, and add the new app.

Creating Pub/Sub triggered Cloud function.

Let’s open our topic in the Pub/Sub and trigger the Cloud function (How to create Pub/Sub data alert topic we discussed in the first part). Click on the ‘Trigger Cloud Function’ button in the top menu of the topic page. In the opened, on the right side, the create function window, fill out fields. Write the function name and choose a region (if necessary).

Choose Python with the version you prefer. Write the name of the script function in the entry point field (main, for example).

Click on the REQUIREMENTS.TXT tab below and add dependencies.

# Function dependencies, for example:
# package>=version
google-cloud-pubsub>=2.14.1
slack-sdk>=3.20.0

Click on the MAIN.PY tab, clear the text area, and put the following script code here, replacing the slack_token, and the channel_id variables with your actual values.

import json
import base64
from google.cloud import pubsub_v1
from slack_sdk import WebClient

slack_token = 'YOUR-SLACK-TOCKEN'
client = WebClient(token=slack_token)
channel_id = "#YOUR-SLACK-CHANNEL-NAME"

def extract_message(json_string):
json_data = json.loads(json_string)
state = json_data.get('state', '-')
message_text = '-'
error_status = json_data.get('errorStatus')
if error_status:
message_data = error_status.get('message', '-')
message_data = message_data.split("at [")[0]
message_text = message_data.replace('Query error: ', '')
return state, message_text

def main(event, context):
text = base64.b64decode(event['data']).decode('utf-8')
state, message_text = extract_message(text)
if state == 'FAILED' and 'Syntax error' not in message_text:
client.chat_postMessage(channel=channel_id, text=message_text)

Let’s look at the script code. We have two functions. The main function reads the coming raw message text from the Pub/Sub topic (as the JSON string) and saves it to the text variable. The raw JSON string can look like this:

{
"dataSourceId": "scheduled_query",
"destinationDatasetId": "",
"emailPreferences": {
"enableFailureEmail": true
},
"endTime": "2023-02-21T11:01:15.393507Z",
"errorStatus": {
"code": 3,
"message": "Query error: Duplicated User IDs in the table at[5]; JobID: 366106435717:scheduled_query_63f34dca-0000-2f29-a60a"
},
"name": "projects/36610640000000/locations/us-east1/transferConfigs/63b00080-0000-28c1-ad37-24058882fefc/runs/63f34dca-0000-2f29-a60a-30fd3817324c",
"notificationPubsubTopic": "projects/my-project/topics/bigquery_asserts",
"params": {
"query": "ASSERT NOT EXISTS (\nSELECT * FROM \n`my-project.my_dataset.my_table`\nWHERE ...) AS 'Query error:Duplicated User IDs in the table at[5]'"
},
"runTime": "2023-02-21T11:00:00Z",
"schedule": "every day 11:00",
"scheduleTime": "2023-02-21T11:00:00Z",
"startTime": "2023-02-21T11:00:00.511108Z",
"state": "FAILED",
"updateTime": "2023-02-21T11:01:15.393525Z",
"userId": "-7436756150212071123"
}

Next, the extract_message function processes the JSON string. It extracts values of the state and errorStatus.message keys, and clears the message text from unuseful words (like ‘Query error:’) and characters (like ‘at[5]’). Finally, the main function posts the text message to the Slack channel if the state is “FAILED”, and the text does not contain the ‘Syntax error’ words because we want to send only alert notifications, but not all query errors.

Our function is ready to work. We can press the DEPLOY FUNCTION button now or improve the function code before it by adding the error processing and comments.

import json
import base64
from google.cloud import pubsub_v1
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError
import logging

slack_token = 'YOUR-SLACK-TOCKEN'
client = WebClient(token=slack_token)
channel_id = "#YOUR-SLACK-CHANNEL-NAME"

def extract_message(json_string):
"""
Extracts the state and the alert message from a JSON string.
Args:
json_string (str): The JSON string to extract data from.
Returns:
A tuple containing the state (str) and the text message (str).
"""
json_data = json.loads(json_string)
state = json_data.get('state', '-')
message_text = '-'
error_status = json_data.get('errorStatus')
if error_status:
message_data = error_status.get('message', '-')
message_data = message_data.split("at [")[0]
message_text = message_data.replace('Query error: ', '')
return state, message_text

def main(event, context):
"""
Reading a Pub/Sub message JSON string, processing it,
and posting the message to Slack.
Args:
event (dict): The Pub/Sub event.
context (google.cloud.functions.Context): The Cloud Functions context.
"""
try:
text = base64.b64decode(event['data']).decode('utf-8')
except (TypeError, ValueError):
logging.error(f"Error decoding message: {event['data']}")
return

state, message_text = extract_message(text)

if state == 'FAILED' and 'Syntax error' not in message_text:
try:
client.chat_postMessage(channel=channel_id, text=message_text)
except SlackApiError as e:
logging.error(f"Error sending message: {e}")
else:
logging.info(f"Message sent to Slack: {message_text}")
else:
logging.info(f"Ignoring message with state '{state}' and text '{message_text}'")

After deploying the function, we can test and edit it in the Cloud function section.

Suppose we have several channels for different groups of alerts (for example, marketing alerts, product alerts, and DevOps alerts). In that case, we can add the alert group name in the alert message (for example, ‘Marketing alert. The conversion dropped’) and filter them in the main function.



def main(event, context):
"""
Reading a Pub/Sub message JSON string, processing it,
and posting the message to Slack.
Args:
event (dict): The Pub/Sub event.
context (google.cloud.functions.Context): The Cloud Functions context.
"""
try:
text = base64.b64decode(event['data']).decode('utf-8')
except (TypeError, ValueError):
logging.error(f"Error decoding message: {event['data']}")
return

state, message_text = extract_message(text)

if state == 'FAILED' and 'Marketing alert' in message_text and 'Syntax error' not in message_text:
try:
client.chat_postMessage(channel=channel_id, text=message_text)
except SlackApiError as e:
logging.error(f"Error sending message: {e}")
else:
logging.info(f"Message sent to Slack: {message_text}")
else:
logging.info(f"Ignoring message with state '{state}' and text '{message_text}'")

I recommend hiding the Slack token in the environment variable for security reasons. To do it, open the script in the Cloud function, choose EDIT, click the ADD VARIABLE button in the Runtime environment variables section, and add the variable name and the value.

Next, click the Next button below and change the script code. We need to import the OS library and replace one row.

import os

## slack_token = 'YOUR-SLACK-TOCKEN'
slack_token = os.environ['SLACK_TOKEN']

Click the DEPLOY button to save changes.

If we’ve sent a false alert to the Slack channel and want to delete it, we can not do it directly because only an author (The data-alert-service app) can remove messages from the channel. To delete the message, we can run the Python script like this:

import logging
import os
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError

slack_token = 'YOUR-SLACK-TOCKEN'
client = WebClient(token=slack_token)
channel_id = "#YOUR-SLACK-CHANNEL-NAME"
logger = logging.getLogger(__name__)
# The ts of the message you want to delete
message_id = "YOUR-MESSAGE-ID"

try:
# Call the chat.chatDelete method using the built-in WebClient
result = client.chat_delete(
channel=channel_id,
ts=message_id
)
logger.info(result)

except SlackApiError as e:
logger.error(f"Error deleting message: {e}")

No need to create a Cloud function for it. We can run the script from any place once (for example, from Cloud Shell Editor).

We can extract the channel ID and message ID values from the Slack message URL.

A link to the Slack message

After six characters from the right, we must add a dot to the message ID value. The string like 1681893536861619 in the script should look like message_id = '1681893536.861619'.

So we prepared a Slack app for receiving alert messages. We created and deployed the Cloud function script that reads messages from the Pub/Sub topic and sends them to the Slack channel.

In the next part, We will talk about how we can further develop our data alerts, how we can avoid false alerts, and which next steps after an alert can bring help for the organization if they are automated (as an example, a subsequent generating and a delivering of the related mini-reports).

--

--