The Slack Bot Every Data Engineer Wants

Vicky Kwan
7 min readApr 26, 2019

--

Your partner in (data) crime!

Update: I created a sample main.py with requirements that work for Cloud Function environment, and a quick summary of this article in the below repo. Feel free to check it out, fork it, and submit pull requests!

Recently my team experienced the bitter taste of system downtime, which caused our major data pipelines to be delayed for more than 50 hours. Causes included an insufficient alert system to inform us of the upstream outage (we were questioned by one of our business stakeholders about why a high-level dashboard wasn’t showing this week’s sales numbers), an outage of 4 hours on the vendor’s server side, and the wait time for all historic data to be backfilled.

In retrospect, the first problem — the lack of an alert system — is the most immediate to be solved. We would want to send our filtered logs from vendor system directly to our Slack channel so engineers will be alerted. We would also like the messages to be reader-friendly, instead of just an entire log entry with too much verbosity. I’m going to demonstrate how we achieved this goal by building a Pub/Sub topic that listens to StackDriver logs and pushes selected log entries to a Cloud Function, which then sends alerts in a nicely formatted way to the Slack channel. While this solution applies to all categories of StackDriver logs, we use our data pipeline logs given by Fivetran to reproduce the business context. This example was written in Python 3.7.

Before we start tweaking the logs, we need to connect Fivetran logging with GCP StackDriver. This step varies depending on your choice of vendor, but the essence here is to transfer all of the vendor logs into the your workspace on StackDriver.

Requirements:

* GCP Account

* Permission to create service account for Fivetran with required role

  1. Enter your GCP project id;
  2. You’ll be given a member email from Fivetran. Go to IAM on GCP console to grant this member a new role Logs Writer. See this page from Google for more details: https://cloud.google.com/iam/docs/quickstart
  3. Click Save and Test. Once the test is successful, the logs will start to appear on your Logs Viewer on GCP.
  4. If you have more than one project on fivetran, you’ll need to set up one logging per project. Within each project, logs from all warehouses and connectors will be sharing the same member email. This member email serves as the service account on GCP. You may filter on different service accounts on the basic selector menus on Logs Viewer as such:
Select the Service Account for your vendor logs. Each service account will need Logs Writer role.

For more info on basic logs filters, you can read more here: https://cloud.google.com/logging/docs/view/basic-filters

Now that we have the Fivetran logs nicely flowing in to StackDriver, we are able to set up a Pub/Sub topic to bundle up all Fivetran projects and warehouses to one log source. This will provide a centralized input stream for our Cloud Function to process in the next step. This Pub/Sub can be imagined as a control center for the bi-directional data traffic:

  1. Pub/Sub will listen to any new logs coming in from Fivetran to StackDriver; this is called a Pull delivery type.
  2. Pub/Sub will then talk to the Cloud Function in next step about this new log entry; this is called a Push delivery type.

We create the Pull delivery in this step, and leave the Push action to next step where we build Cloud Function (triggered by this Pub/Sub topic). We are going to create this Pull from Logs Viewer.

  1. Go to Logs Viewer page, select your first Fivetran project as shown in the previous step. You could either select the log level that your team needs (eg ERROR or above) on the log level tab:
Log Levels

Or, create your own filter by going to the Advance Filter:

Convert to Advanced Filter

Feel free to customize based on your particular requirements. You can expand an example log entry and take a closer look at the log structures:

Logs Viewer’s advanced filter based on json.

You may create special selector such as: jsonPayload.data.status="FAILURE" or jsonPayload.labels.levelValue="1001".

2. Once you are done creating the query for interested logs, hit Submit Filter (or cmd + enter for mac users). Then go ahead and Create Export. Give your sink a meaningful name (eg fivetran_{warehouse_name}_{connector_name}_{log_level}). Choose Cloud Pub/Sub as your Sink Service, and select Create new Cloud Pub/Sub Topic under the Sink Destination. You’re all set to create the first topic! Note that each sink has been automatically created a service account and granted Logs Writer to the topic you just created. There’s no extra steps needed.

3. Go to Pub/Sub — Topics and open your newly created topic. If you don’t see this, keep refreshing the page for a minute or so. Then Create Subscription to specifically pull from the SD logs. Give it a name but make sure to set the Delivery Type to Pull, and keep all others default.

Create a pull subscription

Voila! There completes the Pull side of this topic. Next we are going to create the Push side from Cloud Function.

Since my team is not using Cloud Functions for Firebase, we use our own GitHub repo to set up a corresponding Cloud Repo (https://source.cloud.google.com/), and select Add a Repository -- Create external repository. Choose your GCP project id and select GitHub as your Git Provider and then authorize. Choose your GitHub repo on the next page and connect. This step provides us with better version control and is native to GitHub repos (no charge from Firebase). For your GitHub repo, setup your repo with a main.py that contains the Cloud Function that we are about to create, and a requirements.txt for any dependencies. Now we have everything we need to create the Cloud Function.

  1. Go to Cloud Functions and hit Create Function.
  2. Name your cloud function, set Trigger to Cloud Pub/Sub and Topic to the topic you created in previous steps. Select Cloud Source Repository for the Source Code. Choose your runtime. Copy the Cloud Repo name (see step 4) into Repository and make sure Function to execute is pointing to the correct function on your main.py. Keep all others as default, and Create.
  3. Wait a few moments for the function to spin up. Once it’s deployed successfully, you could test it on the Testing page:
Testing Pub/Sub trigger Cloud Function using base64 encoded log message

Cloud Function that is triggered by a Pub/Sub topic will take event payload in the form of {"data":"base64_encoded_log_message"}. Here, the log message that you wish to test should be the encoded version of one entire log entry.

4. My main.py looks like this, and you can see that the entire cloud repo name should appear on the left upper corner:

My Cloud Function of transforming StackDriver log entries to Slack Webhook

From here on, you can filter and create your own customized payload for Slack webhook. Make sure to wrap the formatted payload around Slack’s template: '{"text":"some_very_pretty_message"}'. I personally find jinja2 templates extremely handy when it comes to passing some string parameters into a deeply nested dictionary like the Slack template.

5. Once you’re happy with your filters and formats, deploy this function.

6. No action is needed after this, but if you’re curious like me, you could go back to the Topic that you created for listening to SD logs. Now that we have created a Cloud Function that is triggered by this Topic, we will find two Subscriptions: the Pull subscription that listens to StackDriver incoming logs, and the Push subscription that sends the filtered and formatted logs to your favorite Slack channel. The endpoint URL for this Push subscription was automatically generated by the Cloud Function.

At this point, we have attempted to (and hopefully succeeded in):

  1. Bring vendor logs into GCP StackDriver for centralized monitoring and alerting;
  2. Set up Pub/Sub Pull subscription that listens to StackDriver incoming logs;
  3. Set up Pub/Sub Push subscription that sends the filtered and formatted logs to somewhere else (in this example, we send it to Slack’s incoming webhook). This step is achieve by writing your own Cloud Function. We control the deployments by writing on the Cloud Repo.

There are tons of flexibility in designing the best alert and monitoring system that fits your company and teams’ needs. By connecting StackDriver logging directly with Slack, we get instant notifications of any notable logs that we want to see, instead of the default StackDriver notifications.

I’ll be writing about our progress in the following blogs as I explore how to bring all Airflow logs in through this channel. I’ll also experiment the best ways to:

  1. Deploy configurations for creating, maintaining and versioning alert policies and user defined metrics on SD;
  2. Create analytics driven metrics by defining a set of SLI’s that best serve the analytics engineering warehouse, such as uptime and latency for ETL pipelines.

This is my first Medium post so I cannot wait to hear your experiences with GCP, StackDriver, Pub/Sub, Cloud Functions, and in general, anything that excites you in the Cloud Engineering world. Thanks!

--

--