Streaming Data Platform at Exness: Monitoring and Alerting

Yury Smirnov
Exness Tech Blog
Published in
6 min readMar 10, 2024

Co-authors: Gleb Shipilov, Aleksei Perminov, Ilya Soin

Read more on Streaming Data Platform at Exness:

  1. Overview by Gleb Shipilov
  2. Flink SQL and PyFlink by Aleksei Perminov
  3. Deployment Process by Ilya Soin
  4. Monitoring and Alerting by Yuri Smirnov (you are here!)

In our SDP project, all metrics are conveniently formatted in Prometheus, facilitating easy visualization through Grafana. Both Flink and Kafka Connect boast a plethora of pre-installed metrics, invaluable for monitoring flow health and relative latency, among other parameters.

For more nuanced checks, we employ a dual approach. Our self-developed inner tool Nerve coupled with Airflow, proves to be indispensable. Nerve operates as a framework, enabling Prometheus-like metric retrieval through database queries on cron schedules. It is an analog of the query-exporter. Meanwhile, Airflow empowers us to schedule complex checks, ranging from API and S3 to Salesforce, while seamlessly integrating data from various databases for real-time comparison. The results from these checks are then stored in a ClickHouse MetaBase and transformed to metrics through query-exporter.

Expanding our monitoring capabilities, we utilize Grafana Dashboards to comprehensively oversee services, flows, and data. This allows us to delve deep into the well-being of our flows, setting alerts tailored to our needs. These alerts are seamlessly integrated with PagerDuty, offering flexibility in configuring thresholds, services, priorities, and urgency levels. The highest priority alerts are promptly forwarded to dedicated Slack channels tailored to each SDP user team.

Wide coverage example

Given that ClickHouse is a pivotal component of our infrastructure, hosting numerous tables populated by KafkaConnect and Flink, we prioritized understanding the timing of the last update for each table. This check was instrumental in maintaining data freshness and ensuring the integrity of our database. Leveraging the unified column ‘updated’, we found it straightforward to compute ‘last_seen’ metrics.

Example of an Airflow job:

import asyncio
import datetime
import json
import logging
import pendulum
from airflow_clickhouse_plugin.hooks.clickhouse import ClickHouseHook
from airflow.decorators import task, dag

dq_last_seen_exception = [
('xdata', 'exness_allagentcommission_ext3'),
('research', 'tdf_199_account_category'),
]

@task()
def get_all_last_seen():
click_ro = ClickHouseHook('clickhouse')
sql = f'''
SELECT database, name
FROM system.tables
WHERE database NOT LIKE '%sandbox%'
AND engine <> 'View'
AND lower(name) NOT LIKE '%tmp%'
AND lower(name) NOT LIKE '%schema%'
AND lower(name) NOT LIKE '%backup%'
AND lower(name) NOT LIKE '%view%'
AND lower(name) NOT LIKE '%.%'
AND (database, name) IN
(SELECT (database, table)
FROM system.columns
WHERE name = 'updated');
'''
click_rw = ClickHouseHook('clickhouse_insert')
all_tables = [(i[0], i[1]) for i in
json.loads(click_ro.get_pandas_df(sql)
.to_json(orient='values'))]
loop = asyncio.get_event_loop()
looper = asyncio.gather(*[get_last_seen(schema_table, click_rw)
for schema_table in all_tables
if schema_table not in
dq_last_seen_exception])
results = loop.run_until_complete(looper)
return all_tables

async def get_last_seen(schema_table, click_rw):
schema, table = schema_table
logging.info(f'{schema}.{table}')
sql = f'''
INSERT INTO qa.dq_metrics (database,
schema,
table,
metric_name,
num)
SELECT 'clickhouse',
'{schema}',
'{table}',
'last_seen',
now() - toDateTime(max(updated)) AS last_seen_seconds
FROM {schema}.{table}
WHERE updated > today() - 10;
'''
click_rw.execute(sql)

@dag(dag_id="ch.all_tables.dq.last_seen",
schedule="15 * * * *",
start_date=pendulum.datetime(2023, 1, 1),
catchup=False,
tags=["clickhouse", "dq", "last_seen"],
dagrun_timeout=datetime.timedelta(minutes=60))
def ch_all_tables_dq_last_seen():
get_all_last_seen()

ch_all_tables_dq_last_seen()

Example of a SQL to Prometheus task:

config:
query: |
SELECT dq.db, dq.schema, dq.table, kc.namespace, dq.last_seen
FROM (SELECT database AS db,
schema,
table,
argMax(num, updated) AS last_seen
FROM qa.dq_metrics
WHERE date >= today() - 1
AND metric_name = 'last_seen'
GROUP BY db, schema, table) dq
LEFT JOIN qa.sdp_sink_connectors kc
ON dq.db = kc.db
AND dq.schema = kc.db_schema
AND dq.table = kc.db_table;
gauge: dq_last_seen{}
label_columns: [ "db", "schema", "table", "namespace" ]
Result metrics in a Grafana

To streamline this process, we developed an Airflow DAG titled ‘ch.all_tables.dq.last_seen’ specifically designed for timeliness calculation. The results are then stored in the ‘qa.dq_metrics’ table. Subsequently, utilizing query-exporter, we retrieve these metrics through the simple SQL query.

With this approach we are able to get essential metrics like ’lag’, ’delay’, ’count’, ’seq_num control’ and others, but we recognize the need for more specialized checks. To address this, we take a tailored approach by creating specific Airflow DAGs or crafting unique SQL queries, exported to Prometheus.

Specified check example

Within one of our Flink jobs lies a rather unique integration with a MaxMind GeoIP DataBase. What sets this integration apart is the unconventional method we employ: downloading the entire database as a file to our Kubernetes pod on a cron schedule, thereafter allowing the Flink job to interact with it. Our monitoring objective centered on ensuring the successful download of the file and verifying its integrity against the original source, thus ensuring the reliability of our data processing pipeline.

Airflow DAG implementation:

import datetime
import logging
import pendulum
from airflow.decorators import task, dag
from airflow.providers.slack.hooks.slack import SlackHook
from kubernetes import client
from tasks.common.k8s import bash_in_k8s, load_k8s_config
from airflow.models import Variable

@task()
def afc_maxmind_load_checking(pod_name):
namespace = "flink-afc"
slack_hook = SlackHook(slack_conn_id="slack_api_di")
notification = "#flink-afc-alerts"
load_k8s_config(cluster='sdp-rke')
kube_api = client.CoreV1Api()
command_remote_md5 = (f'curl --silent "https://download.maxmind.com/'
f'app/geoip_download?edition_id=GeoIP2-City&'
f'license_key={Variable.get("GEOIP_KEY", "")}'
f'&suffix=tar.gz.md5"; echo ""\n')
remote_md5 = bash_in_k8s(kube_api=kube_api, pod_name=pod_name,
namespace=namespace, command=command_remote_md5)
logging.info(f"Remote MD5: {remote_md5}")
command_local_md5 = ('md5sum /opt/flink/data/maxmind/GeoIP2-City.tar.gz'
'| cut -f1 -d " "\n')
local_md5 = bash_in_k8s(kube_api=kube_api, pod_name=pod_name,
namespace=namespace, command=command_local_md5)
logging.info(f"Local MD5: {local_md5}")
if remote_md5 != local_md5:
bad_flag = '''if [ -f /opt/flink/log/file_check_flag_exists.txt ]; then
echo "SECOND_NOT_UPDATED"
else
touch /opt/flink/log/file_check_flag_exists.txt; echo "FIRST_NOT_UPDATED"
fi
'''
bad_check_result = bash_in_k8s(kube_api=kube_api, pod_name=pod_name,
namespace=namespace, command=bad_flag)
if bad_check_result == "SECOND_NOT_UPDATED":
check_logs = 'cat /opt/flink/log/cron.log\n'
logs = bash_in_k8s(kube_api=kube_api, pod_name=pod_name,
namespace=namespace, command=check_logs)
message = (f"[{Variable.get('ENV')}] There were no maxmind "
f"file update on *{pod_name}*, last log message:\n"
f"{logs}")
logging.info(message)
slack_hook.client.chat_postMessage(channel=notification,
text=message)
else:
good_flag = '''if [ -f /opt/flink/log/file_check_flag_exists.txt ]; then
rm /opt/flink/log/file_check_flag_exists.txt;
else
echo "ALL_IS_SUPER_FINE"
fi
'''
good_check_result = bash_in_k8s(kube_api=kube_api, pod_name=pod_name,
namespace=namespace, command=good_flag)
if good_check_result == "ALL_IS_SUPER_FINE":
last_hash_check = (f'cat /opt/flink/log/file_last_hash.txt;'
f'echo "{local_md5}" > '
f'/opt/flink/log/file_last_hash.txt;')
last_hash = bash_in_k8s(kube_api=kube_api, pod_name=pod_name,
namespace=namespace,
command=last_hash_check)
if last_hash != local_md5:
message = (f"[INFO][{Variable.get('ENV')}] On '{pod_name}' "
f"there was a successful maxmind "
f"file update!\n Last hash: {local_md5}")
logging.info(message)
slack_hook.client.chat_postMessage(channel=notification,
text=message)

@task()
def get_afc_standalone_pods():
load_k8s_config(cluster='sdp-rke')
kube_api = client.CoreV1Api()
afc_standalone_pods = []
for pod in kube_api.list_pod_for_all_namespaces(watch=False).items:
if pod.metadata.namespace == 'flink-afc'
and 'accesslog' in pod.metadata.name:
afc_standalone_pods.append(pod.metadata.name)
logging.info(f"Found accesslog pods: {afc_standalone_pods}")
return afc_standalone_pods

@dag(dag_id="afc_maxmind_load_check",
schedule="47 * * * *",
start_date=pendulum.datetime(2024, 2, 1),
catchup=False,
dagrun_timeout=datetime.timedelta(minutes=20),
tags=["afc", "k8s", "maxmind"])
def afc_maxmind_load_check():
afc_maxmind_load_checking.expand(pod_name=get_afc_standalone_pods())

afc_maxmind_load_check()
Example of a Slack Notification

Given that the success of the integration hinges on the presence and integrity of the downloaded file, a metric-based approach isn’t necessary. Instead, we directly inform users via Airflow-Slack integration. By correlating the hash of the source file with the downloaded one, we ensure data consistency and alert users promptly in case of discrepancies. Additionally, we provide updates on when the file was last updated, facilitating transparency and proactive management of the integration process.

Grafana Dashboards overview

Example of a Kafka Connect Grafana Dashboard
Example of a Flink Grafana Dashboard

These Grafana Dashboards serve as dedicated resources for each team, ensuring focused monitoring efforts. Recognizing the varying needs and utilization of tools, we’ve segregated Flink and Kafka Connect dashboards, acknowledging that not every team employs both. Currently, these dashboards are manually crafted with templating assistance. However, we’re actively pursuing automation for dashboard creation, PagerDuty routing, service configurations, and the establishment of dedicated Slack channels. This automation initiative aims to streamline our monitoring infrastructure, enhancing efficiency and responsiveness across teams.

Check our other articles about:

--

--