Interoperability Presto + Iceberg + Spark
First, setup prestodb/presto server + iceberg catalog backed by a HMS service.
Steps to Run Presto(0.282+) + Hive(3.0) + Iceberg(1.2.1 & 1.3.0) and interoperability with Apache Spark 3.3.x and 3.4.x
Setup Hive
+ Hadoop
(Hadoop
is optional, it can be replaced with S3
, If s3
is available then hadoop
installation can be skipped. Please note spark depends on hadoop
for accessing S3. To solve this, one can use — packages option in spark to enable relevant hadoop-aws
package.):
1. Download and extract hadoop
3.3.6 from hadoop
’s releases page.
2. Download and extract hive, since we only need hive metastore service
and not full hive: HMS(i.e. Hive Metastore Service
) 3.0.0 Download link.
3. Setup hadoop standalone following the guide here: https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/SingleCluster.html
4. export HADOOP_HOME=/path/hadoop-3.3.6
5. Similarly extract the file download in step 2 and point the extracted hive location as:export HIVE_HOME=/path/apache-hive-metastore-3.0.0-bin
Start hive metastore service
in the foreground as:
bin/schematool -dbType derby -initSchema # command will create and init the derby
# Start the hive metadata service
bin/start-metastore
2023-07-04 12:34:15: Starting Metastore Server
...
Setup Presto server:
- Download version: 0.282 (Version 0.281 and previous versions are incompatible with Spark. More on: https://github.com/prestodb/presto/pull/19542)
- Extract the above tar file (
presto-server-0.282.tar.gz
), call the installationexport PRESTO_HOME=~/presto-server-0.282
- Instructions to setup are adapted from: https://prestodb.io/docs/current/installation/deployment.html
- In addition to above setup, add configuration as follows.
# create the directories under presto installation dir.
$ mkdir -p $PRESTO_HOME/etc/catalog
# /etc/node.properties
node.environment=production
node.id=ffffffff-ffff-ffff-ffff-ffffffffffff
node.data-dir=/home/user/presto/data
# Please note replace /home/user with a path that this process can write to.
# /etc/config.properties
coordinator=true
node-scheduler.include-coordinator=true
http-server.http.port=8080
query.max-memory=5GB
query.max-memory-per-node=1GB
discovery-server.enabled=true
discovery.uri=http://localhost:8080
# /etc/jvm.config
-Djdk.attach.allowAttachSelf=true
-server
-Xmx16G
-XX:+UseG1GC
-XX:G1HeapRegionSize=32M
-XX:+UseGCOverheadLimit
-XX:+ExplicitGCInvokesConcurrent
-XX:+HeapDumpOnOutOfMemoryError
-XX:+ExitOnOutOfMemoryError
# /etc/catalog/iceberg.properties
connector.name=iceberg
hive.metastore.uri=thrift://localhost:9083
iceberg.catalog.type=hive
Run presto with above configuration in the foreground as follows:
bin/launcher run
Now that we have presto
+ hive
are working and the config above takes care of setting up iceberg
as a catalog.
Please note: presto-cli
can be installed by following the official guide at https://prestodb.io/docs/current/installation/cli.html
Set up Spark + Iceberg with same HMS as used by presto.
Note that: the instructions were tested with both Apache Spark 3.4.1 and 3.3.2 and iceberg version 1.2.1 and iceberg version 1.3.0.
- Download the spark binary from : https://spark.apache.org/downloads.html
Spark
3.3.2 or 3.4.1 withhadoop3
i.e. link was chosen for this setup.- Extract the downloaded file and let us call it
SPARK_HOME
e.g. export SPARK_HOME=/path/spark-3.3.2-bin-hadoop3
where/path
is the path to Spark. - Copy the following configuration to
$SPARK_HOME/conf/spark-defaults.conf
spark.jars.packages org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.2.1
spark.sql.extensions org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
spark.sql.catalog.hive_prod org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.hive_prod.type hive
spark.sql.catalog.hive_prod.uri thrift://localhost:9083
spark.sql.defaultCatalog hive_prod
Start spark as:bin/spark-sql
Now spark
+ Iceberg
and presto
+ Iceberg
backed by a common HMS service is ready to use.
Test scenario 1: A table created in iceberg catalog by spark with data inserted by spark, is read by presto.
Step1. Start presto-cli
and check out the existing schemas in iceberg
catalog.
presto-cli --server localhost:8080 --catalog iceberg
presto> show catalogs
-> ;
Catalog
---------
hive
iceberg
jmx
system
(4 rows)
Query 20230601_112030_00000_epxj4, FINISHED, 1 node
Splits: 19 total, 19 done (100.00%)
0:01 [0 rows, 0B] [0 rows/s, 0B/s]
presto> show schemas;
Schema
--------------------
default
information_schema
(2 rows)
Query 20230601_112047_00002_epxj4, FINISHED, 1 node
Splits: 19 total, 19 done (100.00%)
355ms [2 rows, 35B] [5 rows/s, 98B/s]
Step 2. Start spark sql and create a table in iceberg
catalog and insert some data to it.
spark-sql> create schema nyc;
Time taken: 0.151 seconds
spark-sql> show schemas;
default
nyc
Time taken: 0.352 seconds, Fetched 2 row(s)
spark-sql>
> CREATE TABLE nyc.taxis
> (
> vendor_id bigint,
> trip_id bigint,
> trip_distance float,
> fare_amount double,
> store_and_fwd_flag string
> )
> PARTITIONED BY (vendor_id);
Time taken: 0.716 seconds
spark-sql>
> INSERT INTO nyc.taxis
> VALUES (1, 1000371, 1.8, 15.32, 'N'), (2, 1000372, 2.5, 22.15, 'N'), (2, 1000373, 0.9, 9.01, 'N'), (1, 1000374, 8.4, 42.13, 'Y');
23/06/01 16:54:13 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
Time taken: 3.048 seconds
spark-sql>
>
> select * from nyc.taxis;
1 1000371 1.8 15.32 N
1 1000374 8.4 42.13 Y
2 1000372 2.5 22.15 N
2 1000373 0.9 9.01 N
Time taken: 0.668 seconds, Fetched 4 row(s)
spark-sql>
Step 3. Go back to presto-cli
and see if the data added by spark, can be queried in presto.
presto> show schemas;
Schema
--------------------
default
information_schema
nyc
(3 rows)
Query 20230601_112710_00003_epxj4, FINISHED, 1 node
Splits: 19 total, 19 done (100.00%)
180ms [3 rows, 43B] [16 rows/s, 238B/s]
presto> use nyc;
USE
presto:nyc> show tables;
Table
-------
taxis
(1 row)
Query 20230601_112717_00005_epxj4, FINISHED, 1 node
Splits: 19 total, 19 done (100.00%)
323ms [1 rows, 18B] [3 rows/s, 55B/s]
presto:nyc> select *from taxis;
vendor_id | trip_id | trip_distance | fare_amount | store_and_fwd_flag
-----------+---------+---------------+-------------+--------------------
1 | 1000371 | 1.8 | 15.32 | N
1 | 1000374 | 8.4 | 42.13 | Y
2 | 1000372 | 2.5 | 22.15 | N
2 | 1000373 | 0.9 | 9.01 | N
(4 rows)
Query 20230601_112723_00006_epxj4, FINISHED, 1 node
Splits: 18 total, 18 done (100.00%)
0:02 [4 rows, 3.56KB] [2 rows/s, 2.09KB/s]
presto:nyc>
Test scenario 1 is complete, and success!!!
Test scenario 2:
- Table created by spark,
- Rows added by spark
- A row added by presto.
- Select from inserted table in both presto and spark.
(Please note, step 1 and 2 is already completed by Test scenario 1. so proceeding with step 3)
3. Insert rows from presto as follows and select *from taxis;
presto:nyc> insert into taxis values (10,1233222,2.2,12.22,'Y')
-> ;
INSERT: 1 row
Query 20230601_115227_00007_epxj4, FINISHED, 1 node
Splits: 35 total, 35 done (100.00%)
0:01 [0 rows, 0B] [0 rows/s, 0B/s]
presto:nyc> select *from taxis;
vendor_id | trip_id | trip_distance | fare_amount | store_and_fwd_flag
-----------+---------+---------------+-------------+--------------------
1 | 1000371 | 1.8 | 15.32 | N
2 | 1000372 | 2.5 | 22.15 | N
1 | 1000374 | 8.4 | 42.13 | Y
2 | 1000373 | 0.9 | 9.01 | N
10 | 1233222 | 2.2 | 12.22 | Y
(5 rows)
Query 20230601_115237_00008_epxj4, FINISHED, 1 node
Splits: 19 total, 19 done (100.00%)
283ms [5 rows, 4.49KB] [17 rows/s, 15.9KB/s]
4. Select *from taxis;
in spark-sql
.
spark-sql> select * from nyc.taxis;
23/06/01 17:22:43 ERROR BaseReader: Error reading file(s): file:/Users/prashantsharma/work/temp/spark-3.3.2-bin-hadoop3/spark-warehouse/nyc.db/taxis/data/vendor_id=10/fb14f225-3782-449c-a542-3a1c80f46d82.parquet
java.lang.UnsupportedOperationException: Cannot support vectorized reads for column [trip_id] optional int64 trip_id = 2 with encoding DELTA_BINARY_PACKED. Disable vectorized reads to read this table/file
at org.apache.iceberg.arrow.vectorized.parquet.VectorizedPageIterator.initDataReader(VectorizedPageIterator.java:100)
at org.apache.iceberg.parquet.BasePageIterator.initFromPage(BasePageIterator.java:140)
at org.apache.iceberg.parquet.BasePageIterator$1.visit(BasePageIterator.java:105)
at org.apache.iceberg.parquet.BasePageIterator$1.visit(BasePageIterator.java:96)
at org.apache.iceberg.shaded.org.apache.parquet.column.page.DataPageV2.accept(DataPageV2.java:192)
at org.apache.iceberg.parquet.BasePageIterator.setPage(BasePageIterator.java:95)
at org.apache.iceberg.parquet.BaseColumnIterator.advance(BaseColumnIterator.java:61)
- select only those rows added by spark and not by presto using a where clause as follows:
spark-sql> select * from nyc.taxis where vendor_id != 10;
1 1000371 1.8 15.32 N
1 1000374 8.4 42.13 Y
2 1000372 2.5 22.15 N
2 1000373 0.9 9.01 N
Time taken: 0.081 seconds, Fetched 4 row(s)
spark-sql>
Why did we get a java.lang.UnsupportedOperationException: Cannot support vectorized reads for column
error?
Short answer is: Presto does not support vectorized format as yet and since spark uses vectorized read by default it runs into error. There is a reason Spark has vectorized read turn on by default and the reason is obviously performance. In some benchmarks, vectorized reads have shown 4x to 6x performance improvements. Ref: spark-hive-benchmarks
How to fix it:
SET spark.sql.iceberg.vectorization.enabled=false;
spark-sql (nyc)> SET spark.sql.iceberg.vectorization.enabled=false;
spark.sql.iceberg.vectorization.enabled false
Time taken: 0.008 seconds, Fetched 1 row(s)
spark-sql (nyc)> select *from taxis;
10 1233222 2.2 12.22 Y
1 1000371 1.8 15.32 N
1 1000374 8.4 42.13 Y
2 1000372 2.5 22.15 N
2 1000373 0.9 9.01 N
Time taken: 0.08 seconds, Fetched 5 row(s)
spark-sql (nyc)>
Test scenario complete, inserting by presto and read by Apache Spark does not work, it could only read the rows inserted by spark.
Test scenario 3: Table created by presto and rows inserted by spark.
a. Create a test table using presto-cli as follows:
presto:nyc> create table test (name varchar);
CREATE TABLE
presto:nyc>
b. Insert into the test table only using Apache Spark.
spark-sql> select *from nyc.test;
Time taken: 0.066 seconds
spark-sql> insert into nyc.test values ("check");
Time taken: 0.274 seconds
spark-sql> select *from nyc.test;
check
Time taken: 0.061 seconds, Fetched 1 row(s)
spark-sql> insert into nyc.test values ("check2");
Time taken: 0.257 seconds
spark-sql> select *from nyc.test;
check
check2
Time taken: 0.056 seconds, Fetched 2 row(s)
spark-sql>
Test scenario 4: Table created in presto and data inserted by both spark and presto in order.
(continuing from the above test scenario, we insert a row in presto and observe the result as follows:)
presto:nyc> select *from test;
name
--------
check2
check
(2 rows)
Query 20230601_120145_00010_epxj4, FINISHED, 1 node
Splits: 18 total, 18 done (100.00%)
294ms [2 rows, 966B] [6 rows/s, 3.21KB/s]
presto:nyc> insert into test values ('inserted_via_presto');
INSERT: 1 rowQuery 20230601_120238_00011_epxj4, FINISHED, 1 node
Splits: 19 total, 19 done (100.00%)
0:01 [0 rows, 0B] [0 rows/s, 0B/s]presto:nyc> select *from test;
name
---------------------
check2
inserted_via_presto
check
(3 rows)Query 20230601_120242_00012_epxj4, FINISHED, 1 node
Splits: 19 total, 19 done (100.00%)
248ms [3 rows, 1.25KB] [12 rows/s, 5.04KB/s]presto:nyc>
b. Read the above in spark.
spark-sql (nyc)> select *from nyc.test;
inserted_via_presto
check
check2
Time taken: 0.057 seconds, Fetched 3 row(s)
spark-sql (nyc)>
c. Read specific rows created by spark [NOTE: Following error is fixed by switching to jdk11]
spark-sql> select *from nyc.test where name like "check%";
Time taken: 0.064 seconds
spark-sql> select *from nyc.test where name like "check";
#
# A fatal error has been detected by the Java Runtime Environment:
#
# SIGSEGV (0xb) at pc=0x000000010f03b9da, pid=26137, tid=0x000000000001da03
#
# JRE version: OpenJDK Runtime Environment (8.0_362) (build 1.8.0_362-bre_2023_01_22_03_32-b00)
# Java VM: OpenJDK 64-Bit Server VM (25.362-b00 mixed mode bsd-amd64 compressed oops)
# Problematic frame:
# V [libjvm.dylib+0x5409da] Unsafe_GetByte+0x5c
#
# Failed to write core dump. Core dumps have been disabled. To enable core dumping, try "ulimit -c unlimited" before starting Java again
#
# An error report file with more information is saved as:
# /Users/prashantsharma/work/temp/spark-3.3.2-bin-hadoop3/hs_err_pid26137.log
#
# If you would like to submit a bug report, please visit:
# https://github.com/Homebrew/homebrew-core/issues
#
Abort trap: 6
(base) Prashants-MacBook-Pro:spark-3.3.2-bin-hadoop3 prashantsharma$
Area of interest in above generated heap dump, looks like related java version mismatch.
Classes redefined (0 events):
No events
Internal exceptions (10 events):
Event: 2706.672 Thread 0x00007ff4af896800 Exception <a 'java/lang/NoSuchMethodError': <clinit>> (0x00000007b605c1b0) thrown at [/private/tmp/openjdkA8-20230122-5912-1drjnge/jdk8u-jdk8u362-ga/hotspot/src/share/vm/prims/jni.cpp, line 1614]
Event: 3025.902 Thread 0x00007ff53f00e000 Exception <a 'java/lang/NoSuchMethodError': java.lang.Object.$anonfun$collectProjectsAndFilters$3$adapted(Lscala/collection/Seq;)Ljava/lang/Object;> (0x00000007ba0d3ab8) thrown at [/private/tmp/openjdkA8-20230122-5912-1drjnge/jdk8u-jdk8u362-ga/hotspot/src
Event: 3025.942 Thread 0x00007ff53f00e000 Exception <a 'java/lang/NoSuchMethodError': java.lang.Object.$anonfun$isNullIntolerant$1$adapted(Lorg/apache/spark/sql/catalyst/expressions/PredicateHelper;Lorg/apache/spark/sql/catalyst/expressions/Expression;)Ljava/lang/Object;> (0x00000007bb060918) thr
Event: 3025.943 Thread 0x00007ff53f00e000 Exception <a 'java/lang/NoSuchMethodError': java.lang.Object.$anonfun$outputWithNullability$1(Lscala/collection/Seq;Lorg/apache/spark/sql/catalyst/expressions/Attribute;)Lorg/apache/spark/sql/catalyst/expressions/Attribute;> (0x00000007bb0705c0) thrown at
Event: 3025.951 Thread 0x00007ff53f00e000 Exception <a 'java/lang/NoSuchMethodError': java.lang.Object.$anonfun$generatePredicateCode$4(Lorg/apache/spark/sql/execution/GeneratePredicateHelper;Lscala/collection/Seq;[ZLscala/collection/Seq;Lscala/collection/Seq;Lscala/collection/Seq;Lscala/collecti
Event: 3025.952 Thread 0x00007ff53f00e000 Exception <a 'java/lang/NoSuchMethodError': java.lang.Object.$anonfun$generatePredicateCode$5(Lorg/apache/spark/sql/execution/GeneratePredicateHelper;Lscala/collection/Seq;[ZLscala/collection/Seq;Lscala/collection/Seq;Lscala/collection/Seq;Lscala/collecti
Event: 3025.952 Thread 0x00007ff53f00e000 Exception <a 'java/lang/NoSuchMethodError': java.lang.Object.$anonfun$generatePredicateCode$6$adapted(Lorg/apache/spark/sql/catalyst/expressions/Attribute;Lorg/apache/spark/sql/catalyst/expressions/Expression;)Ljava/lang/Object;> (0x00000007bb1793e0) thro
Event: 3025.955 Thread 0x00007ff53f00e000 Exception <a 'java/lang/NoSuchMethodError': java.lang.Object.$anonfun$generatePredicateCode$7(Lorg/apache/spark/sql/execution/GeneratePredicateHelper;[ZLscala/collection/Seq;Lscala/collection/Seq;Lorg/apache/spark/sql/catalyst/expressions/codegen/CodegenC
Event: 3025.987 Thread 0x00007ff4af868800 Exception <a 'java/lang/NoSuchMethodError': <clinit>> (0x00000007ac04d088) thrown at [/private/tmp/openjdkA8-20230122-5912-1drjnge/jdk8u-jdk8u362-ga/hotspot/src/share/vm/prims/jni.cpp, line 1614]
Event: 3059.588 Thread 0x00007ff540b0d000 Implicit null exception at 0x00000001113dbcb8 to 0x00000001113dbd3d
Events (10 events):
Spark claims to support java version 8u362 and above. However it is failing above with java version 8u362 (link https://spark.apache.org/docs/latest/).
How to fix: switch to JDK 11.
spark-sql (nyc)> select *from nyc.test where name like "check%";
check
check2
Time taken: 0.078 seconds, Fetched 2 row(s)
spark-sql (nyc)> select *from nyc.test where name like "check";
check
Time taken: 0.044 seconds, Fetched 1 row(s)
spark-sql (nyc)>
Related issue: