Making sense of Stream Data with AWS
This post is a continuation of Making sense out of Stream data locally.
In this blog, I will be showing you a step-by-step guide on how we can build analytical dashboards with Kafka using Glue and Quicksights.
AWS
Amazon web services is a cloud platform from Amazon offering many services. In case you do not have an AWS account you can create it here. I am using all the free tier features from AWS.
For following this blog, you will need AWS access and a secret key. Please check the following steps on how to get one.
Architecture
I will be using the data from my personal google timeline, you can check out the code here on how to publish google timeline data (via kml files) to Kafka.
Kafka
Running Kafka locally
You can use the confluent platform for running Kafka locally. Once you download the tar file you can unzip it and follow the following commands:
> export AWS_ACCESS_KEY=*********
> export AWS_SECRET_KEY=*********
> cd confluent-5.2.1/bin
> ./confluent start
This CLI is intended for development only, not for production
https://docs.confluent.io/current/cli/index.htmlStarting zookeeper
zookeeper is [UP]
Starting kafka
kafka is [UP]
Starting schema-registry
schema-registry is [UP]
Starting kafka-rest
kafka-rest is [UP]
Starting connect
connect is [UP]
Starting ksql-server
ksql-server is [UP]
Starting control-center
control-center is [UP]
Please export AWS_ACCESS_KEY AND AWS_SECRET_KEY generated above.
Kafka Connect
Kafka connect allows us to pull or push data from or to Kafka. There are plenty of Kafka connectors available, which you can check at https://www.confluent.io/hub/. For this tutorial, we will push Kafka stream data to S3 using the S3 sink connector.
Kafka Connect S3
We will be using io.confluent.connect.s3.S3SinkConnector
. Using the command below, we can run the Kafka to S3 connector by sending a http POST request to our Kafka connect cluster:
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '
{
"name": "google_timeline_connector_s3",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max": "1",
"topics": "google_history_trip_s3",
"s3.region": "eu-central-1",
"s3.bucket.name": "kafkaoffload",
"s3.part.size": "26214400",
"flush.size": "100",
"enhanced.avro.schema.support": "true",
"partition.duration.ms": "60000",
"locale": "UTC",
"timezone": "UTC",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"partition.field.name":"start_time",
"schema.compatibility": "FULL_TRANSITIVE",
"timestamp.extractor": "RecordField",
"timestamp.field": "start_time",
"path.format": "YYYY-MM-dd",
"rotate.schedule.interval.ms": "60000",
"format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://localhost:8081"
}
}'
Important properties:
s3.bucket.name
:kafkaoffload
, my S3 bucket name (create an S3 bucket in AWS web console)partitioner.class
:TimeBasedPartitioner
, to partition data based on date-timetimestamp.extractor
:RecordField
, to use one of the fields inside the record valuetimestamp.field
:start_time
, to use the start_time field to partition the data
Please modify the above fields, in case your use case or data is different. You can check the complete configuration here.
This should start a Kafka connect worker to offload the data to s3. You can check the status of your connector using: http://localhost:8083/connectors/google_timeline_connector_s3/status
AWS Glue
AWS Glue is a fully managed extract, transform, and load (ETL) service that makes it easier for customers to prepare and load their data for analysis. It can crawl data from S3 and create tables which stores the associated metadata in the Glue Data Catalogue. Once cataloged, your data is immediately searchable and queryable.
Creating a Crawler
Using AWS web console, search for AWS Glue
- You can add a crawler by clicking
Add crawler
- Specify the S3 path
- Create an IAM role
- Add Database name
- Once crawler is created, we can run the crawler using
Run crawler
This will start the crawling over the S3 bucket. It might take a few minutes to index the data.
You can also schedule the crawler to run at specific intervals.
Glue Tables
- Once the crawler finishes you can see a Table in AWS GLUE
Tables
section
- You can edit the schema, if required, as shown below
AWS QuickSight
Once you have finished with AWS Glue, you can use AWS QuickSight to get insights into your data.
Setup
Go to QuickSight on AWS web console
- Fill up
Create your QuickSight account
- Create an Athena
Data Set
- Select the Glue table
Analysis
- You can check the distribution of name
- You can filter out different values, in this example I have filtered out
Driving
andWalking
- You can plot
latitude
andlongitude
on graph
- Filtering out based on
partition_0
Role of partition
While reading from S3 using Athena or QuickSight, filtering by partition plays a crucial role.
If you use partition key to filter, the data is read only from the respected folders. Otherwise, it is a complete scan of the s3 bucket which is slow and expensive.
Column based file format
Column storage file format have advantages over the row based file formats. The basic idea is that instead of storing all the values from one row together, it stores all the values from one column together. This helps speed up filtering queries per column, is better for compression and works better for analytical tasks.
Currently we have outputted our data in Avro format. If you want faster speed up on query time, you might be interested in using parquet as file format. You can follow the parquet kafka connect update here or can also use Glue ETL to convert Avro files to Parquet.