Large scale Data Ingestion using gRPC, Envoy and Istio in Cortex Data Lake
Cortex Data Lake collects, transforms and integrates your enterprise’s security data to enable Palo Alto Networks solutions. This product helps organizations consume and understand data at a much higher rate — letting system administrators focus on the alerts that matter most. It combines advanced artificial intelligence and machine learning across all your enterprise’s data, and provides for a more meaningful response to sophisticated attacks.
Cortex Data Lake Ingestion Service has the requirement to ingest millions of requests per second while acknowledging each request to guarantee no data loss at low latency. Cortex Data Lake clients are typically long-lived and send data continuously. With these requirements, gRPC quickly emerged as the technology of choice to build Cortex Data Lake Ingestion Service. When we started on this project Envoy was the only high-performance proxy that supported gRPC and Istio was the preferred control plane for Envoy.
Envoy is a high-performance open source edge and service proxy designed for cloud-native applications. It is typically deployed in a distributed fashion as a side-car with application containers in the same application pod. Envoy handles advanced routing, monitoring, tracing, logging, and other cross-cutting concerns. Envoy has a programmatic control plane that allows it to be dynamically configured.
Istio is an open source Service Mesh and API Gateway that provides capabilities to connect, secure, control, and observe services. Istio relies on Envoy for data-plane and implements Envoy’s control plane APIs. It includes all the application networking and operational best practices like timeouts, retries, circuit breaking, advanced load balancing, fault injection, and mTLS. Cortex Data Lake API Gateway as well as Service Mesh for Data Services are powered by Istio. Following is the high-level skeleton of Cortex Data Lake Infrastructure Setup.
At the API Gateway layer Istio supports both mTLS and JWT authentication policies. We are using both authentication mechanisms depending on the use case. There are some challenges in exposing a service to support both mTLS and JWT depending on its client. Those details will be covered in a separate post. Overall, Istio has worked well for us, but with earlier versions of Istio we ran into bottlenecks with Istio telemetry getting overloaded with a large number of streams. We turned off Istio telemetry and are now using Envoy native telemetry.
gRPC was created by Google as open-source evolution of their internal RPC technology Stubby. gRPC uses HTTP/2 as its transport protocol. HTTP/2 can multiplex many parallel requests over the same connection and allows full-duplex bidirectional communication.
gRPC uses a channel abstraction to facilitate concurrent use of underlying HTTP/2 connections and to provide flow control capabilities. Within a channel, multiple RPCs may be issued, each of which maps to an underlying HTTP/2 stream.
gRPC uses Protocol Buffers as the Interface Definition Language and also for underlying message interchange format. The source code for gRPC client and server interfaces is generated for different languages using the protoc compiler.
gRPC client and server stubs implement the StreamObserver interface for sending and receiving messages. For outgoing messages, a StreamObserver is provided to the application by the gRPC library. For incoming messages, the application implements the StreamObserver and passes it to the gRPC library for receiving. The StreamObserver interface is rather simple with just three methods:
- onNext: Receives a value from the stream
- onError: Receives a terminating error from the stream
- onCompleted: Receives notification of successful stream completion
Unary v/s Bi-directional
gRPC applications can be written using different types of service methods and we evaluated unary and bi-directional. The pros and cons of each approach are listed below with the preferred characteristics shown in bold.
With bi-directional streams, the message throughput is higher and latency is lower, thereby meeting our design requirements. Having long-lived streams and multiple messages per stream transfers some responsibilities from the gRPC protocol to the application. The desired functionality had to be implemented within our client and server applications. The increased complexity was worth the higher throughput provided by bi-directional streams.
Message Acknowledgement and Error Handling
Cortex Data Lake has the requirement to acknowledge each request. Our gRPC client application sends discrete request payloads on their outbound stream and receives ACKs for those requests on their inbound stream. This allows clients to use timers and retries to compensate for network problems. Each request contains a unique ID. Each ACK contains the ID of a corresponding request and a description of the result of that request. As the client receives ACKs from the server, it inspects the messages, checks for errors, and decides which messages can be retried and which messages must be dropped. The client also implements exponential backoff on retries to allow the server to recover if it is overloaded.
Flow Control is a mechanism to prevent senders from overwhelming the receiver of data. The receiver may be busy under heavy load and may not have resources to handle the additional load. The receiver, in this case, should exert flow control. gRPC relies on underlying HTTP/2 flow control capabilities.
In our ingestion pipeline, we have gRPC client communicating to gRPC server via Istio API Gateway as shown in the diagram below
There are many stream buffers involved in the pipeline. The larger the buffer, the more memory it can use on congested upstream and the longer it takes to communicate backpressure.
To implement backpressure feedback loop in gRPC client for each stream we use CallStreamObserver.html#setOnReadyHandler. This notification calls our application client code every time the stream isReady() state changes from false to true.
gRPC Server Optimizations
In the initial implementation of our gRPC Server, we had large queues and many threads. At high load, we observed cascading failures and limited throughput at much higher latency.
We added detailed metrics at each step to identify where we were spending time and took thread dumps. We identified that threads were contending and the server was not exerting backpressure quickly. We even ran into a JDK bug where java.security.Provider.getService() synchronization became a scalability bottleneck at high load. This required us to upgrade to JDK 13. We reduced the size of thread pools in gRPC server to two times the number of cores and that eliminated most of the thread contention.
Since the pipeline is asynchronous with several buffers/queues we were simply enqueuing more work than could be processed. We did a bunch of controlled-load tests, keeping the gRPC Server CPU busy. We profiled our code and tuned it, then we tuned the Kafka producer embedded in our server application. We established that the request processing thread p99 processing time we can achieve is 70–80 ms and Kafka writes of 125–200 ms.
By bounding the input queue, the server will not read from gRPC when full and exert backpressure. We used the following formulae to calculate gRPC Server request queue length:
maxLatency = (transactionTime / number of threads) * queueLength
queueLength = maxLatency / (transactionTime / number of threads)
We kept maxLatency the same as transactionTime to have maximum backpressure and settled with a queue length the same as the number of threads. With this approach, the workload was mostly CPU bound and it auto-scaled well with varying loads.
gRPC keeps the TCP session open as long as possible to maximize throughput and minimize overhead, but the long-lived sessions make load balancing complex. This is more of an issue in an auto-scaled Kubernetes environments where with increased load new pods are added, but the client will stay connected to the same gRPC server pods, resulting in unequal load distribution.
The designers of gRPC had already thought about this problem and added support for a connection expiration policy on the gRPC Server. This expiration policy will force clients to disconnect and reconnect to another server. Connection expiration can be performed by causing connections to expire after a certain amount of time has elapsed. The Java gRPC library implements this with the maxConnectionAge() and maxConnectionAgeGrace() server builder options. These functions serve to limit and then forcibly terminate a gRPC channel, respectively. When a gRPC channel expires, the server will send an HTTP/2 GOAWAY, indicating that the client may not start new requests but may finish existing ones. At the end of the max connection age grace, the gRPC server will send a second HTTP/2 GOAWAY and close the channel.
We used fixed-size streams to send batches of requests and had to consider the following trade-offs:
- Larger stream size permit higher throughput but use more memory
- Smaller stream sizes reduce memory usage but cause the client and server to block more frequently while waiting for messages to be acknowledged.
Stream size played a very important role in load balancing. With larger streams, their distribution on different Ingestion Server pods was non-uniform and would result in a wide range of CPU utilization across Ingestion Server pods thereby affecting Kubernetes Horizontal Pod Autoscaling. The table below shows the summary results of our tests with different stream sizes.
We are using GKE and it required additional tuning for our applications.
Node Kernel Tuning
At high load nodes were becoming unresponsive because of low conntrack and threadMax limits. We increased CONNTRACK_MAX to 2 million, CONNTRACK_HASHSIZE to 0.5 million, and THREAD_MAX bumped to 4 million.
We were using regular disks and ran into IO throttling causing Docker daemon and Kubernetes to become unstable. We moved our workloads to node-pools with SSD to avoid throttling.
Node Memory Exhaustion
Some of our workloads were not tuned initially and did not have proper limits set, resulting in node instability and memory exhaustion and frequent docker and kubelet restarts. We profiled our workloads and tuned resource requests and limits to not exhaust node resources.
With all these changes and tuning, here are the results of one test where we ran a load of 800k rps and system auto scaled quickly to absorb the load.
The pipeline is very efficient. Istio ILB can easily handle 10,000 requests per core @ 65% average cpu utilization and Ingestion Frontend can handle 1000 requests per core @ 65% average cpu utilization.
This was a marathon effort by gRPC Client, API Gateway and Ingestion team members namely
Have questions? Reach out on Twitter or email: