Building a SQL Engine Infrastructure at QuintoAndar

Kenji
Blog Técnico QuintoAndar
8 min readJan 27, 2021
Photo by C Dustin on Unsplash

At QuintoAndar, the Data Availability Engineering team (a.k.a DAE) is essential for the success in all operations side. This team is responsible to maintain our data infrastructure working 24/7 and a high quality code.

Our data infrastructure is very complex, but an important component is the SQL Engine to provide access to our data lake and here we had used AWS Athena for quite a while to query our data. Like other SQL Engines, it allows us to take better decisions and spreads the data culture among the company.

AWS Athena is a serverless service and provides an easy setup with everything ready to connect to the data lake and query the data. It’s a good shot for companies, as QuintoAndar, that needed to grow up fast in their analyses, but here we have reached a moment that we could not scale Athena and it triggered many errors on resources exhaustion, thus breaking our pipelines.

That’s why we decided to start building our own infrastructure to provide a SQL Engine that we’d have more control over. An internal implementation has its pros and cons, but the big decision was around the flexibility to personalize the application, a better scalability of the components and a transparent way of monitoring.

There are many open sources and commercial tools that have interesting features such as Starburst, Dremio and Impala, but we decided to go straight to Trino (formerly PrestoSQL, a.k.a the community version of Presto) for the reasons that we will discuss below, in addition to the fact that Athena uses PrestoDB under the hood, which facilitates the integration in our data pipeline.

Why Trino?

Trino is a highly parallel and distributed query engine built over ANSI SQL that works with many BI tools, and is capable of querying petabytes of data. It’s very versatile to be deployed on-premise and in a cloud environment and allows us to access data from different data sources within a single query. The community is very strong and helpful, we can reach them via Slack.

We believe that a SQL Engine could be scalable, maintainable, and simple to plug-in at other data sources, as mentioned earlier. Trino has the perfect match with our expectations. Besides, it also has good compatibility with our LGPD stack that we’ll share more about soon.

Trino Components

First of all, let’s explain some important Trino components:

  • Coordinator: is the main server that is responsible for parsing statements, planning queries, and managing Trino workers. Every Trino application must have a coordinator with one or more workers, but we also can configure a single instance in which a coordinator can perform its role and of a worker altogether.
  • Worker: is the server responsible for executing tasks and processing data, it fetches data from connectors and exchange intermediate data with each other.
  • Connector: it adapts Trino to a data source such as Apache Hive or a relational database; it’s like a driver for a database.
  • Catalog: it contains schemas and references a data source via a connector.

All those components are essential to deploy Trino and provide correct JDBC connections.

Implementation

We deployed Trino using high-memory dedicated nodes at Kubernetes. Each node has 16 vCPU and 64 GB of memory. Initially the configuration for coordinator and workers are the same that the documentation recommends, but in the future we’ll define a baseline according to the workload.

The deployment used those nodes according to the workload of Trino, since we implemented HPA (horizontal pod autoscaler) it allowed us to scale up pods when we have a high concurrency of queries and scale down when we have few or no queries running (like in the weekends). Here we can also set the minimum and maximum value of pods running.

Unfortunately, we don’t have the graceful shutdown working on the HPA, so for each scale up or down, K8s will manage one pod at a time, that’s why when it scales down, some queries can be killed too.

In order to implement all those details at K8s, we had to develop a new Helm Chart under QuintoAndar Infrastructure. Now, let’s describe how we did it.

Trino Vanilla Implementation

The image above shows our current implementation, as a simplified overview.

We’re using Hive Metastore as the Metadata Layer responsible to save the databases, schemas, and tables metadata. The metastore is also deployed at Kubernetes and we’ll talk about it in a next post.

For the Query Engine Layer referred to Trino, we had to develop a new Helm Chart under QuintoAndar Infrastructure as commented above.

The last layer is the DataViz layer, that has as our main users: SWEs, Product Owners and Data Analysts. It is used to run ad-hoc queries and generate reports.

Helm Chart

Helm is a package manager that allows an easier deployment, maintainability, and scalability of applications. The chart is a packaging format used by Helm, i.e., the chart is a collection of files that describe a related set of Kubernetes resources like Deployments, HPA, Ingress, etc.

In short, Helm Chart is a set of files that configures the application deployment, serving as a template which we can provide good flexibility in each deployment. It also removes the complexity for developers because all default settings are already configured.

We designed:

  • 2 Deployments: coordinator + worker
  • 2 ConfigMap: coordinator + worker
  • 1 Secret: for both components
  • HPA: worker
  • Ingress

Deployment Object

The Deployment object represents an application running on your cluster, and if you specify that two replicas must be running (in the spec YAML field), then the K8s system will guarantee this behavior.

Example of Trino worker Deployment

Here is an example of Deployment, in this case, we’re showing the worker configuration. Also, we should specify some labels at the Deployment object that enables us to map the organizational structures onto the system. The value of trino.commonLabels is a set of key-value labels that are also used at other k8s objects.

Using the labels, we can easily find the worker deployment using the kubectl command: kubectl get pods -l component=worker[,key=value].

ConfigMap Object

The ConfigMap is an object used to store non-sensitive data in key-value pairs. This object can be consumed by Pods as environment variables, configuration files in a volume, or command-line arguments.

Example of Trino worker Configmap

The above example is the setting of some important configuration for Trino Deployment, at the data section we defined the key-value pairs.

The ConfigMap doesn’t support large chunks of data, it cannot exceed 1 MiB.

Now to join the ConfigMap and the Deployments objects, we need to set the volumes inside the container.

Example of mapping the ConfigMap inside the Deployment

The match happens at spec.volumes where the value of configMap.name must be equal to metadata.name at ConfigMap file. And the spec.containers.volumeMounts will create the volume at usr/lib/trino/etc for this ConfigMap.

Secret Object

The Secret object provides us a way to store and manage sensitive information, such as passwords. Storing confidential information at Secrets is safer than a verbal declaration in a Pod or in a container image.

Example of Secret

This is an example of a Secrets object where we set the connections information like the Hive Metastore. Here we also have a service where injects Vault credentials in the Pods, but we won’t deep inside of this service. To attach the Secret at the Deployment, we should do the same steps of the ConfigMap.

HPA Object

Example of worker HPA

We implemented scaling to Trino workers using HPA, but we couldn’t scale up to more than one coordinator, because we need to guarantee the communication between N coordinators and N workers.

Ingress Object

The Ingress is an object that manages external access to the services in a cluster, it exposes HTTP and HTTPS routes from outside the cluster to services within the cluster.

Ingress image from kubernetes documentation

The Big Picture

The image above shows the big picture that we have built at Kubernetes. All requests coming from outside the cluster will hit the Ingress component which bypasses the request to the Trino’s Coordinator.

If the request is for Trino UI, then the communication is directed to Coordinator and if the requests are coming from the JDBC connection, then the coordinator will process and distribute tasks to the workers.

Monitoring

Our Kubernetes cluster is already monitored by Prometheus and we can create alerts through AlertManager using PromQL. Such alerts can be sent to Slack channels, for example we can configure the threshold for CPU, memory and network consumption to get notified.

We also have the Grafana that allows queries to be more user friendly and a quick view of the application state and future problems that we can have with the usability of the service.

Monitoring and alerts is an important topic to the Trino infrastructure because let us be more proactive than reactive when facing problems. That is, we can make some adjustments and improvements to the infrastructure, machine resources, and avoid problems occurring in the short and long term.

Future Implementations

Seth Meyers GIF by Late Night on Giphy

After Trino’s v1 implementation, we’re already looking for some interesting features that we want to implement in the Trino ecosystem:

  • Queries caching (Rubix)
  • A gateway for Trino compute engine at multiple clusters (presto-gateway)
  • Monitoring Trino cluster stats at Prometheus (presto_exporter)
  • Integration with LGPD stack

Conclusion

Trino is a sophisticated engine and stands out among other tools. It supports many use cases, such as ad-hoc analyses and massive batch queries, provides access to diverse data sources and it’s endorsed by the largest organizations in the world as Lyft, LINE, and Shopify.

We’ll make some benchmarks and load tests across AWS Athena performance to validate the implementation, but we want to emphasize that there are many other tools and we don’t assume that Trino must be used for every case and for everyone.

First, we need to understand the problem that must be solved independently of the tool/application that will be used. This is a tiny detail, but makes an enormous difference in the end.

If you liked this article and want to be part of a high-performance data team towards building a scalable data platform and change the way people live, join us!

--

--