Note: I wrote this in summer 2015 while working at Jut. It was intended as content marketing material and was published on the http://thenewstack.io/.
At Jut, we built a streaming analytics data stack— one place to send, store, analyze, and visualize any combination of logs, events, and metrics.
This post lays out the blueprint for the pieces we used and how we put them together. We’ll cover:
- Ingest: how to bring in many different types of data streams.
- Index and querying: efficient storage and unified queries.
- Wiring it up: how data flows through the system.
- Optimization: making queries fast.
We hope that this will be useful and can server as a high-level orientation for those who are getting started on the exciting (but often daunting, at first) task of a building a similar system.
When it comes to operational analytics and monitoring, there’s no shortage of relevant data types, formats and transport protocols. Users need to be able to point all sorts of disparate sources and senders at this analytics system. For example, that data might include any of the following:
- Custom application events.
- Container-level metrics and logs.
- Statsd or collectd metrics.
- Webhook events from third parties, like GitHub or Stripe.
- Application or server logs.
- User activity.
While these all have different formats and representations, they need to be transformed into a uniform representation inside the system. We went with a simple and flexible representation: each record (“point”) is a set of key/value pairs, which can be conveniently represented as a JSON object. Points must have a “time” field, and metric points also have a numeric “value” field; other than that points can have any “shape.”
A frontend HTTP server (running NGINX) receives incoming data, which is de-multiplexed and sent on to local, per data-type “connector” processes (running Node.js). These processes translate the incoming data into the internal representation described above, then publish them into a Kafka topic, from which they can later be fetched for indexing and/or live processing.
Indexing and Storing Data
All this data needs to be stored in some sort of database or indexed storage. And the more analytics-like querying that database supports, the better. If this system was only for logs and events, then we could just have chosen ElasticSearch. Similarly, if it was only for metrics, a time-series database (TSDB) is all we’d need. But to handle both categories efficiently, each one needs to be backed by a different data store.
Logs and Events go in ElasticSearch
We use ElasticSearch as our events database. These events can have varying “shapes,” depending on which source they come from. We use a number of ElasticSearch APIs to great effect, notably the Query and Aggregations APIs.
Metrics go in Cassandra and ElasticSearch
While metrics could, in principle, be entirely stored in ElasticSearch (or any other database), it is far more efficient to use a specialized database with a data model that matches the inherent structure and redundancy of metrics data.
The obvious approach would be to take an existing open source time series database (TSDB). That’s what we did initially — we used an open source TSDB that used Cassandra as its backend. The challenge in this approach is that the TSDB has its own query API, which is different from ElasticSearch’s. And with this underlying disparity, presenting a uniform search and query interface across events and metrics is hard.
This is why we changed course and decided to write our own TSDB on top of both Cassandra and ElasticSearch. Specifically, we store the time/value pairs in Cassandra and the meta-data in ElasticSearch, and we have a query and management layer on top. This way, searching and querying for events and metrics can be done uniformly inside ElasticSearch.
The Stream Processing Engine
So now we have an ingest pathway and some databases. Are we ready to tack on a frontend app and play with our data? Not yet! Even though ElasticSearch can do some log and events analytics natively, we still need a processing engine. Because:
- We want one unified way to access events and metrics, for realtime or historical data.
- For certain use cases (monitoring, alerting) we need to be able to process live data in real time, as it comes in.
- Metrics! We want to do more then just search for metrics and read them out — metrics are there so that we can crunch them.
- Even for events, we need a more general processing capability than the one provided by the ElasticSearch APIs. For example, to join data from different sources, or do string parsing, or custom aggregations.
This is the point where things depart from simpler stacks like the ELK stack, and start to get really interesting.
You could spend days (or more!) studying up on how others have built data pipelines, learn about Lambda, Kappa and other exotic-sounding data architectures. There’s actually a lot of really good material out there. We’ll cut to the chase and throw our hat in the ring: we think the way to go is a unified processing engine that’s used for both live streaming and batch computation. In that regard, we’ve always fully espoused the ideas that are nicely articulated here and here.
A simple flowgraph that joins a metrics and an event stream after doing some processing on each.
Here, unlike for storage and ingest, we built the processing engine from the ground up — not because there aren’t other stream processing engines out there, but because of our focus on querying and performance, which we’ll discuss in separate sections below.
More specifically, we built a stream processing engine that implements a dataflow processing model. In a dataflow model, computation is expressed as a directed graph of operators that transform inputs into outputs by applying operators such as transforms, aggregations, windowing, filtering or joins. Dataflow is a natural way to model queries and computations in a way that is composable, suited to both live and batch, and maps directly to a distributed runtime.
Of course, unless you’re really looking for a big new project to take on, you’ll probably want to use an open source streaming engine. We’d recommend taking a look at Riemann, Spark Streaming or Apache Flink.
Expressing Queries and Computation
Ok, so we’re using a stream processing engine that’s based on a dataflow model of computation. But how does a user express queries and create such dataflow graphs?
One approach is to provide an API or embedded DSL. That API needs to provide ways to query and filter data, define transforms and other processing operations, and most of all, provide a way to assemble individual processing stages into flow graphs. Each of the projects mentioned above have their own APIs, and while individual preferences may vary, one common challenge of APIs is that APIs aren’t accessible to non-developers like SQL analysts or Excel users.
One likely course of action, at this point, is to declare that those users will have to interact with the system via tools (for example, a simple web app) that are written to these APIs.
Another approach is to provide a simple query language. This is what we did at Jut. Because there’s no established query language for dataflow (like there’s SQL for relational queries), we created a dataflow query language called Juttle. At its core, Juttle consists of flow graph language with simple syntax to declare processing pipelines like the one above. It has such primitives as search, windowing, join, aggregations and group-by, with simple syntax for using them. Of course, before processing data in a flow graph, you need to fetch it — Juttle allows you to define queries that fetch data from any combination of events and/or metrics, live and/or historical, all with the same syntax and constructs. A common structure for simple queries is `query | analyze | view`. Here’s an example which computes per-minute counts of web-logs seen for each status code, and displays the results in a streaming time chart.
read -from :1 day ago: data_type = 'web_log'| reduce -every :minute: count() by status_code| @timechart
Note the familiar shell-like syntax for chaining using a pipe operator — there’s also additional syntax to express non-linear topologies where the stream is forked and merged. And Juttle also has constructs for modules, functions, and aggregators — it’s basically a simple dynamic language with special syntax for defining flowgraphs like the one above.
Putting the Pieces Together: An Example With Anomaly Detection
So far we’ve taken a component-centric lens — we’ve discussed building blocks and their role, but without saying much about how they fit together. Now we’re going to switch to a data-centric lens, to see what pathways need to be setup to support live and historical queries. Let’s use the example of running an anomaly detection algorithm. It’s a good example because we need to query historical data to train the underlying statistical model, live streaming data to test for anomalies, and then we need to write back results to the system while alerting on anomalies.
But before we do any sort of querying, we need to wire up the ingest path so that incoming data is written to indexed storage. This is done by the import service, which amongst other things implements the write path of the time series database, splitting metrics data and meta-data across ElasticSearch and Cassandra.
Indexing incoming data.
Now a user comes along and launches an anomaly detection job. This job first reads historical data, which the processing engine does by querying the underlying databases directly. Depending on the query and data this may include optimization (discussed below), and exercising the read paths of the events and/or metrics databases.
An events read is done against ElasticSearch only, while a metrics read requires issuing metadata queries to ElasticSearch, pulling the metric values out of Cassandra, and combining the results to produce the actual metric points.
Historical, live and write-back flows with an anomaly detection query.
The historical data covers some past range up to “now.” After that data has been streamed into the flow graph, the processing engine switches this flow graph to live — reading data as it arrives into the system and streaming it into the flowgraph. In order to do this, the processing engine taps directly into the incoming stream of points from the import service. Note that this cut-over must be done carefully to avoid dropping or duplicating points.
At this point we have a trained anomaly-detection flow graph running over live data. When an anomaly is detected, we want it to send an alert to some external system, which is done by having the processing engine POST to some external HTTP service. In addition to sending out this alert, we’ll also want to keep track of it inside the system. In other words we want the ability to write back data streams into the system. Conceptually this is done by having the processing engine pipe back data into the ingest pathway.
Making it Fast
So we have a working system with an ingest pathway and some databases and a processing engine. Are we ready to tack on a frontend app and play with our data? Well, yes, but the problem is we’ll have slow query performance for some historical queries.
So let’s revisit the notion of a “unified processing engine” for a second. We explained that it is the same system doing both historical and live processing with the same constructs, abstractions and queries.
The performance challenge comes from the fact that there’s far more historical data than live data. For example, assume we have one million points/sec coming into the system, and a processing path that is fast enough to run a live query on these points as they come in. Now take the same query and run it over the past day’s worth of data — it will need to process tens of billions of points all at once (or, at least, as fast as they can be read from storage). Assuming the computation is distributable, we could address this by adding on compute nodes, but in the best case, this would be inefficient and costly.
So this is where optimization comes in. There are many ways to optimize dataflow queries. Some of them consist of transforming the query itself — for example, moving filters or aggregations upstream when that can be done without changing query semantics. The one we’ll talk about here is a form of pushdown optimization, which pushes filtering and processing down into the databases wherever possible. This requires doing the following:
- Automatically recognizing which parts of a query can be pushed down into the database.
- Taking those parts and translating them into the query language of the target database.
- Running the backend query and injecting the results at the right point in the dataflow graph.
As an illustration, let’s take the following Juttle program, which simply counts the number of events imported daily from all production hosts, over the past month.
read -last :month: hostname~’prod-*’
| reduce -every :day: count()
Without optimization, we would need to read all points out of the database, removing those points whose hostname field does not start with “prod-”. Then we would bucket each point by day and maintain a count for each bucket. With optimization, all these operations can be pushed down to ElasticSearch, using a combination of its filter and date aggregation APIs. The net result being that only a tiny fraction of the total number of points are actually read into the processing engine.
Of course, while there are many other possible optimizations, not all Juttle programs can be pushed down into the database. Far more common is that a program is partially optimized, where some of the query is pushed down into the backend, and some still happens in the processing engine. But in our experience, a large class of useful and common queries can be sped up this way by orders of magnitude.
We’re done! Well, we’re done if we’re willing to do without a visualization layer, and only query the system via an API. But building a client app to create queries, stream and visualize data, and compose dashboards is a whole other can of worms, so we’ll leave it for another day.
For now, let’s just summarize what we’ve seen on the way to building this data hub:
- An ingest pathway that accepts incoming data from heterogenous sources, and translates it to a uniform representation, and stores it for later consumption. (In the case of Jut, this is built on top of Kafka).
- Databases for events and metrics. For Jut we use ElasticSearch for events and have built a custom metrics database on top of Cassandra.
- A processing engine (or two, if you’re going with a lambda-ish architecture).
- An API or query language to run queries on the system.
Phew. If you’ve gotten this far, congratulations. It’s a long, but fun, journey. And either way, even if you’re building your own, check out Jut. You might just be inspired.