InfluxDB v2 ve alert yönetimi: Dynamic Threshold Alerti nasıl oluşturulur?

Scaltas
TurkNet Technology
Published in
3 min readNov 29, 2023

Giriş

InfluxDB, time-series verileri için optimize edilmiş bir açık kaynaklı veritabanı sistemidir. Time-series verileri, belirli bir zaman aralığı içinde sürekli olarak toplanan ve kaydedilen verileri ifade eder. Sensör verileri, uygulama logları, enerji tüketimi gibi sürekli değişen veri setleri zaman serisi verilerine örnektir. Turknet’te cihazlardan toplanan ve zaman değişkenine bağlı olarak izlenmesi gereken birçok veri tipi için bu veritabanını kullanıyoruz.

Bu yazıdaki amacımız ise, InfluxDb’deki time-series ölçümler üzerinden dinamik anomaly tespitiyle ilgili bir çözüm sunmak. Öncelikle Influxdb 'de UI üzerinde kolaylıkla oluşturulabilen static threshold check’e bakacağız.

Örneğin;

“Bir cihazdan gelen ping response time 100 ms’den fazlaysa alert oluşturarak vereceğim endpointe gönder”

taskını static threshold tanımlayarak oluşturabiliyoruz. Ancak, gerçek dünyada bu herhangi bir sorunumuzu çözmüyor. Çünkü her time-series verisinin threshold değeri farklı olabilir.

“Bir cihaza atılan son üç ping response time, son 1 saatin ortalamasının iki katından fazlaysa alert gönder“

gibi daha kompleks taskları aynı kolaylıkla yapabiliyor muyuz, bunu inceleyeceğiz.

Sadece sonuçla ilginenenler için: static threshold için oluşturulan scriptin üzerinde oynayarak yapabiliyoruz.

Static Threshold Check

Dynamic threshold check’e geçmeden önce, InfluxDB UI üzerinde kolayca kullanılabilen ama ihtiyacımızı çözmeyen static threshold check oluşturma ekranına bakalım:

Son gelen ölçüm için farklı threshold değerlerinin aşılması durumunda farklı düzeylerde alert oluşturabiliyoruz. Basit olması için sadece CRIT alert durumunu seçtiğimizde InfluxDb bizim için FLUX dilinde şöyle bir script oluşturuyor:

//Data Check

data =
from(bucket: "Some_Bucket")
|> range(start: -1m)
|> filter(fn: (r) => r._measurement == "some_measurement" and r._field == "some_value")
|> aggregateWindow(every: 1m, fn: mean, createEmpty: false)

//Task Configuration

option task = {name: "Static Threshold Check", every: 1m, offset: 0s}

//Define Alert Condition
check = {_check_id: "0c32c9c686f9d000", _check_name: "Static Threshold Check", _type: "threshold", tags: {}}
crit = (r) => r["some_value"] > 100.0
messageFn = (r) => "Check: ${ r._check_name } is: ${ r._level }"

data |> v1["fieldsAsCols"]() |> monitor["check"](data: check, messageFn: messageFn, crit: crit)

Data Check, Task Configuration, Alert Condition olmak üzere üç adım var. Dynamic Threshold oluştururken bu adımlara ortalama değer hesaplama adımı da eklenecek. Sırayla bu adımlar için scripti geliştirelim.

Dynamic Threshold Check:

1.Average Calculation:

averageValues =
from(bucket: "Some_Bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "some_measurement" and r._field == "some_value")
|> group(columns: ["some_tag"])
|> mean()
|> map(fn: (r) => ({some_tag: r.some_tag, average_value: r._value}))
|> map(fn: (r) => ({r with average_value: string(v: r.average_value)}))
|> group(columns: ["average_value"])

Her bir some_tag için 1 saatlik ortalamayı aldık ve

key: some_tag, value: average_value

olmak üzere bir key-value collection oluşturduk. tüm time-series ölçümlerinin ortalamasını tek query’de alarak performans için önemli bir avantaj sağladık.

2.Data Check:

data =
from(bucket: "Some_Bucket")
|> range(start: -10m)
|> filter(fn: (r) => r["_measurement"] == "Some_Measurement")
|> filter(fn: (r) => r["_field"] == "Some_Value")
|> tail(n: 3)
|> min()

Static threshold check scriptinde sadece gelen son değeri alıp static threshold değeriyle karşılaştırmıştık. Bu kez yazının başında belirttiğimiz gibi false-positive alarm üretmemek için son 3 datanın en küçük değerine bakıyoruz. tail(n: 3) -> min() fonksiyonuyla bunu elde ediyoruz.

3.Task Configuration:

option task = {name: "Dynamic Threshold Check", every: 1m, offset: 0s}

Bu adımda değişen bir şey yok, taskın yine 1 dakikalık periyotlarla çalışmasını sağlıyoruz.

4.Alert Condition:

joined = join(tables: {t1: data, t2: averageValues}, on: ["some_tag"], method: "inner")

joined
|> v1.fieldsAsCols()
|> monitor["check"](
data: {_check_id: "0b2eaf67d3a0b000", _check_name: "Dynamic Threshold Check", _type: "threshold", tags: {}},
messageFn:
(r) =>
"{'check': '${r._check_name}','level': '${r._level}','some_tag': '${r.some_tag}','source': 'some_source','actual': '${r.some_value}','threshold': '${r.average_value}'}",
crit: (r) => r["some_value"] > float(v: r["average_value"]) * 2.0,
)

Scriptin sonunda alert conditionlarını üretiyoruz. Bunun için “some_tag” kolonu üzerinde data ve averageValues collectionları için bir inner join işlemi yapıyoruz. Join işleminden sonra elimizde her bir “some_tag” için hem son gelen üç değerin en küçüğü hem de son 1 saatin ortalaması var. bu iki değeri;

crit: (r) => r["some_value"] > float(v: r["average_value"]) * 2.0,

satırıyla karşılaştırarak critical condition’ı tanımlamış oluyoruz. messageFn fieldıyla da göndereceğimiz alertin içeriğini build ediyoruz.

Sonuç:

Time-series verisi üzerinde dynamic threshold alert üretmek hem performans, hem InfluxDb’nin bunu direkt desteklememesi sebebiyle bir challenge olarak karşımıza çıkmıştı. Bu yazıda bir örneğini gösterdiğimiz flux scripti geliştirme metodu sayesinde bunun gibi görece kompleks ihtiyaçlara kolayca cevap verebiliyoruz.

--

--