Presto + Spark Interoperability: Spark based Export.

Prashant Sharma
Presto + Spark: A Lakehouse story.
2 min readSep 22, 2023

In the Part 1 of this blog series, we discussed Spark based ingestion we learnt how Presto and Spark complement each other in a typical lakehouse setup. While the Part 1 focused on simple ingestion use cases, in this blog i.e. Part 2, we will focus on Exporting the data. This is particularly useful because Presto with iceberg catalog does not have a support for exporting.

Why is exporting the data even needed, esp. when we can store it in the lakehouse itself?

  • The processed data is to be consumed by another tool, for example feeding report dashboards or even training a machine learning model.
  • Sharing it with an external entity i.e. it may not be ideal to give access to the internal lakehouse (or the entire s3 bucket) for a one time access to a particular report for instance.
  • Lastly, we may export when the storage connector is only supported by a third party library not compatible with presto e.g. writing to a table in cassandra or elastic search via Apache spark.

Following sample pyspark App can be used for exporting data in any supported format e.g. CSV, parquet, ORC etc…

# Sample code to
# 1. Read a Spark table
# 2. write a file to s3 bucket/ or hadoop compatible file system (including hdfs/cassandra etc...)

import sys

from pyspark.sql import SparkSession

if __name__ == "__main__":
if len(sys.argv) < 6:
print(f"Usage: {__file__} <s3a://bucket_name/file.csv>"
" <format: json/csv/parquet/orc...> <iceberg_catalog_name> <from-schema> <from-table_name> <mode: e.g. append/overwrite>",
file=sys.stderr)
sys.exit(-1)

output_file = sys.argv[1]
output_format = sys.argv[2]
iceberg_catalog = sys.argv[3]
from_schema = sys.argv[4]
from_table = sys.argv[5]
mode = sys.argv[6]

spark = SparkSession \
.builder \
.appName(f"Export {output_format} from presto table: {iceberg_catalog}.{from_schema}.{from_table}") \
.enableHiveSupport() \
.getOrCreate()

print(str(spark.sparkContext.getConf().toDebugString()))

# 1. Load iceberg table in Spark dataframe.
df = spark.sql(f"select *from {iceberg_catalog}.{from_schema}.{from_table}")
r = df.count()
print(f"Table has : {r} rows.")
df.printSchema()
match output_format:
case "csv":
df.write.csv(output_file, header=True, mode=mode)
case _:
df.write.save(path=output_file, format=output_format, mode=mode)
spark.stop()

References:

  1. A lab on same topic with FpGrowth to mine patterns. Link: https://github.com/IBM/watsonx-data/blob/main/tx3509-labs/Spark.md
  2. Part 1. Spark based Ingestion
  3. Setup your own lakehouse on your laptop: https://medium.com/@scrapcodes/interoperability-presto-iceberg-spark-4e5299ec7de5
  4. Setup IBM watsonx.data on your laptop: https://github.com/IBM/watsonx-data/tree/main/developer_package

--

--

Prashant Sharma
Presto + Spark: A Lakehouse story.

Committee member, maintainer, and contributor for Apache Spark, Kubeflow, and Tekton CD