Stream Data from Kinesis to Databricks with Pyspark
Streaming with AWS Kinesis and Databricks
Streaming data is fresh data and it plays a big role in actionable decision that can be made with that data.Let’s look at the below chart on how data loses it’s value quickly over time.
As you can see in the above chart data loses it’s value quickly over time. On the right there is historical data which is normally stored in a data warehouse or a data lake. The further left we move along this graph we start to see more actionable data. This is data that is captured in real-time in seconds or within minutes. This is where all the interesting data lives. This is a good motivation to start learning how to work with streaming data.
In this article we will learn how to ingest streaming data into AWS Kinesis Data Streams and bring that data into Databricks to apply schema and perform transformations on it.
To follow along you need to have the below perquisites:
- An AWS free tier account.
- A Databricks community edition account.
Navigate to the Kinesis service in your AWS account and click on “Create data stream” button.
Provide a name to the Data stream. For this use case we are going to keep the number of shards at 1. You can use the shard calculator according to your use case. Once you enter the name and number of shards click on create.
Here I will be using a python script to generate data from an API, which will run on a EC2 instance. You can use your preferred method to ingest data into Kinesis Data Stream. You can use the below Python script to ingest data from a API you wish to pull data from.
import jsonclient = boto3.client('kinesis', region_name='<INSERT_YOUR_REGION>')
partition_key = str(uuid.uuid4())number_of_results = 500
r = requests.get('<Your API URL>' + str(number_of_results))
data = r.json()["results"]while True:
# The following chooses a random user from the 500 random users pulled from the API in a single API call.
random_user_index = int(random.uniform(0, (number_of_results - 1)))
random_user = data[random_user_index]
random_user = json.dumps(data[random_user_index])
Keep in mind that you need to setup your IAM role which allows the EC2 instance access to push records onto the Kinesis data stream.
We can see that our data stream is now created and it is getting the streaming records which can be seen in the monitoring section as shown below.
Once you see data coming into your Kinesis data stream, login to you Databricks Community edition. Create a Cluster and attach a blank notebook to the cluster.
Use the awsAccessKeyId and awsSecretKey to read the streaming data from the Kinesis data stream as shown below. It is not a best practice to expose your API credentials in the code. You can use AWS Secrets manager to hide it for better security and compliance.
You can see the streaming data by applying the display method on the kinesisDF data frame.
The data will come in the default schema of Kinesis which is as shown below.
You need to use the DataFrame operations (
cast("string"), udfs) to explicitly de-serialize the
Once the data column is de-serialized you can apply custom schema and transformations on it.
Woohoo!!! We made it to the end where we finally got the streaming data into Databricks workspace and transformed the semi-structured data into structured dataframe. Now we can push this data to S3 or a Data warehouse like Snowflake or Redshift which will then be used by our Data scientist and data analyst friends.
Hope you enjoyed this tutorial!! Happy learning!! Please follow and clap if you learned something new today. See you in another tutorial.
Follow the below Link for Big Data Engineering Contents.