Bioyino — a distributed, scalable metric aggregator

So, you collect metrics. So do we. Yep, we also collect them. We can assure you that they are not useless at all, business guys do need them as they take these metrics into account while making crucial decisions. Today we are going to talk about our front-end solution of metric collection — statsd-compatible server called Bioyino. You are in for a story how and why we wrote it and what was wrong with brubeck, the statsd solution we used before.

We have used Github’s Brubeck as our front-end UDP collecting solution during a long time (check our later posts: 1, 2). It’s written in C, a very simple code from the code`s point of view (which is important when you want to contribute), and, what is most important is that it was able to handle our huge loads of 2m metrics per second(MPS) at peek. Brubeck’s documentation states the support or 4m MPS with notice. The notice is about Linux network tuning which is required to get such a big value. Apart from all the advantages above, there are some serious shortcomings associated with this solution.

Shortcoming #1. A developer — Github, stopped supporting the project after publishing it. They didn’t respond to issues, didn’t merge PRs, including ours. As for the moment of writing this article, the project became a bit more alive again (starting from around Feb 2018), but before this time, it had been experiencing an almost 2-year period of total inactivity. We also found a comment on one of PRs with one of the authors saying they only accept PRs useful for Github. That could become a serious stopper for us in the future.

Shortcoming #2. The precision of the data. As you can see from the code, that brubeck samples 65536 metric values per metric only. In our case, we get a lot more than this value during aggregation period (1 527 392 is that we’ve seen at peak). As a result of such sampling “max” and “min” aggregates become useless:

(what it looks like)
(what it should look like)

For the same reason, sum aggregation is incorrect as well. Append here the 32-bit float overflow bug (which is still not fixed by the way) leading to server segfault.

And, at last, The Issue #X. The one, which we are ready to put on all 14 statsd implementations we were able to find at the moment of writing this article. Imagine some infrastructure growing big enough, that number of metrics collected there exceeds theoretical limit ot 4m MPS significantly. Imagine even, it hasn’t grown yet, but collected metrics has already become significant to some big bosses. Imagine the level of importance where 2–3 minute absence of data is critical enough for managers to fall into deep depression. Obviously, treating depression is not what techs are doing best, so we prefer to find a technical solution to resolve this problem. The solutions are well known and obvious:

At first, the fault tolerance — we don’t want the single failure on one of the servers to create a psychiatric zombie apocalypse at the office.

At last — the scalability. We need to find a way to receive more that 4m MPS without digging deep into network stack while being able to grow vertically to the neсessary size.

Since we knew we had the scalability buffer already, we decided to start with the fault tolerance. “Hey, it’s just a fault tolerance, we did this hundreds of times before, it’s simple”, we thought. So we took two copies of brubeck and ran it in parallel. For doing that we wrote a small UDP traffic multiplexing utility. So the fault tolerance problem seemed to be solved, but… ehmm.., not very good. It all seemed to go well: every brubeck instance got its own variant of aggregation and sent it to Graphite every 30 seconds overwriting previous interval (Graphite is able to do things like that). If one server caught a failure, we had another one containing full copy of aggregated data. There was a problem here: the 30-second aggregation periods were not synchronised between nodes, so every time server had a problem or just stopped for maintenance, you could see the “saw” on the graphs, because one of last aggregations was lost and didn’t get overwritten. The same thing happened when stopped server was relaunched.

Also, the scalability problem was not solved with this scheme because we still got 2–4m MPS per server and still cannot increase them. If you try to find a solution and try to dig some snow (we have snowy winters in Russia!) you may come to the same idea as we came to: we need a statsd that can work in distributed mode. This means we need to keep metrics in sync between nodes considering their timestamp. “Of course there is such a solution”, we said and went googling…. And found nothing. Digging throught documentation of many implementations(as of 11.12.2017) we found literally nothing! Seems like other developers and sysadmins have never met such a problem yet or said nothing about their solutions.

And then we recalled we wrote a toy statsd implementation for a hackaton project. We called it Bioyino, which means nothing because it’s the name the hackaton script generates for each project semi-randomly. And then it crossed our minds, that we need our own statsd implementation. Here’s the arguments for it:

  • because there’s too little statsd clones in the world,
  • because we can get the fault tolerance and scalability level that we really need and solve problems mentioned above, at least the metric sync and metric sending conflicts,
  • because we can ensure the precision, that is better than the brubeck gives us,
  • because we can add more useful statistic information about incoming and outcoming metrics, which was almost absent in brubeck,
  • because we have got a chance to program our own high performance distributed scalable application, which is not a clone of another high perfor… you get it.

What language to choose? Of course Rust! but why?

  • because there was a prototype,
  • because the author of this article already knew Rust and was nuts about writing an open sourced solution in it,
  • because we cannot afford GC languages that affect the almost-real time traffic of infinitely incoming metrics,
  • because we needed top performance comparable to C,
  • because Rust gives us fearless concurrency without overhead, and if we wrote this in С/С++ (which we definitely know worse), we would surely get more security issues, race conditions etc.

There was an argument against using Rust. Our company has no experience in projects on Rust. And we are not going to use it in our main project in the future. So, there were serious concerns. But we ran the risk of trying it.


The time has running by….

At last after some unsuccessful trials, the working solution has been found and it looks like this:

Each node receives its own set of metrics collecting them in memory. Note, that some metrics are not aggregated yet, only simple ones. Because for some types (mostly timers) one needs the full set of metrics for correct aggregation to be made. Nodes interact with each other using some protocol of distributed lock or consensus, which allows only one node to be a Boss and to send metrics to backend. In the current implementation the Hashicorp Consul is used, but in the future the author’s ambitions grow to an own implementation of Raft, where the leader of consensus is considered such a boss instead of lock acquiring. Apart from having consensus, all nodes send each other a snapshot of pre-aggregated metrics within a relatively short interval (around 1 second by default). The snapshot contains only a set of metrics received inside this short interval. What we’ve got using this scheme is that the requirements of scalability and fault tolerance all met: every node has a copy of all metrics, but is not limited by incoming rate because they are already aggregated and take much less space (around tens of megabytes in our case). They sent over TCP, not UDP and using a fast binary protocol, so the resources for cloning compared to UDP are significantly reduced. As a bonus, we reduce the load of a backend because we don’t do any rewrites. The amount of data lost because of server failure or restart is also way lower — around 1 second of data (and may be lowered by tuning). To distribute incoming metrics in that case we can use a simple network device able to do Round robin distribution. Basically, almost any production grade device can achieve this with a speedway greater than 4m MPS because it doesn’t look into packet content and metrics can sometimes come in a batch of many in one packet. So, in the near future we don’t expect any performance problems at a network level. In case of a server failure, the same networking device is able to do a fast enough (1–2 seconds) detection of that fact, removing the host from rotation. As a result, the passive (i.e. non-leader) nodes can be turned off without any consequences, but the leader node still gives a small anomaly because we lose a little part of metrics for a detection interval. Having nodes talking to each other, this issue can be solved relatively easy, for example using some kind of synchronization messages.

Now, how Bioyino looks like inside. It’s multithreaded by all means, but the inner architecture is a bit different from such of a brubeck. The threads in a brubeck are symmetrically doing the same thing being responsible for both — network interaction and metrics aggregation. In Bioyino there are two groups of worker threads: network workers and counting workers. Such separation gives a flexibility in managing application depending on incoming metric types. If you need an intensive computation, you just add counting threads, if you have lots of networking packages incoming (like we do), you can add network workers, without increasing counting ones. The configuration for the result described above uses 8 networking threads and 4 counting threads.

The computational part is pretty boring. Buffers filled in networking workers are distributed between counting workers being parsed and aggregated there. When requested, metrics are popped out from the storages, go to final aggregation and are sent to graphite. All parts except the networking one are asynchronous and based on tokio framework.

The networking part is a bit more interesting and it produced way more problems during the development. The main point of making networking threads separate was the intention to reduce the time not spent reading the data to buffers. We have tried async UDP and usual recvmsg syscall, and they didn’t work. The first one is eating too much user-space CPU for handling events, while the second one produces too much context switches. That’s why the current solution uses recvmmsg syscall empowered with big buffers. Support for regular UDP is still left for non-highload cases, where there is no need to use recvmmsg. The multimessage mode allows us to achieve the main goal: most of the time the network worker thread is busy doing it’s job: pulling the data from socket queue placing it into user space buffer. Only few amount of time is spent out of band when thread needs to give the filled buffer away to counting worker queue. The socket queue in this mode is not filled, and UDP drop rate is pretty low.

If you may want to try Bioyino someday, this may be confusing to you when you test it on low amount of metrics. Those metrics may not come to Graphite at all because they are still in the buffer, and even not parsed! For this case we have bufsize and task-queue-size parameters, that we recommend to keep low so buffer if flushed more often.

At the end, here’s some graphs for graph-lovers:

Ingress rate in metrics, per server: more than 2m MPS
Turning off one of the nodes, where you can see all traffic being redistributed to another node
Egress metrics statistics. Note the only one raid boss-node sending them
Useful internal statistics of subsystems and errors in different parts of the daemon
The detailed statistics about number of updates for each metric (the metric names are under NDA, sorry)

So, what is our future plan? First of all, write the f…ng code, dude! The project was intended to be opensource from it’s very beginning and is promised to stay so all it’s lifetime. Our plans consider moving to the internal version of Raft, changing the protocol between peers to more compatible one, adding more internal statistics, more metric types and many other improvements. For sure, we appreciate any kinds of the project development, like PRs, new issues, which we will try to answer and fix.

That’s all folks, thanks for reading, «pokupayte nashikh slonov».

«Pokupayte nashikh slonov!»