Consume Kinesis Data Streams Using Python and Boto3
This blog post will guide you through the process of consuming data from a Kinesis data stream using Boto3, an AWS library for Python programmers. The primary objective is to explain the concepts of Kinesis streams and equip you with the skills to write consumer programs capable of reading from multiple shards.
A prevalent requirement when working with messaging systems or streaming technologies is to read messages from the event stream. This task often necessitates writing a client program to consume the event stream, driven by purposes such as development, testing, or real-time event processing.
Kafka and Kinesis stand out as popular technologies in the streaming domain. Although both offer dedicated client libraries for consumption, the task is not straightforward due to their architectural differences. For instance, consuming data from Kinesis requires an understanding of shards and sequence numbers, whereas Kafka demands knowledge of partitions and offsets.
Python Libraries:
Boto3
import boto3
import json
import time
def get_all_shards(stream_name):
# Create a Kinesis client
kinesis_client = boto3.client('kinesis')
# Get the list of shards in the stream
response = kinesis_client.describe_stream(StreamName=stream_name)
shards = response['StreamDescription']['Shards']
return shards
def get_shard_iterator(kinesis_client, stream_name, shard_id, iterator_type='LATEST'):
# Get the shard iterator
response = kinesis_client.get_shard_iterator(
StreamName=stream_name,
ShardId=shard_id,
ShardIteratorType=iterator_type
)
return response['ShardIterator']
def get_kinesis_records(kinesis_client, shard_iterator):
# Get records from the shard
response = kinesis_client.get_records(ShardIterator=shard_iterator, Limit=100)
records = response['Records']
# Print the records
for record in records:
print(json.loads(record['Data']))
# Return the next shard iterator
return response['NextShardIterator']
def consume_stream(stream_name):
# Create a Kinesis client
kinesis_client = boto3.client('kinesis')
# Get all shards
shards = get_all_shards(stream_name)
# Create a dictionary to hold shard iterators
shard_iterators = {}
# Initialize shard iterators for all shards
for shard in shards:
shard_id = shard['ShardId']
shard_iterators[shard_id] = get_shard_iterator(kinesis_client, stream_name, shard_id)
# Continuously read records from all shards
while True:
for shard_id, shard_iterator in shard_iterators.items():
try:
shard_iterators[shard_id] = get_kinesis_records(kinesis_client, shard_iterator)
except Exception as e:
print(f"Error reading from shard {shard_id}: {e}")
# Sleep for a short interval to avoid excessive API calls
time.sleep(1)
# Replace with your Kinesis stream name
stream_name = 'your_stream_name'
consume_stream(stream_name)
If you find the content valuable, I kindly encourage you to express your appreciation by clicking the “clap” button and sharing it within your professional network.
Author: Santosh Kotagiri