Presto + Spark Interoperability: Spark based Export.
In the Part 1 of this blog series, we discussed Spark based ingestion we learnt how Presto and Spark complement each other in a typical lakehouse setup. While the Part 1 focused on simple ingestion use cases, in this blog i.e. Part 2, we will focus on Exporting the data. This is particularly useful because Presto with iceberg catalog does not have a support for exporting.
Why is exporting the data even needed, esp. when we can store it in the lakehouse itself?
- The processed data is to be consumed by another tool, for example feeding report dashboards or even training a machine learning model.
- Sharing it with an external entity i.e. it may not be ideal to give access to the internal lakehouse (or the entire s3 bucket) for a one time access to a particular report for instance.
- Lastly, we may export when the storage connector is only supported by a third party library not compatible with presto e.g. writing to a table in cassandra or elastic search via Apache spark.
Following sample pyspark App can be used for exporting data in any supported format e.g. CSV, parquet, ORC etc…
- Presto does not support iceberg vectorized read/write, this needs to be disabled in spark.
SET spark.sql.iceberg.vectorization.enabled=false;
More on: https://medium.com/@scrapcodes/interoperability-presto-iceberg-spark-4e5299ec7de5#28f0
# Sample code to
# 1. Read a Spark table
# 2. write a file to s3 bucket/ or hadoop compatible file system (including hdfs/cassandra etc...)
import sys
from pyspark.sql import SparkSession
if __name__ == "__main__":
if len(sys.argv) < 6:
print(f"Usage: {__file__} <s3a://bucket_name/file.csv>"
" <format: json/csv/parquet/orc...> <iceberg_catalog_name> <from-schema> <from-table_name> <mode: e.g. append/overwrite>",
file=sys.stderr)
sys.exit(-1)
output_file = sys.argv[1]
output_format = sys.argv[2]
iceberg_catalog = sys.argv[3]
from_schema = sys.argv[4]
from_table = sys.argv[5]
mode = sys.argv[6]
spark = SparkSession \
.builder \
.appName(f"Export {output_format} from presto table: {iceberg_catalog}.{from_schema}.{from_table}") \
.enableHiveSupport() \
.getOrCreate()
print(str(spark.sparkContext.getConf().toDebugString()))
# 1. Load iceberg table in Spark dataframe.
df = spark.sql(f"select *from {iceberg_catalog}.{from_schema}.{from_table}")
r = df.count()
print(f"Table has : {r} rows.")
df.printSchema()
match output_format:
case "csv":
df.write.csv(output_file, header=True, mode=mode)
case _:
df.write.save(path=output_file, format=output_format, mode=mode)
spark.stop()
References:
- A lab on same topic with FpGrowth to mine patterns. Link: https://github.com/IBM/watsonx-data/blob/main/tx3509-labs/Spark.md
- Part 1. Spark based Ingestion
- Setup your own lakehouse on your laptop: https://medium.com/@scrapcodes/interoperability-presto-iceberg-spark-4e5299ec7de5
- Setup IBM watsonx.data on your laptop: https://github.com/IBM/watsonx-data/tree/main/developer_package