OmniSci — Importing Data From Kinesis Stream

Veda Shankar
8 min readMay 21, 2019

--

OmniSci — Importing Data From Kinesis Stream

Apache Kafka is a very popular open-source, distributed messaging solution that is used by companies to build real-time data pipelines and streaming apps. However, implementing a scalable, fault-tolerant and performant Kafka deployment is quite involved with many resource considerations. Amazon Kinesis is a managed platform for streaming data on Amazon Web Services (AWS). Kinesis provides built-in features like scaling, automatic replication for high availability, encryption etc that are a must in a production data pipeline. This makes it a very compelling alternative to Kafka especially if you are already using AWS for your workloads.

You can use Amazon Kinesis Data Streams to collect and process large streams of data records in real time. A typical use case is real-time aggregation of data followed by loading the aggregate data into a data warehouse for analysis. In this tutorial, I provide a step-by-step procedure for creating a simple Kinesis Data Stream application, reading data from a Kinesis stream as data records and writing it to an OmniSci database table for querying and visualization.This tutorial uses an Amazon Linux 2 AMI instance for testing with IAM policies AmazonKinesisFullAccess, AmazonDynamoDBFullAccess, and CloudWatchFullAccess attached to it.

Kinesis Streams Concepts

Here is a brief description of the key concepts in Amazon Kinesis Data Streams;for more details, refer to AWS Documentation.

Producer

Producers put records into Amazon Kinesis Data Streams. For example, a web server sending log data to a stream is a producer.

Consumer

Consumers get records from Amazon Kinesis Data Streams and process them. These consumers are known as Amazon Kinesis Data Streams Application.

Kinesis Data Stream

A Kinesis data stream is a set of shards. Each shard has a sequence of data records. Each data record has a sequence number that is assigned by Kinesis Data Streams. Each shard can support up to 5 transactions per second for reads, up to a maximum total data read rate of 2 MB per second and up to 1,000 records per second for writes, up to a maximum total data write rate of 1 MB per second

Data Record

A data record is the unit of data stored in a Kinesis data stream. Data records are composed of a sequence number, a partition key, and a data blob, which is an immutable sequence of bytes. A data blob can be up to 1 MB.

Retention Period

The retention period is the length of time that data records are accessible after they are added to the stream. A stream’s retention period is set to a default of 24 hours after creation which can be increased to 168 hours (7 days).

The following diagram depicts the different components in the discussed Kinesis Data Stream application. In this example, the producer will write mock natural gas production data records to the Kinesis stream.

Create a Kinesis Stream

The first step is to create a stream and verify that it was successfully created. Use the following command to create a stream named gas-production with a single shard.

aws kinesis create-stream — stream-name gas-production — shard-count 1

The stream name gas-production identifies the stream. The name is scoped to the AWS account used and is also scoped by the AWS Region. That is, two streams in two different accounts can have the same name, and two streams in the same account, but in two different Regions, can have the same name.

Check if the stream was created:

aws kinesis list-streams

{
"StreamNames": [
"gas-production"
]
}

Check the status of the stream using:

aws kinesis describe-stream — stream-name gas-production

{
"StreamDescription": {
"KeyId": null,
"EncryptionType": "NONE",
"StreamStatus": "ACTIVE",
"StreamName": "gas-production",
"Shards": [
{
"ShardId": "shardId-000000000000",
"HashKeyRange": {
"EndingHashKey": "340282366920938463463374607431768211455",
"StartingHashKey": "0"
},
"SequenceNumberRange": {
"StartingSequenceNumber": "49595724303092122779582799139640537568816759595765596162"
}
}
],
"StreamARN": "arn:aws:kinesis:us-east-1:041129453290:stream/gas-production",
"EnhancedMonitoring": [
{
"ShardLevelMetrics": []
}
],
"StreamCreationTimestamp": 1557871137.0,
"RetentionPeriodHours": 24
}
}

You can also use the AWS Console Kinesis Dashboard to monitor the status of the service.

The StreamStatus is initially in state CREATING, and after some time it transitions to ACTIVE state. The read and write operations can only be performed on an ACTIVE stream.

Stream Data To Producer Using Kinesis Agent

Kinesis Agent is a stand-alone Java software application that offers an easy way to collect and send data to Kinesis Data Streams. The agent continuously monitors a set of files and sends new data to a stream. The agent handles file rotation, checkpointing, and retry on failure. It delivers all of your data in a reliable, timely, and simple manner. It also emits Amazon CloudWatch metrics to help you better monitor and troubleshoot the streaming process. Reference AWS documentation.

Use the following command to download and install the agent:

sudo yum install –y https://s3.amazonaws.com/streaming-data-agent/aws-kinesis-agent-latest.amzn1.noarch.rpm

As superuser, edit the configuration file /etc/aws-kinesis/agent.json. In this configuration file, specify the files ( “filePattern” ) from which the agent collects data, and the name of the stream ( “kinesisStream” ) to which the agent sends data. Note that the file name is a pattern, and the agent recognizes file rotations. You can rotate files or create new files no more than once per second. The agent uses the file creation timestamp to determine which files to track and tail into your stream; creating new files or rotating files more frequently than once per second does not allow the agent to differentiate properly between them.

{
"cloudwatch.emitMetrics": true,
"kinesis.endpoint": "",
"firehose.endpoint": "",
"flows": [
{
"filePattern": "/tmp/kinesis/production-data*",
"kinesisStream": "gas-production",
"partitionKeyOption": "RANDOM"
}
]
}

Start the agent manually

sudo service aws-kinesis-agent start

Configure the agent to start on system startup:

sudo chkconfig aws-kinesis-agent on

The agent is now running as a system service in the background. It continuously monitors the specified files and sends data to the specified stream. The Kinesis agent runs in the background continuously monitoring the directory /tmp/kinesis for changes to files that fit the pattern production-data*. New records added to the files are sent to the stream gas-production. The activities of the agent is logged in /var/log/aws-kinesis-agent/aws-kinesis-agent.log.

To simulate the production of real time data, I ran a script in a loop adding records with the following format:

flow_date, flow_value, state, county, region, display name, latitude, longitude

2018–02–08 16:00:00, 10.008, Pennsylvania, Warren, Northeast, National Fuel Gas Supply, 41.837835, -79.148063

You can use AWS CloudWatch -> Kinesis Data Stream to visually monitor the records that are written and read from the stream.

Creating a Kinesis Consumer

I used the boto python library to implement the Kinesis data stream consumer application. By setting the AWS keys as environment variables in your terminal, you can use the boto library without having to specify the AWS credentials in the python script.

export AWS_ACCESS_KEY_ID=XXXXXX

export AWS_SECRET_ACCESS_KEY=XXXXXX

Get a low-level client handle representing Amazon Kinesis for a specific region:

kinesis_client = boto3.client('kinesis', region_name='us-east-1')

Use describe_stream to get the details for the stream gas-production. The returned information includes the stream name, Amazon Resource Name (ARN), creation time, enhanced metric configuration, and shard map.

my_stream_name = ‘gas-production'
response = kinesis_client.describe_stream(StreamName=my_stream_name)

You can see the description of the returned JSON structure in boto documentation.

Get the shard id which uniquely identifies the shard in the stream. As gas-production was created with only one shard, use index 0 in the Shards dictionary structure to get the id.

my_shard_id = response[‘StreamDescription']['Shards'][0]['ShardId']

Use get_shard_iterator to get the shard iterator for gas_production stream. The shard iterator specifies the shard position from which to start reading data records sequentially. The position is specified using the sequence number of a data record in a shard. I use LATEST for the ShardIteratorType to start reading just after the most recent record in the shard, so that you always read the most recent data.

shard_iterator = kinesis_client.get_shard_iterator(StreamName=my_stream_name, ShardId=my_shard_id, ShardIteratorType='LATEST')
my_shard_iterator = shard_iterator[‘ShardIterator']

Use the shard iterator with the get_records API to start reading data records sequentially. To read from a stream continually, call get_records in a loop. Use the following loop to read one record at a time from the stream; the Data element in the Records dictionary structure contains the payload data in base64-encoding. The data blob is decoded and finally appended to a Pandas dataframe.

df = pd.DataFrame(columns=['flow_date', 'flow_value', 'state', 'county', 'region', 'display_name', 'latitude', 'longitude'])
row = 0
while 1==1:
record_response = kinesis_client.get_records(ShardIterator=my_shard_iterator, Limit=1)
for item in record_response["Records"]:
record_data = item["Data"].decode("utf-8")
record_data = record_data.rstrip('\n')
list = record_data.split(',')
df.loc[row] = list
row = row + 1
my_shard_iterator = record_response['NextShardIterator']

When you read repeatedly from a stream, use a get_shard_iterator request to get the first shard iterator for use in your first get_records request. For subsequent reads use the shard iterator returned by the get_records request in NextShardIterator . A new shard iterator is returned by every get_records request in NextShardIterator, which you use in the ShardIterator parameter of the next get_records request.

Using Pymapd to insert data into OmniSci

The pymapd client interface provides a Python DB API 2.0-compliant interface to OmniSci. Using pymapd,you can perform typical database operations like creating a table, appending data and running SQL queries on OmniSci. In addition, pymapd provides the ability to return results in the Apache Arrow-based GDF format for efficient data interchange as part of a GPU-based machine learning workflow.

You can install pymapd from conda-forge

conda install -c conda-forge pymapd

Or with pip

pip install pymapd

In the sample application, we use OmniSci Cloud to store the gas production data read from the Kinesis data stream as discussed in the above section.

First, create a connection to the OmniSci cloud instance. Refer to Using pymapd to Load Data to OmniSci Cloud for details on generating API keys needed for the connection.

from pymapd import connectuser_str = 'API Key Name'
password_str = 'API Key Secret'
host_str = 'use2-api.mapd.cloud'
dbname_str = 'mapd'
connection = connect(user=user_str, password=password_str, host=host_str, dbname=dbname_str, port=443, protocol='https')

Use the database connection handle with the load_table API to load the Pandas dataframe into a database table. The default behavior of the pymapd load_table API is to create the database table if it does not exist, and then load the data into the table.

table_name = 'natural-gas_production'
connection.load_table(table_name, df, preserve_index=False
)

Using OmniSci Immerse interface confirm that the table got created and the records got loaded.

You can find the Jupyter Notebook with the sample code for the Kinesis data stream application here. The last code block in the sample Jupyter Notebook continuously loops to read records from the Kinesis Data Stream, processes the records and loads it to the OmniSci table as a Pandas dataframe.

Happy Streaming!

--

--

Veda Shankar

Developer Advocate at OmniSci working actively to assist the user community to take advantage of OmniSci’s open source GPU based analytics platform.