Chaining Custom PySpark DataFrame Transformations
Matthew Powers
2994

Koby Karp Matthew Powers

This also seems to work for me (in contrast to reversed compose):

from toolz import pipe
dest_df = pipe(source_df,
with_stuff1(arg1="nice", arg2="person"),
with_stuff2(arg="yoyo")
)

NOTE: you need to explicit define the arguments (e.g. arg1=…)
NOTE: you need your df’s to passed in as the first argument:

from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
from toolz import pipe, curry


@curry
def with_stuff1(df, arg1, arg2):
print(df)
return df.withColumn("stuff1", lit("{0} {1}".format(arg1, arg2)))

@curry
def with_stuff2(df, arg):
print(df)
return df.withColumn("stuff2", lit("{}".format(arg)))

data = [("jose", 1), ("li", 2), ("liz", 3)]
spark = SparkSession.builder.getOrCreate()
source_df = spark.createDataFrame(data, ["name", "age"])

source_df.show()
dest_df = pipe(source_df,
with_stuff1(arg1="nice", arg2="person"),
with_stuff2(arg="yoyo")
)
dest_df.show()