Delta Live Tables with AWS Kinesis

Muhammad Umar Amanat
6 min readJul 7, 2023

--

AWS Kinesis is managed streaming data service offered by AWS. Amazon Kinesis Data Streams is a scalable and easy-to-use service that enables the seamless collection, manipulation, and retention of streaming data, without the need for managing servers. Learn more about AWS Kinesis at https://aws.amazon.com/kinesis/.

DLT with AWS Kinesis

Delta Live Tables stands out as the pioneering ETL pipeline tool that prioritizes defining the desired actions over the intricacies of configuring the platform for ETL development. It offers automated data testing, comprehensive monitoring, and recovery capabilities, resulting in simplified and efficient ETL workflows.

In this article, we will discover the bronze layer ingestion in the Databricks platform using Delta Live Tables by reading data from AWS Kinesis Stream.

Requirements

  1. AWS credentials and sufficient rights for creating a Kinesis data stream
  2. Databricks Premium Tier account
  3. Databricks rights for running delta live tables
  4. Youtube Data from Kaggle (https://www.kaggle.com/datasets/datasnaek/youtube-new)

Getting Started: DLT with AWS Kinesis

1. Upload Data to DBFS

Before getting into actual coding we need to first upload the Youtube dataset to Databricks DBFS. From DBFS we will generate data for AWS Kinesis and then read this data from AWS Kinesis using DLT.

Download Youtube Dataset ((https://www.kaggle.com/datasets/datasnaek/youtube-new)), and extract it. Then upload data from Databricks UI as shown in the image below.

upload data to DBFS

2. Set up Databricks Secrets

We are not going to hardcode AWS credentials in Databricks notebooks. Instead, we will create Databricks secrets and store AWS credentials in Databricks secrets.

First, install Databricks CLI on your local machine and then connect it to your Databricks runtime (https://learn.microsoft.com/en-us/azure/databricks/archive/dev-tools/cli/).

Once Databricks CLI is configured on your local machine next is to create our Databricks-backed secrets scope.

databricks secrets create-scope --scope dlt_aws_scope

In the secrets scope we are going to store our credentials, following the bash command will use create secrets in our scope.

databricks secrets put --scope dlt_aws_scope --key aws_access_key_id

After running above an editor opens and asks for input. Simply paste your “aws_access_key_id” and then close it after saving.

Do the same step for AWS “secret access key”

databricks secrets put --scope dlt_aws_scope --key aws_secret_access_key

To confirm our newly created secrets run this command:

databricks secrets list --scope dlt_aws_scope

This will list all the secrets in the dlt_aws_scope scope.

3. Clone Repo

Clone code repo (https://github.com/MUmarAmanat/dlt_aws.git) in Databricks repo by referring to the following step

  1. Click on repos
  2. Click your username
  3. On the newly opened window click the arrow beside your username
  4. Add repo (https://github.com/MUmarAmanat/dlt_aws.git) and select Github as git provider
  5. click create button
Clone code into Databricks repos

4. AWS Kinesis stream setup

We are going to use boto3 SDK for Python to set up the AWS Kinesis stream programmatically. The following code will create a Kinesis data stream for us:

kinesis_client = boto3.client('kinesis', 
aws_access_key_id=dbutils.secrets.get(SCOPE_NAME,
"aws_access_key_id"),
aws_secret_access_key=dbutils.secrets.get(SCOPE_NAME,
"aws_secret_access_key"),
region_name=KINESIS_REGION)

stream_list = [KINESIS_DATA_STREAM]

for s_name in stream_list:
try:
kinesis_client.create_stream(StreamName=s_name, StreamModeDetails={'StreamMode':'ON_DEMAND'})
print(f"[INFO] {s_name} created successfully")
except for Exception as e:
print(f"[ERROR] While creating stream={s_name}, following error occurs. {e}")

This code produces the following kinesis stream. You can view it by navigating to your AWS console and then the Kinesis page.

AWS Kinesis stream

5. Generate Data for Kinesis

Once data is at DBFS next step is to read a random sample of data from DBFS, convert data into JSON, and then upload it to the AWS stream using boto3.

cc = rnd.choice(["MX", "RU"]) ## country code

##randomly select 10 records
df = (spark
.read
.option("header", True)
.option("multiline", True)
.csv(path=f"{CSV_PATH}/{cc}videos.csv")
.select("video_id", "trending_date", "title", "channel_title", "category_id",
"publish_time", "views", "likes", "dislikes", "comment_count")
.sample(withReplacement=False, fraction=1.0)
.limit(10)
).toPandas()

for ind, i in df.iterrows():
try:
response = (kinesis_client
.put_record(StreamName=KINESIS_DATA_STREAM,
Data=json.dumps(i.to_dict()),
PartitionKey="category_id")
)
except Exception as e:
print(f"error occurred {e}")

Steps #4 and Step#5 are included in the StreamDataProducer notebook. By running this notebook in interactive or as Job will execute step#4 and step#5.

6. Bronze Data Ingestion

Bronze data is raw data in Databricks Lingo. Now, it’s time to read data from the AWS Kinesis stream. Reading data from AWS kinesis in Databricks is easy as shown below

(spark
.readStream
.format("kinesis")
.option("streamName", KINESIS_DATA_STREAM)
.option("region", KINESIS_REGION)
.option("initialPosition", 'earliest')
.option("awsAccessKey", dbutils.secrets.get(SCOPE_NAME, "aws_access_key_id"))
.option("awsSecretKey", dbutils.secrets.get(SCOPE_NAME, "aws_secret_access_key"))
.load()
.display()
)

This code will output streaming data from Kinesis.

NOTE: This data is base64 encoded, we need to unbase it to convert it into a single JSON record.

But our goal is not to read stream data, instead, we have to read it using the delta live table and then store it in our database. Reading stream data in DLT is simple, use spark structured stream to read stream data, encapsulate this code chunk into a function and decorate the function with dlt.table decorator.

@dlt.table(table_properties={"pipelines.reset.allowed": "false"})
def kinesis_bronze():
return (spark
.readStream
.format("kinesis")
.option("streamName", KINESIS_DATA_STREAM)
.option("region", KINESIS_REGION)
.option("initialPosition", 'earliest')
.option("awsAccessKey", dbutils.secrets.get(SCOPE_NAME, "aws_access_key_id"))
.option("awsSecretKey", dbutils.secrets.get(SCOPE_NAME, "aws_secret_access_key"))
.load()
)

DLT pipeline will create a table with the name “kinesis_bronze” while execution. So, indirectly function name is the table name in the DLT pipeline.

NOTE: This code cannot execute interactively. Rather we can execute this code in a notebook as a DLT pipeline.

We can create a DLT pipeline using “kinesis_read” to ingest Kinesis data into a bronze-level table.

Now time to create the DLT pipeline.

Click workflow, navigate to Delta Live Tables, and click “Create Pipeline”

Fill highlighted information

NOTE: Make sure pipeline mode is selected to Continuous otherwise you may get into an error.

In the source code provide a path for the “kinesis_read” notebook under your repos.

Provide “dlt_aws” in the target schema, otherwise, it will automatically create a schema/database for you.

“StreamDataProducer” notebook already created the “dlt_aws” schema/database for us.

Output

After creating the DLT pipeline, select “Development” as the environment, and then click the “Start” button to execute this. This will take a few minutes for the first runs as it has to set up DLT tables and resources.
NOTE: Make sure you have enough Azure vCPUs quotas for execution. A successful run of the DLT pipeline will produce the following lookalike result.

DLT pipeline execution

The following images show that the DLT pipeline successfully wrote 10 records in a table.

Data Quality by DLT

NOTE: Make sure to stop it manually as it is in continuous running mode.

After execution of the DLT pipeline, we can see that records have been inserted in the “dlt_aws.kinesis_bronze” table.

Data written by DLT

Limitations

There are a few limitations associated with Kinesis when we are working in the Databricks platform. Some limitations are highlighted below.

  1. We cannot write to Kinesis directly via spark.writeStream.
  2. Kinesis stream doesn’t support .trigger(availableNow=True) in DBR < 13.
  3. DLT table can’t support the “Triggered” option in “Pipeline mode”. This is why we have to stop the DLT pipeline manually.

Conclusion

Often we are working on a use case where we have to work with a Kinesis data stream using Databricks. The Databricks platform offers numerous ways to read data from kinesis, the DLT approach is the best one when working with streaming data. Because DLT automatically manages checkpoints for stream data so this way we do not have to worry about checkpointing.

About Author

I am Muhammad Umar Amanat working as Sr. Data & AI Consultant. I have more than 5 years of experience in the Data & AI domain. I am Databricks certified Data Engineer and completed a number of projects from various industries including retail, finance, apparel, manufacturing, and the energy sector.

Follow me on medium to keep updated with new articles. You can also find me on LinkedIn.

Need advice on Databricks? You can book a 1:1 call with me.

--

--