Utilizando Python e métricas de aplicações para detecção de anomalias

Yves M. Galvão
Livelo
5 min readMay 7, 2023

--

Continuando a série de artigos sobre machine learning e IA, veremos hoje como podemos aplicar algoritmos de detecção de anomalia para geração de alertas.

Acompanhe os demais artigos aqui:

Sempre crucial para uma correta operação das aplicações, a disciplina de observabilidade ganhou um contexto ainda maior com o advento da computação distribuída ou, mais especificamente, a arquitetura de microserviços. Se antes tínhamos um ponto único de entrada, com os dados centralizados em um único servidor, hoje temos diversos clientes, pontos de entradas distintos e um maior encadeamento de aplicações envolvidas em uma única requisição. Com isso, tornou-se mais complexo coletar, rastrear e realizar troubleshooting em produção, sendo necessário a utilização de novos padrões de arquitetura para mitigar tais problemas.

Projetos como o Prometheus e Grafana em conjunto com bibliotecas de tracing distribuído, como o Micrometer, permitem a aplicação dos padrões de coleta e rastreamento facilmente. Já, iniciativas como o OpenTelemetry auxiliaram na criação de um padrão agnóstico, sendo inclusive incubada pela iniciativa Cloud Native Computing Foundation (CNCF). Por fim, ferramentas de Application Performance Monitoring (APM) auxiliam no monitoramento da performance de fluxos críticos de negócio e consequentemente identificação de problemas em produção.

Sendo a porta de entrada para exposição de Application Programming Interface (APIs), o API Gateway é uma ferramenta crítica em um arquitetura de microserviços e seu monitoramento nos permite ter uma visibilidade total dos serviços expostos em nossas camadas de borda, tornando essencial termos uma observabilidade completa para cada serviço.

Neste artigo, veremos como utilizar métricas recuperadas de um API Gateway para criar um algoritmo que irá identificar padrões de anomalia utilizando a linguagem Python. Let's go!

Para começar, definimos o conjunto de bibliotecas a serem utilizados:

import os
import pandas as pd
import glob
import numpy as np
import matplotlib.pyplot as plt

Utilizamos cinco conjuntos de dados distribuídos em diferentes arquivos Comma-separated Values (CSV) com o seguinte formato:

Time,url,status
5,/api/v1/test/,200
5,/api/v1/test/,200
5,/api/v1/test/,200
5,/api/v1/test/,200
5,/api/v1/test/,200
10,/api/v1/test/,400
10,/api/v1/test/,400
10,/api/v1/test/,400

Precisamos definir os valores de threshold que serão utilizados como base para identificar se uma anomalia ocorreu.

upper_threshold = 4
lower_threshold = -4
anomaly_average = 3

Em seguida, vamos criar uma função que vai carregar todos os conjuntos de dados numa lista de nome data_frames e uma função para formatar e agrupar corretamente os status code que iremos utilizar para análise: 200, 400 e 500.

data_frames = []
for x in range(1,6):
file = f"api-{x}.csv"
df = pd.DataFrame(pd.read_csv(file))
del df["url"]
data_frames.append(df)

def format_day(current_day):

day = current_day.groupby(['Time', 'status'])['status'].count().unstack().fillna(0).reset_index()

day_200 = day[['Time', 200]].rename(columns={200: 'status_code'})
day_400 = day[['Time', 400]].rename(columns={400: 'status_code'})
day_500 = day[['Time', 500]].rename(columns={500: 'status_code'})

return day_200, day_400, day_500

Agora, vamos agrupar os diferentes dias em listas específicas por status code analisados.

#All 200 codes
all_days = {}
status_200 = []
status_400 = []
status_500 = []

#Get all days
day_1_200, day_2_400, day_2_500 = format_day(data_frames[0])
day_2_200, day_2_400, day_2_500 = format_day(data_frames[1])
day_3_200, day_3_400, day_3_500 = format_day(data_frames[2])
day_4_200, day_4_400, day_4_500 = format_day(data_frames[3])
day_5_200, day_5_400, day_5_500 = format_day(data_frames[4])

#Append 200 status code
status_200.append(day_1_200)
status_200.append(day_2_200)
status_200.append(day_3_200)
status_200.append(day_4_200)
status_200.append(day_5_200)

#Append 400 status code
status_400.append(day_1_400)
status_400.append(day_2_400)
status_400.append(day_3_400)
status_400.append(day_4_400)
status_400.append(day_5_400)

#Append 500 status code
status_500.append(day_1_500)
status_500.append(day_2_500)
status_500.append(day_3_500)
status_500.append(day_4_500)
status_500.append(day_5_500)

all_days["200"] = status_200
all_days["400"] = status_400
all_days["500"] = status_500

Por fim, vamos criar nossa função que detecta se uma anomalia ocorreu baseado no algoritmo de médias móveis, com valores de Threshold máximo, mínimo e janela conforme definidos anteriormente.

def anomaly_detection(status_code="200",window = anomaly_average):
current_index = 1
row = 1
status = all_days[status_code]
#Percore os 5 dias dos conjuntos de dados
for x in range(0,5):
cur_day = status[x]

#calcula a média móvel baseada na janela
cur_day['anomaly_calculation'] = cur_day['status_code'].rolling(window=window).mean()
#verifica a diferença entre o valor calculado e o real
df_result = cur_day['status_code'] - cur_day['anomaly_calculation']

#Marca os pontos de anomalia como 1 e os demais como 0
df_points_day = np.where((df_result>=upper_threshold)|(df_result<=lower_threshold),1,0)
anomalies = cur_day.loc[np.where(df_points_day == 1)]
moving_average = cur_day.loc[np.where(~np.isnan(cur_day['anomaly_calculation']))]

#Cria o gráfico
plt.subplot(row, 2, current_index)
plt.title(f"Resultado dia {x+1}")

#Plota os dados do dataset
plt.plot(cur_day['Time'], cur_day['status_code'], '-')

#Plota os pontos de anomalia
plt.plot(anomalies['Time'], anomalies['status_code'], '*', color='red');

#Plota os dados da média móvel
plt.plot(moving_average['Time'], moving_average['anomaly_calculation'], '--', color='orange');

#Exibe o gráfico na tela
plt.show()

Desta forma, podemos chamar a função anomaly_detection e verificar dia a dia baseado em um valor de janela definido (window) e status_code se alguma anomalia ocorreu, obtendo os seguintes resultados:

anomaly_detection("200",window=2)
Gráficos em linha de tempo, onde a linha azul indica o número de chamadas com status code 200 de um determinado dia, a linha amarela é a média móvel calculada e os pontos em vermelho são as anomalias identificadas.

Nos gráficos é possível identificar na linha azul o número de chamadas de uma API a cada 5 segundos. A linha amarela tracejada exibe a média móvel calculada nessa janela de tempo e, por fim, os pontos vermelhos as anomalias identificadas.

Também podemos simular a inclusão de novos pontos, verificando dinamicamente se uma anomalia ocorreu.

#Adicionando um total de 0 requisições no timestamp 100 para o dataset com status code 200
new_row = {'Time':100, 'status_code':0}
day_1_200 = day_1_200.append(new_row, ignore_index=True)

day_1_200['anomaly_calculation'] = day_1_200['status_code'].rolling(window=5).mean()
df_result = day_1_200['status_code'] - day_1_200['anomaly_calculation']


df_points_day_1 = np.where((df_result>=upper_threshold)|(df_result<=lower_threshold),1,0)


anomalies = day_1_200.loc[np.where(df_points_day_1 == 1)]
moving_average = day_1_200.loc[np.where(~np.isnan(day_1_200['anomaly_calculation']))]


plt.plot(day_1_200['Time'], day_1_200['status_code'], '-')
plt.plot(anomalies['Time'], anomalies['status_code'], '*', color='red');
plt.plot(moving_average['Time'], moving_average['anomaly_calculation'], '--', color='orange')
plt.grid(linestyle=':')
plt.show()
Gráfico em linha de tempo, onde a linha azul indica o número de chamadas com status code 200, a linha amarela é a média móvel calculada e os pontos em vermelho são as anomalias identificadas.

Com isso, verificamos que é possível recuperar informações de métricas de uma aplicação e, utilizando conceitos de detecção de anomalia, identificar comportamentos inesperados, garantindo uma maior observabilidade e realizando assim ações de maneira proativa.

--

--