MPP Pipeline VS Grouped Execution VS Stage By Stage

Kaisen Kang
StarRocks Engineering
5 min readJul 7, 2023

Presto recently published a new paper, Presto: A Decade of SQL Analytics at Meta . It just so happens that Presto supports three distributed execution modes: MPP Pipeline, Grouped Execution, and Stage By Stage. I want to compare these three execution modes.

What Is In MPP Pipeline

Mpp pipeline VS Map Reduce

As shown in the figure, this article specifically refers to the MPP query engine represented by StarRocks and Presto, which has the following characteristics:

  • The entire distributed execution is all In Memory
  • The entire distributed execution is Pipeline, and each Fragment will stream the transmission data to the next Fragment. It is not necessary for a Fragment to process all the data and then transmit it to the next Fragment.
  • Support Shuffle, Shuffle is In Memory and Streaming

What Is Grouped Execution

Colocate join

Grouped Execution is actually easy to understand. You can understand it from the Colocate join. As shown in the above figure, the data of the two tables T1 and T2 have been bucketed according to the Id column when imported, so the data with the same Id is on one node. Join There is no need to re-shuffle, but there is another important point: after the execution of each bucketed Join, the final result of the Join can be returned directly, so when we run out of memory, we don’t need all nodes or buckets The above Join starts at the same time. We can execute the Join operation one by one bucket by bucket, so that the memory resources we need will be reduced by dozens of times. At the same time, retry and fault tolerance can be carried out according to the granularity of buckets. This is Grouped Execution core principles.

What if the data of the two tables is not bucketed according to the Join Column in advance? The answer is simple, as shown in the figure below, it is the same as processing Shuffle Join when we query, just add an additional Shuffle, and persist the result of Shuffle, and then the process is the same as the Colocate processing process described above.

What Is Stage By Stage

Mpp pipeline VS Map Reduce

As shown in the figure, this article specifically refers to batch processing engines represented by MapReduce and Spark, which have the following characteristics:

  • The entire distributed execution is Stage By Stage, and each Stage will be placed on the disk
  • Shuffle needs to persist the result to disk

MPP Pipeline VS Stage By Stage

1 End-to-end latency:

The MPP query engine represented by StarRocks and Presto has lower end-to-end latency, because each Fragment will immediately send the data to the downstream, and the time for the user to receive the first piece of data is shorter.

2 Fault tolerance or batch processing capability:

The execution mode of Stage By Stage represented by Spark has better fault tolerance, because at the end of each Stage execution is a materialization point or Checkpoint point, it can be recovered from the previous Stage after failure, but once the MPP Pipeline execution mode fails, it will The entire query needs to be re-executed, so it is not suitable for executing SQL that takes a long time to execute in the ETL class.

3 Memory usage:

MPP Pipeline may require more memory for complex queries, because the Stage By Stage mode can release the memory immediately after each Stage is executed, but the MPP Pipeline mode may need to keep multiple Hash tables in multiple Fragments (Hash Join and Hash Aggregate), these Hash tables can be released together only after the execution of the entire query is completed.

4 Scheduling methods:

StarRocks and Presto are All At Once scheduling, and all Fragments of the entire query need to be ready, but Spark is Stage By Stage, and tasks are started batch by stage. (Note: Presto also supports phased scheduling, which optimizes this)

5 Scalability:

StarRocks and Presto In Memory Shuffle may be limited by resources such as memory and number of connections when there are hundreds or thousands of nodes

6 Adaptive execution:

The execution method of Spark Stage By Stage is easier to collect statistical information after each Stage, re-plan the Plan, and the MPP pipeline mode is more difficult to adapt

7 Slow node problem:

Relying on speculative execution and fault tolerance, Spark’s Stage By Stage execution method can handle slow node problems relatively better.

Grouped Execution VS Stage By Stage

Because both Grouped Execution and Stage By Stage want to solve the problem of ETL execution, our comparison mainly focuses on ETL-related points.

1 Fault tolerance granularity:

The granularity of Grouped Execution fault tolerance is relatively coarse, and Shuffle must be used as the fault tolerance point, while Spark can use any Mapper and Reducer as the fault tolerance point, and the granularity of query retry is finer, and Mapper and Reducer can be independently scheduled and retried

Grouped Execution cannot tolerate node-granularity Crash, but Spark can

2 Isolation levels:

The isolation granularity of Grouped Execution is at the node level, while Spark is at the Container level, with finer granularity

3 Scalability and stability:

Grouped Execution is still In Memory’s Shuffle, and its stability is not as good as Spark’s Shuffle Service.

Due to the above defects of Grouped Execution, Presto initiated the Presto on Spark project and gradually replaced Grouped Execution, which also declared the failure of the Grouped Execution project.

References

1. Presto 2018 Paper: Presto: SQL on Everything

2. Presto 2023 paper: Presto: A Decade of SQL Analytics at Meta

3. Presto Unlimited: MPP SQL Engine at Scale

4. Scaling with Presto on Spark

--

--

Kaisen Kang
StarRocks Engineering

StarRocks Query Team Leader, StarRocks PMC, Apache Kylin PMC(Inactive), Apache Doris PMC (Retired) , https://www.bcmeng.com/