Query Plans — Analyse SQL Performance In Trino
Another Sunday and another #datapains to discuss.
Today I want to dig deeper into how we can understand a query plan in trino.
SQL Order Of Execution — Let’s get reminded
Before we start looking into this, lets get reminded on the order of execution in a SQL query.
- FROM, JOIN
- WHERE
- GROUP BY
- HAVING
- SELECT
- DISTINCT
- ORDER BY
- LIMIT
This will help us as we read the query plan.
How to determine if your SQL is performant?⚡
Before a query can be planned, the engine also needs to:
- Identify tables
- Identify columns used in a query
The SQL , a basic count of job titles where we group by per department.
EXPLAIN ANALYZE WITH
count_titles AS (
SELECT
department,
COUNT(job_title) AS count_job_titles
FROM lakehouse.bronze.jobs
GROUP BY 1
)
SELECT * FROM count_titles- The SQL query uses a Common Table Expression (CTE) named
count_titlesto simplify the query structure and improve readability. - It begins by selecting data from the
lakehouse.bronze.jobstable. - Within the CTE, it groups the data by the
departmentcolumn. - For each department group, it counts the number of occurrences of
job_titleand aliases it ascount_job_titles. - After the CTE definition, the main query selects all columns from the
count_titlesCTE. - The main purpose of the query is to retrieve the count of job titles for each department from the
lakehouse.bronze.jobstable.
Query Plan 📣
Queued: 1.84ms, Analysis: 85.69ms, Planning: 58.35ms, Execution: 450.11ms
Fragment 1 [HASH]
CPU: 7.57ms, Scheduled: 11.12ms, Blocked 1.80s (Input: 933.53ms, Output: 0.00ns), Input: 8 rows (176B); per task: avg.: 4.00 std.dev.: 2.00, Output: 3 rows (67B)
Amount of input data processed by the workers for this stage might be skewed
Output layout: [department, count]
Output partitioning: SINGLE []
Aggregate[type = FINAL, keys = [department]]
│ Layout: [department:varchar, count:bigint]
│ Estimates: {rows: 3 (68B), cpu: 226, memory: 68B, network: 0B}
│ CPU: 3.00ms (15.00%), Scheduled: 3.00ms (6.52%), Blocked: 0.00ns (0.00%), Output: 3 rows (67B)
│ Input avg.: 1.00 rows, Input std.dev.: 132.29%
│ count := count(count_0)
└─ LocalExchange[partitioning = HASH, arguments = [department::varchar]]
│ Layout: [department:varchar, count_0:bigint]
│ Estimates: {rows: 10 (226B), cpu: 226, memory: 0B, network: 0B}
│ CPU: 1.00ms (5.00%), Scheduled: 1.00ms (2.17%), Blocked: 716.00ms (31.49%), Output: 8 rows (176B)
│ Input avg.: 1.00 rows, Input std.dev.: 86.60%
└─ RemoteSource[sourceFragmentIds = [2]]
Layout: [department:varchar, count_0:bigint]
CPU: 0.00ns (0.00%), Scheduled: 1.00ms (2.17%), Blocked: 933.00ms (41.03%), Output: 8 rows (176B)
Input avg.: 1.00 rows, Input std.dev.: 86.60%
Fragment 2 [HASH]
CPU: 11.24ms, Scheduled: 19.46ms, Blocked 860.54ms (Input: 444.24ms, Output: 0.00ns), Input: 10 rows (362B); per task: avg.: 5.00 std.dev.: 2.00, Output: 8 rows (176B)
Amount of input data processed by the workers for this stage might be skewed
Output layout: [department, count_0]
Output partitioning: HASH [department]
Aggregate[type = PARTIAL, keys = [department]]
│ Layout: [department:varchar, count_0:bigint]
│ Estimates: {rows: 10 (226B), cpu: ?, memory: ?, network: ?}
│ CPU: 4.00ms (20.00%), Scheduled: 7.00ms (15.22%), Blocked: 0.00ns (0.00%), Output: 8 rows (176B)
│ Input avg.: 1.25 rows, Input std.dev.: 118.32%
│ count_0 := count(job_title)
└─ Aggregate[type = FINAL, keys = [department, job_title]]
│ Layout: [department:varchar, job_title:varchar]
│ Estimates: {rows: 10 (362B), cpu: 362, memory: 362B, network: 0B}
│ CPU: 1.00ms (5.00%), Scheduled: 2.00ms (4.35%), Blocked: 0.00ns (0.00%), Output: 10 rows (362B)
│ Input avg.: 1.25 rows, Input std.dev.: 118.32%
└─ LocalExchange[partitioning = HASH, arguments = [department::varchar, job_title::varchar]]
│ Layout: [department:varchar, job_title:varchar]
│ Estimates: {rows: 10 (362B), cpu: 362, memory: 0B, network: 0B}
│ CPU: 0.00ns (0.00%), Scheduled: 0.00ns (0.00%), Blocked: 181.00ms (7.96%), Output: 10 rows (362B)
│ Input avg.: 1.25 rows, Input std.dev.: 190.79%
└─ RemoteSource[sourceFragmentIds = [3]]
Layout: [department:varchar, job_title:varchar]
CPU: 0.00ns (0.00%), Scheduled: 0.00ns (0.00%), Blocked: 444.00ms (19.53%), Output: 10 rows (362B)
Input avg.: 1.25 rows, Input std.dev.: 190.79%
Fragment 3 [SOURCE]
CPU: 11.49ms, Scheduled: 32.68ms, Blocked 0.00ns (Input: 0.00ns, Output: 0.00ns), Input: 10 rows (382B); per task: avg.: 10.00 std.dev.: 0.00, Output: 10 rows (362B)
Output layout: [department, job_title]
Output partitioning: HASH [department, job_title]
Aggregate[type = PARTIAL, keys = [department, job_title]]
│ Layout: [department:varchar, job_title:varchar]
│ Estimates: {rows: 10 (362B), cpu: ?, memory: ?, network: ?}
│ CPU: 1.00ms (5.00%), Scheduled: 3.00ms (6.52%), Blocked: 0.00ns (0.00%), Output: 10 rows (362B)
│ Input avg.: 10.00 rows, Input std.dev.: 0.00%
└─ TableScan[table = lakehouse:bronze.jobs]
Layout: [job_title:varchar, department:varchar]
Estimates: {rows: 10 (362B), cpu: 362, memory: 0B, network: 0B}
CPU: 10.00ms (50.00%), Scheduled: 29.00ms (63.04%), Blocked: 0.00ns (0.00%), Output: 10 rows (382B)
Input avg.: 10.00 rows, Input std.dev.: 0.00%
job_title := job_title:varchar:REGULAR
department := department:varchar:REGULAR
Input: 10 rows (382B), Physical input: 1.32kB, Physical input time: 7.50msOverview of the Query Output And Plan
- Queued: 1.84ms
- Analysis: 85.69ms
- Planning: 58.35ms
- Execution: 467.68ms
Fragment Breakdown
The query execution is divided into three main fragments. Each fragment represents a stage in the execution process.
Fragment 1 [HASH]
Role: This fragment handles the final aggregation of the results.
Performance:
- CPU: 7.57ms
- Scheduled: 11.12ms
- Blocked: 1.80s (mainly waiting for data from other fragments)
Input/Output:
- Input: 8 rows (176B)
- Output: 3 rows (67B)
- Output Layout:
[department, count]
Operations:
- Aggregation: Final aggregation by
departmentto compute the total count of job titles. - Local Exchange: Re-partitioning of data by
departmentto prepare for final aggregation.
Fragment 2 [HASH]
Role: This fragment performs a partial aggregation of job titles by department.
Performance:
- CPU: 11.24ms
- Scheduled: 19.46ms
- Blocked: 860.54ms
Input/Output:
- Input: 10 rows (362B)
- Output: 8 rows (176B)
- Output Layout:
[department, count_0]
Operations:
- Partial Aggregation: Counts job titles by department.
- Local Exchange: Re-partitioning of data by
department, job_titlefor further aggregation.
Fragment 3 [SOURCE]
Role: This fragment reads data from the source table.
Performance:
- CPU: 11.49ms
- Scheduled: 32.68ms
- Input/Output:
- Input: 10 rows (382B)
- Output: 10 rows (362B)
- Output Layout:
[department, job_title]
Operations:
- Table Scan: Reads
departmentandjob_titlecolumns from thelakehouse.bronze.jobstable. - Partial Aggregation: Groups data by
departmentandjob_titleand prepares it for further processing.
Key Takeaways
- Reading Query Plan: The stages are presented as the lowest stage value is the last step, and the highest stage value is the first step. This also links back to the execution order we mentioned above.
- Blocked Time: Significant blocked time indicates waiting for data to be transferred between fragments, especially in Fragment 1 and Fragment 2. This is often a sign of data transfer delays or imbalances in processing time across fragments.
- Skewed Data: Notice the skew in the amount of data processed by workers in various stages, leading to some workers processing more data than others. This can cause inefficiencies.
- Aggregation Steps: The query involves multiple steps of partial and final aggregations, which can be optimized if possible by reducing the data shuffling between fragments.
- CPU and Scheduling: The CPU time and scheduling time are relatively low compared to the blocked time, suggesting that CPU resources are not the bottleneck.
- Tweaking your SQL: Modify your SQL and investigate your query plan to find the right balance across these.
Optimisation Suggestions
- Data Model: Make sure you have have a proper data model in place, for example a star schema, and compute an OBT on top of the star schema.
- Materialised View: Precompute frequent aggregations. However I would prefer the step above over this, this can be good however if you don’t have much time or you are working on step one in parallel, but take caution to avoid big tech debt.
- Simplify Aggregations: Minimise complex aggregation steps to reduce data shuffling.
