Implementing BigQuery Custom Mail Trigger: Responding to Errors with Automated Mails

Abhik Saha
Google Cloud - Community
5 min readJun 20, 2023

This article dives further on error handling in BigQuery. If you haven’t already read the article, please go through it for reference purposes. In this article I will discuss how to trigger a mail to users/support team when an error is encountered during Procedure execution in BigQuery.

The idea here is quite simple and straight forward. We will use the same error logging table we had in our previous article. We will trigger a mail when there are new entries in the log table.

Image credits: hevodata

For that we will need the following APIs and service accounts enabled:

  1. Cloud Scheuler: The cloud scheduler will trigger a cloud function every 10 minutes to check for errors in the error log table. The interval can be adjusted as per requirement.
  2. Cloud Function: The cloud function will log into BigQuery and check for newer entries into log table. If a new entry is present, trigger a mail, else exit.
  3. SendGrid API: This API is responsible for trigger of mails. This is available in Google marketplace. We just need to enable it and the basic plan is free. Check here for official Google documentation for setting up the service.
  4. Two service accounts: A service account that must have Cloud Function invoker role associated with it. A second one which is the App Engine default service account.

Steps to Follow

Setting up the SendGrid API: Enable the API by following the Google Documentation provided here. In the SendGrid website, make sure that a single sender Verification is authenticated on a particular mail id. This mail id will be used to send the mail.

SendGrid website
SendGrid Authentication

Also in the API key section, make sure you generate an API key as per the documentation provided and make a note of it.

Setting up the Cloud Function: The Cloud Function will be a simple HTTP triggered. Create a 1st Generation Cloud Function with the below specification.

Memory Size: 1 GB ; Max instances: 2, Min instances: 1;

Also note down the HTTPs URL and use the App Engine Service Account.

Cloud Function Code

import base64
import json
from sendgrid import SendGridAPIClient
from sendgrid.helpers.mail import Mail
from google.cloud import bigquery
import pandas as pd
import db_dtypes


def mail_sender(request):
client =bigquery.Client()
query_job = client.query(
"""
select
*
from `plated-field-383807.manual_input.error_log_table`
where log_date>=datetime_sub(current_datetime(), INTERVAL 10 MINUTE)
"""
)
results=query_job.result()

if results.total_rows == 0:
print("No errors")
else:
df = results.to_dataframe()
html_content = f"""
<!DOCTYPE html>
<html>
<head>
<title>DataFrame Embedding</title>
<style>
table {{
border-collapse: collapse;
width: 100%;
}}
th, td {{
border: 1px solid black;
padding: 8px;
text-align: left;
}}
th {{
background-color: #4CAF50;
color: white;
}}
tr:nth-child(even) {{
background-color: #f2f2f2;
}}
tr:hover {{
background-color: #ddd;
}}
</style>
</head>
<body>
<h1>Logs</h1>
<table>
{df.to_html(index=False)}
</table>
</body>
</html>
"""
send_email_notification(html_content)

return "OK"

def send_email_notification(html_content):

message = Mail(
from_email='xxxxx@gmail.com', #use the SendGrid authenticated mail here
to_emails='xxxxx@gmail.com', #list of senders you want to mail
subject='Error Detected in the Log Table',
html_content=html_content)
try:
sg = SendGridAPIClient('your Send GRID API key here') #use the SendGrip API here
response = sg.send(message)
print(response.status_code)
print(response.body)
print(response.headers)
except Exception as e:
print(e)
#requirements.txt
# Function dependencies, for example:
# package>=version
sendgrid
google-cloud
datetime
google-cloud-bigquery
pandas==1.4.2
db-dtypes

Code Explanation

  1. Query the BigQuery table `plated-field-383807.manual_input.error_log_table` using the BigQuery Client API and check for errors in last 10 minutes.
  2. If no error logs are generated in the last 10 minutes within the error log table, exit the process.
  3. If error logs are found, store the result in a Python dataframe and embed it within a simple HTML code and trigger the send_email_notification function. The function is documented here.
  4. Store the required dependencies in a requirements.txt file.

Setting up the Cloud Scheduler

Set up a Cloud Scheduler with the below configuration. It will run every 10 minutes, POST as HTTP method, Cloud Function URL as the trigger option, Authentication header as OIDC token and select a service account with Cloud Function invoker access. The configuration screenshots are given below.

Result

To test the entire process, we introduce an error forcefully in a stored procedure as shown below and trigger it.

Error Introduced

It generates a new entry in the error log table as shown below.

To check it, we force run the Cloud Scheduler job. We can see a success message.

After we check the recipient mail, we can see the mail has been triggered.

Our workflow is successful!

Alternate Uses

  1. We can use the same process for generating successful and failure jobs using control tables in BigQuery .
  2. We can check newer entries into a specific table and then trigger other dependent process using the same workflow.

Alternatives

Check out this wonderful article written by Lak Lakshmanan to achieve a similar functionality in Google Cloud Platform.

Link : https://cloud.google.com/blog/topics/developers-practitioners/how-trigger-cloud-run-actions-bigquery-events

Follow me on LinkedIn and Medium to get more content like these!

--

--

Abhik Saha
Google Cloud - Community

Data Engineer @Accenture India || Writes about Bigquery, Cloud Function, GCP, SQL || LinkedIn ID: https://www.linkedin.com/in/abhik-saha-919646108/