Presto + Spark Interoperability: Spark based Ingestion.

Prashant Sharma
Presto + Spark: A Lakehouse story.
8 min readSep 22, 2023

A typical lakehouse setup will have a multi engine configuration, e.g. Presto and Spark, this way not only does the lakehouse get unique advantages of both the engines but also ability to load data a from greater number of data sources(faster — how? read on.).

Get a Spark + Presto lakehouse setup by either:

  1. Using open source Presto and Apache Spark
  2. A managed lakehouse service, e.g. IBM watsonx.data : https://www.ibm.com/products/watsonx-data

About setup: We opted for IBM watsonx.data for preparation of this blog. Presto + Spark interoperability is most easily achieved via a shared iceberg catalog. Apache iceberg is a table format and needs metastore service for example hive to access its metadata. The setup therefore, needs a shared Iceberg catalog (possibly hosted on an object store e.g. Amazon S3/ Minio/ IBM COS), a metastore service e.g. Hive metastore service (HMS), presto & Spark configured to connect to HMS and Apache iceberg catalog. In a managed service e.g. IBM watsonx.data all of this is already configured. For setting it up yourself using the open source HMS/Presto/Iceberg and Spark, read: https://medium.com/@scrapcodes/interoperability-presto-iceberg-spark-4e5299ec7de5 .

Note about ingestion, generally copying a large amount of data is both time and resource consuming. But sometimes the data needs to be :

a) Prepared (selecting only relevant columns or rows)
b) Cleansed(e.g. CSV with bad records — missing fields in rows)
c) Optimized (compressed storage/ format change e.g. CSV to parquet etc…).
d) There is another reason for ingesting i.e. Extract Transform and Load (ETL), in this if the “transform” operation is expensive, it might make sense to store the transformed data in the iceberg format.

Part 1. Ingestion.

How do we ingest a large parquet, ORC or CSV file, hosted on S3 bucket/hdfs/or any hadoop compatible source(the list is long). In fact, the source can be a streaming source too (more on that later).

Sample data source (Apache License v2): https://www.kaggle.com/datasets/regulad/amazon?resource=download Extracted and uploaded to a S3 bucket.

See it in Action, On the spark terminal:

df = spark.read.csv("s3a://testbucket/large/21-Appliances.csv", header=True)
>>> df.show(2) # preview the data!
+-------+-------------+--------------------+----------+--------------------+----------+----------+---------------------------+----+---------+---------------+-----------+----------------------+---------+----------------+-------------------+-----------+------------+----------------+-----------+------------------+------------+-------------+------------+-------------+----------------+------------+------------+-----------+-----------+-------------+------------+---------+-----------+----+
|brandId|subcategoryId| imageUrl| asin| title| brandName|categoryId|subcategory.subcategoryName|rank|amazonIsr|numberOfSellers|isVariation|monthlyRevenueEstimate| ttm|monthlyUnitsSold| listedSince|reviewCount|reviewRating|numberFbaSellers|buyBoxPrice|averageBuyBoxPrice|buyBoxEquity|revenueEquity|marginEquity|outOfStockNow|productPageScore|manufacturer| upc| partNumber| model|numberOfItems|totalRatings|momGrowth|momGrowth12|note|
+-------+-------------+--------------------+----------+--------------------+----------+----------+---------------------------+----+---------+---------------+-----------+----------------------+---------+----------------+-------------------+-----------+------------+----------------+-----------+------------------+------------+-------------+------------+-------------+----------------+------------+------------+-----------+-----------+-------------+------------+---------+-----------+----+
| 181347| 2399939011|data:image/jpeg;b...|B09ZRNP19N|GE Profile Opal 1...|GE PROFILE| 21| Ice Makers|null| 0.0| 1| False| 3223122.0|890585.53| 8078|2022-05-07T00:00:00| 2667| 4.3| 1| 399.0| 306.5| 4039.0| 1611561.0| 241734.15| False| 9.2| GE Profile|084691897880|P4INAASSTSS|P4INAASSTSS| 1| 0| -0.77| null|null|
| 297220| 3741161|data:image/jpeg;b...|B0C7M5MKDQ|Maxblue RPWFE (wi...| Maxblue| 21| Water Filters|null| 0.0| 1| False| 474807.34| null| 3466|2023-06-10T00:00:00| 3528| null| 1| 136.99| 68.0| 1733.0| 237403.67| 35610.55| False| 7.2| Maxblue| null| MB-F19C-D| MB-F19C-D| 1| null| null| null|null|
+-------+-------------+--------------------+----------+--------------------+----------+----------+---------------------------+----+---------+---------------+-----------+----------------------+---------+----------------+-------------------+-----------+------------+----------------+-----------+------------------+------------+-------------+------------+-------------+----------------+------------+------------+-----------+-----------+-------------+------------+---------+-----------+----+
only showing top 2 rows

>>> df.printSchema() # inferred schema!
root
|-- brandId: string (nullable = true)
|-- subcategoryId: string (nullable = true)
|-- imageUrl: string (nullable = true)
|-- asin: string (nullable = true)
|-- title: string (nullable = true)
|-- brandName: string (nullable = true)
|-- categoryId: string (nullable = true)
|-- subcategory.subcategoryName: string (nullable = true)
|-- rank: string (nullable = true)
|-- amazonIsr: string (nullable = true)
|-- numberOfSellers: string (nullable = true)
|-- isVariation: string (nullable = true)
|-- monthlyRevenueEstimate: string (nullable = true)
|-- ttm: string (nullable = true)
|-- monthlyUnitsSold: string (nullable = true)
|-- listedSince: string (nullable = true)
|-- reviewCount: string (nullable = true)
|-- reviewRating: string (nullable = true)
|-- numberFbaSellers: string (nullable = true)
|-- buyBoxPrice: string (nullable = true)
|-- averageBuyBoxPrice: string (nullable = true)
|-- buyBoxEquity: string (nullable = true)
|-- revenueEquity: string (nullable = true)
|-- marginEquity: string (nullable = true)
|-- outOfStockNow: string (nullable = true)
|-- productPageScore: string (nullable = true)
|-- manufacturer: string (nullable = true)
|-- upc: string (nullable = true)
|-- partNumber: string (nullable = true)
|-- model: string (nullable = true)
|-- numberOfItems: string (nullable = true)
|-- totalRatings: string (nullable = true)
|-- momGrowth: string (nullable = true)
|-- momGrowth12: string (nullable = true)
|-- note: string (nullable = true)

>>> df.createOrReplaceTempView("Appliances") # no processing!
>>> spark.sql("use iceberg_data") # iceberg_data is a iceberg based catalog common to both spark & Presto.
>>> spark.sql("create schema amazon")
DataFrame[]
>>> spark.sql("create table amazon.appliances as select * from Appliances") # Actual ingestion
DataFrame[]
>>>

In the above run, we ingested data using spark, next let us look the data using Presto by using the common catalog i.e. iceberg_data. On the Presto terminal:

-- bin/presto-cli --catalog iceberg_data
presto> show schemas;
Schema
------------
amazon
example_03
test
(3 rows)

Query 20230717_115051_00002_i29jx, FINISHED, 1 node
Splits: 19 total, 19 done (100.00%)
0:01 [3 rows, 35B] [2 rows/s, 24B/s]

presto> use amazon;
USE
presto:amazon> show tables;
Table
------------
appliances
(1 row)

Query 20230717_115112_00004_i29jx, FINISHED, 1 node
Splits: 19 total, 19 done (100.00%)
0:01 [1 rows, 26B] [0 rows/s, 17B/s]

presto:amazon> select count(title), title from appliances group by title having count(title) > 1 LIMIT 2;
_col0 | title
-------+---------------------------------------------------------------------------------------------------------------------------------------
12 | Riley & Higgs Curved Front Espresso Range Hood Cover With Decorative Molding - Wall Mounted Wood Range Hood Covers, Plywood and Maple
12 | Riley & Higgs Curved Front Primmed Range Hood Cover With Decorative Molding - Wall Mounted Wood Range Hood Covers, Plywood and Maple
(2 rows)

Query 20230717_123802_00015_i29jx, FINISHED, 1 node
Splits: 54 total, 54 done (100.00%)
0:02 [23.1K rows, 822KB] [14.2K rows/s, 504KB/s]

It is also possible to ingest multiple files with same schema, by either giving path to a directory as follows:

>>> df = spark.read.csv("s3a://test/smartscout-extended", header=True)
>>> df.printSchema()
root
|-- asin: string (nullable = true)
|-- desc: string (nullable = true)
|-- about: string (nullable = true)
|-- aplus: string (nullable = true)

>>> df.inputFiles() # list of files ingested
['s3a://test/smartscout-extended/2-Arts,%20Crafts%20and%20Sewing-extended.csv', 's3a://test/smartscout-extended/12-Pet%20Supplies-extended.csv', 's3a://test/smartscout-extended/47-Apps%20and%20Games-extended.csv', 's3a://test/smartscout-extended/45-Home%20and%20Business%20Services-extended.csv', 's3a://test/smartscout-extended/8-Patio,%20Lawn%20and%20Garden-extended.csv', 's3a://test/smartscout-extended/21-Appliances-extended.csv', 's3a://test/smartscout-extended/44-Audible%20Books%20and%20Originals-extended.csv', 's3a://test/smartscout-extended/9-Clothing,%20Shoes%20and%20Jewelry-extended.csv', 's3a://test/smartscout-extended/37-Kitchen%20and%20Dining-extended.csv', 's3a://test/smartscout-extended/69-Climate%20Pledge%20Friendly-extended.csv', 's3a://test/smartscout-extended/3-Home%20and%20Kitchen-extended.csv', 's3a://test/smartscout-extended/22-Handmade%20Products-extended.csv', 's3a://test/smartscout-extended/32-Team%20Sports-extended.csv', 's3a://test/smartscout-extended/25-Gift%20Cards-extended.csv', 's3a://test/smartscout-extended/50-Spine-extended.csv', 's3a://test/smartscout-extended/68-Private%20Brands-extended.csv', 's3a://test/smartscout-extended/43-Kindle%20Store-extended.csv', 's3a://test/smartscout-extended/26-Entertainment-extended.csv', 's3a://test/smartscout-extended/6-Baby%20Products-extended.csv', 's3a://test/smartscout-extended/23-Video%20Games-extended.csv', 's3a://test/smartscout-extended/31-Books-extended.csv', 's3a://test/smartscout-extended/5-Sports%20and%20Outdoors-extended.csv', 's3a://test/smartscout-extended/38-Movies%20and%20TV-extended.csv', 's3a://test/smartscout-extended/70-Private%20Brands-extended.csv', 's3a://test/smartscout-extended/18-Office%20Products-extended.csv', 's3a://test/smartscout-extended/49-Purchase%20Circles-extended.csv', 's3a://test/smartscout-extended/62-Camera%20and%20Photo-extended.csv', 's3a://test/smartscout-extended/7-Tools%20and%20Home%20Improvement-extended.csv', 's3a://test/smartscout-extended/42-Digital%20Music-extended.csv', 's3a://test/smartscout-extended/46-Special%20Features-extended.csv', 's3a://test/smartscout-extended/20-Collectibles%20and%20Fine%20Art-extended.csv', 's3a://test/smartscout-extended/14-Musical%20Instruments-extended.csv', 's3a://test/smartscout-extended/11-Industrial%20and%20Scientific-extended.csv', 's3a://test/smartscout-extended/19-Cell%20Phones%20and%20Accessories-extended.csv', 's3a://test/smartscout-extended/53-Alexa%20Skills-extended.csv', 's3a://test/smartscout-extended/48-Digital%20Educational%20Resources-extended.csv', 's3a://test/smartscout-extended/4-Beauty%20and%20Personal%20Care-extended.csv', 's3a://test/smartscout-extended/27-CDs%20and%20Vinyl-extended.csv', 's3a://test/smartscout-extended/39-Everything%20Else-extended.csv', 's3a://test/smartscout-extended/1-Toys%20and%20Games-extended.csv', 's3a://test/smartscout-extended/55-Amazon%20Devices%20and%20Accessories-extended.csv', 's3a://test/smartscout-extended/15-Electronics-extended.csv', 's3a://test/smartscout-extended/30-Software-extended.csv', 's3a://test/smartscout-extended/56-Credit%20and%20Payment%20Cards-extended.csv', 's3a://test/smartscout-extended/72-Campaigns-extended.csv', 's3a://test/smartscout-extended/71-Climate%20Pledge%20Friendly-extended.csv', 's3a://test/smartscout-extended/51-Magazine%20Subscriptions-extended.csv', 's3a://test/smartscout-extended/59-Luxury%20Stores-extended.csv', 's3a://test/smartscout-extended/13-Automotive-extended.csv', 's3a://test/smartscout-extended/10-Grocery%20and%20Gourmet%20Food-extended.csv', 's3a://test/smartscout-extended/54-Amazon%20Explore-extended.csv', 's3a://test/smartscout-extended/57-Prime%20Video-extended.csv', 's3a://test/smartscout-extended/17-Health%20and%20Household-extended.csv']
>>> df.dropna().show(2) # dropna drops all the rows with null fields.

+--------------------+--------------------+--------------------+--------------------+
| asin| desc| about| aplus|
+--------------------+--------------------+--------------------+--------------------+
| SAFETY FEATURE...| auto-off and mor...| preparing side d...| appetizers|
| DISHWASHER SAF...| lids| discs| and blade assemb...|
+--------------------+--------------------+--------------------+--------------------+
only showing top 2 rows

Or specifying each path:

>>> df = spark.read.csv(path=["s3a://test/smartscout-extended/1-Toys and Games-extended.csv","s3a://test/smartscout-extended/10-Grocery and Gourmet Food-extended.csv"], header=True)
>>> df.inputFiles()
['s3a://test/smartscout-extended/1-Toys%20and%20Games-extended.csv', 's3a://test/smartscout-extended/10-Grocery%20and%20Gourmet%20Food-extended.csv']
>>> df.count()
73791

Next, let us look at ingestion using “user specified” schema. In the case of CSV, spark tries to prune the columns — based on the provided schema. The ordering of the columns (and/or no. of columns) in CSV and provided schema should be same. This can be adjusted by setting: spark.sql.csv.parser.columnPruning.enabled config to false (true by default).

Example user specified schema:

>>> df = spark.read.schema("asin STRING, desc STRING, about STRING, aplus STRING").csv("s3a://test/smartscout-extended", header=True)
>>> df.printSchema()
root
|-- asin: string (nullable = true)
|-- desc: string (nullable = true)
|-- about: string (nullable = true)
|-- aplus: string (nullable = true)

Effect of column pruning:

>>> spark.conf.set("spark.sql.csv.parser.columnPruning.enabled", "true")
>>> df = spark.read.schema("brandID STRING, subcat STRING, img STRING").option("enforceSchema", "true").option("columnPruning", "true").csv(path=["s3a://test/large/"], header=True)
>>> df.show(2)
23/07/25 06:52:10 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
Header length: 35, schema size: 3
CSV file: s3a://test/large/21-Appliances.csv
+-------+----------+--------------------+
|brandID| subcat| img|
+-------+----------+--------------------+
| 181347|2399939011|data:image/jpeg;b...|
| 297220| 3741161|data:image/jpeg;b...|
+-------+----------+--------------------+
only showing top 2 rows

Parquet format:

As shown above for CSV format — ORC, parquet and other compatible formats can be ingested in same way.

Finally, a generic ingestion python app for ingesting via Parquet/ORC/CSV etc…

# Sample code to
# 1. Read a file into spark DataFrame using inferred schema.
# 2. Register temp table for the Dataframe created above.
# 3. Connect to iceberg catalog
# 4. Create target schema in iceberg catalog
# 5. CTAS/Insert into target table
# 6. Print the record counts.

import sys

from pyspark.sql import SparkSession
from pyspark.sql.utils import AnalysisException

if __name__ == "__main__":
if len(sys.argv) < 6:
print(f"Usage: {__file__} <s3a://bucket_name/file.csv>"
" <input_format> <iceberg_catalog_name> <bucket_name> <target-schema>"
" <target-table_name>", file=sys.stderr)
sys.exit(-1)

input_file = sys.argv[1]
input_format = sys.argv[2]
iceberg_catalog = sys.argv[3]
bucket_name = sys.argv[4]
target_schema = sys.argv[5]
target_table = sys.argv[6]

spark = SparkSession \
.builder \
.appName(f"Ingest {input_format} {input_file} to presto "
f"{iceberg_catalog}.{target_schema}.{target_table}") \
.enableHiveSupport() \
.getOrCreate()

print(str(spark.sparkContext.getConf().toDebugString()))

# 1. Load CSV file in Spark. schema inferred.
df = spark.read.format(source=input_format).options(header=True).load(path=input_file)
r = df.count()
df.printSchema()
# 2. Register temp table
df.createOrReplaceTempView(target_table) # no processing!
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {iceberg_catalog}.{target_schema}"
f" LOCATION 's3a://{bucket_name}/{target_schema}/'")
# 4. Create table as select/ Insert select
try:
spark.sql(
f"create table {iceberg_catalog}.{target_schema}.{target_table}"
f" as select * from {target_table}").show() # Actual ingestion
except AnalysisException as ae: # if the table already exists append to it.
print(f"Unable to create cause: {ae.desc} - {ae.cause} - {ae.args}")
if ae.desc.startswith(f"Table {target_schema}.{target_table} already exists"):
print(f"Since table {target_schema}.{target_table} already"
f" exists trying appending to it.")
spark.sql(f"insert into {iceberg_catalog}.{target_schema}.{target_table}"
f" select * from {target_table}")

c = spark.sql(f"select * from {iceberg_catalog}.{target_schema}.{target_table}").count()
print(f"\n\nInput record: {r} - Updated records count of the target table : {c}\n\n")
spark.stop()

In the Part 2, we will explore export data. Export is particularly useful since as of today - presto with iceberg catalog, does not support exporting data.

References:

  1. A lab on same topic with FpGrowth to mine patterns. Link: https://github.com/IBM/watsonx-data/blob/main/tx3509-labs/Spark.md
  2. Part 2. Exporting data using Apache Spark.
  3. Setup your own lakehouse on your laptop: https://medium.com/@scrapcodes/interoperability-presto-iceberg-spark-4e5299ec7de5
  4. Setup IBM watsonx.data on your laptop: https://github.com/IBM/watsonx-data/tree/main/developer_package

--

--

Presto + Spark: A Lakehouse story.
Presto + Spark: A Lakehouse story.

Published in Presto + Spark: A Lakehouse story.

A series of blog talks about various aspects of lakehouse.

Prashant Sharma
Prashant Sharma

Written by Prashant Sharma

Committee member, maintainer, and contributor for Apache Spark, Kubeflow, and Tekton CD