Interoperability Presto + Iceberg + Spark

Prashant Sharma
Presto + Spark: A Lakehouse story.
8 min readJul 6, 2023

First, setup prestodb/presto server + iceberg catalog backed by a HMS service.

Steps to Run Presto(0.282+) + Hive(3.0) + Iceberg(1.2.1 & 1.3.0) and interoperability with Apache Spark 3.3.x and 3.4.x

Setup Hive + Hadoop (Hadoop is optional, it can be replaced with S3, If s3 is available then hadoop installation can be skipped. Please note spark depends on hadoop for accessing S3. To solve this, one can use — packages option in spark to enable relevant hadoop-aws package.):
1. Download and extract hadoop 3.3.6 from hadoop’s releases page.
2. Download and extract hive, since we only need hive metastore service and not full hive: HMS(i.e. Hive Metastore Service) 3.0.0 Download link.
3. Setup hadoop standalone following the guide here: https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/SingleCluster.html
4. export HADOOP_HOME=/path/hadoop-3.3.6
5. Similarly extract the file download in step 2 and point the extracted hive location as:export HIVE_HOME=/path/apache-hive-metastore-3.0.0-bin

Start hive metastore service in the foreground as:

bin/schematool -dbType derby -initSchema # command will create and init the derby
# Start the hive metadata service
bin/start-metastore
2023-07-04 12:34:15: Starting Metastore Server
...

Setup Presto server:

# create the directories under presto installation dir.
$ mkdir -p $PRESTO_HOME/etc/catalog
# /etc/node.properties
node.environment=production
node.id=ffffffff-ffff-ffff-ffff-ffffffffffff
node.data-dir=/home/user/presto/data
# Please note replace /home/user with a path that this process can write to.
# /etc/config.properties
coordinator=true
node-scheduler.include-coordinator=true
http-server.http.port=8080
query.max-memory=5GB
query.max-memory-per-node=1GB
discovery-server.enabled=true
discovery.uri=http://localhost:8080
# /etc/jvm.config
-Djdk.attach.allowAttachSelf=true
-server
-Xmx16G
-XX:+UseG1GC
-XX:G1HeapRegionSize=32M
-XX:+UseGCOverheadLimit
-XX:+ExplicitGCInvokesConcurrent
-XX:+HeapDumpOnOutOfMemoryError
-XX:+ExitOnOutOfMemoryError

# /etc/catalog/iceberg.properties
connector.name=iceberg
hive.metastore.uri=thrift://localhost:9083
iceberg.catalog.type=hive

Run presto with above configuration in the foreground as follows:

bin/launcher run

Now that we have presto + hive are working and the config above takes care of setting up iceberg as a catalog.

Please note: presto-cli can be installed by following the official guide at https://prestodb.io/docs/current/installation/cli.html

Set up Spark + Iceberg with same HMS as used by presto.

Note that: the instructions were tested with both Apache Spark 3.4.1 and 3.3.2 and iceberg version 1.2.1 and iceberg version 1.3.0.

  1. Download the spark binary from : https://spark.apache.org/downloads.html
  2. Spark 3.3.2 or 3.4.1 with hadoop3 i.e. link was chosen for this setup.
  3. Extract the downloaded file and let us call it SPARK_HOME e.g. export SPARK_HOME=/path/spark-3.3.2-bin-hadoop3 where /path is the path to Spark.
  4. Copy the following configuration to $SPARK_HOME/conf/spark-defaults.conf
spark.jars.packages              org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.2.1
spark.sql.extensions org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
spark.sql.catalog.hive_prod org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.hive_prod.type hive
spark.sql.catalog.hive_prod.uri thrift://localhost:9083
spark.sql.defaultCatalog hive_prod

Start spark as:
bin/spark-sql

bin/spark-sql

Now spark + Iceberg and presto + Iceberg backed by a common HMS service is ready to use.

Test scenario 1: A table created in iceberg catalog by spark with data inserted by spark, is read by presto.

Step1. Start presto-cli and check out the existing schemas in iceberg catalog.

presto-cli --server localhost:8080 --catalog iceberg
presto> show catalogs
-> ;
Catalog
---------
hive
iceberg
jmx
system
(4 rows)

Query 20230601_112030_00000_epxj4, FINISHED, 1 node
Splits: 19 total, 19 done (100.00%)
0:01 [0 rows, 0B] [0 rows/s, 0B/s]


presto> show schemas;
Schema
--------------------
default
information_schema
(2 rows)

Query 20230601_112047_00002_epxj4, FINISHED, 1 node
Splits: 19 total, 19 done (100.00%)
355ms [2 rows, 35B] [5 rows/s, 98B/s]

Step 2. Start spark sql and create a table in iceberg catalog and insert some data to it.

spark-sql> create schema nyc;
Time taken: 0.151 seconds
spark-sql> show schemas;
default
nyc
Time taken: 0.352 seconds, Fetched 2 row(s)
spark-sql>
> CREATE TABLE nyc.taxis
> (
> vendor_id bigint,
> trip_id bigint,
> trip_distance float,
> fare_amount double,
> store_and_fwd_flag string
> )
> PARTITIONED BY (vendor_id);
Time taken: 0.716 seconds
spark-sql>
> INSERT INTO nyc.taxis
> VALUES (1, 1000371, 1.8, 15.32, 'N'), (2, 1000372, 2.5, 22.15, 'N'), (2, 1000373, 0.9, 9.01, 'N'), (1, 1000374, 8.4, 42.13, 'Y');
23/06/01 16:54:13 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
Time taken: 3.048 seconds
spark-sql>
>
> select * from nyc.taxis;
1 1000371 1.8 15.32 N
1 1000374 8.4 42.13 Y
2 1000372 2.5 22.15 N
2 1000373 0.9 9.01 N
Time taken: 0.668 seconds, Fetched 4 row(s)
spark-sql>

Step 3. Go back to presto-cli and see if the data added by spark, can be queried in presto.

presto> show schemas;
Schema
--------------------
default
information_schema
nyc
(3 rows)

Query 20230601_112710_00003_epxj4, FINISHED, 1 node
Splits: 19 total, 19 done (100.00%)
180ms [3 rows, 43B] [16 rows/s, 238B/s]

presto> use nyc;
USE
presto:nyc> show tables;
Table
-------
taxis
(1 row)

Query 20230601_112717_00005_epxj4, FINISHED, 1 node
Splits: 19 total, 19 done (100.00%)
323ms [1 rows, 18B] [3 rows/s, 55B/s]

presto:nyc> select *from taxis;
vendor_id | trip_id | trip_distance | fare_amount | store_and_fwd_flag
-----------+---------+---------------+-------------+--------------------
1 | 1000371 | 1.8 | 15.32 | N
1 | 1000374 | 8.4 | 42.13 | Y
2 | 1000372 | 2.5 | 22.15 | N
2 | 1000373 | 0.9 | 9.01 | N
(4 rows)

Query 20230601_112723_00006_epxj4, FINISHED, 1 node
Splits: 18 total, 18 done (100.00%)
0:02 [4 rows, 3.56KB] [2 rows/s, 2.09KB/s]

presto:nyc>

Test scenario 1 is complete, and success!!!

Test scenario 2:

  1. Table created by spark,
  2. Rows added by spark
  3. A row added by presto.
  4. Select from inserted table in both presto and spark.

(Please note, step 1 and 2 is already completed by Test scenario 1. so proceeding with step 3)

3. Insert rows from presto as follows and select *from taxis;

  presto:nyc> insert into taxis values (10,1233222,2.2,12.22,'Y')
-> ;
INSERT: 1 row

Query 20230601_115227_00007_epxj4, FINISHED, 1 node
Splits: 35 total, 35 done (100.00%)
0:01 [0 rows, 0B] [0 rows/s, 0B/s]

presto:nyc> select *from taxis;
vendor_id | trip_id | trip_distance | fare_amount | store_and_fwd_flag
-----------+---------+---------------+-------------+--------------------
1 | 1000371 | 1.8 | 15.32 | N
2 | 1000372 | 2.5 | 22.15 | N
1 | 1000374 | 8.4 | 42.13 | Y
2 | 1000373 | 0.9 | 9.01 | N
10 | 1233222 | 2.2 | 12.22 | Y
(5 rows)

Query 20230601_115237_00008_epxj4, FINISHED, 1 node
Splits: 19 total, 19 done (100.00%)
283ms [5 rows, 4.49KB] [17 rows/s, 15.9KB/s]

4. Select *from taxis; in spark-sql.

spark-sql> select * from nyc.taxis;
23/06/01 17:22:43 ERROR BaseReader: Error reading file(s): file:/Users/prashantsharma/work/temp/spark-3.3.2-bin-hadoop3/spark-warehouse/nyc.db/taxis/data/vendor_id=10/fb14f225-3782-449c-a542-3a1c80f46d82.parquet
java.lang.UnsupportedOperationException: Cannot support vectorized reads for column [trip_id] optional int64 trip_id = 2 with encoding DELTA_BINARY_PACKED. Disable vectorized reads to read this table/file
at org.apache.iceberg.arrow.vectorized.parquet.VectorizedPageIterator.initDataReader(VectorizedPageIterator.java:100)
at org.apache.iceberg.parquet.BasePageIterator.initFromPage(BasePageIterator.java:140)
at org.apache.iceberg.parquet.BasePageIterator$1.visit(BasePageIterator.java:105)
at org.apache.iceberg.parquet.BasePageIterator$1.visit(BasePageIterator.java:96)
at org.apache.iceberg.shaded.org.apache.parquet.column.page.DataPageV2.accept(DataPageV2.java:192)
at org.apache.iceberg.parquet.BasePageIterator.setPage(BasePageIterator.java:95)
at org.apache.iceberg.parquet.BaseColumnIterator.advance(BaseColumnIterator.java:61)
  • select only those rows added by spark and not by presto using a where clause as follows:

spark-sql> select * from nyc.taxis where vendor_id != 10;
1 1000371 1.8 15.32 N
1 1000374 8.4 42.13 Y
2 1000372 2.5 22.15 N
2 1000373 0.9 9.01 N
Time taken: 0.081 seconds, Fetched 4 row(s)
spark-sql>

Why did we get a java.lang.UnsupportedOperationException: Cannot support vectorized reads for column error?
Short answer is: Presto does not support vectorized format as yet and since spark uses vectorized read by default it runs into error. There is a reason Spark has vectorized read turn on by default and the reason is obviously performance. In some benchmarks, vectorized reads have shown 4x to 6x performance improvements. Ref: spark-hive-benchmarks

How to fix it:

SET spark.sql.iceberg.vectorization.enabled=false;

spark-sql (nyc)> SET spark.sql.iceberg.vectorization.enabled=false;
spark.sql.iceberg.vectorization.enabled false
Time taken: 0.008 seconds, Fetched 1 row(s)
spark-sql (nyc)> select *from taxis;
10 1233222 2.2 12.22 Y
1 1000371 1.8 15.32 N
1 1000374 8.4 42.13 Y
2 1000372 2.5 22.15 N
2 1000373 0.9 9.01 N
Time taken: 0.08 seconds, Fetched 5 row(s)
spark-sql (nyc)>

Test scenario complete, inserting by presto and read by Apache Spark does not work, it could only read the rows inserted by spark.

Test scenario 3: Table created by presto and rows inserted by spark.

a. Create a test table using presto-cli as follows:

presto:nyc> create table test (name varchar);
CREATE TABLE
presto:nyc>

b. Insert into the test table only using Apache Spark.

spark-sql> select *from nyc.test;
Time taken: 0.066 seconds
spark-sql> insert into nyc.test values ("check");
Time taken: 0.274 seconds
spark-sql> select *from nyc.test;
check
Time taken: 0.061 seconds, Fetched 1 row(s)
spark-sql> insert into nyc.test values ("check2");
Time taken: 0.257 seconds
spark-sql> select *from nyc.test;
check
check2
Time taken: 0.056 seconds, Fetched 2 row(s)
spark-sql>

Test scenario 4: Table created in presto and data inserted by both spark and presto in order.

(continuing from the above test scenario, we insert a row in presto and observe the result as follows:)

presto:nyc> select *from test;
name
--------
check2
check
(2 rows)
Query 20230601_120145_00010_epxj4, FINISHED, 1 node
Splits: 18 total, 18 done (100.00%)
294ms [2 rows, 966B] [6 rows/s, 3.21KB/s]
presto:nyc> insert into test values ('inserted_via_presto');
INSERT: 1 row
Query 20230601_120238_00011_epxj4, FINISHED, 1 node
Splits: 19 total, 19 done (100.00%)
0:01 [0 rows, 0B] [0 rows/s, 0B/s]
presto:nyc> select *from test;
name
---------------------
check2
inserted_via_presto
check
(3 rows)
Query 20230601_120242_00012_epxj4, FINISHED, 1 node
Splits: 19 total, 19 done (100.00%)
248ms [3 rows, 1.25KB] [12 rows/s, 5.04KB/s]
presto:nyc>

b. Read the above in spark.

spark-sql (nyc)> select *from nyc.test;
inserted_via_presto
check
check2
Time taken: 0.057 seconds, Fetched 3 row(s)
spark-sql (nyc)>

c. Read specific rows created by spark [NOTE: Following error is fixed by switching to jdk11]

spark-sql> select *from nyc.test where name like "check%";
Time taken: 0.064 seconds
spark-sql> select *from nyc.test where name like "check";
#
# A fatal error has been detected by the Java Runtime Environment:
#
# SIGSEGV (0xb) at pc=0x000000010f03b9da, pid=26137, tid=0x000000000001da03
#
# JRE version: OpenJDK Runtime Environment (8.0_362) (build 1.8.0_362-bre_2023_01_22_03_32-b00)
# Java VM: OpenJDK 64-Bit Server VM (25.362-b00 mixed mode bsd-amd64 compressed oops)
# Problematic frame:
# V [libjvm.dylib+0x5409da] Unsafe_GetByte+0x5c
#
# Failed to write core dump. Core dumps have been disabled. To enable core dumping, try "ulimit -c unlimited" before starting Java again
#
# An error report file with more information is saved as:
# /Users/prashantsharma/work/temp/spark-3.3.2-bin-hadoop3/hs_err_pid26137.log
#
# If you would like to submit a bug report, please visit:
# https://github.com/Homebrew/homebrew-core/issues
#
Abort trap: 6
(base) Prashants-MacBook-Pro:spark-3.3.2-bin-hadoop3 prashantsharma$

Area of interest in above generated heap dump, looks like related java version mismatch.

Classes redefined (0 events):
No events
Internal exceptions (10 events):
Event: 2706.672 Thread 0x00007ff4af896800 Exception <a 'java/lang/NoSuchMethodError': <clinit>> (0x00000007b605c1b0) thrown at [/private/tmp/openjdkA8-20230122-5912-1drjnge/jdk8u-jdk8u362-ga/hotspot/src/share/vm/prims/jni.cpp, line 1614]
Event: 3025.902 Thread 0x00007ff53f00e000 Exception <a 'java/lang/NoSuchMethodError': java.lang.Object.$anonfun$collectProjectsAndFilters$3$adapted(Lscala/collection/Seq;)Ljava/lang/Object;> (0x00000007ba0d3ab8) thrown at [/private/tmp/openjdkA8-20230122-5912-1drjnge/jdk8u-jdk8u362-ga/hotspot/src
Event: 3025.942 Thread 0x00007ff53f00e000 Exception <a 'java/lang/NoSuchMethodError': java.lang.Object.$anonfun$isNullIntolerant$1$adapted(Lorg/apache/spark/sql/catalyst/expressions/PredicateHelper;Lorg/apache/spark/sql/catalyst/expressions/Expression;)Ljava/lang/Object;> (0x00000007bb060918) thr
Event: 3025.943 Thread 0x00007ff53f00e000 Exception <a 'java/lang/NoSuchMethodError': java.lang.Object.$anonfun$outputWithNullability$1(Lscala/collection/Seq;Lorg/apache/spark/sql/catalyst/expressions/Attribute;)Lorg/apache/spark/sql/catalyst/expressions/Attribute;> (0x00000007bb0705c0) thrown at
Event: 3025.951 Thread 0x00007ff53f00e000 Exception <a 'java/lang/NoSuchMethodError': java.lang.Object.$anonfun$generatePredicateCode$4(Lorg/apache/spark/sql/execution/GeneratePredicateHelper;Lscala/collection/Seq;[ZLscala/collection/Seq;Lscala/collection/Seq;Lscala/collection/Seq;Lscala/collecti
Event: 3025.952 Thread 0x00007ff53f00e000 Exception <a 'java/lang/NoSuchMethodError': java.lang.Object.$anonfun$generatePredicateCode$5(Lorg/apache/spark/sql/execution/GeneratePredicateHelper;Lscala/collection/Seq;[ZLscala/collection/Seq;Lscala/collection/Seq;Lscala/collection/Seq;Lscala/collecti
Event: 3025.952 Thread 0x00007ff53f00e000 Exception <a 'java/lang/NoSuchMethodError': java.lang.Object.$anonfun$generatePredicateCode$6$adapted(Lorg/apache/spark/sql/catalyst/expressions/Attribute;Lorg/apache/spark/sql/catalyst/expressions/Expression;)Ljava/lang/Object;> (0x00000007bb1793e0) thro
Event: 3025.955 Thread 0x00007ff53f00e000 Exception <a 'java/lang/NoSuchMethodError': java.lang.Object.$anonfun$generatePredicateCode$7(Lorg/apache/spark/sql/execution/GeneratePredicateHelper;[ZLscala/collection/Seq;Lscala/collection/Seq;Lorg/apache/spark/sql/catalyst/expressions/codegen/CodegenC
Event: 3025.987 Thread 0x00007ff4af868800 Exception <a 'java/lang/NoSuchMethodError': <clinit>> (0x00000007ac04d088) thrown at [/private/tmp/openjdkA8-20230122-5912-1drjnge/jdk8u-jdk8u362-ga/hotspot/src/share/vm/prims/jni.cpp, line 1614]
Event: 3059.588 Thread 0x00007ff540b0d000 Implicit null exception at 0x00000001113dbcb8 to 0x00000001113dbd3d
Events (10 events):

Spark claims to support java version 8u362 and above. However it is failing above with java version 8u362 (link https://spark.apache.org/docs/latest/).

How to fix: switch to JDK 11.

spark-sql (nyc)> select *from nyc.test where name like "check%";
check
check2
Time taken: 0.078 seconds, Fetched 2 row(s)
spark-sql (nyc)> select *from nyc.test where name like "check";
check
Time taken: 0.044 seconds, Fetched 1 row(s)
spark-sql (nyc)>

Related issue:

  1. https://github.com/apache/iceberg/issues/7162
  2. https://github.com/apache/spark/pull/35262

--

--

Prashant Sharma
Presto + Spark: A Lakehouse story.

Committee member, maintainer, and contributor for Apache Spark, Kubeflow, and Tekton CD