SERVERLESS DATA LAKE

Serverless Data Lake: Storing and Analysing Streaming Data using AWS

Shafiqa Iqbal
Apr 11, 2020 · 8 min read

Making an Amazon S3 Data Lake on Streaming Data using Kinesis, S3, Lambda, Glue, Athena and Quicksight

This article will cover the following:

  • Write a python producer which will send records to Kinesis Data Stream using KPL aggregator
  • Preprocess records using Kinesis Data Analytics Preprocessor Lambda
  • Run-time aggregation on streaming data using Kinesis Data Analytics
  • Store data in S3 and create catalog in Glue
  • Run queries in Athena and create Views
  • Import datasets in Quicksight and build charts

Note: In this article, I will go through step by step on how I built this pipeline so that anyone interested in replicating a similar workflow can use this as a resource.

In the next part of this series, a step-by-step guide on ETL Data Processing, Querying and Visualization in a Serverless Data Lake, check my article here.

High-Level Solution Overview

In this post, we will use Chicago Crimes Dataset, which contains 2GB data, ranging from 2001 to 2017. The dataset has 6M records, thus, perfect for our scenario. We will use Python Kinesis Aggregation Module for efficient transmission of records on Kinesis Data Stream. We will use preprocessing lambda to transform the records (in our case KPL), into formats that Kinesis Data Analytics can understand. We will execute the SQL code in the tumbling window and analyze our aggregations. The Aggregated analytics are sent to Kinesis Firehose, which later on stored on S3. Once available on S3, AWS Glue tables are updated and the data is made available for running queries with Amazon Athena. Then our chart is updated on QuickSight for visualization.

Below is the Architecture Diagram.

Create a Python Producer and Write Records

By this time, we have downloaded and preprocessed the dataset. Since files contain millions of records, you can partition and divide your files into multiple years. I’m using the Python Kinesis Aggregation module. Here are some of the benefits of using KPL:

  • Automatic and configurable retry mechanism
  • Aggregates user records to increase payload size and improve throughput

I’m using Callback-based Aggregation, for this, you must register a callback via the on_record_complete method. As you add individual records to the RecordAggregator object, you will receive a callback (on a separate thread) whenever a new fully-packed aggregated record is available.

import glob
import boto3
import aws_kinesis_agg.aggregator
import time
import uuid
myStreamName = "Your-Stream-Name"
def send_record(agg_record):
pk, _, data = agg_record.get_contents()
kinesis_client.put_record(StreamName=myStreamName, Data=data, PartitionKey=pk)
kinesis_client = boto3.client('kinesis', region_name="us-east-1")
kinesis_agg = aws_kinesis_agg.aggregator.RecordAggregator()
kinesis_agg.on_record_complete(send_record)
def main():
path = 'path-to-your-files/*.csv'
filenames = glob.glob(path)
for filename in filenames:
with open(filename, 'r', encoding='utf-8') as data:
pk = str(uuid.uuid4())
for record in data:
kinesis_agg.add_user_record(pk, record)
send_record(kinesis_agg.clear_and_get())
time.sleep(1)
main()

Setup Your Kinesis Data Analytics Application

Kinesis Data Analytics applications continuously read and process streaming data in real-time. You write the application code using SQL to process the incoming streaming data and produce output. Then, Kinesis Data Analytics writes the output to a configured destination. For this, let’s login to the AWS Console, and head over to the Kinesis service.

Step 1: Go to Kinesis Data Analytics and create a new Application.

Step 2: In source, choose your Kinesis Data Stream.

Step 3: In record preprocessing with Lambda section, enable that and create a new lambda function. Since Kinesis Data Analytics accepts only csv and json records, so we need to convert our KPL formatted records into csv so that Kinesis Data Analytics can process it. For each batch of records, the application receives, the service manages how each batch gets passed to your Lambda function. You can learn about Lambda Preprocessing here. All records that are passed to your Lambda function, must be returned with recordId, result and data properties, otherwise, the application rejects them. For each record, you parse multiple csv records, process and join them, and send them back to the application with the same RecordId. Here is Lambda Function’s Code:

import base64
from aws_kinesis_agg.deaggregator import deaggregate_records
def lambda_handler(event, context):
output = []
kinesis_records = event['records']
for record in kinesis_records:
recordId = record['recordId']
records_deaggregated = deaggregate_records(record)
decoded_data = []
for record in records_deaggregated:
data = base64.b64decode(
record['kinesis']['data']) \
.decode('utf-8')
decoded_data.append(data)
output_data = "".join(decoded_data)
output_record = {
'recordId': recordId,
'result': 'Ok',
'data': base64.b64encode(
output_data.encode('utf-8'))
.decode('utf-8')
}
output.append(output_record)
print("Successfully processed \
{} records.".format(len(event['records'])))
return {'records': output}

Step 4: Choose an IAM role which gives the application the right permissions to do the following:

  • Read data from input data stream
  • Write Output to application output stream
  • S3 read permissions if you are using reference data from S3
Kinesis Data Analytics Source Config

Step 5: Start your application and discover schema. You can filter the columns and edit the schema to your choice.

Step 6: We will be using a TOP_K_ITEMS_TUMBLING function which returns the most frequently occurring values in the specified in-application stream column over a tumbling window. We will be displaying the trending locations of crimes over a tumbling window of 60 seconds. Save and run your SQL. Run your producer and you can see the real-time query results in the window.

Kinesis Data Analytics Query

Step 7: Configure Kinesis Firehose as your application output stream. We will be using a buffer period of 5MB and 300 seconds. You can configure the following Amazon S3 object key used for delivering data from Kinesis Data Firehose.

year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/

Here is the final configuration of our output stream.

You can run your producer, once either of the condition is met (5 MB or 300 seconds), a new partition will be created on S3. Here is the prefix structure.

Glue Cataloging

Athena uses Glue Catalog for querying S3 data. In order to register schema, we will be running our crawler only for the first time as we need to register our schema only once. Once the schema is registered, we just need to register our partitions which are created on an hourly basis on S3. Recrawling is an expensive and time taking operation. Since our schema will be the same, we just need to register our newly created partitions. We will be using batch_create_partition() glue api to register new partitions.

Step 1: Configure and run a crawler to register the table schema (this will run every time there are schema changes). Here are the crawler configurations to get you started:

crawler configuration

Step 2: Once the table’s schema is registered, you can edit the column names in your table. Make sure column names don’t contain any spaces.

Step 3: Now that the table’s schema is registered, Create a new Lambda Function which will register the new partitions. Since our Firehose is creating hourly partitions in our case, we need to run our lambda function on an hourly basis so that the respective created partition is registered in Glue Catalog. Attach a CloudWatch event with Lambda Function. Here’s the function code:

import boto3
from datetime import datetime
def check_if_prefix_exists(prefix):
client = boto3.client('s3', region_name="Region")
result = client.list_objects(Bucket="S3-Bucket", Prefix=prefix)
exists = False
if "Contents" in result:
exists = True
return exists
def lambda_handler(event, context):
l_client = boto3.client('glue', region_name="Region")
# Fetching table information from glue catalog
print("Fetching table info for {}.{}".format(" table-name" \
, "data"))
try:
response = l_client.get_table(
CatalogId="AWS-Account-ID",
DatabaseName="Database-Name",
Name="Table-Name"
)
except Exception as error:
print("cannot fetch table")
exit(1)
# Parsing table info required to create partitions from table
input_format = response['Table'] \
['StorageDescriptor']['InputFormat']
output_format = response['Table'] \
['StorageDescriptor']['OutputFormat']
table_location = response['Table']\
['StorageDescriptor']['Location']
serde_info = response['Table']\
['StorageDescriptor']['SerdeInfo']
partition_keys = response['Table']['PartitionKeys']
print(input_format)
print(output_format)
print(table_location)
print(serde_info)
print(partition_keys)
# Firehose creates partitions in utc time format,
#creating 2 digit utc time
hour, day, month, year = str('%02d' % datetime.utcnow().hour) \
,str('%02d' % datetime.utcnow().day) \
,str('%02d' % datetime.utcnow().month) \
,str(datetime.utcnow().year)
prefix = "year={}/month={}/day={}/hour={}" \
.format(year, month, day, hour)

# check if the prefix exist on S3, if not,
# we don't need to register the partition
if not check_if_prefix_exists(prefix):
print("prefix doesn't exist")
exit(1)

part_location = "{}year={}/month={}/day={}/hour={}".format(table_location, year, month, day, hour)
input_dict = {
'Values': [
year, month, day, hour
],
'StorageDescriptor': {
'Location': part_location,
'InputFormat': input_format,
'OutputFormat': output_format,
'SerdeInfo': serde_info
}
}
print(input_dict)
create_partition_response = l_client.batch_create_partition(
CatalogId="134867112207",
DatabaseName="chicago-crimes-batch",
TableName="chicago_crimes_streaming",
PartitionInputList=[input_dict]
)

Analyze your Data with Athena

Now that Glue Catalog is updated, we can query our data from S3 using Athena. We will create a view in Athena and import it in QuickSight.

Step 1: Create a new query where we will be displaying trending locations grouped by time. Here is the query structure:

Step 2: Run your query, your query will have the following results:

Now that the view is saved, we can import this view in QuickSight, and move on to the final step, where we create a chart.

Final Step: Visualize the data

Step 1: Open the Amazon QuickSight console.

Step 2: Step 2: Set up Amazon QuickSight account settings to access Athena and your S3 bucket. First, select the Amazon Athena checkbox. Select the Amazon S3 checkbox to edit Amazon QuickSight access to your S3 buckets. Choose the buckets that you want to make available, and then choose Select buckets.

Step 3: Choose Manage data.

Step 4: Choose NEW DATASET. In the list of data sources, choose Athena.

Step 5: Enter your data source name.

Step 6: Choose the view that you created, and choose Select.

Step 7: Then you can choose to use either SPICE (cache) or direct query access.

Step 8: You can add a new visualization, add your fields in metrics and select the visual type. Here I have created a combo bar chart.

Quicksight Bar Graph

Summary

In this tutorial, we created a serverless data lake from streaming data. We wrote a python producer to simulate our streaming and used Kinesis Data analytics to aggregate the data in a tumbling window. Then we used Glue catalog to store our schema for Athena’s queries. In the end, we created a sample real-time chart in Quicksight, which will be updated on our incoming streaming data.

Some Optimizations

  • We can run an ETL job which will convert our csv data into parquet format.
  • We can merge our files within partitions for better query performance. More on how we can optimize file sizes and increase Athena’s performance can be found here.

For a complete step-by-step guide on ETL Data Processing, Querying and Visualization in a Serverless Data Lake using AWS Glue, Athena and QuickSight, check my article here.

That’s it!

For upcoming stories, you should follow my profile Shafiqa Iqbal.

That’s it, guys! Have fun, keep learning & always coding!

The Startup

Get smarter at building your thing. Join The Startup’s +787K followers.

Sign up for Top 10 Stories

By The Startup

Get smarter at building your thing. Subscribe to receive The Startup's top 10 most read stories — delivered straight into your inbox, once a week. Take a look.

By signing up, you will create a Medium account if you don’t already have one. Review our Privacy Policy for more information about our privacy practices.

Check your inbox
Medium sent you an email at to complete your subscription.

Shafiqa Iqbal

Written by

Big Data Engineer. WomenTech Ambassador. Documenting the journey and sharing knowledge.

The Startup

Get smarter at building your thing. Follow to join The Startup’s +8 million monthly readers & +787K followers.

Shafiqa Iqbal

Written by

Big Data Engineer. WomenTech Ambassador. Documenting the journey and sharing knowledge.

The Startup

Get smarter at building your thing. Follow to join The Startup’s +8 million monthly readers & +787K followers.

Medium is an open platform where 170 million readers come to find insightful and dynamic thinking. Here, expert and undiscovered voices alike dive into the heart of any topic and bring new ideas to the surface. Learn more

Follow the writers, publications, and topics that matter to you, and you’ll see them on your homepage and in your inbox. Explore

If you have a story to tell, knowledge to share, or a perspective to offer — welcome home. It’s easy and free to post your thinking on any topic. Write on Medium

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store