Creating a serverless(almost free) event-driven data analytics pipeline
with AWS Athena, S3, SNS and Lambda
“Big Data” analysis is all the rage right now but never has it been quite so easy to analyse huge swathes of data for such a negligible cost.
AWS Athena is Amazon’s “Serverless interactive query service” — using standard SQL, a data scientist can perform queries on large volumes of data stored in S3 (an incredibly cheap place to store such quantities of data).
There are dozens of articles on how to get started with Athena and Glue Tables (essentially the schema of your data) so I won’t dwell on those here. The focus of this article is to design and explain an event-driven, continuous analysis pipeline which scales with your data.
Aside: My preferred format of storing Glue Table schemas is to write Protocol Buffers and generate the Glue schemas from these. This is very useful for passing data between services/languages.
The architecture I shall be describing is outlined below:
Data Sources
In this example, the data sources I am using are an Elastic Beanstalk App and raw IoT MQTT data.
The important part to note here is that both services post their information to Firehose which supports Data Transformation. This is incredibly handy as it allows for pre-formatting of different data sources into a standardised format which complies with your Glue schema. There are several other ways of achieving this but I have found a Lambda attached to a Firehose to be the most cost-effective and most real-time.
Data Storage
After being converted into a standardised format, data is stored in S3. For small to medium data volumes, a compressed (GZIP) JSON file should be sufficient but Firehose also supports outputting to Parquet if you prefer.
S3 pricing is laughably cheap (in my opinion) but Athena charges per GB of data scanned so smaller files (GZIP) are preferred. If you have a large number of columns/JSON keys in your data which are not often queried, it would make more sense to use a Parquet output format (since it is columnular and will limit the quantity of data scanned).
Event-driven Data Analysis
S3 has a wonderful feature of being able to notify a number of other AWS services to various events, one of which is “Object Created”.
When Firehose “flushes” the data into S3, this event can trigger a Lambda directly, an SNS topic and various other events. Directly invoking a Lambda can be a little limiting (since only one Lambda can be triggered) so I recommend an SNS topic to which a Lambda will subscribe.
The Lambda then invokes an Athena query. The SNS notification will contain information about what file was added so this key prefix can be used to tell the Lambda which query/queries to run in Athena.
Example:
- A file is added from Firehose to
raw/v0/deviceping
(a heartbeat message from an IoT device) — since data has been buffered in Firehose, this will not be a single IoT device but instead a concatenation of all new heartbeat messages received in the buffer interval [up to 15 minutes]) - The Lambda performs a lookup on that prefix and knows to execute a query which creates a report of the current battery levels of all IoT devices.
- The output from this query is output to another S3 file, e.g.
processed/v0/devicebattery
Second-tier Analysis
This is where the event-driven architecture really begins to excel. Using another object created S3 event, the above outputted report can be used for further analysis.
Annoyingly, Athena outputs a CSV file which I have found to be somewhat limiting. As an intermediary step, I invoke another Lambda to convert the CSV to a GZIPed JSON since CSVs do not play nicely with Athena/Glue’s schemas.
Athena Part Deux
Let’s assume that the higher-ups like graphs and want to see a time series plot of the change in battery levels of all IoT devices over the last week.
In order to make that graph as real-time as possible, it can be regenerated whenever a new device battery report is output from Athena. So, instead of manually (or using a cron) recalculating the device battery graph every hour, the chart can be regenerated if (and only if) new data becomes available! This maintains the real-time correctness of the data being displayed and prevents useless requerying when the underlying data has not changed.
Give me Python Please!
Athena is a stupendous tool but it is limited to only a small subset of data sources you might require (although the release of federated queries may address that). There may be times when another tool, e.g. Python, is required for further analysis.
Luckily this event-driven architecture can easily handle that. Instead of the second-tier Lambda invoking another SQL query through Athena, it can instead invoke a Python Lambda to perform any required processing.
Wrapping Up
This article has only highlighted a proposed architecture which I have found to be exceptionally useful and cheap. For those who are more intrigued by real code examples, I will be happy to share an AWS CDK project which generates the CloudFormation template for the above architecture.
Thanks for reading!