Streaming Iceberg Table, an Alternative to Kafka?

Spark Structured Streaming supports a Kafka source and a file source, meaning it can treat a folder as a source of streaming messages. Can a solution, entirely based on files, really compare to a streaming platform such as Kafka?

Jean-Claude Cote
Towards Data Science

--

Photo by Edward Koorey on Unsplash

In this article, we explore using an Iceberg table as a source of streaming messages. To do this, we create a Java program that writes messages to an Iceberg table and a pySpark Structured Streaming job that reads these messages.

The Motivation

Azure Event Hubs is a big data streaming platform, capable of processing millions of events per second. However, it lacks message compression and messages are limited to 1 MiB. Event Hubs can become an expensive option when processing very high throughput data flows. Apache Kafka supports compression of messages. It’s also possible to write messages larger than 1 MiB, although it is not recommended, since very large messages are considered inefficient. Running a Kafka streaming platform on VMs can also be costly both in human and physical resources. Is there an alternative, can the Azure blob storage be leveraged as a queuing system?

The FileSource

Spark Structured Streaming is capable of reading streaming data from various sources; Kafka, Event Hubs, File. However, the built-in FileSource does not scale well since, at every micro-batch, the FileSource lists all the files under the source directory. It does this to determine what files are newly added and need to be processed in the next micro-batch. As the number of files grows, the listing becomes more costly.

Despite this drawback, the FileSource has a trick up it sleeve. It also checks for the presence of a _spark_metadata folder. When Spark Structured Streaming is used to create a directory of files using writeStream . The FileSource creates a _spark_metadata containing “transaction logs” that record what files are added to the directory at every micro-batch. Here’s an example of writing to a directory using writeStream

eventsDf.writeStream
.outputMode("append")
.format("json") # spark tables can be in any format supported by spark
.trigger(processingTime='5 seconds')
.option("checkpointLocation", "/tmp/stream_eval/checkpoint")
.option("path", "/tmp/stream_eval/output/")
.start()

The FileSourcereadStream takes advantage of the _spark_metadata folder and thus avoids costly listings.

However, this is not the only table format with metadata support. Apache Iceberg is also a table format, which supports time travel and rollback. It keeps track of added/deleted files via a history of commits. This diagram illustrates the structure of Iceberg commits. In a nutshell, every commit records the data files that are added or removed from the table. Thus Iceberg readStream takes advantage of it’s metadata to avoid costly listings.

Apache Iceberg can be leveraged within many big data engines like Spark, Trino, Flink, Presto and Dremio. Interestingly, Iceberg’s Java API can also be used in standalone Java program.

The Writers

We will use the Iceberg Java API to create and write to an Iceberg table. We illustrate the concept using a schema similar to a Kafka topic; a timestamp, a key and a value.

Schema schema =
new Schema(
Types.NestedField.optional(1, "timestamp", Types.TimestampType.withZone()),
Types.NestedField.required(2, "key", Types.LongType.get()),
Types.NestedField.optional(3, "value", Types.BinaryType.get()));

Creating an Iceberg table is as simple as

PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).build();
HadoopTables tables = new HadoopTables(hadooConf);
Map<String,String> properties = new HashMap();
properties.put("commit.retry.num-retries", "200");
Table table = tables.create(schema, spec, properties, tableLocation);

To create Parquet files for our table, we use Iceberg’s DataWriter

OutputFile file = HadoopOutputFile.fromLocation(fullPath, hadooConf);
DataWriter<Record> dataWriter =
Parquet.writeData(file)
.forTable(table)
.createWriterFunc(GenericParquetWriter::buildWriter)
.build();

try {
for (Record record : createRecords()) {
dataWriter.write(record);
}
} finally {
dataWriter.close();
}

This code creates physical Parquet files in the data lake. However, these new files are not yet visible to clients of the Iceberg table. The last table commit does not include any reference to these new data files. In order to make these files available to clients, we must append the files to the table and commit the transaction.

DataFile dataFile = dataWriter.toDataFile();
AppendFiles append = table.newFastAppend();
append.appendFile(dataFile);
append.commit();

The DataFile object is a moniker of the real Parquet file. The DataFile object captures information such as the file size, upper/lower bounds of the columns, the Parquet file path and the number of records. The appendFile method can be called multiple times to add multiple DataFiles to the table in a single commit.

The Concurrency

Iceberg uses an optimistic concurrency strategy. A Writer assumes that no other Writer is simultaneously operating on the table. During the commit operation, the Writer creates and writes out a new metadata file. The Writer then attempts to rename this new metadata file to the next available versioned metadata file. If the rename fails because another Writer has simultaneously committed, the failed Writer retries the commit operation. The failed Writer rebuilds a new metadata file and tries to commit it again. This process is explained in more details here.

This strategy works well when Writers create large Parquet files and commit occasionally. In these conditions, Writer spend most of their time creating data files and only a fraction of their time is spent creating commits. The contention on the commit phase is low.

Although, if many Writers commit small amount of data frequently, the optimistic locking mechanism can become a bottleneck. Under these conditions, Writers spend most of their time trying to commit.

The Bookkeeper

To achieve frequent commits, it is necessary to centralize the commit phase in a single committer, which we will call the Bookkeeper. The Writers still create Parquet files, but do not append them to the table. This task is delegated to the Bookkeeper. Only the Bookkeeper appends data files to the table.

This however, requires that the Writers somehow inform the Bookkeeper what are the data files it needs to register. There are several ways to do this. A simple approach is to leverage the data lake.

When a Writer creates one or more Parquet files, it serializes the DataFile objects to a “transaction log” in a location known to the Writer and the Bookkeeper.

// create parquet file
...

// obtain DataFile moniker objects
List<DataFile> dataFiles = ...
dataFiles.append(dataWriter.toDataFile());

// serialize DataFile monikers
Path txLogDir = new Path("abfss://.../transactionLogs/"
Path txLogTemp = new Path(txLogDir, uuid);
Path txLog = new Path(txLogDir, uuid + ".txlog.ser");
try (FSDataOutputStream fout = fs.create(txLogTemp);
ObjectOutputStream out = new ObjectOutputStream(fout) ) {
out.writeObject(dataFiles);
}

// make transaction log visible to single committer
fs.rename(txLogTemp, txLog);

The Bookkeeper continually lists the “transaction logs” folder, discovering the new DataFiles that need to be appended to the table.

List<DataFile> dataFiles = …
Path txLogDir = new Path("abfss://…/transactionLogs/"
FileStatus[] txLogFiles = listTransactionLogs(txLogDir);
for (FileStatus txLog: txLogFiles) {
try (FSDataInputStream fin = getFileSystem().open(txLog.getPath());
ObjectInputStream in = new ObjectInputStream(fin)) {
dataFiles.appendAll(in.readObject());
}
}

The Bookkeeper then appends the list of DataFile objects to the table and commits.

AppendFiles append = table.newFastAppend();
for(DataFile dataFile : dataFiles) {
append.appendFile(dataFile);
}
append.commit();

The Bookkeeper can then safely discard the transaction logs.

for (FileStatus txLog: txLogFiles) {
getFileSystem().delete(txLog.getPath());
}

To implement a finite queue with a certain retention, the Bookkeeper marks the old data files for deletion.

import org.apache.iceberg.expressions.Expressions;
// keep 7 days
long nowMicros = System.currentTimeMillis() * 1000;
long watermarkMicros = nowMicros - (7 * 24 * 60 * 60 * 1000 * 1000);
table
.newDelete()
.deleteFromRowFilter(
Expressions.lessThan("timestamp", watermarkMicros))
.commit();

Once a data file is marked to be deleted, it is no longer available to clients of the table. However, it is not yet physically deleted.

The Reaper

The last piece of the puzzle is physically removing unreferenced metadata and data files. To do this we expire old snapshots. When Iceberg removes a snapshot, it also physically deletes the metadata files and data files that were marked to-be-deleted by the Bookkeeper.

long nowMillis = System.currentTimeMillis();
long watermarkMillis = nowMicros - (10 * 60 * 1000); // 10 minutes
table
.expireSnapshots()
.expireOlderThan(watermarkMillis)
.retainLast(100)
.commit()

The Writers are completely independent, we can run as many of them as we want. The Bookkeeper’s job is lightweight; it reads transaction logs and commits append/remove DataFiles requests to the Iceberg table.

In the background, the Reaper periodically expires snapshots and physically deletes old data files.

The Spark WriteStream

Of course, if using Spark is an option, creating a streaming Iceberg table is very easy. All you have to do is run a Spark writeStream query with the iceberg format. Iceberg is integrated into Spark and handles the coordination between Writers and the Bookkeeper.

query = ( df
.writeStream
.outputMode("append")
.format("iceberg")
.trigger(processingTime="5 seconds")
.option("checkpointLocation", "abfss://.../checkpoint/")
.toTable("icebergcatalog.dev.events_table")
)

However, you still need to run a Reaper process. Iceberg surfaces this via an SQL procedure. More details are available here.

spark.sql("""
CALL icebergcatalog.system.expire_snapshots(
'dev.events_table', TIMESTAMP '2021-06-30 00:00:00.000', 100)
""")

The Reader

Iceberg tables are deeply integrated with Spark Structured Streaming and as such support both the readStream and writeStream functions. Thanks to Iceberg’s commit mechanism, readStream can efficiently discover what files are new at every micro-batch. Here’s an example pySpark streaming query.

# current time in milliseconds
ts = int(time.time() * 1000)
# create a streaming dataframe for an iceberg table
streamingDf = (
spark.readStream
.format("iceberg")
.option("stream-from-timestamp", ts)
.option("streaming-skip-delete-snapshots", True)
.load("icebergcatalog.dev.events_table")
)
# start a streaming query printing results to the console
query = (
streamingDf.writeStream
.outputMode("append")
.format("console")
.trigger(processingTime="15 seconds")
.start()
)

The stream-from-timestamp is used to position the first micro-batch at a particular point in time. After each micro-batch, the offsets are stored to the data lake during checkpointing (fault-tolerance support).

Iceberg lacks the ability to limit the number of rows or the number of files per micro-batch. However, a git pull request adding this functionality is currently open.

The Results

In our first experiment, we simulate a queue with a retention of 1 hour. We use 5 Writers, which continually create 1 Parquet file of 20k records. Each record is approximately 1700 bytes, resulting in Parquet files of 20MiB. A Writer takes on average 7 seconds to create a Parquet data file.

We use one Bookkeeper, which continually reads transaction logs and commits append/remove file requests. It takes on average 8 seconds to list the contents of the transaction logs folder and 2 seconds to read and commit the append file. Marking files to be deleted takes less than 2 seconds. The Bookkeeper spends most of its time listing files in the data lake. Removing the need for listing files would reduce latency. For example Writer-1 could serialize it’s DataFile objects to a well know file txlog1.ser. The Bookkeeper would then check for the existence of this file and load it. Checking existence of a file only takes about 300 milliseconds.

We schedule the Reaper to run every 2 minutes. It takes between 10 to 70 seconds to physically remove snapshots and old data files. Fortunately, this is a background job and does not impact availability of data for the Readers.

We read the queue using a pySpark Structured Streaming job configured to use 5 threads (5 CPUs). The average time to process a micro-batch of 200k records is 11 seconds.

Looking at the breakdown, we see that Iceberg takes less than 500 milliseconds to determine which files to consume in the micro-batch. The micro-batch execution time is relatively stable, with most of the time spent processing records.

We also tried using an Iceberg table of Avro files rather then Parquet files. Results were comparable, with the exception that Writers took 6 seconds to create an Avro file.

In our second experiment we use a retention of 24 hours. Iceberg thus keeps track of 24 times the number of data files. We see no degradation of performance in either the Bookkeeper or the Reader.

The Conclusion

If sub-second latencies is not a requirement, using Iceberg tables on an Azure data lake might be a good option. In this article, we showed that latencies of less than 15 seconds are achievable. This is perfectly acceptable for Sparks Structured Streaming queries based on the micro-batch architecture. It is a good use case for bulk processing of high volume streams. However, when latencies are critical, like in high frequency trading, this option is not ideal.

This design is simple. Streaming jobs use the same technology as batch job; data lake and Iceberg. No additional infrastructure needs to be maintain or no additional SAS to pay for. In a follow-up article we contrast this solution with using Azure Event Grid.

There is no need for an external schema management system. The schema is built into the Iceberg table. The schema is also open-ended, allowing you to make the schema as rich as you want it to be. You are not limited to the timestamp, key and value of Kafka. An added benefit is that clients don’t necessarily need to read all the columns. Streaming reads exploit the Parquet columnar format the same way batch reads do. This can potentially reduce the I/O a streaming jobs requires.

--

--

Data democratization advocate and principal engineer at Canadian Centre for Cyber Security | jean-claude.cote@cyber.gc.ca | Twitter @cybercentre_ca