StarRocks Inside Scoop: The Magic Journey of an SQL Statement in StarRocks
--
SQL is a common programming language used for managing data stored in relational databases. It is simple but is capable of performing complex queries. Data analysts can use SQL to extract useful insights from data and power informed decision-making.
In a distributed relational database, an SQL statement roughly goes through the following three stages:
- Parsing the SQL text into an optimal distributed physical execution plan
- Scheduling and distributing the physical execution plan to multiple compute nodes
- Executing the plan on multiple compute nodes
This article dives deep into how StarRocks processes a SQL statement and why StarRocks can make a difference. This article aims to provide a new perspective for those who want to accelerate queries in a more cost-effective way.
Before we go deep into the tech details, let’s get familiar with two basic components in StarRocks:
- Frontend (FE): responsible for query parsing, optimization, scheduling, and metadata management
- Backend (BE): responsible for query execution and data storage
Stage 1: SQL Text to Execution Plan
StarRocks performs the following steps to translate a SQL text into a distributed physical execution plan:
- SQL Parse: parses the SQL text and breaks it up into an abstract syntax tree (AST).
- SQL Analyze: analyzes the syntax and semantics of the AST.
- SQL Logical Plan: converts the AST into a logical plan.
- SQL Optimize: rewrites and converts the logical plan based on relational algebra, statistical information, and the cost model. Then, choose a physical execution plan that has the “lowest” CPU cost.
- Generation of Plan Fragments: converts the physical execution plan into plan fragments that can be directly executed on multiple BEs.
SQL Parse
The SQL Parser receives an SQL string and generates an AST, in which each node is a parse node.
In the figure below, a SelectStmt is generated after the SQL string is parsed. SelectStmt may contain the following elements: SelectList, FromClause, WhereClause, GroupByClause, HavingPredicate, OrderByElement, and LimitElement. These elements virtually correspond to the elements in the SQL text.
Currently, StarRocks uses the ANTLR4 parser. You can obtain the syntax rules file by searching for “StarRocks g4” on GitHub.
SQL Analyze
After the AST is obtained, StarRocks performs the following steps to analyze the syntax and semantics of the AST:
- Check and bind metadata such as database, table, and column.
- Validate the statement. For example, the WHERE clause cannot contain grouping operations and SUM cannot be performed on HyperLogLog (HLL) and Bitmap columns.
- Process table and column aliases.
- Validate parameters in a function. For example, parameters passed to SUM must be of the numeric type; the second and third parameters in the LEAD and LAG window functions must be constants.
- Check and convert data types, for example, converting BIGINT values into DECIMAL values using CAST.
SQL Analyzer generates a hierarchy of relations. As shown in the following figure, the FROM clause corresponds to TableRelation and the subquery corresponds to SubqueryRelation.
SQL Logical Plan
StarRocks maps the relations into a logical plan tree. Each set operator corresponds to one logical node.
SQL Optimize
The SQL Optimizer receives a logical plan tree and generates a distributed physical plan tree that has the “lowest” cost.
Generally, a complex SQL statement means more tables to join and large data volume to process. An optimizer is critical because it may explore thousands of different ways of executing the same query and the performance of different execution methods may vary significantly. StarRocks developed a brand-new cost-based optimizer (CBO) based on Cascades and Optimal Reciprocal Collision Avoidance (ORCA). This CBO is deeply customized and optimized in combination with StarRocks’ execution engine and scheduler.
StarRocks CBO has full support for 99 TPC-DS SQL and has implemented several key features and optimizations, including common table expression (CTE) reuse, subquery rewriting, Lateral Join, Join Reorder, policy selection for distributed join execution, pushdown of global runtime filters, and global dictionary for low-cardinality optimization.
Logical Plan Rewrite
Before the logical plan enters the CBO, StarRocks performs a series of rewrites to generate a better plan. The major rewrite rules are as follows:
- Expression rewrite and simplification
- Column pruning
- Predicate pushdown
- Limit merge and Limit pushdown
- Aggregate merge
- Derivation of equivalent predicates (constant propagation)
- Outer join to inner join
- Constant folding
- CTE reuse
- Subquery rewrite
- Lateral join simplification
- Partitioning and bucketing
- Empty node optimization
- Empty Union, INTERSECT, EXCEPT pruning
- INTERSECT reorder
- Aggregate function rewrite, such as COUNT DISTINCT
CBO Transformation
After the logical plan is rewritten, the system performs cost-based optimization. These optimizations include:
- Multi-Stage Aggregate Optimization
Common aggregates (such as COUNT, SUM, MAX, MIN) will be split into two stages. One COUNT DISTINCT is split into three or four stages.
- Left/Right Table Adjustment
StarRocks always uses the right table to build a hash table. Therefore, the right table is usually the small table. StarRocks can automatically adjust the order of left and right tables based on cost. It can also automatically convert left join into right join.
- Multi-Table Join Reorder
When it comes to determining the optimal order for multi-table joins, the CBO looks at if the number of tables to join is less than or equal to 5. If it is, StarRocks performs Join Reorder based on the exchange law and association law. If the number is greater than 5, StarRocks performs Join Reorder based on the greedy algorithm and dynamic planning.
- Selection of Distributed Joins
StarRocks supports the following types of distributed joins: Broadcast, Shuffle, Unilateral Shuffle, Colocate, and Replicated. StarRocks chooses the “best” method to execute distributed joins based on cost estimation and the property enforcement mechanism.
- Lastly, pushing down aggregates to join and materialized view selection and rewrite optimizations are also considered.
During optimization, the logical plan is first translated into the data structure shown in under Memo in the figure above. All the logical plans and physical plans will be recorded in Memo, and Memo constitutes the entire search space.
In the following figure, StarRocks uses various rules to expand the search space and generate corresponding physical execution plans. It chooses a physical execution plan that has the lowest cost from Memo based on statistical information and cost estimation.
Statistical Information Collection and Cost Estimation
Whether a CBO is good hinges greatly on cost estimation, which depends on whether information is collected in a timely and accurate manner.
StarRocks allows for automatic and manual collection of table-level and column-level statistical information. Both of these collection methods support full and sampled collection.
StarRocks estimates cost based on statistical information, including resource factors such as CPU, memory, network, and I/O. Each resource factor is assigned a weight, and different execution operators have different cost formulas.
When you detect that your left and right tables, or the execution policy of distributed joins are inappropriate, you can collect statistical information by referring to StarRocks’ CBO.
Generation of Plan Fragments
StarRocks’ CBO outputs a distributed physical execution plan tree. However, the plan cannot be directly executed by BE nodes. It must be converted into plan fragments that BE can execute. The conversion process is a one-to-one mapping process as seen below.
Stage 2: Plan Scheduling
After a physical distributed plan is generated, the FE decomposes the plan into multiple plan fragments, generates an execution instance for each plan fragment, schedules the plan fragment instances, manages BE execution states, and receives query results.
To schedule the plan, the system must work out the following issues:
- Which BE executes which plan fragments?
- Which replicas of a tablet will be queried?
- How to schedule multiple plan fragments?
StarRocks FE first determines which BE nodes will execute the fragments on which the Scan operator is located. Each Scan operator has a list of tablets to access. For each tablet, StarRocks first selects replicas that are healthy and match the data version. StarRocks then randomly selects a replica to access and ensures balanced request distribution among BEs. Suppose we have 10 BEs and 10 tablets. Theoretically, one BE will scan one tablet.
After StarRocks FE determines the mapping between BEs and plan fragments on which the Scan operator is located, other plan fragment instances will also be executed on these BE nodes, but the allocation is random (you can also select other BE nodes by configuring parameters).
Then, the FE sends parameters related to plan fragment execution to the BEs through Thrift.
Currently, the FE schedules plan fragments in an All-At-Once manner. It traverses the plan fragment tree from top to bottom, and distributes the execution information of each plan fragment to the corresponding BE.
Stage 3: Plan Execution
StarRocks leverages a massively parallel processing (MPP) architecture to fully utilize the resources of multiple machines, the pipeline mechanism to fully utilize the resources of multiple cores on a single machine, and the vectorized execution engine to make full use of the resources of a single CPU core. These mechanisms work together to deliver blazing-fast query performance.
MPP: Parallel Execution on Multiple Machines
MPP is a large-scale parallel processing structure. The focus is to split one query plan into multiple computing instances that can be executed in parallel on multiple nodes. These nodes do not share CPU, memory, or disk resources. The query performance of MPP databases can be linearly enhanced as the cluster scales horizontally.
In the preceding figure, StarRocks logically splits one query into multiple query fragments. Each query fragment can have one or more fragment instances, and each instance will be scheduled to one BE. One query fragment can contain one or more operators. In the preceding figure, the query fragment has three operators: Scan, Filter, and Agg. Different fragments can be executed with different parallelism.
As shown in the figure above, multiple query fragments are executed in parallel in different pipelines in the memory, rather than stage-by-stage execution like a batch processing engine. The Shuffle operation (data redistribution) plays a key role in improving query performance. It is also important for achieving high-cardinality aggregation and large table joins.
Pipeline: Parallel Execution on One Stand-Alone Machine
StarRocks introduces the concept of pipeline between fragments and operators. Data in a pipeline does not need to be materialized before it reaches its destination. If the physical plan contains operators that require materialization (such as Agg, Sort, and Join), a new pipeline needs to be split from the original one. Therefore, one fragment can be executed using multiple pipelines.
As shown in the preceding figure, one pipeline consists of multiple operators. The first operator is the source operator, which is responsible for generating data. The last operator is the sink operator, which is responsible for materializing or consuming data. The operators in between are responsible for transforming data.
So how can pipelines run in parallel? Pipelines, like fragments, can generate multiple instances. Each instance is called a pipeline driver. When a pipeline needs to run with N
parallelism, it will generate N
pipeline drivers. In the preceding figure, the pipeline generates three pipeline drivers.
During pipeline execution, if the current operator can generate data and the next operator can consume data, the execution thread of the pipeline will pull data from the current operator and then push the data to the next operator. The execution states of each pipeline are clear: Ready, Running, and Blocked. If the current operator cannot generate data or the next operator does not need to consume data, the pipeline will be in the Blocked state.
As shown in the figure, the core of the pipeline parallel execution framework is to implement coroutine scheduling, which no longer relies on the kernel-mode thread scheduling of the operating system. This reduces the cost of thread creation, thread destruction, and thread context switching.
In the parallel execution framework, StarRocks starts execution threads equal to the number of CPU cores. Each execution thread will pick up Ready pipeline drivers from a multi-level feedback queue (MLFQ). At the same time, a global poller thread constantly checks whether the pipeline drivers in the blocked queue become unblocked. If so, such pipeline drivers will be moved from the blocked queue to the MLFQ.
Vectorized Execution
The capacity bottleneck of query execution in a database has gradually changed from I/O to CPU. To maximize the execution performance of CPUs, StarRocks reinvents the entire execution engine based on vectorization technology. Vectorized execution can make full use of the capabilities of a single CPU core and speed up queries.
Vectorization is mainly about the vectorization of operators and expressions. The goal of operator and expression vectorization is to achieve batch, columnar execution. Compared to row-based execution, batch execution requires fewer virtual function calls and fewer branch judgments, and columnar execution is more friendly to CPU cache and SIMD optimization.
However, vectorized execution is more than operator and expression vectorization. It is a huge and complex performance optimization project which involves columnar organization of data in disks, memory, and networks, the redesign of data structures, algorithms, and memory management, SIMD instruction optimization, CPU cache optimization, and C++ level optimization. After continuous improvements, StarRocks’ vectorized execution engine delivers a 5x to 10x query performance improvement over row-based execution engines.
Take the Next Step in Your Journey
This article provides a great foundation in understanding how StarRocks processes SQL statements and the ways these capabilities contribute to its amazing performance. There’s still a lot more to learn, however. If this article has you interested in StarRocks, we encourage you to visit the project’s website, check out its GitHub page, and join the community on Slack.
And if you like what you’ve read here today but are looking for something a bit enterprise-friendly, take a look at CelerData.