We have been developing our monitoring system for two years. Click to…

Vladimir Kolobaev
AvitoTech
Published in
8 min readJan 21, 2020

Hi everybody. Earlier I wrote about the organization of a modular monitoring system for microservice architecture and about the transition from Graphite+Whisper to Graphite+ClickHouse for metrics storage under high loads. Then my colleague Sergey Noskov wrote about the very first element of our monitoring system — Bioyino, our own distributed scalable metrics aggregator.

The time has come for an update about how we deal with monitoring at Avito — the most recent article dates back to 2018, and since then, there have been some interesting changes in the monitoring architecture, trigger and notification management, data optimization in ClickHouse, and other innovations that I want to tell you about.

But first things first.

Back in 2017, I showed the interaction diagram of the components that applied at that time, and I would like to return to it so that you do not have to switch tabs in your browser:

Since then, the following happened:

  • The number of servers in the Graphite cluster has grown from 3 to 6. (56 CPU 2.60GHz, 384GB RAM, 10 SSD SAS 745GB, Raid 6, 10GBit/s Net).
  • We have migrated from brubeck to Bioyino — our own implementation of StatsD in Rust — and there is an entire post about that. Though, after the blog post had been published, we added support for tags (Graphite) and Raft for choosing a leader.
  • We included support for using Bioyino as a StatsD agent and placed such agents next to the monolith instances where it was necessary in k8s.
  • We finally got rid of the obsolete monitoring system Munin (formally we still have it, but the data from Munin is no longer used).
  • Data collection from Kubernetes clusters was channeled through Prometheus/Federations, since Heapster is not supported in the newer versions of Kubernetes.

Monitoring

Over the past two years, the number of metrics collected and processed in Avito has grown by a factor of 9.

The share of occupied server storage space has also soared, and we make efforts to reduce it. This is visible in the chart below.

What exactly are we doing?

  • We have applied complex compression methods to the data columns.
ALTER TABLE graphite.data MODIFY COLUMN Time UInt32 CODEC(Delta, ZSTD)
  • We continue to reduce the temporal resolution of data older than five weeks from 30 seconds to five minutes. To do this, we have set up a cron task, which approximately once a month runs the following procedure:
10 10 10 * * clickhouse-client -q “select distinct partition from system.parts where active=1 and database=’graphite’ and table=’data’ and max_date between today()-55 AND today()-35;” | while read PART; do clickhouse-client -u systemXXX — password XXXXXXX -q “OPTIMIZE TABLE graphite.data PARTITION (‘“$PART”’) FINAL”;done
  • We have sharded data tables. Now we have three shards with two replicas each with a metric name-based hash as a shard key. This approach enables us to run rollup procedures, since all values for a specific metric are within the same shard, and the disk space usage is uniform across all shards.

The schema of the Distributed table is as follows:

CREATE TABLE graphite.data_all (
`Path` String,
`Value` Float64,
`Time` UInt32,
`Date` Date,
`Timestamp` UInt32
)
ENGINE = Distributed (
‘graphite_cluster’,
‘graphite’,
‘data’,
jumpConsistentHash(cityHash64(Path), 3)
)

We also assigned the default user read-only rights and reallocated the execution of write procedures to the tables to a dedicated user systemXXX.

The configuration of the Graphite cluster in ClickHouse is as follows:

<remote_servers>
<graphite_cluster>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>graphite-clickhouse01</host>
<port>9000</port>
<user>systemXXX</user>
<password>XXXXXX</password>
</replica>
<replica>
<host>graphite-clickhouse04</host>
<port>9000</port>
<user>systemXXX</user>
<password>XXXXXX</password>
</replica>
</shard>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>graphite-clickhouse02</host>
<port>9000</port>
<user>systemXXX</user>
<password>XXXXXX</password>
</replica>
<replica>
<host>graphite-clickhouse05</host>
<port>9000</port>
<user>systemXXX</user>
<password>XXXXXX</password>
</replica>
</shard>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>graphite-clickhouse03</host>
<port>9000</port>
<user>systemXXX</user>
<password>XXXXXX</password>
</replica>
<replica>
<host>graphite-clickhouse06</host>
<port>9000</port>
<user>systemXXX</user>
<password>XXXXXX</password>
</replica>
</shard>
</graphite_cluster>
</remote_servers>

In addition to the write load, the number of requests to read data from Graphite has grown. The data is used for:

  1. Trigger processing and alert generation.
  2. Displaying graphs on screen in the office and on laptops and PCs of the company’s growing staff.

To prevent the monitoring system from collapsing under this load, we used another hack: we store the data for the last two days in a separate smaller table, and we redirect all the read requests for the last two days to that table, thus reducing the load on the main shard table. In this smaller table, we also applied a reverse metric storage scheme, which greatly accelerated the search for the data stored in it, and set up daily partitioning. The schema of the smaller table:

CREATE TABLE graphite.data_reverse (
`Path` String,
`Value` Float64,
`Time` UInt32 CODEC(Delta(4), ZSTD(1)),
`Date` Date,
`Timestamp` UInt32
)
ENGINE = ReplicatedGraphiteMergeTree (
‘/clickhouse/tables/{cluster}/data_reverse’,
‘{replica}’,
‘graphite_rollup’
) PARTITION BY Date
ORDER BY (Path, Time)
SETTINGS index_granularity = 4096

To direct data to it, we added a new section to the configuration file of the application carbon-clickhouse.

[upload.graphite_reverse]
type = “points-reverse”
table = “graphite.data_reverse”
threads = 2
url = “http://systemXXX:XXXXXXX@localhost:8123/"
timeout = “60s”
cache-ttl = “6h0m0s”
zero-timestamp = true

To delete partitions older than two days, we wrote a cron task. It looks like this:

1 12 * * * clickhouse-client -q “select distinct partition from system.parts where active=1 and database=’graphite’ and table=’data_reverse’ and max_date<today()-2;” | while read PART; do clickhouse-client -u systemXXX — password XXXXXXX -q “ALTER TABLE graphite.data_reverse DROP PARTITION (‘“$PART”’)”;done

To read data from the table, the following section was added to the configuration file graphite-clickhouse:

[[data-table]]
table = “graphite.data_reverse”
max-age = “48h”
reverse = true

As a result, we have a table with 100% of the data replicated to all six servers that process the entire read load from requests with a window of fewer than two days (and these are 95% of the total). And also we have a sharded table with 1/3 of the data on each shard for reading of all historical data. And even if the number of such requests is much smaller, the load from them is much higher.

What about the CPU? As a result of the increase in the volume of written and read data in the Graphite cluster, the total CPU load on the servers has also increased.

I would like to draw attention to the following aspect: half of the CPUs are involved in parsing and primary processing of the metrics in carbon-c-relay (v3.2 of 2018–09–05, responsible for the transport of metrics), hosted on three of the six servers. As you can see from the chart above, it is these three servers that are on the top.

Alerting

We still use Moira and the custom moira-client as an alerting system. For flexible management of triggers, notifications, and escalations, we use a declarative description called alert.yaml. It is generated automatically when creating a service through PaaS and is hosted in its repository.

To handle alert.yaml, we developed a harness based on moira-client and called it alert-autoconf (we plan to make it open source). The service build at TeamCity has a step exporting triggers and notifications to Moira via alert-autoconf. When committing changes to alert.yaml, automatic tests are run to check the validity of the yaml file and send requests to Graphite for each metric template to verify validity.

For infrastructure teams that do not use PaaS, we have set up a dedicated repository called Alerting. It contains the following structure: Team/Project/alert.yaml. For each alert.yaml, we generate a separate build in TeamCity, which runs tests and pushes alert.yaml to Moira.

Thus, any staff member can manage their triggers, notifications, and escalations using a single approach.

Since we already had triggers configured via the GUI, we implemented the function of uploading these in yaml format. The contents of the received yaml-document can be inserted into alert.yaml with practically no additional transformations, and then push the changes to the master. During the build, alert-autoconf understands that the trigger already exists and registers it in our registry in Redis.

Recently, we have set up a 24x7 team of support engineers. To send triggers to them for maintenance, one only needs to fill out in alert.yaml the description of “what to do if you see this”, add the tag [24x7], and push the changes to the master. After the roll-out of alert.yaml, all the triggers described in it will automatically be monitored 24x7 by the engineers. S stands for Simplify!

Collecting business metrics

Since the time the last blog post about collecting and processing our business metrics was published, our Bioyino has undergone major improvements.

1. Instead of choosing a leader through Consul, the built-in Raft is used.

2. Graphite tags are handled correctly.

3. Now you can use Bioyino (StatsD-server) as an agent.

4. The “set” format is supported for counting unique values.

5. The final aggregation of metrics can be done in several threads.

6. Data can be sent to Graphite in chunks via multiple parallel connections.

7. All bugs found have been fixed.

This is how it works now.

We began to actively implement StatsD agents next to all the major metric generators: in containers with monolith instances, in k8s pods next to services, on hosts with infrastructure components, etc.

StatsD agent is located next to the application. It receives metrics from the application over UDP as before, but no longer uses the network subsystem (as a result of optimizations in the Linux kernel). All events are pre-aggregated, and the collected data is sent every second (the interval can be adjusted) to the main cluster of StatsD servers (bioyino0[1–3]) in the Cap’n Proto format.

Further processing and aggregation of metrics, leader selection in the StatsD cluster, and the transmission of the metrics by the leader to Graphite are practically unchanged. You can read about this in detail in a recent post.

The new figures are as follows:

Graph of received StatsD events
Graph of metrics sent from StatsD to Graphite

The bottom line

The general interaction diagram between the monitoring components presently looks like this:

Total number of metric values: 2,189,484,898,474.

Total storage depth of metrics: 3 years.

Number of unique metric names: 6,585,413,171.

There are 1,053 triggers that serve from 1 to 15k metrics each.

Plans for the near future:

  • begin converting product services to a tagged metric storage scheme;
  • add three more servers to the Graphite cluster;
  • make Moira compatible with persistent fabric.

I look forward to your comments and questions.

--

--