Sitemap

Apache Druid on Ice: Ingesting Apache Iceberg tables into Apache Druid

5 min readDec 26, 2023

--

Introduction:

Apache Iceberg is a high-performance format for huge analytic tables and performs a critical role in a majority of data lake use cases. Iceberg is often paired with query engines such as Trino which allows querying data stored in files written in the Iceberg format.

Apache Druid is a high-performance, real-time analytics data store which is geared towards sub-second query responses for streaming and batch data.

The rising prominence of Druid in data lake ecosystems underscores the importance for Druid to have the capability to read data stored in iceberg format as well as ingest this data into Druid for facilitating low latency use cases.

With the release of Apache Druid version 27.0, it now includes functionality to read and ingest data written in iceberg format and this functionality is provided in the form of an extension: druid-iceberg-extensions

This article discusses setting up a simple Iceberg catalog followed by a walkthrough on how to ingest tables from this catalog into Druid.

Iceberg table setup:

Apache Iceberg being a data format, requires a catalog to create and manage tables. Additionally, there is an associated warehouse where the data files are stored.

There are several Iceberg catalog implementations but for this tutorial, a Hadoop catalog will be used along with a local filesystem based warehouse. This catalog can be used by including the iceberg-core dependency.

A Hadoop catalog can be initialized using the following Java code snippet:

HadoopCatalog hadoopCatalog = new HadoopCatalog();
hadoopCatalog.setConf(new Configuration());
hadoopCatalog.initialize("hadoop", Collections.singletonMap("warehouse", "/tmp/warehouse/");
);
return hadoopCatalog;

It initializes a Hadoop catalog object and defines the local path /tmp/warehouse/ as the warehouse. This catalog object is now ready for creating a new Iceberg table and subsequently inserting data into it. Following the Iceberg quick start tutorial, we can create an Iceberg table named logs and the table schema is as follows:

level StringType

event_time TimestampType

message StringType

After inserting three rows, the table contents are as follows:

+---------------------+---------+----------------------------------+
| event_time | level | message |
+---------------------+---------+----------------------------------+
| 2023-04-14 00:00:00 | info | Starting Server… |
| 2023-04-12 07:00:00 | warning | You probably should not do this! |
| 2023-04-11 00:00:00 | error | NoSuchElementException |
+---------------------+---------+----------------------------------+

This Iceberg table is now ready to be ingested into Druid.

Druid ingestion:

If you don’t have a druid cluster setup already, the quickstart tutorial provides an easy way to bring up a cluster in a few minutes. Ensure that the extension: druid-iceberg-extensions is installed before starting up the Druid cluster.

Once the cluster is up, you can use SQL based ingestion to ingest the data into Druid. Alternatively, a JSON based batch ingestion spec can be used. The ingest SQL for the above table is as follows:

INSERT INTO iceberg_ds
SELECT
MILLIS_TO_TIMESTAMP("event_time") AS __time,
level,
message
FROM TABLE(
EXTERN(
'{"type":"iceberg","tableName": "logs",
"namespace": "webapp",
"icebergCatalog":
{ "type" : "local",
"warehousePath": "/tmp/warehouse",
"catalogProperties" :
{"spark.sql.catalog.demo.warehouse": "/tmp/warehouse"}
},
"icebergFilter":
{
"type": "and",
"filters": [
{
"type": "equals",
"filterColumn": "level",
"filterValue": "warning"
},
{
"type": "interval",
"filterColumn": "event_time",
"intervals": [
"2023-04-12T00:00:00.000Z/2023-04-13T00:00:00.000Z"
]
}
]
},
"warehouseSource": {
"type": "local"
}
}',
'{"type":"parquet"}',
'[{"name":"event_time","type":"long"},{"name":"level","type":"string"},{"name":"message","type":"string"}]'
)
)
PARTITIONED BY HOUR

The JSON based ingestion spec would be:

{
"type": "index_parallel",
"spec": {
"ioConfig": {
"type": "index_parallel",
"inputSource": {
"type": "iceberg",
"tableName": "logs",
"namespace": "webapp",
"icebergCatalog":
{ "type" : "local",
"warehousePath": "/tmp/warehouse",
"catalogProperties" :
{"spark.sql.catalog.demo.warehouse": "/tmp/warehouse"}
},
"icebergFilter":
{
"type": "and",
"filters": [
{
"type": "equals",
"filterColumn": "level",
"filterValue": "warning"
},
{
"type": "interval",
"filterColumn": "event_time",
"intervals": [
"2023-04-12T00:00:00.000Z/2023-04-13T00:00:00.000Z"
]
}
]
},
"warehouseSource": {
"type" : "local"
}
},
"inputFormat": {
"type": "parquet"
}
},
"tuningConfig": {
"type": "index_parallel",
"partitionsSpec": {
"type": "dynamic"
}
},
"dataSchema": {
"dataSource": "iceberg_test",
"granularitySpec": {
"type": "uniform",
"queryGranularity": "HOUR",
"rollup": true,
"segmentGranularity": "DAY",
"intervals": [
"2022-01-16T00:00:00/2023-10-18T00:00:00"
]
},
"timestampSpec": {
"column": "event_time",
"format": "millis"
},
"dimensionsSpec": {
"dimensions": [
"level",
"message"
]
},
"metricsSpec": [
{
"name": "record_count",
"type": "count"
}
]
}
}
}

This ingestion spec intends to scan the table logs from the webapp namespace and read only those records with level as warning and event time between 2023–04–12 and 2023–04–13.

icebergFilter: This describes the filter applied on the table before the iceberg table scan is initiated which helps reduce the input data size.

warehouseSource: This describes the location of the warehouse which can be either local, s3 or hdfs. For s3 and hdfs warehouses, ensure to install the relevant extension in the druid cluster.

icebergCatalog: This specifies the catalog type and configuration. As of the latest release, Druid supports the following catalog types:

  • Hadoop catalog
  • Hive metastore catalog

More information regarding the spec can be found here.

For loading this data into Druid, open the Druid web console and Navigate to the Query tab.

Druid webconsole displaying the Query tab

Hit Run after pasting the ingestion SQL. After a few magical seconds, the ingestion should be successful and the data successfully loaded into Druid.

Druid ingestion status tab

This Druid data source is now ready to be queried. As it can be seen below, the Druid data source contains the data from the iceberg table created earlier and only the row matching the iceberg filters got ingested.

Druid query console tab displaying the query results.

This was a simple example on how to utilize one of Druid’s latest features to ingest Iceberg data. Please note that other catalog and warehouse implementations may have slightly different configurations and so it’s important to follow the extension documentation whenever in doubt.

--

--

No responses yet