Sitemap

Query Plans — Analyse SQL Performance In Trino

6 min readJun 9, 2024

Another Sunday and another #datapains to discuss.

Today I want to dig deeper into how we can understand a query plan in trino.

Press enter or click to view image in full size
Query Plan Investigation

SQL Order Of Execution — Let’s get reminded

Before we start looking into this, lets get reminded on the order of execution in a SQL query.

  1. FROM, JOIN
  2. WHERE
  3. GROUP BY
  4. HAVING
  5. SELECT
  6. DISTINCT
  7. ORDER BY
  8. LIMIT

This will help us as we read the query plan.

How to determine if your SQL is performant?⚡

Before a query can be planned, the engine also needs to:

  • Identify tables
  • Identify columns used in a query

The SQL , a basic count of job titles where we group by per department.

EXPLAIN ANALYZE WITH 

count_titles AS (
SELECT
department,
COUNT(job_title) AS count_job_titles
FROM lakehouse.bronze.jobs
GROUP BY 1
)

SELECT * FROM count_titles
  • The SQL query uses a Common Table Expression (CTE) named count_titles to simplify the query structure and improve readability.
  • It begins by selecting data from the lakehouse.bronze.jobs table.
  • Within the CTE, it groups the data by the department column.
  • For each department group, it counts the number of occurrences of job_title and aliases it as count_job_titles.
  • After the CTE definition, the main query selects all columns from the count_titles CTE.
  • The main purpose of the query is to retrieve the count of job titles for each department from the lakehouse.bronze.jobs table.

Query Plan 📣

Queued: 1.84ms, Analysis: 85.69ms, Planning: 58.35ms, Execution: 450.11ms
Fragment 1 [HASH]
CPU: 7.57ms, Scheduled: 11.12ms, Blocked 1.80s (Input: 933.53ms, Output: 0.00ns), Input: 8 rows (176B); per task: avg.: 4.00 std.dev.: 2.00, Output: 3 rows (67B)
Amount of input data processed by the workers for this stage might be skewed
Output layout: [department, count]
Output partitioning: SINGLE []
Aggregate[type = FINAL, keys = [department]]
│ Layout: [department:varchar, count:bigint]
│ Estimates: {rows: 3 (68B), cpu: 226, memory: 68B, network: 0B}
│ CPU: 3.00ms (15.00%), Scheduled: 3.00ms (6.52%), Blocked: 0.00ns (0.00%), Output: 3 rows (67B)
│ Input avg.: 1.00 rows, Input std.dev.: 132.29%
│ count := count(count_0)
└─ LocalExchange[partitioning = HASH, arguments = [department::varchar]]
│ Layout: [department:varchar, count_0:bigint]
│ Estimates: {rows: 10 (226B), cpu: 226, memory: 0B, network: 0B}
│ CPU: 1.00ms (5.00%), Scheduled: 1.00ms (2.17%), Blocked: 716.00ms (31.49%), Output: 8 rows (176B)
│ Input avg.: 1.00 rows, Input std.dev.: 86.60%
└─ RemoteSource[sourceFragmentIds = [2]]
Layout: [department:varchar, count_0:bigint]
CPU: 0.00ns (0.00%), Scheduled: 1.00ms (2.17%), Blocked: 933.00ms (41.03%), Output: 8 rows (176B)
Input avg.: 1.00 rows, Input std.dev.: 86.60%

Fragment 2 [HASH]
CPU: 11.24ms, Scheduled: 19.46ms, Blocked 860.54ms (Input: 444.24ms, Output: 0.00ns), Input: 10 rows (362B); per task: avg.: 5.00 std.dev.: 2.00, Output: 8 rows (176B)
Amount of input data processed by the workers for this stage might be skewed
Output layout: [department, count_0]
Output partitioning: HASH [department]
Aggregate[type = PARTIAL, keys = [department]]
│ Layout: [department:varchar, count_0:bigint]
│ Estimates: {rows: 10 (226B), cpu: ?, memory: ?, network: ?}
│ CPU: 4.00ms (20.00%), Scheduled: 7.00ms (15.22%), Blocked: 0.00ns (0.00%), Output: 8 rows (176B)
│ Input avg.: 1.25 rows, Input std.dev.: 118.32%
│ count_0 := count(job_title)
└─ Aggregate[type = FINAL, keys = [department, job_title]]
│ Layout: [department:varchar, job_title:varchar]
│ Estimates: {rows: 10 (362B), cpu: 362, memory: 362B, network: 0B}
│ CPU: 1.00ms (5.00%), Scheduled: 2.00ms (4.35%), Blocked: 0.00ns (0.00%), Output: 10 rows (362B)
│ Input avg.: 1.25 rows, Input std.dev.: 118.32%
└─ LocalExchange[partitioning = HASH, arguments = [department::varchar, job_title::varchar]]
│ Layout: [department:varchar, job_title:varchar]
│ Estimates: {rows: 10 (362B), cpu: 362, memory: 0B, network: 0B}
│ CPU: 0.00ns (0.00%), Scheduled: 0.00ns (0.00%), Blocked: 181.00ms (7.96%), Output: 10 rows (362B)
│ Input avg.: 1.25 rows, Input std.dev.: 190.79%
└─ RemoteSource[sourceFragmentIds = [3]]
Layout: [department:varchar, job_title:varchar]
CPU: 0.00ns (0.00%), Scheduled: 0.00ns (0.00%), Blocked: 444.00ms (19.53%), Output: 10 rows (362B)
Input avg.: 1.25 rows, Input std.dev.: 190.79%

Fragment 3 [SOURCE]
CPU: 11.49ms, Scheduled: 32.68ms, Blocked 0.00ns (Input: 0.00ns, Output: 0.00ns), Input: 10 rows (382B); per task: avg.: 10.00 std.dev.: 0.00, Output: 10 rows (362B)
Output layout: [department, job_title]
Output partitioning: HASH [department, job_title]
Aggregate[type = PARTIAL, keys = [department, job_title]]
│ Layout: [department:varchar, job_title:varchar]
│ Estimates: {rows: 10 (362B), cpu: ?, memory: ?, network: ?}
│ CPU: 1.00ms (5.00%), Scheduled: 3.00ms (6.52%), Blocked: 0.00ns (0.00%), Output: 10 rows (362B)
│ Input avg.: 10.00 rows, Input std.dev.: 0.00%
└─ TableScan[table = lakehouse:bronze.jobs]
Layout: [job_title:varchar, department:varchar]
Estimates: {rows: 10 (362B), cpu: 362, memory: 0B, network: 0B}
CPU: 10.00ms (50.00%), Scheduled: 29.00ms (63.04%), Blocked: 0.00ns (0.00%), Output: 10 rows (382B)
Input avg.: 10.00 rows, Input std.dev.: 0.00%
job_title := job_title:varchar:REGULAR
department := department:varchar:REGULAR
Input: 10 rows (382B), Physical input: 1.32kB, Physical input time: 7.50ms

Overview of the Query Output And Plan

Press enter or click to view image in full size
Execution High Level Output.
  • Queued: 1.84ms
  • Analysis: 85.69ms
  • Planning: 58.35ms
  • Execution: 467.68ms

Fragment Breakdown

The query execution is divided into three main fragments. Each fragment represents a stage in the execution process.

Fragment 1 [HASH]

Press enter or click to view image in full size
HASH

Role: This fragment handles the final aggregation of the results.

Performance:

  • CPU: 7.57ms
  • Scheduled: 11.12ms
  • Blocked: 1.80s (mainly waiting for data from other fragments)

Input/Output:

  • Input: 8 rows (176B)
  • Output: 3 rows (67B)
  • Output Layout: [department, count]

Operations:

  • Aggregation: Final aggregation by department to compute the total count of job titles.
  • Local Exchange: Re-partitioning of data by department to prepare for final aggregation.

Fragment 2 [HASH]

Role: This fragment performs a partial aggregation of job titles by department.

Performance:

  • CPU: 11.24ms
  • Scheduled: 19.46ms
  • Blocked: 860.54ms

Input/Output:

  • Input: 10 rows (362B)
  • Output: 8 rows (176B)
  • Output Layout: [department, count_0]

Operations:

  • Partial Aggregation: Counts job titles by department.
  • Local Exchange: Re-partitioning of data by department, job_title for further aggregation.

Fragment 3 [SOURCE]

Press enter or click to view image in full size
SOURCE — TableScan

Role: This fragment reads data from the source table.

Performance:

  • CPU: 11.49ms
  • Scheduled: 32.68ms
  • Input/Output:
  • Input: 10 rows (382B)
  • Output: 10 rows (362B)
  • Output Layout: [department, job_title]

Operations:

  • Table Scan: Reads department and job_title columns from the lakehouse.bronze.jobs table.
  • Partial Aggregation: Groups data by department and job_title and prepares it for further processing.

Key Takeaways

  1. Reading Query Plan: The stages are presented as the lowest stage value is the last step, and the highest stage value is the first step. This also links back to the execution order we mentioned above.
  2. Blocked Time: Significant blocked time indicates waiting for data to be transferred between fragments, especially in Fragment 1 and Fragment 2. This is often a sign of data transfer delays or imbalances in processing time across fragments.
  3. Skewed Data: Notice the skew in the amount of data processed by workers in various stages, leading to some workers processing more data than others. This can cause inefficiencies.
  4. Aggregation Steps: The query involves multiple steps of partial and final aggregations, which can be optimized if possible by reducing the data shuffling between fragments.
  5. CPU and Scheduling: The CPU time and scheduling time are relatively low compared to the blocked time, suggesting that CPU resources are not the bottleneck.
  6. Tweaking your SQL: Modify your SQL and investigate your query plan to find the right balance across these.

Optimisation Suggestions

  • Data Model: Make sure you have have a proper data model in place, for example a star schema, and compute an OBT on top of the star schema.
  • Materialised View: Precompute frequent aggregations. However I would prefer the step above over this, this can be good however if you don’t have much time or you are working on step one in parallel, but take caution to avoid big tech debt.
  • Simplify Aggregations: Minimise complex aggregation steps to reduce data shuffling.

--

--

No responses yet