Making sense of Stream Data with AWS

Ankush Khanna
Analytics Vidhya
Published in
5 min readDec 11, 2019

This post is a continuation of Making sense out of Stream data locally.

In this blog, I will be showing you a step-by-step guide on how we can build analytical dashboards with Kafka using Glue and Quicksights.

AWS

Amazon web services is a cloud platform from Amazon offering many services. In case you do not have an AWS account you can create it here. I am using all the free tier features from AWS.

For following this blog, you will need AWS access and a secret key. Please check the following steps on how to get one.

Architecture

Architecture

I will be using the data from my personal google timeline, you can check out the code here on how to publish google timeline data (via kml files) to Kafka.

Kafka

Running Kafka locally

You can use the confluent platform for running Kafka locally. Once you download the tar file you can unzip it and follow the following commands:

> export AWS_ACCESS_KEY=*********
> export AWS_SECRET_KEY=*********
> cd confluent-5.2.1/bin
> ./confluent start
This CLI is intended for development only, not for production
https://docs.confluent.io/current/cli/index.html
Starting zookeeper
zookeeper is [UP]
Starting kafka
kafka is [UP]
Starting schema-registry
schema-registry is [UP]
Starting kafka-rest
kafka-rest is [UP]
Starting connect
connect is [UP]
Starting ksql-server
ksql-server is [UP]
Starting control-center
control-center is [UP]

Please export AWS_ACCESS_KEY AND AWS_SECRET_KEY generated above.

Kafka Connect

Kafka connect allows us to pull or push data from or to Kafka. There are plenty of Kafka connectors available, which you can check at https://www.confluent.io/hub/. For this tutorial, we will push Kafka stream data to S3 using the S3 sink connector.

https://www.confluent.io/

Kafka Connect S3

We will be using io.confluent.connect.s3.S3SinkConnector. Using the command below, we can run the Kafka to S3 connector by sending a http POST request to our Kafka connect cluster:

curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '
{
"name": "google_timeline_connector_s3",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max": "1",
"topics": "google_history_trip_s3",
"s3.region": "eu-central-1",
"s3.bucket.name": "kafkaoffload",
"s3.part.size": "26214400",
"flush.size": "100",
"enhanced.avro.schema.support": "true",
"partition.duration.ms": "60000",
"locale": "UTC",
"timezone": "UTC",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"partition.field.name":"start_time",
"schema.compatibility": "FULL_TRANSITIVE",
"timestamp.extractor": "RecordField",
"timestamp.field": "start_time",
"path.format": "YYYY-MM-dd",
"rotate.schedule.interval.ms": "60000",
"format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://localhost:8081"
}
}'

Important properties:

  • s3.bucket.name: kafkaoffload, my S3 bucket name (create an S3 bucket in AWS web console)
  • partitioner.class: TimeBasedPartitioner, to partition data based on date-time
  • timestamp.extractor: RecordField, to use one of the fields inside the record value
  • timestamp.field: start_time, to use the start_time field to partition the data

Please modify the above fields, in case your use case or data is different. You can check the complete configuration here.

This should start a Kafka connect worker to offload the data to s3. You can check the status of your connector using: http://localhost:8083/connectors/google_timeline_connector_s3/status

AWS Glue

AWS Glue is a fully managed extract, transform, and load (ETL) service that makes it easier for customers to prepare and load their data for analysis. It can crawl data from S3 and create tables which stores the associated metadata in the Glue Data Catalogue. Once cataloged, your data is immediately searchable and queryable.

Creating a Crawler

Using AWS web console, search for AWS Glue

  • You can add a crawler by clicking Add crawler
  • Specify the S3 path
  • Create an IAM role
  • Add Database name
  • Once crawler is created, we can run the crawler using Run crawler

This will start the crawling over the S3 bucket. It might take a few minutes to index the data.

You can also schedule the crawler to run at specific intervals.

Glue Tables

  • Once the crawler finishes you can see a Table in AWS GLUE Tables section
  • You can edit the schema, if required, as shown below

AWS QuickSight

Once you have finished with AWS Glue, you can use AWS QuickSight to get insights into your data.

Setup

Go to QuickSight on AWS web console

  • Fill up Create your QuickSight account
  • Create an Athena Data Set
  • Select the Glue table

Analysis

  • You can check the distribution of name
  • You can filter out different values, in this example I have filtered out Driving and Walking
  • You can plot latitude and longitude on graph
  • Filtering out based on partition_0

Role of partition

While reading from S3 using Athena or QuickSight, filtering by partition plays a crucial role.

If you use partition key to filter, the data is read only from the respected folders. Otherwise, it is a complete scan of the s3 bucket which is slow and expensive.

Column based file format

Column storage file format have advantages over the row based file formats. The basic idea is that instead of storing all the values from one row together, it stores all the values from one column together. This helps speed up filtering queries per column, is better for compression and works better for analytical tasks.

Currently we have outputted our data in Avro format. If you want faster speed up on query time, you might be interested in using parquet as file format. You can follow the parquet kafka connect update here or can also use Glue ETL to convert Avro files to Parquet.

--

--