The Road of Data in AWS

Part 1— Create a Delivery Stream to S3 using Kinesis

Bruno Borges
9 min readJul 14, 2023

In today’s data-driven world, the ability to harness and leverage vast amounts of information has become paramount. Enterprises across industries are realizing the immense value of data in driving innovation, gaining insights, and making informed decisions. With the advent of cloud computing, storing, processing, and analyzing data has transcended traditional boundaries, paving the way for the creation of robust and scalable data infrastructures.

At the heart of this data revolution lies the concept of a datalake — a centralized repository that empowers organizations to store, manage, and analyze massive volumes of structured and unstructured data. And when it comes to building a cutting-edge datalake, Amazon Web Services (AWS) emerges as an undisputed leader, offering a comprehensive suite of services designed to unleash the full potential of data in the cloud.

In this series of articles, we will guide you through the process of creating a powerful datalake in a simple yet effective way, leveraging the capabilities of some AWS services. We will explore key components that play a vital role in the end-to-end data lifecycle, empowering organizations to unlock the full potential of their data assets.

  1. Collecting Data with Amazon Kinesis: The journey of data in AWS begins with the collection of real-time streaming data. Amazon Kinesis provides a scalable and fully managed service for ingesting, processing, and analyzing streaming data in real-time.
  2. Managing the Data with AWS Lake Formation: Once the data is ingested into your datalake, it becomes crucial to have a robust management system in place. AWS Lake Formation offers a comprehensive solution for managing data lakes at scale. We will explore the capabilities of Lake Formation, including data cataloging, data access control, and data transformation. You will learn how to organize your data lake, define access policies, and ensure data governance and compliance.
  3. Use Glue to Extract, transform and load (ETL) process: we will ensure that all ETL processes are performed using AWS Glue, further enhancing the power and capabilities of your datalake. You will have a comprehensive understanding of how to harness the power of Glue to drive your data transformation and integration processes, ultimately enabling you to derive valuable insights and make data-driven decisions.
  4. Implementing Governance with Iceberg, Hudi and Delta Lake Tables: Data governance is a critical aspect of managing a datalake effectively. Iceberg Tables, an open-source table format, provides advanced governance features for large-scale datasets in the cloud. We will delve into the concept of Data lakehouse, exploring how they enhance data quality, enable schema evolution, and facilitate efficient query performance. You will learn how to integrate Iceberg, Hudi and Dlta Lake Tables into your datalake architecture, ensuring data consistency, reliability, and compliance.
  5. Training and Deploying Models with Amazon SageMaker: One of the key benefits of a datalake is the ability to leverage data for advanced analytics and machine learning. Amazon SageMaker is a powerful service that simplifies the process of building, training, and deploying machine learning models at scale. We will explore how SageMaker can be seamlessly integrated with your datalake, enabling you to leverage the rich data within it to train and deploy models. You will gain insights into SageMaker's capabilities, such as automatic model tuning, deployment automation, and real-time inference.

Create a Delivery Stream to S3 using Kinesis

The first step is to collect data. Data can be generated in streaming from various sources such as IoT (Internet of Things) sensors, mobile applications, server logs, social media feeds, financial transactions, among others. This data is transmitted in real-time, usually in small packets called events. To handle streaming data, specific technologies and platforms are used, such as AWS Kinesis, Apache Kafka, Apache Flink, Apache Spark Streaming, among others. These tools provide capabilities for data ingestion, processing, storage, and analysis in streaming, making it possible to handle large volumes of data in real-time in a scalable and efficient manner.

In this tutorial we will use Kinesis, an AWS service that enables real-time data processing and analysis at scale. Basically, there are three main services within Kinesis: Amazon Kinesis Data Streams, Amazon Kinesis Data Firehose, and Amazon Kinesis Data Analytics.

Amazon Kinesis Data Streams: It is the core part of the Kinesis service. It allows the ingestion of large volumes of data in real-time and the ability to process them in a timely manner. The data is divided into shards (partitions) that can be read and written to simultaneously. Applications can consume the data directly from Kinesis streams or process them using Amazon Kinesis Data Analytics or any other real-time data processing tool.

Amazon Kinesis Data Firehose: It is a service that enables direct ingestion of data into various storage destinations such as Amazon S3, Amazon Redshift, Amazon Elasticsearch Service, and Amazon Splunk. Kinesis Data Firehose automatically manages the scaling and delivery of data to the specified destinations, relieving the developer’s workload.

Amazon Kinesis Data Analytics: It is a tool that enables real-time analysis of streaming data using standard SQL queries. You can run real-time queries to filter, transform, and aggregate data, generating immediate insights. Kinesis Data Analytics allows you to build advanced analytics applications and apply machine learning to streaming data.

We can simulate a real-time data delivery service using kinesis. For this, we will choose the amazon database that contains video game reviews.

import boto3
import json
import pandas as pd
import time
import uuid
import matplotlib.pyplot as plt


# Specify the name of the Kinesis Data Stream
stream_name = 'video_games_review_amazon'

# Create a Kinesis client
session = boto3.Session(profile_name='bruno_personal')
kinesis_client = session. Client("kinesis")

#load the dataset
books_df = pd.read_json('http://jmcauley.ucsd.edu/data/amazon_v2/categoryFilesSmall/Video_Games_5.json.gz', lines = True)

#plot number of ratings
d = (books_df.overall.value_counts().reset_index())
plt.bar(d.index, d.overall);
plt.xlabel("Rating")
plt.ylabel("No. of users")
plt.title("Number of avaliations per rating")
plt.show()

We will do the following: We will use Kinesis Data Streams to create a data stream; then we will use Kinesis Firehose to send the data to the Source Zone of the Data Lake, located in a bucket that we will create in S3. A lambda function can be used to transform the input data into the Parquet format.

Amazon Kinesis Data Streams

Let’s first create a data stream. Set up the stream: Provide a name for your data stream and define the desired number of shards. The shards determine the capacity and performance of the stream. More shards allow for a greater volume of data but also increase costs. Consider your use case needs carefully when determining the number of shards. It is also important to configure data retention, which determines how long the data in the stream will be stored before being automatically deleted. You can choose to retain the data for up to 7 days.

We will use Boto to create a data stream. By default, we will choose 1 shard and a retention period of one day (default). 1 shard is capable of writing 1 MiB or 1000 records per second and reading 2 MiB per second.

shard_count = 1
response = kinesis_client.create_stream(
StreamName=stream_name,
ShardCount=shard_count
)

response = kinesis_client.describe_stream(StreamName=stream_name)
stream_status = response['StreamDescription']['StreamStatus']
print("Stream status:", stream_status)

Amazon Kinesis Data Firehose

Let’s now create a delivery stream that sends the data to an S3 bucket. The delivery bucket is where our data lake is located. In particular, I have created one called “bbs-datalake”. Open the Kinesis Data Firehose service: On the Kinesis dashboard, click on “Create delivery stream” in Amazon Kinesis Data Firehose. Configure the delivery stream:

  • a. Select the data source, which will be the data stream you previously created in Amazon Kinesis Data Streams.
  • b. Select “Amazon S3” as the destination.
  • c. Click on “Create new” to create a new Amazon S3 bucket or select an existing bucket.
  • d. Choose a name for your delivery stream.
  • e. Add a prefix to the bucket.

In the bucket containing the datalake, the raw data is delivered to the Source Zone (or Bronze Data). Create a prefix to locate the video_games_review_amazon table in the Datalake Source Zone.

For Buffer hints, compression, and encryption, only change the buffer interval to the minimum, which is 60 seconds.

It is possible to configure transformation options (optional): If you wish to transform or format the data before sending it to the S3 bucket, you can add transformations using AWS Lambda. However, since we want the data to arrive in the Source Zone without any transformations, we will deliver it directly to S3 without any additional steps. I advise only compressing the data (gzip is more efficient) to save disk space. Please note that you could also configure additional processing options if you need to process the data using services like Amazon Elasticsearch Service or Amazon Redshift.

Regarding the data producer, Kinesis has three producers:

  • SDK: It is a collection of libraries and tools available in various programming languages such as Java, Python, .NET, among others. The SDK allows you to develop custom applications to produce and consume data from Amazon Kinesis. It offers high-level APIs, using PutRecord (one) or PutRecords (many), which facilitate sending records to Kinesis Streams or Kinesis Data Firehose, automatically managing authentication and connection details. It is targeted for simpler applications that do not require low latency or high throughput.
  • Kinesis Producer Library (KPL): The Kinesis Producer Library is a high-performance library designed to facilitate the efficient sending of large volumes of data to Amazon Kinesis Streams. The KPL is designed to handle record partitioning and aggregation, as well as handle high-throughput streaming use cases. It also provides buffering and automatic retry mechanisms in case of temporary connectivity failures.
  • Kinesis Agent: The Kinesis Agent is a command-line tool designed to facilitate the continuous sending of logs and files to Amazon Kinesis Data Streams or Kinesis Data Firehose. It is especially useful for sending server or application log data directly to Kinesis. The Kinesis Agent monitors specified log files or directories and sends the data to Kinesis in real-time, taking care of aspects such as record partitioning and streaming configuration.

Here, for simplicity, we will simulate streaming using the Kinesis SDK with boto3 as the producer. Essentially, we will send 100 lines from our dataset per second to the Data Stream we created. Each data packet will be a JSON with 100 lines.

import json
import time
import pandas as pd
import uuid
# Split the DataFrame into chunks of 50 rows each
chunk_size = 100
df_chunks = [books_df[i:i+chunk_size] for i in range(0, books_df.shape[0], chunk_size)]

# Process and send each chunk
i=0
partition_key = str(uuid.uuid4())
for chunk in df_chunks:
records = []
print(f'entrega {i}')
for _, record in chunk.iterrows():
# convert the columns that contain dictionaries into a string,
# in order to be able to display them in athena.
record['style'] = str(record['style'])
record['image'] = str(record['image'])
data = json.dumps(record.to_dict())
records.append({'Data': data, 'PartitionKey': partition_key})

# Send records to Kinesis
kinesis_client.put_records(StreamName=stream_name, Records=records)

# Wait for 1 second before sending the next chunk
time. Sleep(1)
i+=1

If everything worked out. After a few minutes we can already see files arriving in S3. Each file contains 5MB (the recommended Firehose buffer size).

Cool! We now have data coming into the Source Zone of the datalake. In the next tutorial we will talk about the datalake structure and what each of the data zones are. We’ll also talk about Glue and Lake Formation.

--

--

Bruno Borges

Cientista de dados formado em engenheiria de produção e matemática aplicada.