How SSB Queries run in StarRocks
This article uses a typical SSB query as an example to explain the execution process of a query (Query 1) within StarRocks. The goal is to help readers better understand how queries are executed, so that they can subsequently analyze more complex queries independently and solve everyday issues.
To simplify the analysis, the test environment consists of two CN nodes, focusing on the execution process in a shared data cluster. However, this is the same for the share nothing mode.
Query
select sum(lo_extendedprice*lo_discount) as revenue from lineorder, dates where lo_orderdate = d_datekey and d_year = 1993 and lo_discount between 1 and 3 and lo_quantity < 25;
Query Plan
mysql> explain select sum(lo_extendedprice*lo_discount) as revenue from lineorder, dates where lo_orderdate = d_datekey and d_year = 1993 and lo_discount between 1 and 3 and lo_quantity < 25;
+------------------------------------------------------------------------------------------+
| Explain String |
+------------------------------------------------------------------------------------------+
| PLAN FRAGMENT 0 |
| OUTPUT EXPRS:36: sum |
| PARTITION: UNPARTITIONED |
| |
| RESULT SINK |
| |
| 9:AGGREGATE (merge finalize) |
| | output: sum(36: sum) |
| | group by: |
| | |
| 8:EXCHANGE |
| |
| PLAN FRAGMENT 1 |
| OUTPUT EXPRS: |
| PARTITION: RANDOM |
| |
| STREAM DATA SINK |
| EXCHANGE ID: 08 |
| UNPARTITIONED |
| |
| 7:AGGREGATE (update serialize) |
| | output: sum(CAST(10: lo_extendedprice AS BIGINT) * CAST(12: lo_discount AS BIGINT)) |
| | group by: |
| | |
| 6:Project |
| | <slot 10> : 10: lo_extendedprice |
| | <slot 12> : 12: lo_discount |
| | |
| 5:HASH JOIN |
| | join op: INNER JOIN (BROADCAST) |
| | colocate: false, reason: |
| | equal join conjunct: 6: lo_orderdate = 18: d_datekey |
| | |
| |----4:EXCHANGE |
| | |
| 1:Project |
| | <slot 6> : 6: lo_orderdate |
| | <slot 10> : 10: lo_extendedprice |
| | <slot 12> : 12: lo_discount |
| | |
| 0:OlapScanNode |
| TABLE: lineorder |
| PREAGGREGATION: ON |
| PREDICATES: 12: lo_discount >= 1, 12: lo_discount <= 3, 9: lo_quantity < 25 |
| partitions=7/7 |
| rollup: lineorder |
| tabletRatio=28/28 |
| tabletList=98793,98794,98795,98796,98797,98798,98799,98800,98801,98802 ... |
| cardinality=58779223 |
| avgRowSize=16.0 |
| |
| PLAN FRAGMENT 2 |
| OUTPUT EXPRS: |
| PARTITION: RANDOM |
| |
| STREAM DATA SINK |
| EXCHANGE ID: 04 |
| UNPARTITIONED |
| |
| 3:Project |
| | <slot 18> : 18: d_datekey |
| | |
| 2:OlapScanNode |
| TABLE: dates |
| PREAGGREGATION: ON |
| PREDICATES: 22: d_year = 1993 |
| partitions=1/1 |
| rollup: dates |
| tabletRatio=1/1 |
| tabletList=98839 |
| cardinality=365 |
| avgRowSize=8.0 |
+------------------------------------------------------------------------------------------+
72 rows in set (0.01 sec)
Analyzing the Query Plan we can see that three Fragments are generated:
Fragment 2: Scans the dates table, retrieves the specific column d_datekey (Project), and then exchanges data with Fragment 1 via the EXCHANGE operator. In the visualized Profile, it looks like as this:
Fragment 1: Scans the lineorder table, retrieves specific columns lo_orderdate, lo_extendedprice, and lo_discount via the PROJECT operator, then exchanges data from Fragment 2 to perform a hash join operation. Afterward, it uses the Project operator to obtain the contents of the two columns lo_extendedprice and lo_discount. Next, the AGG operator will compute the sum (sum(CAST(10: lo_extendedprice AS BIGINT) * CAST(12: lo_discount AS BIGINT))). In the visualized Profile, it looks like this
Fragment 0: Not very clear.
It can be seen that Fragments are divided by the EXCHANGE operator. Once an EXCHANGE is needed, a new Fragment is planned, and different Fragments pass data through the EXCHANGE operator.
Fragment Execution
We’ll take Fragment 1 (which is more complex) as an example. From the Profile, the key execution process within the Fragment can be observed.
Since the test cluster has two CN nodes and each partition has 4 tablets, it can be observed that Fragment 1 generated two fragment instances, which were sent to two CN nodes. The logs show that each CN node generated one instance, as follows:
Fragment 1:
BackendAddresses: 172.26.92.212:9060, 172.26.92.212:9062
InstanceIds: 601a635c-6ce4–11ef-8e27–2af996751df8, 601a635c-6ce4–11ef-8e27–2af996751df9
BackendNum: 2
Each Fragment instance contains half of the tablets from all partitions.
# The first CN
I20240907 14:42:39.257463 139780171531840 fragment_executor.cpp:452] fragment instance: 601a635c-6ce4-11ef-8e27-2af996751df8, scan ranges: db: , table: , partition: 98784, tablet: 98794
I20240907 14:42:39.257468 139780171531840 fragment_executor.cpp:452] fragment instance: 601a635c-6ce4-11ef-8e27-2af996751df8, scan ranges: db: , table: , partition: 98784, tablet: 98796
I20240907 14:42:39.257471 139780171531840 fragment_executor.cpp:452] fragment instance: 601a635c-6ce4-11ef-8e27-2af996751df8, scan ranges: db: , table: , partition: 98785, tablet: 98798
I20240907 14:42:39.257473 139780171531840 fragment_executor.cpp:452] fragment instance: 601a635c-6ce4-11ef-8e27-2af996751df8, scan ranges: db: , table: , partition: 98785, tablet: 98800
I20240907 14:42:39.257476 139780171531840 fragment_executor.cpp:452] fragment instance: 601a635c-6ce4-11ef-8e27-2af996751df8, scan ranges: db: , table: , partition: 98786, tablet: 98802
I20240907 14:42:39.257478 139780171531840 fragment_executor.cpp:452] fragment instance: 601a635c-6ce4-11ef-8e27-2af996751df8, scan ranges: db: , table: , partition: 98786, tablet: 98804
I20240907 14:42:39.257480 139780171531840 fragment_executor.cpp:452] fragment instance: 601a635c-6ce4-11ef-8e27-2af996751df8, scan ranges: db: , table: , partition: 98787, tablet: 98806
I20240907 14:42:39.257483 139780171531840 fragment_executor.cpp:452] fragment instance: 601a635c-6ce4-11ef-8e27-2af996751df8, scan ranges: db: , table: , partition: 98787, tablet: 98808
I20240907 14:42:39.257485 139780171531840 fragment_executor.cpp:452] fragment instance: 601a635c-6ce4-11ef-8e27-2af996751df8, scan ranges: db: , table: , partition: 98788, tablet: 98810
I20240907 14:42:39.257487 139780171531840 fragment_executor.cpp:452] fragment instance: 601a635c-6ce4-11ef-8e27-2af996751df8, scan ranges: db: , table: , partition: 98788, tablet: 98812
I20240907 14:42:39.257490 139780171531840 fragment_executor.cpp:452] fragment instance: 601a635c-6ce4-11ef-8e27-2af996751df8, scan ranges: db: , table: , partition: 98789, tablet: 98814
I20240907 14:42:39.257493 139780171531840 fragment_executor.cpp:452] fragment instance: 601a635c-6ce4-11ef-8e27-2af996751df8, scan ranges: db: , table: , partition: 98789, tablet: 98816
I20240907 14:42:39.257495 139780171531840 fragment_executor.cpp:452] fragment instance: 601a635c-6ce4-11ef-8e27-2af996751df8, scan ranges: db: , table: , partition: 98790, tablet: 98818
I20240907 14:42:39.257497 139780171531840 fragment_executor.cpp:452] fragment instance: 601a635c-6ce4-11ef-8e27-2af996751df8, scan ranges: db: , table: , partition: 98790, tablet: 98820
# The second CN
I20240907 14:42:39.273023 140253722486336 fragment_executor.cpp:452] fragment instance: 601a635c-6ce4-11ef-8e27-2af996751df9, scan ranges: db: , table: , partition: 98784, tablet: 98793
I20240907 14:42:39.273028 140253722486336 fragment_executor.cpp:452] fragment instance: 601a635c-6ce4-11ef-8e27-2af996751df9, scan ranges: db: , table: , partition: 98784, tablet: 98795
I20240907 14:42:39.273031 140253722486336 fragment_executor.cpp:452] fragment instance: 601a635c-6ce4-11ef-8e27-2af996751df9, scan ranges: db: , table: , partition: 98785, tablet: 98797
I20240907 14:42:39.273033 140253722486336 fragment_executor.cpp:452] fragment instance: 601a635c-6ce4-11ef-8e27-2af996751df9, scan ranges: db: , table: , partition: 98785, tablet: 98799
I20240907 14:42:39.273035 140253722486336 fragment_executor.cpp:452] fragment instance: 601a635c-6ce4-11ef-8e27-2af996751df9, scan ranges: db: , table: , partition: 98786, tablet: 98801
I20240907 14:42:39.273038 140253722486336 fragment_executor.cpp:452] fragment instance: 601a635c-6ce4-11ef-8e27-2af996751df9, scan ranges: db: , table: , partition: 98786, tablet: 98803
I20240907 14:42:39.273040 140253722486336 fragment_executor.cpp:452] fragment instance: 601a635c-6ce4-11ef-8e27-2af996751df9, scan ranges: db: , table: , partition: 98787, tablet: 98805
I20240907 14:42:39.273043 140253722486336 fragment_executor.cpp:452] fragment instance: 601a635c-6ce4-11ef-8e27-2af996751df9, scan ranges: db: , table: , partition: 98787, tablet: 98807
I20240907 14:42:39.273045 140253722486336 fragment_executor.cpp:452] fragment instance: 601a635c-6ce4-11ef-8e27-2af996751df9, scan ranges: db: , table: , partition: 98788, tablet: 98809
I20240907 14:42:39.273047 140253722486336 fragment_executor.cpp:452] fragment instance: 601a635c-6ce4-11ef-8e27-2af996751df9, scan ranges: db: , table: , partition: 98788, tablet: 98811
I20240907 14:42:39.273050 140253722486336 fragment_executor.cpp:452] fragment instance: 601a635c-6ce4-11ef-8e27-2af996751df9, scan ranges: db: , table: , partition: 98789, tablet: 98813
I20240907 14:42:39.273052 140253722486336 fragment_executor.cpp:452] fragment instance: 601a635c-6ce4-11ef-8e27-2af996751df9, scan ranges: db: , table: , partition: 98789, tablet: 98815
I20240907 14:42:39.273054 140253722486336 fragment_executor.cpp:452] fragment instance: 601a635c-6ce4-11ef-8e27-2af996751df9, scan ranges: db: , table: , partition: 98790, tablet: 98817
I20240907 14:42:39.273057 140253722486336 fragment_executor.cpp:452] fragment instance: 601a635c-6ce4-11ef-8e27-2af996751df9, scan ranges: db: , table: , partition: 98790, tablet: 98819
Additionally, Fragment 1 built 5 Pipelines, which can be seen in detail from the profile:
Pipeline (id=4):
EXCHANGE_SINK (plan_node_id=8):
...
AGGREGATE_BLOCKING_SOURCE (plan_node_id=7):
...
Pipeline (id=3):
AGGREGATE_BLOCKING_SINK (plan_node_id=7):
...
PROJECT (plan_node_id=6):
...
CHUNK_ACCUMULATE (plan_node_id=5):
...
HASH_JOIN_PROBE (plan_node_id=5):
...
LOCAL_EXCHANGE_SOURCE (plan_node_id=5):
...
Pipeline (id=2):
LOCAL_EXCHANGE_SINK (plan_node_id=5):
...
PROJECT (plan_node_id=1):
...
CHUNK_ACCUMULATE (plan_node_id=0):
...
CONNECTOR_SCAN (plan_node_id=0):
...
Pipeline (id=1):
HASH_JOIN_BUILD (plan_node_id=5):
...
LOCAL_EXCHANGE_SOURCE (plan_node_id=5):
...
Pipeline (id=0):
LOCAL_EXCHANGE_SINK (plan_node_id=5):
...
EXCHANGE_SOURCE (plan_node_id=4):
...
Based on the flow of the data, the Pipeline diagram can be drawn (numbers represent Plan Node ID):
Pipeline 1 to 5 looks like:
As we can see, Fragment Instance 1 is executed around the Hash Join (Plan node 5) operator. Pipelines 2 and 3 are responsible for reading the tablet data from the current node and passing it to the Hash Join probe, while Pipelines 0 and 1 read data from Fragment Instance 2 (located on other CN) through the Exchange operator and build the Hash Join. Pipeline 3 passes the result of the Hash Join to the Project and Agg operators, and the final result is passed by Pipeline 4 through the Exchange Sink operator to Fragment 0.
I hope you found this SSB query tutorial useful, and if you did, join me on Slack [Slack URL = https://try.starrocks.com/join-starrocks-on-slack] and let me know what you think.