Slowness of ML pipeline in Spark
It might be not very intuitive to link the SQL engine with ML pipeline. But actually in Spark, the ML library is supported by SparkSQL. The older MLlib component is based on RDD-based operation. As the Spark community continues to improve SparkSQL runtime and catalyst implementation, there are efforts to migrate it to DataFrame/Dataset-based operations which rely on SparkSQL.
With the increased complexity of SparkSQL, ML developers may not be able to grasp the details. It sometimes makes the efficient ML pipeline difficult to achieve, if you only follow the ML path in Spark.
One such example, is a config I’ve added into SparkSQL months ago: spark.sql.constraintPropagation.enabled. This SQL config is used to control the constraint propagation behavior during query optimization.
Let’s see the description of this config:
“When true, the query optimizer will infer and propagate data constraints in the query plan to optimize them. Constraint propagation can sometimes be computationally expensive for certain kinds of query plans (such as those with a large number of predicates and aliases) which might negatively impact overall runtime.”
Before a query is going to run, it goes through several phrases. Optimization is one of phrases. Query optimization deals with re-transforming original query plan, pruning unnecessary columns, reducing the amount of data read from data store, etc.
To make optimization work properly, we need to provide some information to the optimizer. Constraints are the invariants which hold true for all the rows produced by an operator in SparkSQL.
For example, in a query like SELECT * FROM sales WHERE sales.price > 10000, we can guarantee that all rows from the Filter operator satisfy price > 10000. This is so called constraints.
Although constraints are useful in query optimization, the process to obtain complete constraints (i.e., constraint propagation) can be computation expensive if you have a very huge query.
It is not rare to stack many ML stage in a ML pipeline. Because the algorithms are actually fulfilled by DataFrame/Dataset operations in Spark ML, it goes through the same optimization phrase.
Consider the following ML pipeline:
import org.apache.spark.ml.{Pipeline, PipelineStage}
import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer, VectorAssembler}
import org.apache.spark.sql.internal.SQLConfval df = (1 to 40).foldLeft(Seq((1, “foo”), (2, “bar”), (3, “baz”)).toDF(“id”, “x0”))((df, i) => df.withColumn(s”x$i”, $”x0"))val indexers = df.columns.tail.map(c => new StringIndexer()
.setInputCol(c)
.setOutputCol(s”${c}_indexed”)
.setHandleInvalid(“skip”))val encoders = indexers.map(indexer => new OneHotEncoder()
.setInputCol(indexer.getOutputCol)
.setOutputCol(s”${indexer.getOutputCol}_encoded”)
.setDropLast(true))val stages: Array[PipelineStage] = indexers ++ encoders
val pipeline = new Pipeline().setStages(stages)val startTime = System.nanoTime
pipeline.fit(df).transform(df).show
val runningTime = System.nanoTime — startTime
It takes about half of a hour to finish this before. Most of the time is spent locally to optimize the huge query plan. The time needed to run the ML algorithm is actually a little.
After this config spark.sql.constraintPropagation.enabled is added, we can simply disable constraint propagation by
spark.conf.set(SQLConf.CONSTRAINT_PROPAGATION_ENABLED.key, false)It can reduce the time for running the above pipeline to less than half of a minute.
Next time when you construct a ML pipeline and find the performance is bad, maybe it is just because the query optimization costs too much time. You can try to disable this config and see if the things improve.
