Deep dive into managing latency, throughput, and cost in Snowflake

This blog post provides guidance on translating constructs such as workload (as a collection/grouping of queries), desired completion time for the workload (a.k.a Service Level Objective), and cost budget into virtual warehouse configurables such as size, type, cluster count, scaling policy, timeouts, etc. to balance across latency, throughput, and cost. While doing so, it also provides a deep dive into the internals of virtual warehouses and the scheduling of jobs in virtual warehouses.

Virtual Warehouse basics

Snowflake exposes the abstraction of “virtual warehouses” to run customer jobs. These virtual warehouses are Massively Parallel Processing (MPP) compute clusters composed of multiple Virtual Machines (VMs) provisioned by Snowflake and run software managed by Snowflake.

VMs that comprise a warehouse cluster are interconnected to share data during job execution. The numbers and type of VMs that make up a virtual warehouse, the details of the software running in it, and related management are abstracted away from you.

Each virtual warehouse includes independent compute clusters that do not share resources with other virtual warehouses. VMs that comprise a Virtual Warehouse are dedicated solely to that Virtual Warehouse which results in strong performance, resource, and security isolation. Each job runs on exactly one Virtual Warehouse.

Virtual warehouses consume billable compute as Snowflake credits on a per-second granularity as they execute jobs, load data, and perform other DML operations. Your spend is a function of the size of the virtual warehouse and how long it runs. Virtual warehouses are only billed while they are started; when suspended, they are not billed.

Primer on job scheduling in virtual warehouses

After compilation in the Snowflake Cloud Services layer, jobs are scheduled for execution by the Warehouse Scheduling Service (WSS) which tracks and throttles load on the warehouse with the aim of striking a balance between maximizing throughput and minimizing latency, maximizing utilization of warehouse clusters, and deterministic performance under contention.

WSS tracks memory and CPU usage across all the VMs of each Virtual Warehouse cluster. The memory capacity of a cluster is the available memory on each VM (after subtracting the footprint of OS and other system software) multiplied by the number of VMs. When main memory is exhausted during execution, Snowflake allows data to “spill” to disk. If memory pressure is excessive, jobs may get killed and retired.

CPU usage is a proxy for tracking concurrency. Each job runs with a Degree Of Parallelism (DOP) which is the number of processes executing the job simultaneously. The maximum DOP for a job is the number of processes per VM (upper bounded by the number of CPU cores on each VM) multiplied by the number of VMs available to execute the job (limited by the warehouse size selected by the customer). As an example, a warehouse with 4 VMs each with 8 vCPUs, has 8*4 = 32 DOP available, which means it can simultaneously run up to 32 jobs (for a singular concurrency workload) with a DOP of 1 simultaneously or 8 jobs (also for a singular concurrency workload) with a DOP of 4. Other combinations are possible, as long as the total DOP across all running jobs does not exceed 32. Furthermore, if the maximum concurrency for the workload is greater than 1, the effective number of jobs that can be simultaneously run would be MAX_CONCURRENCY * MAX_DOP.

The Cloud Services layer receives jobs to schedule on available warehouse clusters and creates a “scheduling request” with WSS containing the required memory and requested DOP, which is estimated during compilation. WSS maintains a queue of other jobs waiting to run on available warehouse clusters. WSS decides whether the job is allowed to run or whether it has to wait in the queue. Eventually, WSS chooses a cluster of the warehouse for the job to run and informs the control plane which sends the job to the chosen warehouse cluster for execution.

When jobs arrive at WSS, they are moved into a queue with FIFO behavior. If a job fails during execution and is rescheduled, it keeps its place in the queue and does not have to join at the back of the line. WSS iterates over jobs in the queue starting at the front and checks whether it is allowed to run on an available cluster based on availability of free memory and concurrency. If there are multiple eligible clusters, the job is scheduled on the one with the lowest current concurrency level. If no cluster is eligible, the job continues to wait in the queue. WSS makes a simplifying assumption that load will be evenly distributed across all VMs of the cluster.

Warehouse scheduling in action

When WSS schedules jobs on the cluster, it increases the tracked resource balance of the cluster. The “scheduling response” contains the cluster chosen by WSS for the job and a list of all available VMs on that cluster. Based on this response, the control plane may perform a “DOP Downgrade” and determine the VMs used to execute it. The downgraded DOP may also be larger or smaller than the requested DOP.

Once the DOP Downgrade has been completed, the job is sent to the Virtual Warehouse cluster for execution. After the execution has been completed, WSS frees the allocated resources.

WSS performs all scheduling and associated operations in memory to achieve high throughput and low latency. If a WSS node crashes, state is recovered by aggregating replicated state information from other nodes in the cluster.

Scaling up vs scaling out to balance across throughput & latency

There are two warehouse types (Standard and high-memory, branded as Snowpark-optimized) and multiple T-shirt sizes (XS, S, M, L, XL, … 6XL). The warehouse type determines memory-to-CPU ratio of the virtual warehouse, while the size determines the total amount of CPU and memory available. You can modify virtual warehouse size and type, even while the cluster is currently executing queries.

While warehouse size & type scales a warehouse “up” and increases single-query performance, using multiple clusters in the same warehouse helps with query concurrency. Each cluster is scheduled independently and can be spun up/ down dynamically based on load.

With multi-cluster warehouses, Snowflake supports allocating, either statically or dynamically, additional clusters to make a larger pool of compute resources available to the same virtual warehouse to increase job concurrency without managing multiple distinct virtual warehouses. To use this capability, you can specify a minimum and maximum number of clusters for a given virtual warehouse.

When different values for maximum and minimum are specified, Snowflake automatically starts and stops additional clusters as needed to react to dynamic incoming load. As the number of concurrent user sessions and/or jobs increases, and jobs start to queue due to insufficient resources, additional clusters are automatically started up to the specified maximum. Similarly, if load decreases, Snowflake automatically shuts down clusters.

When the same value is specified for both the maximum and minimum number of clusters, Snowflake statically allocates those clusters to the virtual warehouse. This configuration is effective when the incoming load does not fluctuate significantly and cluster startup costs are not acceptable.

As an example, in the case of interleaved workloads, when a warehouse is scaled up by a size, queries can potentially run in half the time — it is the same amount of work, but run on twice the compute capacity. Similarly, when warehouses are scaled out by the addition of a cluster, each workload is run on its own warehouse, thereby halving the time taken to complete jobs.

Virtual Warehouse controls such as size and cluster provide controls to balance latency, throughput, and cost.

Although the amount of work is the same, using scale-up or scale-out, total run-time is significantly reduced. Since you pay per-VM on a per-second basis, the cost is the same.

The following query lists warehouses and times that could benefit from multi-cluster warehouses.

- LIST OF WAREHOUSES AND DAYS WHERE MCW COULD HAVE HELPED
SELECT TO_DATE(START_TIME) as DATE
,WAREHOUSE_NAME
,SUM(AVG_RUNNING) AS SUM_RUNNING
,SUM(AVG_QUEUED_LOAD) AS SUM_QUEUED
FROM "SNOWFLAKE"."ACCOUNT_USAGE"."WAREHOUSE_LOAD_HISTORY"
WHERE TO_DATE(START_TIME) >= DATEADD(month,-1,CURRENT_TIMESTAMP())
GROUP BY 1,2
HAVING SUM(AVG_QUEUED_LOAD) >0
;

Memory spilling to inform warehouse type

The warehouse type determines memory-to-CPU ratio of the virtual warehouse, while the size determines the total amount of CPU and memory available. The Snowpark-optimized warehouses type (which can help unlock ML training and memory-intensive analytics use cases) provides 16x more memory and 10x more local SSD cache per VM compared to standard warehouses. The larger memory speeds up computations while larger local storage provides speedup when cached intermediate results and artifacts such as Python packages and JARs are reused on subsequent runs.

When execution artifacts in Snowflake perform write operations of intermediate data, first, main memory on the virtual warehouse VMs are used; if this memory is full, data is “spilled” to local disk/SSD on virtual warehouse VMs; when this local disk is also full, data spills to remote persistent storage (object storage such as Amazon S3). This scheme removes the need to handle out-of-memory or out-of-disk errors.

For performance-critical workloads, you can choose larger warehouse sizes to ensure that the intermediate data fits in memory, or at least in disk, and does not spill to object storage (e.g., S3 on AWS).

Metrics exposed under the QUERY_HISTORY view such as BYTES_SPILLED_TO_LOCAL_STORAGE and BYTES_SPILLED_TO_REMOTE_STORAGE indicate the extent of memory pressure, which in many cases, can be addressed in a cost-efficient manner by moving to Snowpark-optimized warehouses of the same size. The following query lists queries & warehouses that can benefit from increased size or by changing warehouse type to Snowpark-optimized.

SELECT QUERY_ID
,USER_NAME
,WAREHOUSE_NAME
,WAREHOUSE_SIZE
,BYTES_SCANNED
,BYTES_SPILLED_TO_REMOTE_STORAGE
,BYTES_SPILLED_TO_REMOTE_STORAGE / BYTES_SCANNED AS SPILLING_READ_RATIO
FROM "SNOWFLAKE"."ACCOUNT_USAGE"."QUERY_HISTORY"
WHERE BYTES_SPILLED_TO_REMOTE_STORAGE > BYTES_SCANNED * 5 - Each byte read was spilled 5x on average
ORDER BY SPILLING_READ_RATIO DESC
;

The following query identifies the top 10 worst offending queries in terms of bytes spilled to local and remote storage.

SELECT query_id, SUBSTR(query_text, 1, 50) partial_query_text, user_name, warehouse_name,
bytes_spilled_to_local_storage, bytes_spilled_to_remote_storage
FROM snowflake.account_usage.query_history
WHERE (bytes_spilled_to_local_storage > 0
OR bytes_spilled_to_remote_storage > 0 )
AND start_time::date > dateadd('days', -45, current_date)
ORDER BY bytes_spilled_to_remote_storage, bytes_spilled_to_local_storage DESC
LIMIT 10;

Limiting concurrently running jobs

Since jobs running concurrently in a virtual warehouse share available resources, each incremental job gets a smaller share of resources. The MAX_CONCURRENCY_LEVEL parameter, which is set to 8 by default limits the number of concurrent queries running in a virtual warehouse.

Lowering the concurrency level can boost performance for individual jobs (especially if the jobs are complex or multi-statement queries) since each job gets a greater share of resources. However, lowering the concurrency level can cause increased job queuing. Other strategies such as using a dedicated virtual warehouse or using the Query Acceleration Service can boost the performance of a large or complex query without impacting the rest of the workload.

Furthermore, the STATEMENT_QUEUED_TIMEOUT_IN_SECONDS parameter can be used to cancel queries based on a timeout rather than let them remain in the queue for extended periods of time. This parameter can be set within the session hierarchy or at the warehouse level; when it is set for both a warehouse and a session, the lowest non-zero value is enforced. Its value is set to 0 by default (i.e., no timeout).

Similarly, the STATEMENT_TIMEOUT_IN_SECONDS parameter specifies the amount of time, in seconds, after which a running job (SQL query, DDL/DML statement, etc.) is canceled by the system. By default, it is set to 172800 seconds (i.e., 2 days).

Warehouse load & Warehouse utilization to inform warehouse size and cluster counts

Metrics exposed by Snowflake such as warehouse load metrics and warehouse utilization metrics (in private preview) can inform and guide optimizations to “right size” compute capacity.

Warehouse job load metrics measure the average number of jobs that were running or queued within a specific interval. It is computed by dividing the execution time (in seconds) of all jobs in an interval by the total time (in seconds) for the interval. A job load chart shows current and historical usage patterns along with total job load in intervals of 5 minutes to 1 hour (depending on the range selected) and individual load for each job status that occurred within the interval (Running, Queued).

Warehouse utilization metrics (in private preview) shows the utilization of resources as a percentage at a per-cluster and per-warehouse levels.

Utilization data for each cluster of the virtual warehouse as a percentage (private preview) can help identify idle capacity and inform rightsizing decisions, when used with warehouse load metrics.

Warehouse load and utilization metrics can be used in conjunction to inform capacity allocation decisions. Some high-level postulates include:

If your workload has adequate throughput and/or latency performance AND (queued query load is low OR total query load <1 for prolonged periods) AND utilization is low (e.g., less than 50%):

  • Consider downsizing the warehouse or reducing the number of clusters. Additionally, consider starting a separate warehouse and moving queued jobs to that warehouse

If your workload is running slower than desired (based on throughput and/or latency measurements) AND running query load is low AND utilization is high (e.g., greater than 75%):

  • Consider upsizing the warehouse or adding clusters

If there are recurring usage spikes (based on warehouse utilization history):

  • Consider moving queries that represent the spikes to a new warehouse or adding clusters. Also, consider running the remaining workload on a smaller warehouse

If a workload has considerably higher than normal load:

  • Investigate which jobs are contributing to the higher load.

If the warehouse runs during recurring time periods, but the total job load is < 1 for substantial periods of time:

  • Consider decreasing the warehouse size and/or reduce the number of clusters.

Multi-cluster auto-scaling policy tradeoffs around cost and responsiveness

To help control the usage footprint of auto-scaling multi-cluster warehouses, scaling policy options (economy, standard) can be used to control the relative rate at which clusters automatically start or shut down.

The Standard policy is the default and minimizes queuing by favoring starting additional clusters over conserving credits. With the standard policy, the first cluster starts immediately when either a query is queued or the system detects that there’s one more query than the currently running clusters can execute. Each successive cluster waits to start 20 seconds after the prior one has started. For example, if your warehouse is configured with 10 max clusters, it can take ~200 seconds to start all 10 clusters. Clusters shut down after 2 to 3 consecutive successful checks (performed at 1-minute intervals), which determine whether the load on the least-loaded cluster could be redistributed to the other clusters without spinning up the cluster again.

The Economy policy conserves credits by favoring keeping running clusters fully loaded rather than starting additional clusters, which may result in queries being queued and taking longer to complete. With the economy policy, a new cluster is started only if the system estimates there’s enough query load to keep the cluster busy for at least 6 minutes. Clusters shut down after 5 to 6 consecutive successful checks (performed at 1 minute intervals), which determine whether the load on the least-loaded cluster could be redistributed to the other clusters without spinning up the cluster again.

Scale-to-zero zero timeouts to balance cost and responsiveness

A warehouse can be set to automatically resume or suspend, based on activity to only consume resources based on actual usage. By default, Snowflake automatically suspends the warehouse if it is inactive for a specified period of time. Also, by default, Snowflake automatically resumes the warehouse when any job arrives at the warehouse. Auto-suspend and auto-resume behaviors apply to the entire warehouse and not to the individual clusters in the warehouse.

The following query identifies all warehouses that do not have auto-suspend enabled. Enabling auto-suspend ensures that warehouses suspend after a specific amount of inactive time in order to prevent runaway costs.

SHOW WAREHOUSES
;
SELECT "name" AS WAREHOUSE_NAME
,"size" AS WAREHOUSE_SIZE
FROM TABLE(RESULT_SCAN(LAST_QUERY_ID()))
WHERE IFNULL("auto_suspend",0) = 0
;

The following query identifies all warehouses that do not have auto-resume enabled. Enabling auto-resume automatically resumes a warehouse when queries are submitted against it.

SHOW WAREHOUSES
;
SELECT "name" AS WAREHOUSE_NAME
,"size" AS WAREHOUSE_SIZE
FROM TABLE(RESULT_SCAN(LAST_QUERY_ID()))
WHERE "auto_resume" = 'false'
;

We recommend setting the auto-suspend duration based on the ability of the workload to take advantage of warehouse caches. This exercise involves finding the sweet spot for cost efficiency by balancing the benefits of compute cost savings by suspending compute quickly and the price-performance benefits of Snowflake’s sophisticated caching. In general, our high-level directional guidance is as follows:

· For tasks, loading, and ETL/ELT use cases, immediate suspension of virtual warehouses is likely to be the optimal choice.

· For BI and SELECT use cases, query warehouses are likely to be cost-optimal with ~10 minutes for suspension to keep data caches warm for end users.

· For DevOps, DataOps, and Data Science use cases, warehouses are usually cost-optimal at ~5 minutes for suspension as a warm cache is not as important for ad-hoc & highly unique queries.

The following query flags potential optimization opportunities by identifying virtual warehouses with an egregiously long duration for automatic suspension after a period of no activity on that warehouse.

SHOW WAREHOUSES
;
SELECT "name" AS WAREHOUSE_NAME
,"size" AS WAREHOUSE_SIZE
FROM TABLE(RESULT_SCAN(LAST_QUERY_ID()))
WHERE "auto_suspend" >= 3600 // 3600 seconds = 1 hour
;

The following statement timeouts provide additional controls around how long a query is able to run before canceling it. This can help ensure that any queries that get hung up for extended periods of time will not cause excessive consumption of credits and also prevent the virtual warehouse from being suspended. This parameter is set at the account level by default and can also be optionally set at both the warehouse and user levels.

SHOW PARAMETERS LIKE 'STATEMENT_TIMEOUT_IN_SECONDS' IN ACCOUNT;
SHOW PARAMETERS LIKE 'STATEMENT_TIMEOUT_IN_SECONDS' IN WAREHOUSE <warehouse-name>;
SHOW PARAMETERS LIKE 'STATEMENT_TIMEOUT_IN_SECONDS' IN USER <username>;

The following query aggregates the percentage of data scanned from the ephemeral storage layer (cache) across all queries broken out by warehouse. Warehouses running querying/reporting workloads with a low percentage of data returned from caches indicate optimization opportunities (because warehouses may be suspending too quickly).

SELECT WAREHOUSE_NAME
,COUNT(*) AS QUERY_COUNT
,SUM(BYTES_SCANNED) AS BYTES_SCANNED
,SUM(BYTES_SCANNED*PERCENTAGE_SCANNED_FROM_CACHE) AS BYTES_SCANNED_FROM_CACHE
,SUM(BYTES_SCANNED*PERCENTAGE_SCANNED_FROM_CACHE) / SUM(BYTES_SCANNED) AS PERCENT_SCANNED_FROM_CACHE
FROM "SNOWFLAKE"."ACCOUNT_USAGE"."QUERY_HISTORY"
WHERE START_TIME >= dateadd(month,-1,current_timestamp())
AND BYTES_SCANNED > 0
GROUP BY 1
ORDER BY 5
;

Query Acceleration Service to handle bursts from short-running queries

Query Acceleration Service (QAS) enables another form of vertical scaling to accelerate query performance. It automatically adds VMs outside inter-networked VMs in the warehouse cluster on an on-demand basis to offload & accelerate SQL queries. QAS is particularly useful for large table scans.

Additionally, when Snowflake detects a massive query that will scan gigabytes of data, the use of query acceleration can free up resources on the virtual warehouse cluster to execute other short-running queries from other users and is usually less expensive than scaling up to a larger warehouse and leads to more efficient use of resources. Effectively, the Query Acceleration Service acts like a powerful additional cluster that is temporarily available to deploy alongside existing virtual warehouses and when needed.

QAS compute resources billed per second. The SYSTEM$ESTIMATE_QUERY_ACCELERATION function and QUERY_ACCELERATION_ELIGIBLE View help identify queries that might benefit from QAS.

The following query identifies the queries that might benefit the most from the service by the amount of query execution time that is eligible for acceleration:

SELECT query_id, eligible_query_acceleration_time
FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_ACCELERATION_ELIGIBLE
ORDER BY eligible_query_acceleration_time DESC;
The following query identifies the queries that might benefit the most from the service in a specific warehouse mywh:
SELECT query_id, eligible_query_acceleration_time
FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_ACCELERATION_ELIGIBLE
WHERE warehouse_name = 'mywh'
ORDER BY eligible_query_acceleration_time DESC;

The following query identifies the warehouses with the most queries eligible in a given period of time for the query acceleration service:

SELECT warehouse_name, COUNT(query_id) AS num_eligible_queries
FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_ACCELERATION_ELIGIBLE
WHERE start_time > 'Mon, 29 May 2023 00:00:00'::timestamp
AND end_time < 'Tue, 30 May 2023 00:00:00'::timestamp
GROUP BY warehouse_name
ORDER BY num_eligible_queries DESC;

The following query identifies the warehouses with the most eligible time for the query acceleration service:

SELECT warehouse_name, SUM(eligible_query_acceleration_time) AS total_eligible_time
FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_ACCELERATION_ELIGIBLE
GROUP BY warehouse_name
ORDER BY total_eligible_time DESC;

Query Acceleration scale factor sets an upper bound on the amount of compute resources a warehouse can lease for acceleration. The scale factor is a multiplier based on warehouse size and cost. For example, if the scale factor is 5 for a Medium-sized warehouse (4 credits/hour), the warehouse can lease compute up to 5 times its size (i.e., 4 x 5 = 20 credits/hour). By default, the scale factor is set to 8 when Query Acceleration Service is used.

The following query identifies the upper limit for scale factor for a given warehouse with QAS enabled:

SELECT MAX(upper_limit_scale_factor)
FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_ACCELERATION_ELIGIBLE
WHERE warehouse_name = 'mywh';

The following query identifies the distribution of scale factors for a given warehouse with QAS enabled:

SELECT upper_limit_scale_factor, COUNT(upper_limit_scale_factor)
FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_ACCELERATION_ELIGIBLE
WHERE warehouse_name = '<warehouse_name>'
GROUP BY 1 ORDER BY 1;

Sizing the Snowpark sandbox & number of nodes to balance tradeoffs between throughput m & latency

Unlike SQL which comes with a limited language surface, code in Java/Python/Scala for User Defined Functions (UDF) and Stored procedures presents a larger security attack surface. Since this code is run on the same virtual warehouse VMs that execute the rest of the job for performance reasons, in addition to the use of VMs (that aren’t reused across accounts) for multi-tenant compute isolation, a secure sandbox using Linux kernel primitives such as cgroups, namespaces, secomp, eBPF, chroot prevents code from accessing information outside of its environment (scoped to the current job) or affecting the operation of other parts of Snowflake; other virtual warehouse resources such as its network and namespaces are also isolated from the contents of the sandbox. Each Java/Python/Scala job gets a new sandbox and includes “just enough” read-only software needed to run code while a chroot directory tree is used for a few required writable directories and a temporary one that provides scratch space. The sandbox is run in a cgroup to limit the executing code’s memory, CPU, and PID usage.

Stored procedures on Snowflake run on only a single node in a virtual warehouse and aren’t parallelized to run across multiple nodes of a virtual warehouse cluster. We recommend taking advantage of this insight to inform warehouse sizing. When using stored procedures, we recommend using single node warehouse sizes (especially for use cases such as perform single-node ML training). This can be achieved by setting WAREHOUSE_SIZE = MEDIUM and MAX_CONCURRENCY_LEVEL = 1 which ensures that the Snowpark-optimized warehouse consists of a single Snowpark-optimized node with the Snowpark sandbox configured to have the maximum memory (~240GiB) and CPU possible via appropriate cgroup changes behind the scenes. Additionally, the use of a Snowpark-optimized warehouses for stored procedures can help further improve throughout by allowing multi-processor use cases by supporting the ability to spawn new threads inside the sandbox.

We recommend using multi-cluster snowpark-optimized warehouses to support multiple concurrently running stored procedures. Additionally, we recommend using a separate warehouse for executing nested queries from the stored procedure; the session.use_warehouse() API can be used to select the warehouse for the query inside the stored procedure.

Unlike stored procedures, with UDFs, Snowflake attempts to use the full power of your warehouse by parallelizing computation. As a result, for UDFs, we recommend using warehouses with multiple nodes (such as a Snowpark-optimized warehouse of size L or larger).

To the extent possible, we also recommend against using LIMIT clauses or a heavily skewed GROUP BY, PARTITION BY, or JOIN in your query — these hinder Snowflake’s ability to parallelize UDFs. Instead, we recommend using the batch API when using Snowpark Python libraries such as xgboost or PyTorch to operate efficiently on batches of rows.

Conclusion

As illustrated in the blog post, Snowflake offers a number of mechanisms to manage latency, throughput, and cost based on the needs of the workload. Selecting the right strategy to meet the needs of your workload and cost budget may require some experimentation.

--

--