Streaming Data from AWS Kinesis to an Iceberg Table

A robust data integration approach

Sudhakar Pandhare
Globant
8 min readJul 1, 2023

--

In today’s data-driven world, organizations generate and process vast amounts of data in real time. To effectively harness this data and derive valuable insights, it is crucial to have a robust and scalable streaming data integration solution. AWS Kinesis and Iceberg provide an excellent combination of streaming data ingestion and storage. In this blog post, we will explore how to stream data from AWS Kinesis to an Iceberg table, enabling organizations to build a reliable and efficient data pipeline.

The Architecture of Streaming Data Pipeline

Real-time processing capabilities of AWS Kinesis, the storage efficiency of Amazon S3 with the Iceberg format, and the flexibility of various data analysis and visualization tools compatible with Iceberg. It provides a scalable, cost-effective solution for streaming data ingestion, transformation, storage, analysis, and visualization.

Streaming Data Pipeline

Let's see how the Streaming Data Pipeline works:

  1. An application that typically emits data records as they are generated to a Kinesis data stream. Data producers assign partition keys to records. Partition keys ultimately determine which shard ingests the data record for a data stream.
  2. Streaming data is generated in JSON format and inserted into Kinesis Data Streams.
  3. A Spark streaming job is connected to Kinesis Data Streams to process the data.
  4. The streaming job output is stored in Amazon S3 in Iceberg table format.
  5. Athena uses the AWS Glue Data Catalog to store and retrieve table metadata for the Amazon S3 data in Iceberg format.
  6. Athena interacts with the Data Catalog tables in Iceberg format for transactional queries.

Let’s see each component in detail…

AWS Kinesis

AWS Kinesis, a fully managed service offered by Amazon Web Services (AWS), provides a powerful solution for streaming data ingestion, processing, and analysis. AWS Kinesis is a suite of services that enables organizations to collect, process, and analyze streaming data in real time. It is designed to handle large volumes of data generated by diverse sources such as website clickstreams, IoT devices, social media feeds, logs, etc. Kinesis offers three primary services: Kinesis Data Streams, Kinesis Data Firehose, and Kinesis Data Analytics.

  • Kinesis Data Streams is the core service of AWS Kinesis, providing a highly scalable and durable platform for real-time data streaming. It allows you to continuously collect and store large volumes of data in “streams,” divided into “shards” to facilitate parallel processing and scalability. Each shard can ingest and process up to 1MB/s of data and support multiple concurrent consumers for real-time data analytics.
  • Kinesis Data Firehose simplifies the ingesting and delivering streaming data to various destinations such as data lakes, Amazon S3, Amazon Redshift, and Amazon Elastic Search Service. It automatically handles data buffering, compression, and transformation before delivering it to the specified destination. Data Firehose is a fully managed service, alleviating the operational overhead of managing the underlying infrastructure.
  • Kinesis Data Analytics enables real-time data processing and analytics on streaming data. It allows you to run SQL queries on the incoming data to perform aggregations, filtering, transformations, and even join with reference data. Data Analytics leverages Apache Flink, a powerful stream processing framework, under the hood to provide scalable and fault-tolerant stream processing capabilities.

Spark Structured Streaming

Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. The Spark SQL engine will run it incrementally and continuously and update the final result as streaming data continues to arrive.

Internally, by default, Structured Streaming queries are processed using a micro-batch processing engine, which processes data streams as a series of small batch jobs, thereby achieving end-to-end latencies as low as 100 milliseconds and exactly once fault-tolerance guarantees. However, since Spark 2.3, a new low-latency processing mode called Continuous Processing has been introduced, which can achieve end-to-end latencies as low as one millisecond with at least one guarantee. Without changing the Dataset/Data Frame operations in our queries, we can choose the mode based on our application requirements.

Understanding Iceberg S3 Table

Iceberg is a cloud-native data warehousing solution that leverages the power of Amazon S3 (Simple Storage Service) and computes engines like AWS Athena or Presto. It offers a highly scalable, cost-effective, and efficient platform for storing, managing, and analyzing large volumes of data.

Photo by Danting Zhu on Unsplash

Let’s see Iceberg's features.

  • Separation of Storage and Compute: Iceberg allows for independent scaling of storage and computing resources. You can store your data in Amazon S3 and then leverage compute engines like AWS Athena or Presto for querying and analysis. This separation enables efficient resource allocation and cost optimization based on workload demands.
  • Columnar Storage Format: Iceberg utilizes a columnar storage format, where data is organized and stored column-wise. This format provides benefits like efficient compression, faster query performance, and reduced storage costs.
  • Metadata Management: Iceberg incorporates a robust metadata management system that optimizes query planning and execution. It maintains column statistics and data file locations, enabling efficient data access and query optimization.
  • Pay-as-You-Go Pricing: With Iceberg, you only pay for the storage consumed in Amazon S3 and the compute resources used during query execution. This pay-as-you-go pricing model eliminates upfront infrastructure costs and provides cost efficiency.
  • Query Performance Optimization: Iceberg’s columnar storage format and metadata management enhance query performance. Selective column access, efficient compression, and metadata-driven optimizations improve query execution speed and reduce resource consumption.
  • Data Format Flexibility: Iceberg supports various data formats like Parquet, ORC, and CSV. This flexibility allows organizations to ingest and process data in the format that best suits their needs.
  • Data Lake Integration: Iceberg seamlessly integrates with data lakes built on Amazon S3. It can query data stored in the data lake directly, eliminating the need for data duplication or ETL processes.
  • Query Compatibility: Iceberg supports ANSI SQL, making it compatible with a wide range of SQL-based analytics tools and applications. This compatibility simplifies integration with existing data workflows and enables seamless adoption.
  • Serverless Architecture: Iceberg follows a serverless architecture, relieving organizations from the burden of managing infrastructure. It provides automatic scaling and resource allocation based on demand, allowing users to focus on data analysis rather than infrastructure management.
  • Data Catalog Integration: Iceberg integrates with AWS Glue Data Catalog, providing a centralized metadata repository for easy data discovery, management, and governance.

A hands-on example of Streaming Data from AWS Kinesis to an Iceberg Table

  1. Import the necessary libraries: In your PySpark script, import the required libraries.
import boto3
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.functions import col
import json

2. Set up AWS credentials and region: Before interacting with AWS services, you need to set up your AWS credentials and specify the AWS region you want to work with. Replace the placeholders 'your-access-key-id', 'your-secret-access-key', and 'your-aws-region' with your actual AWS access key ID, secret access key, and desired AWS region.

aws_access_key_id = 'your-access-key-id'
aws_secret_access_key = 'your-secret-access-key'
region_name = 'your-aws-region'

3. Create a Kinesis client: Instantiate a boto3 Kinesis client by providing the necessary credentials and region.

kinesis_client = boto3.client(
"kinesis",
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name=region_name,
)

4. Create the Kinesis stream: Specify the name and shard count for your Kinesis stream.

  • Replace 'your-kinesis-stream-name' with the desired name for your stream
  • Adjust the shard_count as per your requirements.
  • Use the create_stream() method of the Kinesis client to create the stream by providing the stream name and shard count.
  • Capture the response from the API call.
stream_name = "your-kinesis-stream-name"
shard_count = 1 # Specify the number of shards for the stream
response = kinesis_client.create_stream(StreamName=stream_name, ShardCount=shard_count)

5. Write the JSON message to the stream: The put_record() method is used to write a single data record to a Kinesis stream.

  • StreamName specifies the name of the Kinesis stream to which you want to write the record.
  • Replace 'your-kinesis-stream-name' with the actual name of your Kinesis stream.
  • Data contains the actual data of the record; in this case, encoded_data, which is the serialized JSON message encoded as bytes.
  • PartitionKey determines the shard to which the record is assigned. Kinesis uses it to distribute the data across shards. You can provide any string value as the partition key. Replace 'partition-key' with your desired partition key.
message_data = {"key1": "value1", "key2": "value2", "key3": "value3"}
encoded_data = json.dumps(message_data)

response = kinesis_client.put_record(
StreamName=stream_name, Data=encoded_data, PartitionKey="partition-key"
)

6. Create a SparkSession: Initialize a Spark Session and provide an Iceberg configuration. spark.sql.catalog.glue.enabledEnables the Glue Catalog integration in Spark.

  • spark.sql.catalog.glue.glueSource.factory specifies the glue catalog source factory for Iceberg.
  • spark.sql.catalog.glue.glueSource.type indicates that Iceberg tables will be accessed using the Glue Catalog.
  • spark.sql.catalog.glue.glueSource.catalog.name specifies the name of your Glue Catalog.
  • spark.sql.catalog.glue.glueSource.catalog.region specifies the AWS region where your Glue Catalog is located. Make sure to replace the placeholder values (‘your-glue-catalog-name’ and ‘your-aws-region’) with your actual Glue Catalog name and AWS region.
conf = (
pyspark.SparkConf()
.setAppName("app_name")
# packages
.set(
"spark.jars.packages",
"org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.0.0,software.amazon.awssdk:bundle:2.17.178,software.amazon.awssdk:url-connection-client:2.17.178",
)
# SQL Extensions
.set(
"spark.sql.extensions",
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
)
# Configuring Catalog
.set('spark.sql.catalog.iceberg', 'org.apache.iceberg.spark.SparkCatalog')
.set('spark.sql.catalog.iceberg.type', 'hadoop')
.set('spark.sql.catalog.iceberg.warehouse', 'iceberg-warehouse')
)

# Start Spark Session
spark = SparkSession.builder.config(conf=conf).getOrCreate()

7. Read the Kinesis stream as a DataFrame: The readStream API is used to read streaming data from a Kinesis stream as a DataFrame in PySpark.

  • .format("kinesis") specifies the data source format as Kinesis.
  • .option("streamName", stream_name) sets the name of the Kinesis stream to read from.
  • Replace 'your-kinesis-stream-name' with the actual name of your Kinesis stream.
  • .option("regionName", region_name) sets the AWS region where the Kinesis stream is located.
  • Replace 'your-aws-region' with the appropriate region.
  • .option("startingPosition", "latest") determines the starting position in the stream. In this example, it is set to "latest" to read from the latest available data in the stream.
  • .load() triggers the loading of the Kinesis stream data into the DataFrame df.
df = (
spark.readStream.format("kinesis")
.option("streamName", stream_name)
.option("regionName", region_name)
.option("startingPosition", "latest")
.load()
)

8. Parse the data column as JSON using the defined schema: We will parse the JSON to read the actual columns.

schema = (
StructType()
.add("key1", StringType())
.add("key2", StringType())
.add("key3", StringType())
)
parsed_df = df.withColumn("data", from_json(df["data"].cast("string"), schema))

# Extract the individual fields from the parsed data:
result_df = parsed_df.select("data.key1", "data.key2", "data.key3")

9. Write the DataFrame to the Iceberg table: Before writing the data frame into the Iceberg format, we first need to create the Iceberg table

  • .format("iceberg") specifies the Iceberg format for writing the table.
  • .mode("append") determines the write mode. In this case, we append data to the existing table. You can choose the appropriate mode based on your requirements.
  • .option("path", table_location) sets the HDFS path where the Iceberg table will be stored.
  • .option("tableName", table_name) specifies the name of the Iceberg table.
  • The .save() method triggers the write operation and saves the DataFrame as an Iceberg table.
# create the Iceberg table
spark.sql(
"CREATE TABLE iceberg.message (key1 string,key2 string,key3 string) USING iceberg;"
)

table_location = (
"s3://bucket_name/message" # Specify the S3 location where the table will be stored
)
table_name = "iceberg.message"

df.write.format("iceberg").mode("append").option("path", table_location).option(
"tableName", table_name
).save()

## Run a Query to get data
spark.sql("SELECT * FROM iceberg.message").show(truncate=False)

##output
+------+------+------+
| key1| key2| key3|
+------+------+------+
|value1|value2|value3|
+------+------+------+

Conclusion

By utilizing Iceberg with PySpark, we can effectively structure and manage streaming data in a data lake. In this example, we created an Iceberg table, ingested streaming data into it, and performed real-time analytics using PySpark’s querying capabilities. Iceberg’s features, such as schema evolution, time travel, and fine-grained metadata, simplify the management of streaming data, enabling organizations to derive valuable insights from their data lake architecture.

--

--