Stream Data from Kinesis to Databricks with Pyspark

Streaming with AWS Kinesis and Databricks

Streaming data is fresh data and it plays a big role in actionable decision that can be made with that data.Let’s look at the below chart on how data loses it’s value quickly over time.

Mike Gualtier, Forrester

As you can see in the above chart data loses it’s value quickly over time. On the right there is historical data which is normally stored in a data warehouse or a data lake. The further left we move along this graph we start to see more actionable data. This is data that is captured in real-time in seconds or within minutes. This is where all the interesting data lives. This is a good motivation to start learning how to work with streaming data.

In this article we will learn how to ingest streaming data into AWS Kinesis Data Streams and bring that data into Databricks to apply schema and perform transformations on it.

To follow along you need to have the below perquisites:

  • An AWS free tier account.
  • A Databricks community edition account.

Navigate to the Kinesis service in your AWS account and click on “Create data stream” button.

Provide a name to the Data stream. For this use case we are going to keep the number of shards at 1. You can use the shard calculator according to your use case. Once you enter the name and number of shards click on create.

Here I will be using a python script to generate data from an API, which will run on a EC2 instance. You can use your preferred method to ingest data into Kinesis Data Stream. You can use the below Python script to ingest data from a API you wish to pull data from.

import requests
import boto3
import uuid
import time
import random
import json
client = boto3.client('kinesis', region_name='<INSERT_YOUR_REGION>')
partition_key = str(uuid.uuid4())
number_of_results = 500
r = requests.get('<Your API URL>' + str(number_of_results))
data = r.json()["results"]
while True:
# The following chooses a random user from the 500 random users pulled from the API in a single API call.
random_user_index = int(random.uniform(0, (number_of_results - 1)))
random_user = data[random_user_index]
random_user = json.dumps(data[random_user_index])
client.put_record(
StreamName='<INSERT_YOUR_STREAM_NAME>',
Data=random_user,
PartitionKey=partition_key)
time.sleep(random.uniform(0, 1))

Keep in mind that you need to setup your IAM role which allows the EC2 instance access to push records onto the Kinesis data stream.

We can see that our data stream is now created and it is getting the streaming records which can be seen in the monitoring section as shown below.

Once you see data coming into your Kinesis data stream, login to you Databricks Community edition. Create a Cluster and attach a blank notebook to the cluster.

Use the awsAccessKeyId and awsSecretKey to read the streaming data from the Kinesis data stream as shown below. It is not a best practice to expose your API credentials in the code. You can use AWS Secrets manager to hide it for better security and compliance.

You can see the streaming data by applying the display method on the kinesisDF data frame.

The data will come in the default schema of Kinesis which is as shown below.

You need to use the DataFrame operations (cast("string"), udfs) to explicitly de-serialize the data column.

Once the data column is de-serialized you can apply custom schema and transformations on it.

Woohoo!!! We made it to the end where we finally got the streaming data into Databricks workspace and transformed the semi-structured data into structured dataframe. Now we can push this data to S3 or a Data warehouse like Snowflake or Redshift which will then be used by our Data scientist and data analyst friends.

Hope you enjoyed this tutorial!! Happy learning!! Please follow and clap if you learned something new today. See you in another tutorial.

Follow the below Link for Big Data Engineering Contents.

--

--

--

road to data engineering is a publication which publishes articles related to data engineering tools and technologies to share knowledge on big data engineering which is a crucial process which enables data scientists and data analysts to create value.

Recommended from Medium

The Most Simplified Explanation of Data Science You Will Read Today!

SWMM with Python Week 4: Dry Weather Flow Analysis

Logistic Regression using Python

Using Folium on Police Data

Maths & Stats for Data Science Part-II: Combinations

How I scored #7 on Kaggle’s July 2021 tabular competition!

Reversing & Refunding Google Analytics Transactions with Da Vinci Tools

10 Data Exploration Tricks!

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Himansu Sekhar

Himansu Sekhar

Data Engineering | DevOps | DataOps | Distributed Computing

More from Medium

Query data from Cross Region Cross Account AWS Glue Data catalog

Serverless Data Integration with AWS Glue

Data Streaming with Flink

Auto Scaling Impact on Data Scientists Day-to-Day