Registering S3 files into Apache Iceberg tables- without the rewrites

Erick Enriquez
inquery-data
Published in
7 min readFeb 22, 2024

It’s no secret that table formats like Apache Iceberg, Delta Lake, and Hudi have taken the Big Data world by storm over the past couple of years. Bringing warehouse-like functionality to the data lake didn’t just unlock a catchy new marketing term–the data lake house–it also unlocked atomic transactions and simplified metadata management on file-based storage platforms like S3, GCS, and Azure Blob storage. Through client-side libraries that manage file versioning and tracking, data processing engines like Spark, Trino, pyArrow, and DuckDB can directly read from file storage as efficiently as from a relational table.

At InQuery, we’ve been setting up our own large-scale data infrastructure to identify implementation challenges and inform our product direction. Even during this early building phase, it felt like a no-brainer to leverage a table format so that we could easily manage changes to our data and chose Iceberg primarily for its robust community support.

Still, for as many capabilities as these table formats unlock, we found it surprisingly awkward to register our pre-exising files on S3 into Iceberg tables efficiently. Through engagement in the Apache Iceberg Slack community, we discovered several unresolved feature requests related to our ingestion issues. Specifically, appending raw Parquet files on S3 to Iceberg tables without performing full rewrites was a significant challenge, which seemed silly to us considering Iceberg’s fundamental role as a pointer-management framework. To fix this problem, we hacked together a solution using Iceberg’s Java API , enabling manual updates to the Iceberg metadata to optimize resource usage during file registration.

In this article, we will highlight some of Iceberg’s non-obvious capabilities, limitations, and implementation details as we dive into our solution. We will use S3 for storage, Parquet as our file format, the AWS Glue Catalog, Spark for writing, and Trino/Spark/DuckDB/pyArrow for confirming reading capabilities across engines.

To start, we identified 3 main workflows that we found surprisingly tricky to perform as metadata-only operations (we wanted to avoid the cost of making full copies of our data):

  1. Creating a new Iceberg table from existing parquet files
  2. Register existing parquet files to an existing Iceberg table
  3. Create an iceberg table with files living in multiple locations

For the 3 use cases above, it seemed that the only way to assign parquet files with Iceberg tables would be to set up Spark, read the data from the existing files, and write the data out to new files with Iceberg in mind. This workflow requires full data rewrites that resulted in unnecessary compute usage at best and duplicate storage costs at worst (assuming files you don’t proactively delete the original copy of each file). This workflow felt unnecessarily wasteful.

To fix this, we set out to implement a function that would take in a list of directory or file paths and a target table and automatically update the Iceberg metadata to reference the relevant files without doing full data rewrites.

appendToIceberg(List<String> sourcePaths, String targetTable)

For the purposes of this article, we will discuss a base case with a single path referencing a single parquet file landing into a previously non-existent Iceberg table.

Let’s begin by getting some overhead out of the way. The first thing we need to do is connect to our AWS S3 File System.

import org.apache.hadoop.conf.Configuration;


private static Configuration getAWSConfig() {
Configuration conf = new Configuration();


// Use DefaultCredentialsProvider for AWS authentication
conf.set("fs.s3a.aws.credentials.provider", "com.amazonaws.auth.DefaultAWSCredentialsProviderChain");


// Ensure fs.s3a.impl is set to use S3AFileSystem
conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem");
return conf;
}

This connection enables us to retrieve files from S3 so that we can infer meteadata from the file that we’ll upload to Iceberg later on. Luckily for us, parquet files store metadata like their schema, row count, and file size directly in the file footer.

Using Hadoop standard libraries we can pretty easily extract this information and save it for later:

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.hadoop.util.HadoopInputFile;


// Some boiler plate...
// Get file size.
Path path = new Path(filePath);
FileSystem fs = path.getFileSystem(awsConf);
long fileSize = fs.getFileStatus(path).getLen();


// Get file row count
HadoopInputFile inputFile = HadoopInputFile.fromPath(path, conf);
ParquetMetadata metadata = ParquetFileReader.readFooter(
inputFile, PaquetMetadataConverter.NO_FILTER);
long rowCount = metadata.getBlocks().stream().mapToLong(block -> block.getRowCount()).sum();


// Get file schema
MessageType parquetSchema = metadata.getFileMetaData().getSchema();

Now that we have all of the file metadata, we need to turn the file schema into an Iceberg schema to respect its structure and data types. This is where things start to get funky.

Because we couldn’t find an explicit mapping from parquet data types to Iceberg data types and didn’t want to write our own, we took advantage of existing schema conversion functions written by Spark to bridge the two.

private static Schema parquetToIceSchema(ParquetMetadata metadata) {
try {
MessageType parquetSchema = metadata.getFileMetaData().getSchema();

// Initialize the converter. Adjust the constructor parameters based on your requirements.
ParquetToSparkSchemaConverter converter = new ParquetToSparkSchemaConverter(
assumeBinaryIsString,
assumeInt96IsTimestamp,
caseSensitive,
inferTimestampNTZ,
nanosAsLong
);

// Convert the MessageType (Parquet schema) to StructType (Spark schema)
StructType sparkSchema = converter.convert(parquetSchema);

// Print the Spark schema
Schema icebergSchema = SparkSchemaUtil.convert(sparkSchema);
return icebergSchema;
} catch (Exception e) {
// Handle errors...
}
}

Now that we have a table schema and metadata ready to go, we can now build an Iceberg DataFile, which acts as a container for file-level metadata like location, file size, record counts, column statistics, and partitioning that query engines can use for query plan optimizations like predicate pushdown, data skipping, and partition managment. The DataFile abstraction is also core to Iceberg’s ability to support granular metadata management like snapshotting, time travel, rollbacks, and upserts. Here, we explicitly skip metric extraction from the file as it warrants its own post.

ImmutableList.Builder<DataFile> dataFilesBuilder = ImmutableList.builder();
DataFile dataFile = DataFile.builder(spec)
.withPath(filePath)
.withFormat(format)
.withFileSizeInBytes(fileSize)
.withRecordCount(recordCount)
// .withMetrics(metrics)
.withPartition(partition)
.build();
dataFilesBuilder.add(dataFile);

Next, we connect to our metastore by instantiating our Glue client and initializing our glue catalog with the relevant properties. Here, warehouseLocation refers to the S3 bucket where you want to create and manage your iceberg table:

try (GlueClient glueClient = GlueClient.builder()                                                           .region(software.amazon.awssdk.regions.Region.of(awsRegion))
.build()) {

Map<String, String> catalogProperties = new HashMap<>();
catalogProperties.put("warehouse", warehouseLocation);
catalogProperties.put("client", "glue");
catalogProperties.put("region", awsRegion);

GlueCatalog catalog = new GlueCatalog();
catalog.setConf(new org.apache.hadoop.conf.Configuration());
// If GlueCatalog required direct GlueClient, it should be managed here
catalog.initialize("glue_catalog", catalogProperties);


// Some code here...
}

With our connection set up, we can now initialize a map of table properties. We used only a subset of properties that you might be interested in setting:

import static org.apache.iceberg.TableProperties.FORMAT_VERSION;


private static Map<String, String> icebergTableProperties(String fileFormat) {
Map<String, String> icebergTableProperties = new HashMap<>();

// Keys
String PROVIDER_PROPERTY_KEY = "provider";
String DEFAULT_NAME_MAPPING = "schema.name-mapping.default";
String DEFAULT_FILE_FORMAT = "write.format.default";
String COMPRESSION_CODEC_PROP = "write.parquet.compression-codec";


// Values
String PROVIDER_PROPERTY_VALUE = "iceberg";
String formatVersion = "2";
String fileFormat = "parquet";
String COMPRESSION_CODEC_VALUE = "zstd";


icebergTableProperties.put(COMPRESSION_CODEC_PROP, COMPRESSION_CODEC_VALUE);


// New table properties
icebergTableProperties.put(PROVIDER_PROPERTY_KEY, PROVIDER_PROPERTY_VALUE);
icebergTableProperties.put(DEFAULT_FILE_FORMAT, fileFormat);
icebergTableProperties.put(FORMAT_VERSION, String.valueOf(formatVersion));

return ImmutableMap.copyOf(icebergTableProperties);
}

And then use these table properties to build an Iceberg Table and create a Transaction object which is useful for committing multiple transactions atomically in a single commit.

// Create table properties
Map<String, String> tableProperties = icebergTableProperties(tableLocation, storageFormat);
TableIdentifier identifier = TableIdentifier.of(databaseName, tableName);
PartitionSpec partitionspec = PartitionSpec.unpartitioned();

// Build table
Table table = catalog.buildTable(identifier, schema)
.withPartitionSpec(partitionspec)
.withProperties(tableProperties)
.withLocation(tableLocation);

// Update the name mapping based on the new schema
Schema updatedSchema = table.schema();
NameMapping nameMapping= MappingUtil.create(updatedSchema);
properties.put("schema.name-mapping.default", nameMapping);
table.updateProperties(properties);

// Create and commit transaction
Transaction transaction = table.createTransaction();
AppendFiles appendFile = table.newAppend();
appendFile.commit();
transaction.commitTransaction();

You’ll notice in the code above that we didn’t add the “schema.name-mapping.default” propety to the table until after table creation- this is because of a subtle difference in the way that the table schema is indexed by our conversion method compared to how the table schema was indexed by the catalog when we build the table. Since the NameMapping is derived directly from the Table’s schema, we need to wait until after the re-indexed to create a propert NameMapping object.

The NameMapping object maps a Field ID to a corresponding column in the underlying file so that readers can efficiently find the columns they’re seeking. Since our original file was not created with Iceberg in mind, columns wreen’t assigned Field IDs on creation so icebergConverter(sparkConverter(parquetSchema)) inferred a zero-indexed mapping from the column names to the Field IDs. When we passed this 0-indexed schema to the catalog.buildTable() method, however, the table was built with a 1-indexed Schema (and corresponding NameMapping). Because the Glue code is proprietary we weren’t able to get to the bottom of this conversion, so we worked around it instead. This fix helps us avoid corrupted file reads at read time.

Example 1-indexed NameMapping Object:

[
{
"field-id": 1,
"names": [
"first_name"
]
},
{
"field-id": 2,
"names": [
"variety"
]
},
{
"field-id": 3,
"names": [
"sepal.length"
]
}
]

Finally, with all the table properties in place, we can generate an Iceberg Transaction and commit the new file to the table without doing a full rewrite of the data as required by other methods.

Our first draft was simple and really just meant to demonstrate technical feasibility of our goal but is by no means production ready. We did not perform explicit schema validation or conflict resolutoin and did not collect metrics from the newly appended parquet files which can be used to optimize performance at read time. That said, we plan to use the learnings from this experiment to extend the application and make it easier and more cost-effective to integrate your existing datasets into Table formats at scale.

If you’d like to follow along through our startup journey through Y Combinator and beyond, you can subscribe to our newsletter at https://inquery-data.com.

Github repository: https://github.com/inquery-team-git/Iceberg-File-Integration

--

--

Erick Enriquez
inquery-data

Data Engineer | Writer | Founder @ InQuery | Stanford MCS