Unlocking the Power of Iceberg within PostgreSQL

Majid Azimi
DataReply
Published in
7 min readApr 22, 2024

Introduction

In the complex world of data management, the quest for efficiency, scalability, and interoperability is never-ending. Standard table formats like Iceberg have emerged as beacons of innovation in this realm, offering a unified approach to handling massive datasets across various platforms and systems. Iceberg tables, in particular, stand out for their advanced features such as schema evolution, hidden partitioning, and compatibility with extensive data processing ecosystems. These capabilities are not just enhancements; they are essential in navigating the challenges of modern data storage and analysis. The integration of these standard formats with powerful database systems, such as PostgreSQL, marks a significant leap forward in data management strategies. But the integration of Iceberg tables with PostgreSQL offers more than just improved data handling; it represents a strategic alignment with standard table formats that are becoming the backbone of the data industry.

As we delve into various approaches, we’ll evaluate the crucial aspects of integrating Iceberg tables with PostgreSQL. Whether you are navigating the intricacies of database management, engineering data solutions, or simply keen on the evolution of data technologies, understanding the significance of standard table formats and their integration with robust systems like PostgreSQL is key to unlocking new dimensions of data infrastructure capabilities.

What are the possible options?

To facilitate reading from and writing to Iceberg tables, we’re presented with a trio of viable options:

  1. Use Airflow operators which enables us seamlessly transfer data to and from Iceberg tables and PostgreSQL. Namely, DuckDB and PostgreSQL operators are the ones we will need.
  2. Adopting PL/Python procedures offers a direct avenue for interacting with Iceberg tables.
  3. Opting for an external tool, akin to Apache Flink in combination with Apache Kafka, presents another strategic pathway. Given its complexity and the breadth of its application, we’ll dedicate a separate blog post to fully explore how tools like Flink can be leveraged for interacting with Iceberg tables, ensuring a thorough examination of its capabilities and integration possibilities. For a similar architecture take a look into lakehouse post.

For the sake of examples, we will use a sample table person with the following structure on both AWS S3 and PostgreSQL:

CREATE TABLE person (
id INT PRIMARY KEY,
name VARCHAR(50) NOT NULL,
age INT NOT NULL
);

For example, below is the same table definition on AWS Athena:

CREATE TABLE IF NOT EXISTS person (
id int,
name string,
age int
)
LOCATION 's3://<bucket>/person'
TBLPROPERTIES (
'table_type' = 'ICEBERG',
'format'='parquet',
'write_compression'='snappy',
'optimize_rewrite_delete_file_threshold'='10'
)

Additionally, for the sake of simplicity, we assume you have already setup an Iceberg table, PostgreSQL and Airflow.

Using Airflow Operators

In order to load an Iceberg table (on S3 for example), we can utilize DuckDB Operator. The following shows a snippet for a sample person table:


with dag:
@task
def load_duckdb_into_postgresql():
conn = duckdb.connect()
conn.sql("INSTALL httpfs")
conn.sql("INSTALL iceberg")
conn.sql("INSTALL postgres")
conn.sql("LOAD icerberg")
conn.sql("LOAD httpfs")
conn.sql("LOAD postgres")
conn.sql("SET s3_region='eu-central-1'")
conn.sql("INSERT INTO postgres_db.public.person SELECT * FROM iceberg_scan('s3://bucket/person/path/to/metadata.json');")
conn.close()

Realize how I utilized INSERT INTO tbl SELECT * FROM other_tbl pattern to read from Iceberg table and load it into PostgreSQL.

The reverse direction doesn’t work out of the box (read from PostgreSQL and write to Iceberg table. At the time of writing this post, DuckDB doesn’t support writing to Iceberg tables [doc].

The problem with this approach is that data flows into Airflow tasks (depends on the execution environment). The following diagrams depicts this scenario.

There are two possible issues with this approach:

  1. In case you are moving large amounts of data in and out of PostgreSQL, it has to flow through your Airflow environment which might be resource intensive.
  2. In some security sensitive environments, data may not move out of the restricted zone. In other words, all readers, writers and data lakes are in restricted zone. Other clients (including Airflow) can only connect to submit administrative commands/queries. Once raw data is aggregated (or masked/anonymized), it’s fine to move out of restricted zone.

Adopting PL/Python Procedures

PL/Python is a procedural language supported by PostgreSQL, enabling database developers and administrators to write user-defined and trigger functions in Python. It provides a seamless bridge between the SQL commands of PostgreSQL and the Python programming environment, allowing for sophisticated data manipulation, processing, and external system interactions directly within PostgreSQL.

To use PL/Python, it must first be installed and enabled within your PostgreSQL database. This typically involves installing the PL/Python extension package on your database server and then creating the extension within your database using the CREATE EXTENSION plpythonu; command. In case the extension is not installed already, you will see an error:

ERROR:  extension "plpython3u" is not available
DETAIL: Could not open extension control file "/usr/share/postgresql/16/extension/plpython3u.control": No such file or directory.
HINT: The extension must first be installed on the system where PostgreSQL is running.

PyIceberg is a Python library that offers programmatic access to Iceberg tables, making it possible to read from and write to Iceberg tables directly from Python. By leveraging PyIceberg within PL/Python functions, we can effectively bridge PostgreSQL and Iceberg tables, enabling direct data exchange between these two systems without the need for intermediary data movement or external workflow orchestration tools.

This integration harnesses the power of Iceberg’s table format with the robustness and reliability of PostgreSQL, providing a comprehensive solution for managing and analyzing large-scale datasets.

First, ensure PyIceberg is available in your PostgreSQL server’s Python environment. This might require installing PyIceberg in the global Python environment of the server or using a virtual environment accessible to PostgreSQL.

pip install "pyiceberg[hive,glue,pyarrow,pandas,duckdb,ray,s3fs,snappy]"

Since PostgreSQL runs under a system user (for example postgres), you need to configure AWS credentials for this user. In other words, run aws configure with postgres user and configure your AWS credentials.

Below is an example of PL/Python procedure that loads Iceberg table into a PostgreSQL table.

CREATE OR REPLACE PROCEDURE load_iceberg_table() 
LANGUAGE plpython3u
AS $$
from pyiceberg.catalog.glue import GlueCatalog

catalog = GlueCatalog(
"default",
**{
"region_name": "eu-central-1",
}
)

namespace: str = "pg-iceberg-loader"

person_tbl = catalog.load_table(("pg-iceberg-loader", "person"))

pd = person_tbl.scan().to_pandas()

plan = plpy.prepare("INSERT INTO person (id, name, age) VALUES ($1, $2, $3)", ["INT", "VARCHAR", "INT"])
for index, row in pd.iterrows():
plpy.execute(plan, [row['id'], row['name'], row['age']])
$$;

Explanation:

  • For the sake of this example, we use GlueCatalog. But other catalogs are also fine.
  • Once table is loaded, it’s possible to scan/filter the table and convert to panda data frames. DuckDB and Ray integrations also exist.
  • We use plpy.prepare to prepare a SQL statement and run against our DB instance. You have to call plpy.execute to actually execute it. Check PostgreSQL docs for more database access functions.

The reverse direction (PostgreSQL to Iceberg) works similarly.

CREATE OR REPLACE PROCEDURE unload_postgresql_table() 
LANGUAGE plpython3u
AS $$
from pyiceberg.catalog.glue import GlueCatalog
import pyarrow as pa

record_schema = pa.schema([
('id', pa.int32()),
('name', pa.string()),
('age', pa.int32()),
])

catalog = GlueCatalog(
"default",
**{
"region_name": "eu-central-1",
}
)

namespace: str = "pg-iceberg-loader"

person_tbl = catalog.load_table(("pg-iceberg-loader", "person"))

rs = []

for row in plpy.cursor("SELECT * FROM person"):
rs.append({"id": row['id'], "name": row['name'], "age": row['age']})

df = pa.Table.from_pylist(rs, record_schema)

person_tbl.append(df)
$$;

Explanation:

  • PyIceberg uses Arrow under the hood, so we have to compile Arrow table to push to S3.
  • Note that we used from_pylist to build our Arrow table. In case a large quantities of data is being exported, we have to build the table on a local file and then commit to Iceberg table. Checkout Arrow docs for more info.
  • Finally, we use append function to commit our changes.

Now that we can interact with Iceberg tables, our Airflow tasks looks like the following:

# Define the task to call the stored procedure
call_load_iceberg_proc = PostgresOperator(
task_id='call_load_iceberg_table',
postgres_conn_id='your_postgres_connection_id',
sql='CALL load_iceberg_table();',
)

Note that we only use CALL load_iceberg_table() to invoke our function. As a consequence, data wouldn’t flow into Airflow tasks.

What About Other Databases?

Iceberg is supported on all major programming languages. According to my research, majority of databases allow writing user defined procedures in other languages than SQL:

What About Cloud Databases?

AWS RDS has a PostgreSQL extension named aws_s3 that allows importing and exporting queries to AWS S3. First, install the extension:

CREATE EXTENSION aws_s3 CASCADE;
NOTICE: installing required extension "aws_commons"
CREATE EXTENSION

Then you can import/export directly to S3:

SELECT aws_s3.table_import_from_s3(
't', '', '(format csv)',
:'s3_uri',
aws_commons.create_aws_credentials('sample_access_key', 'sample_secret_key', '')
);

However, this is a raw import/export. It doesn’t allow interacting with Iceberg table format.

At the time of writing this post, no cloud provider enables you directly interact to Iceberg tables within PostgreSQL.

Conclusion

In conclusion, integrating Iceberg tables with PostgreSQL presents a powerful combination for managing large-scale datasets with efficiency, reliability, and scalability. We explored two distinct approaches to achieve this integration: leveraging Airflow in conjunction with DuckDB Operators for a flexible, externalized workflow, and employing PL/Python in combination with the PyIceberg library for direct, in-database operations. Each method offers unique advantages, whether it’s the broad orchestration capabilities and extensibility of Airflow or the seamless, efficient data handling afforded by executing Python code within PostgreSQL.

The choice between these approaches depends on various factors, including your specific data infrastructure, performance requirements, and operational preferences. For workflows that prioritize flexibility and the integration of multiple data sources and destinations, the Airflow method with DuckDB Operators is exceptionally suited. On the other hand, for scenarios that demand tight integration, minimal data movement, and leveraging the computational resources of the PostgreSQL environment itself, the PL/Python and PyIceberg combination provides an elegant solution.

As we wrap up this exploration, it’s clear that the journey toward optimal data management is ongoing, with new tools, techniques, and best practices continually emerging. Staying informed and adaptable is key to navigating this landscape successfully, ensuring that your data infrastructure remains robust, responsive, and aligned with your organizational goals. Whether you choose to externalize your data processes with Airflow or integrate directly using PL/Python and PyIceberg, the path to enhanced data capabilities and insights is well within reach.

--

--