A Snowflake External Function to pull data from Purple Air API to analyze air quality

In 2020 I worked with a group of data scientists that were looking at building a Clean Air solution leveraging the purple air API. They were interested in polling the API and loading that data into Snowflake. They wanted to keep a lot of historical values so they could maintain seasonality and correlate against notable regional events (e.g. wildfires). Snowflake is a great fit for storing lots of data for just this purpose. So we drafted up an architectural diagram to deploy the solution.

Draft Architecture to Proof of Concept

There were two initial thoughts around gathering the data. The first was to provision a small AWS EC2 Instance that would poll the API and potentially do a full file dump. This would help with the initial seed. What was more interesting was Snowflake’s ability to leverage External Functions to grab the data as necessary.

First we set up an API Gateway

Next, we configure a Lambda function

import json
import urllib3
import logging

logger = logging.getLogger()
logger.setLevel(logging.WARNING)

def lambda_handler(event, context):
http = urllib3.PoolManager()

json_return_message = []
error_msg = []
exception_msg = []

body = json.loads(event["body"])
status_code = 200

for row in body["data"]:
row_number = row[0] # This is the row number from Snowflake
url = row[1] # This is the data from Snowflake, url in this case
try:
res = http.request("GET",url)
# status=200 means it works
if res.status != 200:
error_msg = {
'error' : "URL: %s | HTTP error: %s" % (url, res.status)
}
logging.error(error_msg)
row_to_return = [row_number, error_msg]


else:
data = res.data
row_to_return = [row_number, data.decode("utf-8")]

except Exception as e:
logging.error(e)
exception_msg = {
'exception' : "URL: %s | Exception: %s" % (url, json.dumps(str(e)))
}
row_to_return = [row_number, exception_msg]

json_return_message.append(row_to_return)
json_compatible_string_to_return = json.dumps({"data" : json_return_message}) # build the JSON string that we will send back

return {
'body': json_compatible_string_to_return
}

At this point, from within Snowflake, I can now make a call to a REST endpoint, in this case, Purple Air API. I set up the External function in Snowflake. At this point, I would advise checking out the External Function setup in the Snowflake documentation for additional details as I’ve redacted some code in the next section.

create or replace api integration fetch_http_data
api_provider = aws_api_gateway
api_aws_role_arn = 'arn:aws:iam::[redacted]:role/ExecuteLambdaFunction'
enabled = true
api_allowed_prefixes = ('https://[redacted]/fetchhttpdata');
-- create the function
create or replace external function fetch_http_data(v varchar)
returns variant
api_integration = fetch_http_data
as 'https://[redacted]/fetchhttpdata';
show integrations;
use role sysadmin;
use database AirQuality;
create schema PurpleAir;
use warehouse PA_VWH;
create or replace table devices(data variant, time datetime);
--CONFIGURE AN EXTERNAL FUNCTION
desc function citibike.utils.fetch_http_data(varchar);
alter function citibike.utils.fetch_http_data(varchar) set max_batch_rows = 1;

That’s the hard work. Once the scaffolding is set up I can access it in SQL. At this point, I also leverage Snowflake’s geospatial capability. Instead of gathering all the data points I defined a point in LA and drew a 30KM circle around that point which retrieved only points from that vicinity.

--HOW ABOUT JUST LA: USE GEOSPATIAL
insert into devices
select citibike.utils.fetch_http_data($1) payload,
current_timestamp() row_inserted
from (select 'https://www.purpleair.com/json?show='|| device_id
from device_ids
where device_id IN (select data:ID
from device_flattened
where st_dwithin(
st_point(data:Lon,data:Lat)
,st_point(-118.249813,34.043327)
,30000)
)
)
;

Finally, we put it all together in a Tableau Dashboard where we can see a heatmap of the PM2.5 values as delivered by Purple Air.

Air Quality for Los Angeles displayed in Tableau

--

--