Apache Iceberg, Nessie REST catalog, Minio, spark, trino and duckdb — Part 1

Arnab Neogi
8 min readJun 24, 2024

--

This will be 3 series article where my objective is to spin up distributed storage and compute system along with datawarehouse in DuckDB. All techs are open-source. Mininum knowledge is required in docker, docker-compose, python, SQL and cicd in git(optional). You can follow dockerfiles and they are self explanatory. Internal architecture of iceberg, trino or spark are out of scope of this article. My objective is to create architecture using open source techs. Shout out in comments if you need any help.

In this article, I’ll try to create a on prem lakehouse using apache iceberg and minio. We’ll use Nessie REST catalog for iceberg. You can use any REST catalog. The reason I’ve used REST catalog instead of Nessie native catalog is that pyiceberg connect to Nessie catalog. pyiceberg can connect to REST catalog and we had already implemented Nessie catalog. By using Nessie REST catalog, i can easily move from Nessie native catalog without losing existing catalogs in Nessie.

This is a very high level architecture:

There are containers that will depend on DB container to spin up.

This is an issue with docker swarm since depends_on variable doesnt work in docker swarm using docker stack. Hence created wait-for-it.sh file for the DB to spin up.

#!/usr/bin/env bash
# Use this script to test if a given TCP host/port are available

WAITFORIT_cmdname=${0##*/}

echoerr() { if [[ $WAITFORIT_QUIET -ne 1 ]]; then echo "$@" 1>&2; fi }

usage()
{
cat << USAGE >&2
Usage:
$WAITFORIT_cmdname host:port [-s] [-t timeout] [-- command args]
-h HOST | --host=HOST Host or IP under test
-p PORT | --port=PORT TCP port under test
Alternatively, you specify the host and port as host:port
-s | --strict Only execute subcommand if the test succeeds
-q | --quiet Don't output any status messages
-t TIMEOUT | --timeout=TIMEOUT
Timeout in seconds, zero for no timeout
-- COMMAND ARGS Execute command with args after the test finishes
USAGE
exit 1
}

wait_for()
{
if [[ $WAITFORIT_TIMEOUT -gt 0 ]]; then
echoerr "$WAITFORIT_cmdname: waiting $WAITFORIT_TIMEOUT seconds for $WAITFORIT_HOST:$WAITFORIT_PORT"
else
echoerr "$WAITFORIT_cmdname: waiting for $WAITFORIT_HOST:$WAITFORIT_PORT without a timeout"
fi
WAITFORIT_start_ts=$(date +%s)
while :
do
if [[ $WAITFORIT_ISBUSY -eq 1 ]]; then
nc -z $WAITFORIT_HOST $WAITFORIT_PORT
WAITFORIT_result=$?
else
(echo -n > /dev/tcp/$WAITFORIT_HOST/$WAITFORIT_PORT) >/dev/null 2>&1
WAITFORIT_result=$?
fi
if [[ $WAITFORIT_result -eq 0 ]]; then
WAITFORIT_end_ts=$(date +%s)
echoerr "$WAITFORIT_cmdname: $WAITFORIT_HOST:$WAITFORIT_PORT is available after $((WAITFORIT_end_ts - WAITFORIT_start_ts)) seconds"
break
fi
sleep 1
done
return $WAITFORIT_result
}

wait_for_wrapper()
{
# In order to support SIGINT during timeout: http://unix.stackexchange.com/a/57692
if [[ $WAITFORIT_QUIET -eq 1 ]]; then
timeout $WAITFORIT_BUSYTIMEFLAG $WAITFORIT_TIMEOUT $0 --quiet --child --host=$WAITFORIT_HOST --port=$WAITFORIT_PORT --timeout=$WAITFORIT_TIMEOUT &
else
timeout $WAITFORIT_BUSYTIMEFLAG $WAITFORIT_TIMEOUT $0 --child --host=$WAITFORIT_HOST --port=$WAITFORIT_PORT --timeout=$WAITFORIT_TIMEOUT &
fi
WAITFORIT_PID=$!
trap "kill -INT -$WAITFORIT_PID" INT
wait $WAITFORIT_PID
WAITFORIT_RESULT=$?
if [[ $WAITFORIT_RESULT -ne 0 ]]; then
echoerr "$WAITFORIT_cmdname: timeout occurred after waiting $WAITFORIT_TIMEOUT seconds for $WAITFORIT_HOST:$WAITFORIT_PORT"
fi
return $WAITFORIT_RESULT
}

# process arguments
while [[ $# -gt 0 ]]
do
case "$1" in
*:* )
WAITFORIT_hostport=(${1//:/ })
WAITFORIT_HOST=${WAITFORIT_hostport[0]}
WAITFORIT_PORT=${WAITFORIT_hostport[1]}
shift 1
;;
--child)
WAITFORIT_CHILD=1
shift 1
;;
-q | --quiet)
WAITFORIT_QUIET=1
shift 1
;;
-s | --strict)
WAITFORIT_STRICT=1
shift 1
;;
-h)
WAITFORIT_HOST="$2"
if [[ $WAITFORIT_HOST == "" ]]; then break; fi
shift 2
;;
--host=*)
WAITFORIT_HOST="${1#*=}"
shift 1
;;
-p)
WAITFORIT_PORT="$2"
if [[ $WAITFORIT_PORT == "" ]]; then break; fi
shift 2
;;
--port=*)
WAITFORIT_PORT="${1#*=}"
shift 1
;;
-t)
WAITFORIT_TIMEOUT="$2"
if [[ $WAITFORIT_TIMEOUT == "" ]]; then break; fi
shift 2
;;
--timeout=*)
WAITFORIT_TIMEOUT="${1#*=}"
shift 1
;;
--)
shift
WAITFORIT_CLI=("$@")
break
;;
--help)
usage
;;
*)
echoerr "Unknown argument: $1"
usage
;;
esac
done

if [[ "$WAITFORIT_HOST" == "" || "$WAITFORIT_PORT" == "" ]]; then
echoerr "Error: you need to provide a host and port to test."
usage
fi

WAITFORIT_TIMEOUT=${WAITFORIT_TIMEOUT:-15}
WAITFORIT_STRICT=${WAITFORIT_STRICT:-0}
WAITFORIT_CHILD=${WAITFORIT_CHILD:-0}
WAITFORIT_QUIET=${WAITFORIT_QUIET:-0}

# Check to see if timeout is from busybox?
WAITFORIT_TIMEOUT_PATH=$(type -p timeout)
WAITFORIT_TIMEOUT_PATH=$(realpath $WAITFORIT_TIMEOUT_PATH 2>/dev/null || readlink -f $WAITFORIT_TIMEOUT_PATH)

WAITFORIT_BUSYTIMEFLAG=""
if [[ $WAITFORIT_TIMEOUT_PATH =~ "busybox" ]]; then
WAITFORIT_ISBUSY=1
# Check if busybox timeout uses -t flag
# (recent Alpine versions don't support -t anymore)
if timeout &>/dev/stdout | grep -q -e '-t '; then
WAITFORIT_BUSYTIMEFLAG="-t"
fi
else
WAITFORIT_ISBUSY=0
fi

if [[ $WAITFORIT_CHILD -gt 0 ]]; then
wait_for
WAITFORIT_RESULT=$?
exit $WAITFORIT_RESULT
else
if [[ $WAITFORIT_TIMEOUT -gt 0 ]]; then
wait_for_wrapper
WAITFORIT_RESULT=$?
else
wait_for
WAITFORIT_RESULT=$?
fi
fi

if [[ $WAITFORIT_CLI != "" ]]; then
if [[ $WAITFORIT_RESULT -ne 0 && $WAITFORIT_STRICT -eq 1 ]]; then
echoerr "$WAITFORIT_cmdname: strict mode, refusing to execute subprocess"
exit $WAITFORIT_RESULT
fi
exec "${WAITFORIT_CLI[@]}"
else
exit $WAITFORIT_RESULT
fi

set up minio in docker:

version: '3'
services:

minio:
image: minio/minio
hostname: minio
ports:
- 9003:9000
- 9004:9001
environment:
MINIO_ROOT_USER: minioadmin
MINIO_ROOT_PASSWORD: minioadmin
container_name: minio
command: server /data --console-address ":9001"
volumes:
- lakehouse/minio/data:/data

Set up Nessie dockerfile

This set up is specific for using Nessie REST catalog. Refer to: https://raw.githubusercontent.com/projectnessie/nessie/main/docker/catalog-auth-s3-otel/docker-compose.yml

# Use the base image for Nessie
FROM ghcr.io/projectnessie/nessie:0.90.4

USER root
COPY ../wait-for-it.sh /wait-for-it.sh
RUN chmod +x /wait-for-it.sh

ENV nessie.server.authentication.enabled=false
# - neexample uses MinIO as the object store.
ENV nessie.catalog.default-warehouse=warehouse
ENV nessie.catalog.warehouses.warehouse.location=s3://warehouse/
ENV nessie.catalog.service.s3.region=us-east-1
# Whet https://<bucket>.<domain>. If unspecified, the default will depend on the cloud provider.
ENV nessie.catalog.service.s3.path-style-access=true
ENV nessie.catalog.service.s3.access-key.name=xxxxxx
ENV nessie.catalog.service.s3.access-key.secret=xxxxx
# MinIO endpoint for Nessie server
ENV nessie.catalog.service.s3.endpoint=http://minio:9000/
# MinIO endpoiports
EXPOSE 19120

Nessie Docker-compose:

version: '3'
services:

nessie:
image: $DOCKER_NESSIE_IMAGE
hostname: nessie
container_name: nessie
ports:
- "19120:19120"
volumes:
- lakehouse/nessie/data:/var/lib/nessie/data

nessie-db:
image: postgres:14.1-alpine
hostname: nessie-db
container_name: nessie-db
environment:
POSTGRES_DB: nessie
POSTGRES_USER: nessie_user
POSTGRES_PASSWORD: nessie_password
ports:
- 5440:5432
volumes:
- lakehouse/nessie/db_data:/var/lib/postgresql/data

Spark dockerfile

ARG my_arg
# Use the official Python 3.12 image from the Docker Hub
FROM python:3.12-slim as base


FROM base AS dev-version
ENV HIVE_HOME=/opt/hive
COPY spark/resources/hive-site-dev.xml $HIVE_HOME/conf/hive-site.xml
COPY spark/resources/hive-site-dev.xml $SPARK_HOME/conf/hive-site.xml
# COPY spark/resources/log4j.properties $SPARK_HOME/conf
RUN echo "DEV Image"
ENV VAR=TRUE

FROM base AS prod-version
ENV HIVE_HOME=/opt/hive
COPY spark/resources/hive-site-prod.xml $HIVE_HOME/conf/hive-site.xml
COPY spark/resources/hive-site-prod.xml $SPARK_HOME/conf/hive-site.xml
# COPY spark/resources/log4j.properties $SPARK_HOME/conf/
RUN echo "Prod Image"
ENV VAR=FALSE

FROM ${my_arg}-version AS builder
RUN echo "VAR is equal to ${VAR}"
ENV SPARK_HOME=/opt/spark \
HADOOP_HOME=/opt/hadoop \
SCALA_HOME=/opt/scala \
PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin \
PYSPARK_PYTHON=/usr/bin/python3
ENV SPARK_OPTS --driver-java-options=-Xms1024M --driver-java-options=-Xmx4096M --driver-java-options=-Dlog4j.logLevel=info


# Download and install OpenJDK 11
RUN apt-get update && \
apt-get install -y wget gnupg2 procps && \
wget -q https://download.java.net/openjdk/jdk11/ri/openjdk-11+28_linux-x64_bin.tar.gz \
&& tar xzf openjdk-11+28_linux-x64_bin.tar.gz \
&& mv jdk-11 /opt/jdk-11 \
&& ln -s /opt/jdk-11/bin/java /usr/bin/java \
&& rm openjdk-11+28_linux-x64_bin.tar.gz \
&& apt-get clean

# Set environment variables for Java
ENV JAVA_HOME=/opt/jdk-11
ENV PATH=$JAVA_HOME/bin:$PATH

# Install Spark
ARG SPARK_VERSION=3.3.2
ARG HADOOP_VERSION=3

RUN mkdir -p "$SPARK_HOME" "$HADOOP_HOME" "$SCALA_HOME" "$HIVE_HOME" && \
wget -qO - https://archive.apache.org/dist/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz | tar -xz -C $SPARK_HOME --strip-components=1 && \
wget -qO - https://archive.apache.org/dist/hadoop/common/hadoop-3.3.5/hadoop-3.3.5.tar.gz | tar -xz -C $HADOOP_HOME --strip-components=1


ENV SPARK_LOG_DIR=/opt/spark/logs \
SPARK_MASTER_LOG=/opt/spark/logs/spark-master.out \
SPARK_WORKER_LOG=/opt/spark/logs/spark-worker.out
RUN mkdir -p $SPARK_LOG_DIR && \
touch $SPARK_MASTER_LOG && \
touch $SPARK_WORKER_LOG && \
ln -sf /dev/stdout $SPARK_MASTER_LOG && \
ln -sf /dev/stdout $SPARK_WORKER_LOG
RUN rm $HADOOP_HOME/share/hadoop/common/lib/slf4j*
# RUN rm ${SPARK_HOME}/jars/log4j*
# COPY spark/jars/log4j-slf4j-impl-2.23.1.jar ${SPARK_HOME}/jars/
FROM builder as apache-spark
COPY spark/resources/spark-defaults.conf $SPARK_HOME/conf/

# COPY jars/postgresql-42.2.22.jar $SPARK_HOME/jars/
WORKDIR $SPARK_HOME

# Copy entrypoint script
COPY spark/entrypoint.sh /entrypoint.sh
RUN chmod +x /entrypoint.sh

COPY ../wait-for-it.sh /wait-for-it.sh
RUN chmod +x /wait-for-it.sh


# ENTRYPOINT ["sh", "-c", "/entrypoint.sh"]
ENTRYPOINT ["/wait-for-it.sh", "spark-meta-postgres:5432", "-s","-t","120", "--", "sh", "-c","/entrypoint.sh"]

hive-site-dev.xml — change the metadata path based on dev or prod

<configuration>
<!-- Hive Execution Parameters -->
<property>
<name>javax.jdo.option.ConnectionURL</name>
<!-- <value>jdbc:mysql://spark-meta-mysql:3306/metastore_db?createDatabaseIfNotExist=true&amp;useSSL=FALSE&amp;autoReconnect=true&amp;nullCatalogMeansCurrent=true</value> -->
<value>jdbc:postgresql://spark-meta-postgres:5432/metastore_db?createDatabaseIfNotExist=true&amp;useSSL=FALSE&amp;autoReconnect=true&amp;nullCatalogMeansCurrent=true</value>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>user</value>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>password</value>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<!-- <value>com.mysql.cj.jdbc.Driver</value> -->
<value>org.postgresql.Driver</value>
</property>
<property>
<name>hive.server2.transport.mode</name>
<value>http</value>
</property>
<property>
<name>hive.server2.thrift.http.port</name>
<value>10000</value>
</property>
<property>
<name>hive.server2.http.endpoint</name>
<value>cliservice</value>
</property>


<property>
<name>hive.metastore.schema.verification</name>
<value>false</value>
<description/>
</property>
<property>
<name>datanucleus.autoCreateSchema</name>
<value>true</value>
</property>
<property>
<name>datanucleus.fixedDatastore</name>
<value>true</value>
</property>
<property>
<name>datanucleus.autoCreateTables</name>
<value>true</value>
</property>
<property>
<name>datanucleus.schema.autoCreateTables</name>
<value>true</value>
</property>

</configuration>

spark submit container docker

ARG IMAGE_NAME
FROM ${IMAGE_NAME} as base

COPY spark/resources/check_and_update_schema.sh $HIVE_HOME/check_and_update_schema.sh
RUN wget -qO - https://dlcdn.apache.org/hive/hive-3.1.3/apache-hive-3.1.3-bin.tar.gz | tar -xz -C $HIVE_HOME --strip-components=1
RUN apt-get update
RUN wget -O $SPARK_HOME/jars/postgresql-42.5.4.jar https://repo1.maven.org/maven2/org/postgresql/postgresql/42.5.4/postgresql-42.5.4.jar
RUN wget -O $HIVE_HOME/lib/postgresql-42.5.4.jar https://repo1.maven.org/maven2/org/postgresql/postgresql/42.5.4/postgresql-42.5.4.jar
RUN chmod +x $HIVE_HOME/check_and_update_schema.sh


ENV PATH=$PATH:$HIVE_HOME/bin
# RUN $HIVE_HOME/check_and_update_schema.sh

RUN pip install pyspark==3.3.2 \
jupyterlab==4.2.1 findspark==2.0.1 \
thrift==0.20.0 numpy pandas cffi
RUN pip install duckdb --upgrade
RUN pip install mmh3
RUN apt-get install gcc -y
RUN apt-get install g++ -y
RUN pip install "pyiceberg[s3fs,hive,snappy,pyarrow,pandas]"
RUN apt-get install -y iputils-ping
COPY spark/resources/.pyiceberg.yaml /root/.pyiceberg.yaml

# Expose the port for JupyterLab
EXPOSE 8888

WORKDIR /opt

ENTRYPOINT ["/wait-for-it.sh", "spark-meta-postgres:5432", "-s","-t","120", "--","sh", "-c", "/entrypoint.sh"]

docker compose for metastore and spark master and worker

version: '3'
services:
spark-master:
image: $DOCKER_SPARK_IMAGE
container_name: spark-master
hostname: spark-master
env_file: .env
ports:
- "7077:7077"
- "8080:8080"
volumes:
- lakehouse/spark/apps:/opt/spark-apps
- lakehouse/spark/data:/opt/spark-data

environment:
# - SPARK_LOCAL_IP=spark-master
- SPARK_LOCAL_HOSTNAME=spark-master
# - SPARK_PUBLIC_DNS=spark-master
- SPARK_WORKLOAD=master
- SPARK_MASTER_PORT=7077
- SPARK_MASTER_WEBUI_PORT=8080
- SPARK_MODE=master

spark-worker:
image: $DOCKER_SPARK_IMAGE
hostname: spark-worker-a
container_name: spark-worker-a
env_file: .env
# networks:
# - default
ports:
- "9091:9091"
- "7000:7000"
depends_on:
- spark-master

environment:
- SPARK_MASTER=spark://spark-master:7077
- SPARK_LOCAL_HOSTNAME=spark-worker-a
- SPARK_WORKER_WEBUI_PORT=9091
- SPARK_PUBLIC_DNS=localhost
- SPARK_WORKER_CORES=4
- SPARK_WORKER_MEMORY=15G
- SPARK_DRIVER_MEMORY=2G
- SPARK_EXECUTOR_MEMORY=8G
- SPARK_WORKLOAD=worker
- SPARK_MODE=worker
- SPARK_WORKER_PORT=7000
volumes:
- spark-apps:/opt/spark-apps
- spark-data:/opt/spark-data

spark-meta-postgres:
image: postgres:14.1-alpine
hostname: spark-meta-postgres
container_name: spark-meta-postgres
environment:
- POSTGRES_USER=user
- POSTGRES_PASSWORD=password
- POSTGRES_DB=metastore_db

ports:
- 5431:5432
volumes:
- lakehouse/spark/db_data:/var/lib/postgresql/data

volumes:
spark-apps:
spark-data:

.env file containing minio and access keys of the minio

# Fill in Details

# AWS_REGION is used by Spark
AWS_REGION=us-east-1
# This must match if using minio
MINIO_REGION=us-east-1
# Used by pyIceberg
AWS_DEFAULT_REGION=us-east-1
# AWS Credentials (this can use minio credential, to be filled in later)
AWS_ACCESS_KEY_ID=xxxx
AWS_SECRET_ACCESS_KEY=xxxxx
# If using Minio, this should be the API address of Minio Server
AWS_S3_ENDPOINT=http://minio:9000
# Location where files will be written when creating new tables
WAREHOUSE=s3a://warehouse/
# URI of Nessie Catalog
NESSIE_URI=http://nessie:19120/api/v1

docker compose for spark submit:

version: '3'
services:

spark-submit:
image: $DOCKER_SPARK_SUBMIT_IMAGE
env_file: .env
container_name: spark-submit
# command: ["/wait", "&&", "entrypoint.sh"]
hostname: spark-submit
# build: .
environment:
- SPARK_MASTER=spark://spark-master:7077
- SPARK_LOCAL_HOSTNAME=spark-submit
- SPARK_PUBLIC_DNS=spark-submit
- SPARK_WORKER_WEBUI_PORT=4040
- SPARK_WORKER_CORES=1
- SPARK_MODE=submit
- SPARK_WORKER_MEMORY=1G
- SPARK_DRIVER_MEMORY=1G
- SPARK_EXECUTOR_MEMORY=1G
- SPARK_WORKLOAD=submit
volumes:
- lakehouse/spark/apps:/opt/spark-apps
- lakehouse/spark/data:/opt/spark-data

ports:
- "4040:4040"
- "8888:8888"
- "18080:18080"
- "8889:8889"
- "10000:10000"
- "10001:10001"
- "9083:9083"


some more spark confs in spark-defaults.conf

spark.master                     spark://spark-master:7077
spark.executor.memory 6G
spark.executor.cores 3
spark.driver.memory 5G
spark.jars.ivy /opt/spark-apps

entrypoint.sh in spark that will define to run notebook or spark master/worker based on variable SPARK_MODE

#!/bin/bash

if [ "$SPARK_MODE" == "master" ]; then
$SPARK_HOME/bin/spark-class org.apache.spark.deploy.master.Master
elif [ "$SPARK_MODE" == "worker" ]; then
$SPARK_HOME/bin/spark-class org.apache.spark.deploy.worker.Worker spark://spark-master:7077
elif [ "$SPARK_MODE" == "submit" ]; then
#update schema
$HIVE_HOME/check_and_update_schema.sh
# Start JupyterLab
jupyter lab --ip=0.0.0.0 --port=8888 --no-browser --allow-root --NotebookApp.token=''
else
echo "Invalid SPARK_MODE specified: $SPARK_MODE"
exit 1
fi

Trino configuration and duckdb connection using pyiceberg will follow soon.

--

--