Maximize Performance: The BestSecret to Scaling Trino Clusters with KEDA

Isa Inalcik
BestSecret Tech
Published in
11 min readFeb 19, 2024
DALL-E

In the fast-paced world of data analytics and scalable technology solutions, integrating Kubernetes Event-Driven Autoscaling (KEDA) with Trino clusters stands as a pivotal advancement. This integration facilitates efficient, demand-driven scaling by leveraging metrics such as CPU utilization, memory pressure index, and queued queries, ensuring optimal resource utilization and performance. The adoption of this method marks a significant departure from traditional scaling mechanisms, heralding a new era of responsive and cost-effective data processing environments.

KEDA Concept

Prerequisites

Before implementing the dynamic scaling solution outlined in this article, ensure that the following prerequisites are met:

  1. Kubernetes Cluster: A running Kubernetes cluster configured with sufficient permissions to deploy and manage resources.
  2. Trino Setup: An operational Trino cluster within the Kubernetes environment.
  3. KEDA Installation: KEDA must be installed and configured in your Kubernetes cluster.
  4. Prometheus Monitoring: A Prometheus setup configured to collect metrics from your Trino cluster.
  5. JMX Exporting to Prometheus: It’s essential to export Java Management Extensions (JMX) metrics from Trino to Prometheus. This involves setting up a JMX exporter on your Trino nodes.

Configuration Overview

Dockerfile for JMX Exporting

To enable JMX exporting to Prometheus, you can use a Dockerfile like the one below. This Dockerfile extends the official Trino image to include the JMX Prometheus Java agent, allowing Prometheus to collect JMX metrics from Trino:

FROM trinodb/trino:439

USER root

# Install the JMX Prometheus Java agent
RUN mkdir -p /opt/java_metrics && \
wget -U "Any User Agent" https://repo1.maven.org/maven2/io/prometheus/jmx/jmx_prometheus_javaagent/0.20.0/jmx_prometheus_javaagent-0.20.0.jar -P /opt/java_metrics/

COPY prometheus_config.yaml /opt/java_metrics/config.yaml
RUN chown -R trino:trino /opt/java_metrics

USER trino:trino

Prometheus Configuration for JMX Metrics

To configure Prometheus to collect JMX metrics, you’ll need a config.yaml file for the JMX exporter. This file might start as simple as:

rules:
- pattern: ".*"

However, you should customize the rules based on the specific metrics you wish to capture.

JVM Configuration for Trino

Adjust the etc/jvm.config file within your Trino setup to include the Java agent and specify memory settings that match your worker specifications. An example configuration could look like this:

-server
-Xmx184G # Adjust based on your worker specs
...
-javaagent:/opt/java_metrics/jmx_prometheus_javaagent-0.20.0.jar=9483:/opt/java_metrics/config.yaml

Make sure to replace -Xmx184G with the appropriate memory allocation for your environment.

Adjustments and Considerations

  • JMX Exporting: The JMX Prometheus Java agent enables Prometheus to collect detailed metrics from Trino, enhancing the information available for scaling decisions.
  • xmx Adjustment: Tailor the -Xmx value in the jvm.config to align with the memory capabilities of your Trino workers to ensure optimal performance.

Creating a PodMonitor/ServiceMonitor

To complete your monitoring setup, create a PodMonitor or ServiceMonitor resource. This resource specifies how Prometheus should discover and scrape metrics from your application pods. The initial configuration provided serves as the basis for this step, detailing the application to be monitored and the metrics endpoint.

apiVersion: monitoring.coreos.com/v1
kind: PodMonitor
metadata:
name: trino-worker
labels:
team: <your-team>
spec:
selector:
matchLabels:
app: trino-worker
podMetricsEndpoints:
- port: 9483
path: /metrics

With these prerequisites in place, you’re well on your way to implementing a dynamic scaling solution for Trino that responds adeptly to your data processing workload demands.

KEDA: ScaledObject

Implementing dynamic scaling involves configuring a ScaledObject in Kubernetes, utilizing KEDA to adjust the number of Trino worker pods based on specific metrics. Below is an example configuration that illustrates how to dynamically scale the Trino worker deployment within a Kubernetes namespace, driven by a mix of CPU utilization, memory pressure, and queued queries metrics:

apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: keda-scaler-trino-worker
namespace: [your-namespace]
spec:
scaleTargetRef:
name: trino-worker
minReplicaCount: 1
maxReplicaCount: 15
pollingInterval: 30 # Seconds
cooldownPeriod: 600 # Seconds
fallback:
failureThreshold: 3
replicas: 8
advanced:
horizontalPodAutoscalerConfig:
behavior:
scaleDown:
stabilizationWindowSeconds: 600
policies:
- type: Pods
value: 1
periodSeconds: 300
scaleUp:
stabilizationWindowSeconds: 0
policies:
- type: Pods
value: 1
periodSeconds: 120
triggers:
- type: cpu
metricType: Utilization
metadata:
value: '80' # Target CPU utilization percentage
- type: prometheus
metricType: Value
metadata:
serverAddress: [your-prometheus-server-address]
metricName: 'memory_pressure_index'
threshold: '0.01' # Target for free memory across the cluster
query: 1 / (sum by (job) ((avg_over_time(trino_memory_MemoryPool_MaxBytes{job="your-job-name"}[1m]) - avg_over_time(trino_memory_MemoryPool_ReservedBytes{job="your-job-name"}[1m])) / 1024 / 1024 / 1024))
- type: prometheus
metricType: Value
metadata:
serverAddress: [your-prometheus-server-address]
threshold: '1'
metricName: queued_queries
query: sum by (job) (avg_over_time(trino_execution_QueryManager_QueuedQueries{job="your-coordinator-job-name"}[1m]))

Scaling Metrics Explained

The dynamic scaling of Trino clusters using Kubernetes Event-Driven Autoscaling (KEDA) is predicated on the thoughtful selection and monitoring of specific metrics that directly influence the scaling behavior of your data processing infrastructure. Each metric serves a unique purpose, acting as a gauge for when to scale up (add more pods) or scale down (remove pods), ensuring that resources are optimally utilized to match workload demands. Here’s a deeper dive into each metric and its role in the scaling process:

1. CPU Utilization

  • Purpose: The CPU utilization metric measures the percentage of CPU resources being used by the Trino worker nodes. It’s a fundamental indicator of how much work the nodes are doing relative to their capacity.
  • How It Works: When CPU utilization reaches or exceeds the specified threshold (e.g., 80%), it signals that the nodes are nearing their capacity and additional resources may be needed to maintain performance levels. Conversely, if CPU utilization is consistently below this threshold, it may indicate over-provisioning, and the cluster can scale down to reduce costs without impacting performance.
  • Impact on Scaling: High CPU utilization triggers scaling up to distribute the workload across more nodes, preventing any single node from becoming a bottleneck. Low CPU utilization triggers scaling down to reduce unnecessary resource usage.

2. Memory Pressure Index

  • Purpose: The memory pressure index is an innovative metric that helps ensure there is a sufficient buffer of free memory across the cluster to handle fluctuations in workloads without running into out-of-memory errors.
  • How It Works: This metric is calculated using a Prometheus query that evaluates the available vs. used memory across all nodes. The threshold set (e.g., 0.01 corresponding to 100GB of free memory on cluster level) acts as a trigger point. When available memory falls below this threshold, it indicates that the cluster is under memory pressure and may benefit from scaling up.
  • Impact on Scaling: By monitoring memory pressure, the system can proactively scale up before memory constraints negatively impact performance, ensuring smooth operation under varying loads.

3. Queued Queries

  • Purpose: This metric tracks the number of queries waiting to be processed, serving as a direct indicator of demand exceeding current processing capacity.
  • How It Works: A threshold is set (e.g., 1 queued query), and if the number of queued queries exceeds this value, it’s a signal that the current infrastructure is unable to efficiently handle incoming queries. This situation typically occurs during peak usage times or with complex queries that take longer to process.
  • Impact on Scaling: Scaling out (increasing the number of worker nodes) in response to an increase in queued queries helps reduce wait times for query processing, improving overall system responsiveness and user satisfaction.

Understanding Polling Interval, Cooldown Period, and Advanced Configurations

In addition to the crucial metrics like CPU Utilization, Memory Pressure Index, and Queued Queries, the ScaledObject configuration for KEDA in Kubernetes includes several other important settings: Polling Interval, Cooldown Period, and Advanced configurations. Let’s break down what these settings mean and how they impact the dynamic scaling of your Trino cluster.

Polling Interval

  • Definition: The Polling Interval is the frequency at which KEDA checks the specified metrics to decide whether to scale up or down. It’s set in seconds.
  • How It Works: If you set the polling interval to 30 seconds, KEDA will query the metrics (CPU utilization, memory pressure index, queued queries) every 30 seconds to evaluate if the scaling conditions are met.
  • Impact on Scaling: A shorter polling interval allows for more responsive scaling, adjusting quickly to workload changes. However, it may also lead to more frequent scaling actions, which could cause system instability if not managed properly. A longer interval might delay scaling actions but can provide more stability.

Cooldown Period

  • Definition: The Cooldown Period is the time after a scaling action during which no further scaling will occur. This allows the system to stabilize at its new scale before additional scaling actions are considered.
  • How It Works: With a 600-second cooldown period, after KEDA scales the Trino worker nodes up or down, it will wait for 10 minutes before evaluating metrics again for potential scaling actions.
  • Impact on Scaling: This setting prevents rapid, repeated scaling actions (scale thrashing), allowing the cluster to stabilize and accurately measure the impact of the previous scaling action.

Advanced Configuration: HorizontalPodAutoscalerConfig

  • Definition: This section allows for more granular control over how the Horizontal Pod Autoscaler (HPA) behaves, specifically regarding how it scales down and scales up.
  • Stabilization Window: For scale down, this is the period where the autoscaler considers the current metrics before deciding to scale down. It helps in preventing premature scale-down actions.
  • Policies: These are the rules that define how many pods can be added or removed during scaling actions and how often these actions can occur. For example, allowing one pod to be added or removed every few minutes.

More Detailed Explanation of the Memory Pressure Index

The Memory Pressure Index is a sophisticated metric designed to preemptively address memory resource constraints, ensuring that the Trino cluster has enough free memory to handle incoming workloads effectively.

  • How It Works More Deeply: This metric uses a Prometheus query to calculate the ratio of available memory to used memory across the cluster. The following formula essentially calculates the inverse of the memory pressure, providing a measure that increases as available memory decreases.
1 / (sum by (job) ((avg_over_time(trino_memory_MemoryPool_MaxBytes{job="your-job-name"}[1m]) - avg_over_time(trino_memory_MemoryPool_ReservedBytes{job="your-job-name"}[1m])) / 1024 / 1024 / 1024))
  • Threshold Interpretation: Setting the threshold to 0.01 is akin to saying “trigger scaling up when the available memory across the cluster drops below 100GB”. It’s a proactive measure to ensure that there’s always a buffer of free memory to accommodate spikes in demand without hitting memory limits that could lead to performance degradation or failures.
  • Impact on Scaling: Monitoring memory pressure closely allows the system to scale up in advance of memory becoming a bottleneck. This is crucial for maintaining smooth operation, especially for memory-intensive queries or during peak usage times, preventing out-of-memory errors and ensuring that queries run efficiently.
Scaling behavior based on the “memory_pressure_index” metric

Additional Configuration Details

This configuration sets the framework for dynamic scaling, from minimum and maximum replicas to polling intervals, cooldown periods, and fallback mechanisms, all aimed at ensuring a stable, responsive scaling process.

Implementation Steps

  1. Configure Prometheus to Collect Metrics: Set up Prometheus to gather the necessary metrics from the Trino cluster.
  2. Apply the ScaledObject Configuration: Deploy the ScaledObject configuration to your Kubernetes cluster.
  3. Observe and Optimize: Monitor the scaling behavior and adjust thresholds and policies as needed to fine-tune performance and resource usage.
HPA created by KEDA

Implementing Effective Scaling Strategies

Understanding and configuring these metrics appropriately is crucial for implementing an effective scaling strategy. The goal is to maintain a balance where resources are neither underutilized (leading to unnecessary costs) nor overstretched (resulting in degraded performance). Regular monitoring and adjustment of the threshold values based on actual performance data and workload patterns are essential for optimizing the scaling behavior. This ensures that the Trino cluster remains responsive and efficient, capable of dynamically adapting to both sudden surges and drops in demand.

Configuration: Enhanced with Graceful Shutdown

Implementing dynamic scaling for Trino clusters using Kubernetes and KEDA focuses not just on scaling up efficiently but also on minimizing disruptions during scale-down operations. An essential part of this process is ensuring that Trino instances shut down gracefully, preserving the integrity of ongoing queries and maintaining a high level of service reliability. To achieve this, we incorporate the terminationGracePeriodSeconds parameter in our deployment configurations and implement Trino’s graceful shutdown procedures.

Adding Termination Grace Period to Deployment

When configuring your Trino worker deployment in Kubernetes, it’s crucial to specify an appropriate terminationGracePeriodSeconds. This setting gives Trino pods enough time to finish current tasks and clean up resources before being terminated.

The terminationGracePeriodSeconds should be set based on your workload characteristics and query execution times to ensure that even long-running queries have a chance to complete or reach a safe stopping point.

Implementing Graceful Shutdown in Trino

Trino supports graceful shutdown natively, allowing workers to finish processing current tasks while not accepting new ones. To initiate a graceful shutdown:

  1. Disable New Queries: Prevent the coordinator from assigning new tasks to the worker being shut down.
  2. Allow Current Queries to Complete: Let the worker continue processing its assigned tasks without interruption.
  3. Shutdown: Once tasks are complete, the worker shuts down cleanly.

For more details on implementing graceful shutdown in Trino, refer to the official documentation.

The Importance of Graceful Shutdown

Graceful shutdowns are vital for maintaining data consistency and ensuring a high-quality user experience. By allowing queries to complete before terminating a worker:

  • Data Integrity: Ensures that in-flight queries are not lost or corrupted, maintaining the accuracy and reliability of query results.
  • Resource Utilization: Optimizes the use of compute resources, as queries are allowed to complete without the need for restarts, which could incur additional computational overhead.
  • User Experience: Minimizes disruptions to end-users by reducing the likelihood of query failures or unexpected delays.

Incorporating a thoughtful approach to graceful shutdowns within your dynamic scaling strategy is key to achieving a resilient, efficient, and user-friendly data analytics environment. Balancing the need for rapid scaling with the necessity of uninterrupted service delivery ensures that your Trino clusters can handle both the challenges of peak loads and the nuances of scaling back resources when demand subsides.

Conclusion

In the world of big data analytics, being able to quickly adjust to changing data demands is key. Using Kubernetes Event-Driven Autoscaling (KEDA) with Trino clusters shows how flexible and forward-thinking this approach is. It means our system can handle today’s needs and also be ready for what comes next. This smart scaling makes sure we use resources wisely, saving money while keeping our system running smoothly.

An important part of making sure our system can scale up and down without problems is the graceful shutdown of Trino clusters. This means when we need to turn off some parts of our system, we do it in a way that doesn’t interrupt ongoing work. Setting a terminationGracePeriodSeconds in our setup and following Trino’s steps for a smooth shutdown are crucial. This careful shutdown process shows we care about keeping our service stable and reliable, even when making adjustments.

Graceful shutdowns are more than just a technical step; they show our dedication to providing a steady and dependable service. By finishing ongoing tasks before shutting down, we protect our data and make sure our users don’t face sudden stops or errors. This careful approach to scaling down is as important as scaling up, helping to keep our data systems strong even when demand changes.

To sum up, combining smart scaling with careful shutdown practices offers a solid plan for managing data analytics infrastructure. This strategy helps us use our resources efficiently and keep our system performing well, all while making sure our data processing is stable and reliable. As we deal with more and larger data tasks, these methods will be essential in building a data strategy that’s ready for the future.

About Me

I am Isa Inalcik, a Principal Data Engineer at BestSecret Group, with an extensive background in testing and data engineering spanning over 12 years. My career has taken me through the entire software lifecycle across various industries, where I’ve honed my skills in designing and orchestrating efficient, robust ELT/ETL pipelines, and deploying data-driven solutions. My toolkit is extensive, including Trino and Starburst Enterprise Platform, Snowflake, Airflow, Kafka, Python, Spark, Kubernetes, Docker, Cloud technologies, Hive, Apache Iceberg, Data Build Tool (DBT), and Hadoop.

--

--

Isa Inalcik
BestSecret Tech

Principal Data Engineer at BestSecret, with 12+ yrs in testing & data engineering. Skilled in ELT/ETL, Trino, Airflow, Python, Kubernetes, Cloud tech, and more.