How Humio Does Scale-Out Clustering

Kresten Krab
Humio
Published in
5 min readJun 11, 2018

When scaling a Humio cluster, there are a two main concerns that are relevant to optimize: ingest and search.

The cluster configuration allows:

  • scaling out to deal with high ingest
  • scaling out to improve query performance

In a Humio cluster, some nodes are arrival nodes (they actually receive the data from the real world), and you can specify which nodes should deal with ingest processing, and which nodes to use for search. The nodes responsible for search also store the underlying segment files. In the default setup all nodes are equal — they all play the roles of arrival, ingest and search nodes, so the load gets evenly distributed across your nodes, which is a good starting point.

We discussed part of the ingest pipeline in a previous post, but left out how we scale out ingest to a cluster.

Data Arrives

Any node in the cluster can act as point of arrival, as all nodes expose the Humio HTTP API. You’ll have to control which nodes are arrival nodes in a load balancer or using DNS, or using some other means outside of Humio.

When data arrives, the request is validated: the ingest token is validated, and the request is parsed. If, for instance, you send nonconforming JSON, then the request is rejected.

After parsing, we know the destination dataspace and the full set of tags, and thus we’ll know which data source the event belongs to. The first time an arrival node observes a new data source, this information is hashed to decide which ingest partition to send the data to; each ingest partition has it’s own Kafka partition used to deliver the log entry to an ingest node.

In larger setups, we recommend having a designated ‘arrival node’, which is neither resposible for ingest processing nor search, only arrival processing.

Arrival nodes are also responsible for coordinating, gathering, and merging the final result of queries that span multiple nodes.

Scaling Out Data Ingest

The next link in the chain is an ingest node which reads data off Kafka, as it is sent there from an arrival node. An ingest node is responsible for two things: assembing segment files, and processing real-time queries.

You can control the assignment of ingest-partitions to nodes using the ingest-partitions HTTP api. To get the current ingest partition assignment you can run something like this on any one of the nodes in the cluster:

> export TOKEN=`cat humio-data/local-admin-token.txt`
> curl -H "Authorization: Bearer $TOKEN" http://localhost:8080/api/v1/clusterconfig/ingestpartitions > ingest-partitions.json

The task of an ingest node is then to receive logs on it’s designated ingest partitions and create segments out of the data, and process live queries.

The assembly of segment files involves maintaining a 1MB work-in-progress buffer, which is flushed to the segment when the buffer is full or when 10 minutes has passed. As you can imagine, this work is a lot less expensive than having to build a full free-text index.

Whenever a 1MB block is done and flushed, the ingest queue may be pruned to the index of the oldest entry in that block. The Kafka ofsset of the last event to go into a daa block is simply stored in the block. In case of a crash-restart, the ingest node will re-read enqueued log entries up from the last indes safely stored in the last block of a incomplete segment.

Once an entire segment is full (which means that it has a configured number of blocks, defaulting to 1000), the segment is passed on to a storage node. Storage nodes are responsible for processing interactive searches of ‘old’ data, whereas ingest nodes participate in interactive search of data segments under assembly.

In addition to building segments, ingest nodes also deal with real-time queries. This involves pushing all events through the pipelines corresponding to current real-time queries.

Since a real-time query may involve data being processed on a number of nodes (it may involve multiple data sources that end up being ingested on distinct nodes), the ingest nodes keep just the part of the query that relates to the data arriving on that node. These partial results are sent to the coordinating arrival node on a periodic basis, where they are merged to form a full result. The coordinating arrival node is typically the node that received the HTTP request to initiate the real-time query, but may be some other node in case of e.g. alerts.

Scaling Out Search

The last element of scaling out is search. This is both ‘historic’ searches, and the loading of events needed to load data for the initial time window of real-time search.

As described above — when a segment file is “full” it is copied to one or more storage nodes, from which it will also eventually be searched. Thus, the more storage nodes, the faster search.

To place a segment, it is hashed and using the storage-partitioning scheme, placed on one or more nodes. The current allocation of storage partitions can be queried using the segment-partitioning HTTP API:

> export TOKEN=`cat humio-data/local-admin-token.txt`
> curl -H "Authorization: Bearer $TOKEN" "http://localhost:8080/api/v1/clusterconfig/segments/partitions" > segments-partitions.json
> cat segments-partitions.json | json_pp
[{ "hosts" : [1,2], "partition" : 0 },
{ "hosts" : [2,3], "partition" : 1 },
{ "hosts" : [3,1], "partition" : 2 }]

The current placement of segments, the relevant time interval for data in a segment, as well as the data sources those segments belong to are known by all nodes in the system.

When an interactive search is received by an ‘API node’, it determines which segments are relevant for the search, and select which hosts should be used for this search. This is chosen so that the search is spread across as many hosts as possible so as to increase the processing power.

As a consequence of this, you will observe that large queries run faster in terms of GB/sec, because they end up being assigned to more hosts/CPUs, and thus exhibiting more parallelism in the search. We only parallelize based on whole segments, so if you’re only searching through ~1GB of data that happens entirely sequential. But that’s also something a single core can do in a few hundred milliseconds, so it’s not an issue.

In this blog post you can read about how we make search processing the individual segments fast.

--

--