Catalog Iceberg data with AWS Glue Catalog from Flink Job

Zhe Wang
4 min readMar 17, 2024

Flink is a modern streaming engine for big data, while Iceberg is a a higher-order file format for big data (eg., on top of Parquet). Then it comes to the question of where to publish the catalog (table schemas).

Traditionally, big data was strongly associated with Hadoop and therefore Hadoop Catalog was naturally the choice. However, it is rather cumbersome to configure HDFS where Hadoop Catalog and its conf files are hosted.

In 2023, AWS Athena started to support querying Iceberg data and the mechanism behind the scene is the application of Glue Catalog.

The integration between Iceberg and Glue Catalog is pretty new and therefore lacks documentation on a lot of how-to’s. For example, my team is using Flink on EKS instead of alternatives like AWS Kinetics Flink or even AWS Glue Spark.

Here I hope some working (Java) snippets can help those in the same shoes as me.

import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.Catalog;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.hadoop.ConfigProperties;
import org.apache.iceberg.aws.glue.GlueCatalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.CatalogLoader;
import org.apache.iceberg.flink.FlinkCatalog;
import org.apache.iceberg.flink.TableLoader;
...

public class MyFlinkJob {
public static void main(String[] args) throws Exception {
/* Grab Params from Args */
MultipleParameterTool params = MultipleParameterTool.fromArgs(args);

/* Setup Job Environment */
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

String database = params.get("database-name");
String catalogName = params.get("catalog-name");
String tableName = params.get("table-name");
String warehousePath = params.get("warehouse-path");
String catalogImpl = "org.apache.iceberg.aws.glue.GlueCatalog";
String ioImpl = "org.apache.iceberg.aws.s3.S3FileIO";

// Load GlueCatalog
Configuration hadoopConf = new Configuration(false);
Map<String, String> catalogProperties = new HashMap<>();
catalogProperties.put("warehouse", warehousePath);
catalogProperties.put("catalog-impl", catalogImpl);
catalogProperties.put("io-impl", ioImpl);

CatalogLoader catalogLoader = CatalogLoader.custom(catalogName, catalogProperties, hadoopConf, catalogImpl);
Catalog flinkCatalog = new FlinkCatalog(catalogName, database, Namespace.empty(), catalogLoader, true, -1);

tableEnv.registerCatalog(catalogName, flinkCatalog);
tableEnv.useCatalog(catalogName);

String createDatabaseSql = String.format("CREATE DATABASE IF NOT EXISTS %s;", database);
tableEnv.executeSql(createDatabaseSql);

...

TableLoader tableLoader = TableLoader.fromCatalog(catalogLoader, TableIdentifier.of(database, tableName));

...

FlinkSink.forRowData(...)
.tableLoader(tableLoader)
.upsert(true)
.append();
}

The steps to set up StreamTableEnvironment is kinda standard, what is interesting is right after that. FlinkSink.Builder expects a TableLoader instance before upserting into that table. TableIdentifier.of(database, tableName) returns the table identifier, but what is catalogLoader doing?

Here is the background why cataglogLoader is initialized like this.

  • There is no native call like CatalogLoader.hadoop(…) or CatalogLoader.hive(…). You can’t write CatalogLoader.glue(…), because the API is not designed that way. Instead, it must be the generic form CatalogLoader.custom(catalogName, catalogProperties, hadoopConf, catalogImpl)
  • hadoopConf is a superficial argument, because there’s no real dependency on the Configuration which comes from the legacy hadoop-common package
  • There’re a nested hierarchy of catalog -> database -> table, but the catalog name is not really visible on the Athena UI. Cataglog name is mainly used for identifing the S3 prefix path for the catalog.

Another related recipe is the package dependencies. Those three implementation dependencies are easy to understand. The compileOnly ones are worth a bit more explanation. In short, both flink-s3-fs-hadoop and flink-fs-presto are needed to be available at runtime in the environment. How to set that up depends on your choice of devops tooling. In my case, it is JIB building a image and publishing to the local private docker registry, for dev purposes. The related lines of code are shown as below.

# build.gradle
plugins {
id 'java'
id 'application'
id 'de.undercouch.download' version '5.3.0'
id 'com.google.cloud.tools.jib' version '3.3.1'
}

ext {
...
flinkVersionShort = '1.16'
flinkVersion = "${flinkVersionShort}.2"
hadoopVersion = '3.3.6'
...
}

dependencies {
...
implementation "org.apache.hadoop:hadoop-common:${hadoopVersion}"
implementation "org.apache.iceberg:iceberg-flink-runtime-${flinkVersionShort}:${icebergVersion}"
implementation 'org.apache.iceberg:iceberg-aws-bundle:1.5.0'
compileOnly "org.apache.flink:flink-s3-fs-hadoop:${flinkVersion}"
compileOnly "org.apache.flink:flink-s3-fs-presto:${flinkVersion}"
...
}

task downloadFiles(type: Download) {
src (["https://repo.maven.apache.org/maven2/org/apache/flink/flink-s3-fs-hadoop/${flinkVersion}/flink-s3-fs-hadoop-${flinkVersion}.jar",
"https://repo.maven.apache.org/maven2/org/apache/flink/flink-s3-fs-presto/${flinkVersion}/flink-s3-fs-presto-${flinkVersion}.jar"])
dest buildDir
overwrite false
}

tasks.jib.dependsOn downloadFiles

jib {
...
allowInsecureRegistries = true
container {
appRoot = '/opt/flink/usrlib/'
...
}
extraDirectories {
paths {
[path {
from = project.provider { buildDir }
into = project.provider { '/opt/flink/plugins/s3-fs-hadoop' }
includes = project.provider { ["flink-s3-fs-hadoop-${flinkVersion}.jar"] }
},
path {
from = project.provider { buildDir }
into = project.provider { '/opt/flink/plugins/s3-fs-presto' }
includes = project.provider { ["flink-s3-fs-presto-${flinkVersion}.jar"] }
}]
}
}
containerizingMode = 'packaged'
...
}

When I run gradle jib, it automatically downloads both jars and puts into the right path in built docker image.

Apparently a lot of assumptions are made here and only essentially related code snippets are shared to save the space. Please see references if you are keen to delve deeper.

Original posted:

https://aliz-in-wonderland.com/#/story/a2gN4KwVhUi8GLBtzkLM

References:

--

--