Indexing and Querying Telemetry Logs with Lucene
Today’s modern web applications are typically composed of tens to thousands of individual services, each providing a subset of the application’s functions. This microservice architecture provides distinct advantages (isolation of concerns, independent scalability, faster iteration and deployment, congruent software and team architecture, etc.), but it comes at a cost. Understanding application behavior — including resource utilization, performance, bugs, etc. — requires investigating a complex, distributed system. Local developer tools such as IDEs or debuggers are not sufficient for analyzing and understanding the behavior of unsynchronized processes, written in a number of different languages, running on heterogeneous infrastructure. Telemetry provides a solution.
Telemetry — the distributed-systems analog to IDEs and debuggers for local development workflows — allows developers and SREs to understand performance, health, and usage patterns of applications. Standard debug tools are replaced by distributed metrics and log collection and aggregation infrastructure, including such tools as Datadog, OpenTracing/Zipkin, Grafana, Influx, and Prometheus. This array of tools makes it possible to achieve end-to-end telemetry of system health and log information, which is a critical piece of infrastructure for any distributed, large-scale application.
But this end-to-end understanding doesn’t come for free. To make use of the various log streams, indexing, search, and analysis infrastructure must be built. For example, a developer might want to investigate which API calls against a service have triggered certain known error states, as evidenced by an error log entry. Assuming that each service of each deployment of an application emits a stream of log lines, we must collect, index, and answer search queries against the entirety of logs.
In this post, we describe how we built a stable, scalable, and performant telemetry infrastructure by developing a system based on pre-computed Lucene indices and a combination of cold and hot storage for serving searches against these indices.
Components of a telemetry pipeline
A typical end-to-end log pipeline has the following stages:
Each service of an application is responsible for adding logging statements that convey pivotal information regarding its state, progress, success, and failure conditions. We formalize and standardize three aspects of log production across all services and applications:
- First, each log level (WARN, INFO, ERROR, etc.) is associated with semantics and alerting thresholds. For example, ERROR-level logs typically trigger pager duty alerts to the affected team.
- Next, we maintain guidelines explaining what type of information is acceptable to include in logs. For example, authentication tokens or passwords may never occur in logs.
- Finally, we define a JSON format for structured logs, including fields like originating host, datetime, message, log level, log type, etc. Libraries for commonly used languages (Java, Go, Python) transparently encode messages emitted from standard logging frameworks (e.g., SLF4J for Java services) into the JSON format.
The JSON-formatted logs are emitted to file or stdout depending on the environment. Per-service log collectors pick up all logs and push them to a global stream (think Kafka or AWS Kinesis).
Filtering and standardization
Before indexing logs from the stream, we filter logs by whitelist and blacklist: only a defined set of environments is allowed to submit logs, and logs must conform to syntax and content restrictions. Since the log schema may evolve over time, we harmonize logs with different schema versions by mapping them to the latest supported schema.
Finally, we index the filtered and standardized logs. Indexing is done in anticipation of typical search workloads: we index the full-text log message and all of the structured fields, including datetime, log type, error level, originating host and environment, etc.
Developers and SREs interact with indexed logs via a custom UI or through APIs. For example, users query for a live stream of all logs from some service or environment, logs containing a particular token or string, logs corresponding to a call trace id, etc.
Elasticsearch telemetry pipeline
Let us now take closer look at the index and search stages of the log pipeline. As the need for a centralized log infrastructure for Palantir Foundry became apparent, we built a first version using readily available components and picked Elasticsearch for the Index and Search stages.
The architecture is simple: a number of indexing nodes read blocks of log lines from the stream (see above) and index each structured log line into an Elasticsearch cluster. Stream reads are acknowledged once the Elasticsearch put succeeds. Users and UIs use the Elasticsearch API to perform searches against the Elasticsearch cluster.
The advantages of the Elasticsearch-based solution include:
- Time to market: By assembling existing components, we were able to deliver a first version of the log infrastructure on the order of weeks.
- Documented, stable API: The indexing and search workflows use stable, battled-tested APIs that are readily accessible to all developers and teams involved.
- System management tooling: Elasticsearch provides system metrics that allow us to understand the health of the log pipeline, as well as tools and configuration for tuning the cluster behavior.
Reaching the limits of Elasticsearch
As the number of services and application deployments grew, so did the number of log lines ingested into Elasticsearch. By 2017, we were running a production cluster of over 100 machines to handle a log volume of multiple terrabyte per day. At this point it became apparent that Elasticsearch was no longer the correct backend choice for our log pipeline, not due to principled deficiencies of Elasticsearch, but rather due to an impedance mismatch between the design goals of Elasticsearch and the requirements for log indexing:
- We spent a disproportionate amount of time managing the Elasticsearch cluster. Routine cluster management activities such as node restarts and cluster and node resizing require a lot of handholding, and we experienced frequent outages.
- Retention is difficult to manage. Debugging and monitoring workflows require only a limited amount of log data; for example, the last 30 days. Retention is difficult to implement in Elasticsearch.
- All data is in “hot” storage. Elasticsearch stores all data on local hard drives, typically with redundancy. This is in contrast to standard access patterns: more recent logs are queried much more frequently than older logs, yielding a poor cost/performance trade-off.
- Indexing and search infrastructure is tightly coupled. Indexing and search workloads share the same Elasticsearch infrastructure and can thus not be scaled independently. However, indexing and search workloads typically have very different load characteristics: indexing load is roughly constant since it’s a function of the size of the log-producing applications. Conversely, search load depends on the number of concurrent users and thus spikes as a function of time-of-day and day-of-week. Further, an outage of the index pipeline implies an outage of the search capability and vice versa.
- Index and search throughput cannot be scaled dynamically. For example, assume the cluster is sized such that the steady-state indexing workload is handled at 75% of the maximum throughput. A planned or unplanned outage of the upstream log infrastructure of x minutes then requires 3x minutes for the indexers to catch up. It is not easily possible to increase the throughput of the indexers by adding additional temporary capacity.
We had hit the ground running with a general-purpose database, but reached its limits when our requirements grew in a direction that Elasticsearch wasn’t designed for: comparatively simple queries over high-scale structured, immutable streams.
Back to first principles: indexing and searching
Let us now revisit the requirements for the index and search infrastructure and then sketch an alternative architecture based on those requirements:
- Log data is presented as a stream of small (order of bytes to low kilobytes), immutable, structured records.
- Log data typically has stable yet steadily growing scale. The indexers should be able to handle occasional bursts; for example, due to outages or delays of the upstream log producers.
- More recent logs are more valuable and queried more frequently than older logs. It is acceptable for the search system to provide better search performance for recent logs, and worse performance for older logs. Logs can be deleted after a configurable retention threshold, e.g., 30 days.
- Searches are performed as keyword queries against the fields of the structured records. These can be exact match queries, sub-string match queries, range queries, or full-text queries.
A key difference between these requirements and Elasticsearch’s data model is the immutability of the indexed records: we never update log lines once produced or indexed. In terms of distributed systems design, this means that the main driver of the coordination and synchronization complexity in Elasticsearch vanishes and we can simplify the interaction points between the indexing and search components while maintaining the same eventual consistency guarantees. In particular, it is possible to precompute search indices and store them in cheap cold storage (e.g., S3) and then load them into search nodes in order to answer queries.
A second difference is the query complexity: in order to support non-local operations such as joins and aggregations, a distributed database requires a substantial amount of cluster coordination. This generally leads to decreased indexing performance and necessitates complex cluster maintenance protocols.
Here is a sketch of our revised index and search architecture.
In a nutshell, we decouple indexing and searching into two different phases: indexers consume the log stream, produce time- and space-bounded Lucene indices, and push them to cold storage. Search nodes fetch a subset of the relevant index shards into hot storage (local disk and memory) in order to serve queries. Let’s look at the components in more detail.
Assuming a sharded log stream (e.g., the AWS Kinesis or Kafka model), each Indexer consumes a time-ordered subset of the log stream (e.g., a Kinesis “shard”) and produces a time and space-bounded (e.g., the minimum of 1 hour and 10GB) Lucene index backed by local storage. Since the schema of the log records is known, we can use a static mapping from field to index configuration, e.g., how to tokenize, what data types to use, etc. When the time or space limit is reached, the indexers push the compressed Lucene index to cold storage and register corresponding metadata with the Index Catalog. Then it commits the position in the stream and repeats with a fresh, empty index.
The Catalog stores the list of produced index shards as a pointer to the cold storage location and index metadata such as log type and index start/end date. Catalog storage should be durable; we typically use Cassandra for this purpose.
Search data node
Each search data node is responsible for serving a subset of the precomputed Lucene shards. It fetches an allocation of relevant shards from the Coordinator (see below), then fetches the corresponding indices from cold storage, and exposes an RPC API for searching against the served indices.
The coordinator keeps track of the available search nodes and manages the allocation of relevant index shards to nodes. There are a number of dials that we can tune here, in particular the redundancy level of the allocation. Since all data is immutable, it is acceptable for multiple search nodes to serve the same index shard without requiring any complex synchronization logic. The index shards allocation respects per-index retention settings; for example, it may advertise shards holding data for the past 2 days for some log type, or for the last 7 days for some other log type.
Search aggregator node
The aggregator exposes an RPC query API (e.g., HTTP/JSON or protobuf) and forwards the incoming query requests to the data nodes. It learns from the Coordinator which data nodes serve shards relevant for a query w.r.t. the log type and the time window of the query. The aggregator implements the query language: it forwards filter queries, applies limits to the aggregated query results, and de-duplicates results (see below).
Since logs are immutable, it is straightforward to de-duplicate search results by their intrinsic id, for instance a hash of the log record. The need for de-duplication may not be immediately evident, but consider the following edge case as an example: an Indexer successfully pushes an index shard to cold storage and registers it with the Catalog, but the stream commit fails. Then another Indexer node may pick up the same log records from the stream and push them as an additional, partially duplicative index shard. This pattern is not unique to the log indexing workflow or the presented architecture: exactly once processing in distributed systems requires either coordinated transactions with a commit protocol, or idempotent downstream processing. Since our records are immutable the latter option is much simpler to implement.
The described data flow has a worst-case end-to-end latency bounded by the maximum index shard age, for instance 1 hour. This is of course insufficient for debug and log tailing workflows. To reduce the latency, we add a set of additional low-latency indices that consume a separate sharded copy of the log stream and produce local, short-lived indices for the latest few hours of log data. These co-located index and search nodes may look very similar to the Elasticsearch model at first glance, but have two key differences: first, they only keep a few hours worth of logs, and second, they don’t have to deal with any coordination complexity.
Retention and tunable cost-vs-performance
The presented system holds index data in three places: first, in the low-latency indices on disk or in memory, second in search data nodes on disk, and third in cold storage (S3). This storage hierarchy allows us to tune search performance against infrastructure cost as a function of the access frequency of logs by age: The vast majority of queries are against recent logs, often for the last hour, sometimes for the current day; only very infrequently, users are interested data older than a few days. We thus serve the bulk of queries from the low-latency indices holding the past hour worth of of data in memory or on fast SSD disks. Next, we serve queries against data from the past day from indices held by the search data nodes on disk. Finally, queries against older data are served by indices prepared by the search data nodes on demand: when such a query enters the system, the coordinator adds the relevant index shards to the allocation and the search UI indicates to the user that the relevant data is being prepared before the query is dispatched to the backend. The relevant shards are cached by the data nodes so that additional queries against the same time range can be dispatched instantaneously. We have found that most such on-demand queries can be answered within a few minutes.
The storage hierarchy offers three different retention windows that can be tuned to reach an acceptable cost/performance tradeoff: The maximum age of data in the low-latency indices, in the default allocation of the search data nodes, and in cold storage. The first two parameters affect performance only, while the latter dictates the maximum age of data available for users to search against.
The on-demand workflow clearly benefits from an indexing strategy allows us to load only a small number of index shards holding the subset of data relevant for the given query. The obvious parameter to tune here is the time range. Further, since logs can be attributed to the originating service or component and to the environment it was produced in, we shard indices by (time range, component, environment) and ask users of the on-demand flow to specify those parameters as part of their query. We have found that this presents an acceptable usability-latency trade off.
We are in the process of switching over from the Elasticsearch backend to the described Lucene backend. So far, the redesigned architecture looks promising: it is more stable and has significantly reduced the amount of compute resources required, given our current retention settings by about 50%. More importantly, the redesign is a critical step towards future-proofing our log infrastructure, as we anticipate a factor 2–3 growth in log volume over the coming year and are already hitting the feasibility limits of the Elasticsearch backend at today’s scale.
Retrospectively, the initial choice of Elasticsearch was correct: we hit the ground running with a stable data foundation and had time to iterate on the other pieces of the infrastructure (log formats and libraries, whitelisting/blacklisting, UIs, etc.) while already being able to serve queries against the service logs to our development teams.
The architecture redesign was a natural next step as our requirements changed and the data scale grew. It is also the starting point for implementing features such as granular access control, auto scaling, and rate limiting. It is really fun to witness how a small team of two or three engineers can rethink and rebuild a substantial part of our data infrastructure over the course of a few months. Don’t be scared to embark on such a mission, even if it means to replace a well-established system like Elasticsearch with more bare-bones components such as Lucene and custom coordination logic. Of course, it is always beneficial to stand on the shoulders of giants, but sometimes think twice to pick the suitable family of Titans.
Amr AM., Haithem T., Robert F.