Scaling Ancestry.com: How to Optimize Updates for Iceberg Tables with 100 Billion Rows

Thomas Cardenas
Ancestry Product & Technology
6 min readFeb 23, 2023

--

One of the most interesting datasets at Ancestry is the Hints database. This is used to alert users that potential new information is available. The database has multiple shards, and there are currently 100 billion rows being used by machine learning models and analysts. Not only is the dataset large, it also changes rapidly. There are roughly 7 million row changes per hour.

Previously, teams that needed hints data would build similar processes and tables all with the same goal. They did this by calling the hint service, listening for hint changes on Apache Kafka topics, or calling other data warehouses. However, these practices only produced partial data. To solve for this and offload pressure from the database and services, data engineers created a one-stop shop for this dataset using Apache Iceberg. Every hour, there is an Apache Spark job that performs a merge function of the recently changed rows into the Iceberg table.

Doing row-level updates quickly while controlling query performance and cost is a concern that the engineers needed to address. This was achieved through partitioning strategies, table configurations, Iceberg procedures, and dividing the updates into parts.

Photo by Hubert Neufeld on Unsplash

Partitions

Partitioning data allows bypassing the need to read all the files and directories on read and merge

The first step is to partition the data optimally, so the queries will be fast. To do this, one must first identify the most frequently used WHERE clauses. For hints, users usually FILTER on which hints are still in pending status. This makes status a particularly good partition to use. Second to that, type is the next best partition.

Iceberg supports multiple types of partition strategies. Among some of the most common are: column value, year, month, day, hour, bucket, and truncate. The types of columns used determine the strategy. A combination of strategies can be used. For example, if the column is a timestamp, a combination of year, month, day, and hour would be appropriate.

There are only a handful of statuses that a hint can have, so that limits the number of top-level directories using the column value partition. Going on to the next partition column, type which is an enum as well. It can be just of a few variations. It is also one of the most common filters.

This helps queries because when providing type and/or status, Iceberg will do a partition scan on those columns to know which folders to look in. This will bypass the need to read all the files and directories. Being able to partition data in this way is one of the key reasons Iceberg was chosen for the table.

One would want to look out for over and under partitioning. Too few partitions leads to reading more data per query. Too many partitions leads to an over abundance of small files which could increase storage as well as impact query performance.

Configuration

One of the major configurations that made a big difference in update time is changing to merge-on-read for merge operations. The reason for this is that the default is copy-on-write which rewrite’s the entire file when a row is needing to be updated which leads to longer writes and more memory being consumed. With large-hourly table mergers the cost adds up quickly as well as write time to make updates. The downside to merge-on-read is that those row merges now have to happen on all reads. Currently merges on read is not a problem at the moment with the usages of this table. This can be mitigated by more frequent re-writes.

ALTER TABLE datalake.db.table SET TBLPROPERTIES (
'write.merge.mode'='merge-on-read'
)

Procedures

Compacting files
Due to the nature of the hints table, it’s necessary to constantly re-write files in order to reduce the number of files that need to be opened and read. The way to perform this in Iceberg is to run the rewrite data files procedure. By default, calling theRewriteDataFiles procedure will only write one partition at a time. This is extremely inefficient when working with thousands of partitions. To improve this, it’s best to add the max-concurrent-file-group-rewrites property to increase concurrency.

CALL datalake.system.rewrite_data_files(
table => ‘database.table’,
strategy => ‘binpack’,
options => map (
'max-concurrent-file-group-rewrites','100',
'partial-progress.enabled','true',
'rewrite-all','true'
)
)

The next important property on this procedure was partial-progress.enabled due to being such a big dataset, failures can be computational expensive. Compacting a large amount of data and only to find out 60 minutes later that it failed can be costly. Therefore instead of one commit once all rewriting is done setting this property allows multiple commits throughout a rewrite. In case of failure not all is lost.

Due to the size, the strategy that has currently shown to have the best performance is to use the where clause on rewrite procedure. Doing so allows compacting a certain partition at a time. Different partitions have varying sizes and throughput resulting in different compacting time. Some partitions cannot utilize concurrent rewrites or the cluster runs out of memory fast.

Without setting rewrite-all to true then positional deletes from using merge-on-read are not rewritten. Not doing so will progressively lead to slower queries and updates since the data needs to be read in-order to perform merges.

Cleaning up files
Constantly rewriting files results in files that are no longer being used, and they increase storage costs. This requires an expire_snapshots procedure to be run occasionally. It will delete files and the pointers to those files. Expiring snapshots removes the ability to time travel to different points in time. Similar to the RewriteDataFiles, the default concurrency is 1. Fortunately, this can be increased. By increasing concurrent deletes, the time to expire decreased by tens of minutes.

CALL datalake.system.expire_snapshots(
table => '`database`.table',
retain_last => 1,
max_concurrent_deletes => 20)

The one gotcha with the above procedure: We might think there will only be one snapshot left after running this command because retain_lastis set to 1. That is not always the case. This procedure looks at another property called history.expire.max-snapshot-age-ms which is default to 5 days. With that only snapshots newer than 5 days are kept.

-- Set Snapshots to 1 day
ALTER TABLE datalake.`database`.table SET TBLPROPERTIES (
'history.expire.max-snapshot-age-ms'='86400000'
)

Multiple Uploads

Doing one large MERGE operation didn’t prove to be efficient. While analyzing the incoming data a few ideas arose. The table should be updated hourly. With that information, if data was created since last run the MERGE operation can be avoided using an append method. The downsides is handling backfills, the MERGE operation would need to be used in that case.

The next idea was around the fact that 85% of the incoming data was hints that were created recently, past two months. Therefore the second upload application can MERGE recently created hints and filter out hints older than two months.

The last upload would handle the remaining 15% of hints that are much older. While spanning many more years the data is very small, less than a million rows.

Doing this reduced the amount of data needed to be shuffled during any operation. The hourly data was 250MB, if we could get each upload batch down to 25MB then it would be easier to use a broadcast join using--conf spark.sql.autoBroadcastJoinThreshold=26214400. However a full table scan prior to filtering is still happening which is part of future improvement to reduce that.

One of the interesting learning outcomes was that when using WHEN MATCHED and WHEN NOT MATCHED conditionals in the same MERGE operation, the join couldn’t be broadcast. This is due to the when there is both conditionals Iceberg uses a full outer join which broadcast join doesn’t support. The WHEN MATCHED alone is an inner join which can be broadcasted. The conditional WHEN NOT MATCHED when present by itself is a left-anti join which is supported by broadcast but the Iceberg table is the table that would need to be broadcasted and that table is too large to broadcast.

Going forward

Hints was one of the first Iceberg tables created on the Data Platform team at Ancestry, and it’s 12x larger than the previous tables. This led to the team needing to adapt quickly due to the importance of the dataset. Going forward, the team will be experimenting with partitioning strategies and working to find the correct number for optimal performance. Also explored will be more frequent updates and additional Spark/table properties.

If you’re interested in joining Ancestry, we’re hiring! Feel free to check out our careers page for more info. Also, please see our medium page to see what Ancestry is up to and read more articles like this.

--

--