Python to Apache Iceberg(s)

Tim Spann
Cloudera
Published in
9 min readFeb 27, 2024

Apache Iceberg, Python, Open Data Lakehouse, LLM, GenAI, OLLAMA, Apache Parquet, Apache Arrow, JSON, CSV, MinIO, S3

I needed a place to store my data in a modern format that is flexible, high performance, with time travel, partitioning, full SQL, multi data engine support and much more. It is also possible to run it in in the small with a Python library, a small sqlite database and MinIO to store files.

The first step is to install the library for querying and appending data to Iceberg tables.

Install Libraries

pip3 install "pyiceberg[s3fs,hive,pyarrow]"

For Easy Local Demos Use SQLCatalog with SQLite

We then setup our MinIO buckets.

We already had the Apache Kafka library for Python installed. So we can send our finance data to Kafka and Iceberg.

We read Yahoo Finance with the library we set here.

Stock Feed to Apache Iceberg and Apache Kafka via Python

import pyarrow.parquet as pq
import pyarrow.compute as pc
import pyarrow as pa
from pyiceberg.catalog import load_catalog
from pyarrow import fs
import json
import pandas as pd
from pyarrow import csv
import datetime
import subprocess
import sys
import os
from kafka import KafkaProducer
from kafka.errors import KafkaError
import random
import time
from yahoo_fin import stock_info as si
from yahoo_fin import news as sn
from yahoo_fin import options as so
from datetime import datetime, timezone
import time
import logging
import sys
import subprocess
import os
import traceback
import math
import base64
from time import gmtime, strftime
import random, string
import psutil
import uuid
import requests
from time import sleep
from math import isnan
from subprocess import PIPE, Popen
import socket
import argparse
import os.path
import re
from kafka import KafkaProducer
from kafka.errors import KafkaError
import json
from time import sleep
from math import isnan
import datetime
import sys
import os

StockNames = ["ORCL", "SAP", "CSCO","GOOG","ETH-USD", "NVDA", "AMZN", "IBM", "NFLX", "MARA", "TSLA", "PLTR", "NCLH", "RIVN", "AMD", "PFE", "BAC", "GOOGL"]

producer = KafkaProducer(key_serializer=str.encode, value_serializer=lambda v: json.dumps(v).encode('ascii'),bootstrap_servers='kafka:9092',retries=3)

tablename = "yfinstocks"
schemaname = "docs_example"
s3location = "s3://pyiceberg"
local_data_dir = "/tmp/stocks/"

from pyiceberg.catalog.sql import SqlCatalog
warehouse_path = "/tmp/warehouse"
catalog = SqlCatalog(
"docs",
**{
"uri": f"sqlite:///{warehouse_path}/pyiceberg_catalog.db",
"warehouse": "http://localhost:9000",
"s3.endpoint": "http://localhost:9000",
"py-io-impl": "pyiceberg.io.pyarrow.PyArrowFileIO",
"s3.access-key-id": "key",
"s3.secret-access-key": "secretkey",
},
)

rowCounter = 0
isList = []

while rowCounter >= 0:
stockname = random.choice(StockNames)
ts = time.time()
uuid_key = '{0}_{1}'.format(strftime("%Y%m%d%H%M%S",gmtime()),uuid.uuid4())
try:
row = {'uuid': uuid_key, 'stockname': stockname, 'ts': float(int(ts * 1000)), 'currentts': float(strftime("%Y%m%d%H%M%S",gmtime())), 'stockvalue': float(si.get_live_price(stockname)) }
producer.send(tablename, key=uuid_key, value=row)
producer.flush()
except:
print("Bad stockname " + stockname)

print(str(rowCounter) + " " + stockname)
isList.append(row)
rowCounter = rowCounter + 1

if ( rowCounter >= 1000):
rowCounter = 0

## build PyArrow table from python list
df = pa.Table.from_pylist(isList)
#### Write to Apache Iceberg on Minio (S3)
### - only create it for new table
table = None
try:
table = catalog.create_table(
f'{schemaname}.{tablename}',
schema=df.schema,
location=s3location,
)
except:
print("Table exists, append " + tablename)
table = catalog.load_table(f'{schemaname}.{tablename}')

### Write table to Iceberg/Minio
table.append(df)
isList = []
df = None

time.sleep(0.05)


producer.close()

Source Code:

Apache Iceberg data files in MINIO
Apache Iceberg meta data in MINIO
Kafka topic data

In production you would use Cloudera’s Hive Metastore which we do all the things for you automagically. Don’t run Iceberg in production without an enterprise catalog, security, governance, management, multicloud coordination, backups, replication and support.

Suggestion from OLLAMA with Mistral

I’d be happy to help you design an Apache NiFi flow for storing data into Apache Iceberg tables. Here is a high-level overview of the steps involved:

  1. Fetch Data: First, we need to fetch data from its source and load it into NiFi. You can use one of the many available processors like `GetFile`, `ExecuteStreamCommand`, or `EvaluateJsonPath` for this purpose based on your data source.
  2. Parse Transform Data: After fetching the data, you might need to parse and transform it into the desired schema before storing it in Iceberg tables. You can use processors like `EvaluateJsonPath`, `UpdateAttribute`, `SetProperty` or custom processors written in Java or Python for this purpose.

3. Write Data to Parquet Files: Before writing data to Iceberg tables, you need to write it to parquet files. Use the `PutHdfs` processor along with a `ListFile` and `UpdateSessionFlowFile` processor chain to write the transformed data into HDFS/S3/Minio in Apace Parquet format.

4. Create or Update Iceberg Tables: Once the parquet files are written, you can use Apache Iceberg’s `IcebergPutHdfsWriter` to create or update the corresponding Iceberg tables using the NiFi processor `InvokeScript`. You will need to provide a script (in Python, Java, or Scala) that calls the Iceberg CLI commands. Here is an example script in Python:

# python3.10+
import sys
from iceberg import Iceberg, Table

# Replace this with your project-specific code
project = "your_iceberg_project"
namespace = "your_iceberg_namespace"
table = "your_iceberg_table"
filepath = "/opt/demo/data/parquet/file.parquet"

iceberg = Iceberg()
iceberg.init(project=project, namespace=namespace)

# Create a new table if it doesn't exist or update the existing one
if not iceberg.tables.exists(table):
table_spec = Table.from_path(filepath).schema("my_schema")
iceberg.create_table(table, table_spec)
else:
table = iceberg.get_table(table)

# Read the parquet file and update the metadata of the existing table
data = iceberg.tables.read(table, read_data=False)
new_metadata = data[-1].metadata + {"some_key": "some_value"}
iceberg.update_table(table, new_metadata)

sys.exit()

5. Make sure to replace the placeholders in the script with your specific project details and schema definition.

6. Triggering the Flow: Finally, you can schedule your NiFi flow using the NiFi Web UI or call it through an external trigger like Apache Airflow or a REST API call.

With this setup, data is fetched, transformed, written to parquet files, and stored in Iceberg tables using Apache NiFi.

How to get that suggestion from Ollama

curl http://localhost:11434/api/generate -d '{
"model": "mistral",
"prompt": "Write me a Apache NiFi flow to store to Apache Iceberg tables",
"stream": false
}'

TOOLS

brew install parquet-cli

parquet schema 00000-0-07decdd6-ec51-44d0-b5bc-ce8bed54e174.parquet
{
"type" : "record",
"name" : "schema",
"fields" : [ {
"name" : "uuid",
"type" : [ "null", "string" ],
"default" : null
}, {
"name" : "stockname",
"type" : [ "null", "string" ],
"default" : null
}, {
"name" : "ts",
"type" : [ "null", "double" ],
"default" : null
}, {
"name" : "currentts",
"type" : [ "null", "double" ],
"default" : null
}, {
"name" : "stockvalue",
"type" : [ "null", "double" ],
"default" : null
} ]
}

parquet meta 00000-0-de9eb90a-0c71-4565-a38f-31ec930a6bea.parquet

File path: 00000-0-de9eb90a-0c71-4565-a38f-31ec930a6bea.parquet
Created by: parquet-cpp-arrow version 11.0.0
Properties:
ARROW:schema: /////3ACAAAQAAAAAAAKAA4ABgAFAAgACgAAAAABBAAQAAAAAAAKAAwAAAAEAAgACgAAAAwAAAAEAAAAAAAAAAUAAADEAQAARAEAANwAAABwAAAABAAAAGL+//8AAAEDFAAAAFQAAAAIAAAAGAAAAAAAAAAKAAAAc3RvY2t2YWx1ZQAAAQAAAAQAAABU/v//EAAAAAQAAAABAAAANQAAABAAAABQQVJRVUVUOmZpZWxkX2lkAAAAADr///8AAAIAyv7//wAAAQMUAAAAVAAAAAgAAAAYAAAAAAAAAAkAAABjdXJyZW50dHMAAAABAAAABAAAALz+//8QAAAABAAAAAEAAAA0AAAAEAAAAFBBUlFVRVQ6ZmllbGRfaWQAAAAAov///wAAAgAy////AAABAxQAAABQAAAACAAAABAAAAAAAAAAAgAAAHRzAAABAAAABAAAABz///8QAAAABAAAAAEAAAAzAAAAEAAAAFBBUlFVRVQ6ZmllbGRfaWQAAAYACAAGAAYAAAAAAAIAlv///wAAAQUUAAAAWAAAAAgAAAAYAAAAAAAAAAkAAABzdG9ja25hbWUAAAABAAAABAAAAIj///8QAAAABAAAAAEAAAAyAAAAEAAAAFBBUlFVRVQ6ZmllbGRfaWQAAAAABAAGAAQAAAAAABIAGAAIAAYABwAMAAAAEAAUABIAAAAAAAEFFAAAAFwAAAAIAAAAFAAAAAAAAAAEAAAAdXVpZAAAAAABAAAADAAAAAgADAAEAAgACAAAABAAAAAEAAAAAQAAADEAAAAQAAAAUEFSUVVFVDpmaWVsZF9pZAAAAAAEAAQABAAAAAAAAAA=
Schema:
message schema {
optional binary uuid (STRING) = 1;
optional binary stockname (STRING) = 2;
optional double ts = 3;
optional double currentts = 4;
optional double stockvalue = 5;
}


Row group 0: count: 1000 31.44 B records start: 4 total(compressed): 30.707 kB total(uncompressed):73.043 kB
--------------------------------------------------------------------------------
type encodings count avg size nulls min / max
uuid BINARY Z _ R 1000 22.79 B 0 "20240227165804_5dde6202-d..." / "20240227170241_4b3e7fe4-b..."
stockname BINARY Z _ R 1000 0.81 B 0 "AMD" / "TSLA"
ts DOUBLE Z _ R 1000 3.36 B 0 "1.709053084942E12" / "1.709053361178E12"
currentts DOUBLE Z _ R 1000 1.56 B 0 "2.0240227165804E13" / "2.0240227170241E13"
stockvalue DOUBLE Z _ R 1000 2.93 B 0 "11.0600004196167" / "3237.353271484375"

parquet head 00000-0-07decdd6-ec51-44d0-b5bc-ce8bed54e174.parquet
{"uuid": "20240227154750_f6a195ef-47be-4e6a-aadd-a34d92f098b7", "stockname": "CSCO", "ts": 1.709048870183E12, "currentts": 2.024022715475E13, "stockvalue": 48.224998474121094}
{"uuid": "20240227154750_f9f3792d-8eb7-4228-bbce-3057f5dbb9e9", "stockname": "RIVN", "ts": 1.709048870473E12, "currentts": 2.024022715475E13, "stockvalue": 10.893899917602539}
{"uuid": "20240227154750_f9701ebd-4b2b-4563-950c-8c50d37e4317", "stockname": "BAC", "ts": 1.709048870636E12, "currentts": 2.024022715475E13, "stockvalue": 34.17499923706055}
{"uuid": "20240227154751_bc35e094-2265-4454-b443-d9d44cdf76f1", "stockname": "GOOGL", "ts": 1.709048871059E12, "currentts": 2.0240227154751E13, "stockvalue": 138.375}
{"uuid": "20240227154751_d67e3936-94af-4363-8181-e626479a4bdd", "stockname": "ORCL", "ts": 1.709048871269E12, "currentts": 2.0240227154751E13, "stockvalue": 109.97000122070312}
{"uuid": "20240227154751_242a0e9e-012f-4028-b4a5-9847d6899305", "stockname": "PFE", "ts": 1.709048871569E12, "currentts": 2.0240227154751E13, "stockvalue": 26.950199127197266}
{"uuid": "20240227154751_315b6985-2c18-4504-a0ec-8cc805437d53", "stockname": "NVDA", "ts": 1.709048871999E12, "currentts": 2.0240227154751E13, "stockvalue": 786.8400268554688}
{"uuid": "20240227154752_1632a927-d807-4874-81fd-e3aa7a20b580", "stockname": "RIVN", "ts": 1.70904887225E12, "currentts": 2.0240227154752E13, "stockvalue": 10.893899917602539}
{"uuid": "20240227154752_e4d88f5f-54ba-4d38-917b-9702ab70eef8", "stockname": "AMZN", "ts": 1.709048872399E12, "currentts": 2.0240227154752E13, "stockvalue": 174.24000549316406}
{"uuid": "20240227154752_b50a8d0c-8f3b-4522-bec6-69e9db5c9fc2", "stockname": "TSLA", "ts": 1.709048872643E12, "currentts": 2.0240227154752E13, "stockvalue": 203.27000427246094}

RESOURCES

--

--

Tim Spann
Cloudera

Principal Developer Advocate, Zilliz. Milvus, Attu, Towhee, GenAI, Big Data, IoT, Deep Learning, Streaming, Machine Learning. https://www.datainmotion.dev/