Prevendo acidentes com vítimas nas rodovias federais brasileiras com Amazon SageMaker

Carlos Eduardo Souza
Data Hackers
Published in
12 min readJul 14, 2020
Photo by Markus Spiske on Unsplash

No Brasil, há um grande número mortes nas rodovias federais, entre 2010 e 2017, aconteceram 62.120 mortes nas rodovias federais, uma média de 21 mortes por dia, quase uma vítima por hora; adicionalmente 201.006 pessoas ficaram seriamente feridas enquanto outras 578.954 levemente feridas, como resultado de 2.392.205 acidentes.

Ainda que, por um lado, não se possa calcular o que a perda de vida humana representa ou os danos psicológicos e estresses traumáticos aos quais as vítimas de trânsito e suas famílias estão sujeitas após eventos dessa natureza, por outro lado, há também a formação de custos econômico-financeiros que impactam diretamente as famílias, bem como a sociedade em geral, e que podem ser estimados usando metodologias específicas de cálculo.

Para essa finalidade, em 2005, o IPEA em conjunto com o DENATRAM desenvolveu uma metodologia para medir os impactos sociais e econômicos dos acidentes de trânsito nas rodovias brasileiras e posteriormente esse atualizou esse estudo 2015, que serve como orientador na formulação de políticas públicas para a prevenção de acidentes.

A partir desse cenário criei um projeto para analisar as condições em que ocorreram acidentes nas estradas brasileiras entre 2017 e junho de 2020, com o objetivo de construir um modelo capaz de prever se em um determinado acidente haverá vítimas fatais, pessoas feridas ou ilesas. O conjunto de informações que descreve esse problema contém variáveis ​​sobre as condições da estrada, condições climáticas, horário, número de passageiros, número de veículos, causa do acidente e danos observados. Esses dados foram obtidos no site da Polícia Rodoviária Federal.

Figura 1 — Quantidade de tipo de acidente em milhares (0 (vítimas fatais), 1 (feridos), 2 (ilesos)) entre 2017 e 2020
Figura 2 — Custo por tipo de acidente

Podemos observar que a maioria dos acidentes envolve pessoas feridas em uma proporção de 10 para 1 comparada com os acidentes com vítimas fatais. Já os custos associados aos acidentes com vítimas fatais é quase 4x maior, quando comparado com os custos dos acidentes com feridos.

Logo, se for possível prever o resultado de um acidente com um certo grau de confiança, o modelo pode ser usado como uma ferramenta de suporte no gerenciamento de recursos de prevenção de acidentes, como determinar pontos de inspeção, distribuir equipes de resgate e sistemas de alerta para os motoristas, além de auxiliar na gestão de políticas públicas para reduzir acidentes nas estradas federais.

Essa tarefa é, portanto, um problema de classificação com várias classes e se encaixa no grupo de aprendizado supervisionado. Vários algoritmos de machine learning podem ser usados ​​nesta tarefa; para este projeto, focaremos no modelo de regressão linear como baseline. A partir dos resultados observados, procuraremos melhorar a previsão com um modelo mais complexo.

Como métrica de avaliação foi usado o Precision, para minimizar o erro Tipo 1, que ocorre quando o modelo prediz que um evento ocorre quando na verdade ele não ocorre. A escolha da métrica para esse problema está relacionada com os custos observados na Figura 2, é particularmente importante predizer corretamente que se um acidente terá determinado tipo de vítima esse fato realmente ocorrerá.

Resumo do Data Cleaning e EDA e Feature Engineering:

Como já temos os argumentos para desenvolver um produto de dados e, conforme o título do artigo, utilizou-se o SageMaker para construir o modelo, vou resumir os pontos mais importantes do EDA para descrever mais sobre a ferramenta.

No total foram analisados 245.877 registros no período, com 34 variáveis; no site da Polícia Rodoviaria Federal é possível encontrar um descritivo das variáveis, o que facilita a análise dos dados.

  • Foram identificadas 127 rodovias federais monitoradas pela PRF.
  • As variáveis 'id', 'km', 'municipio', 'regional', 'delegacia' e 'uop' foram removidas, assim como 391 registros sem informação
  • A maior parte dos acidentes acontece aos finais de semanas.
  • MG, SC, PR, RS, RJ concentram o maior número de acidentes.
  • br101, br116, br381, br40, br153, são as rodovias federais com maior número de acidentes.
  • A maior de causa de acidentes é "Falta de atenção".
  • O tipo mais comum de acidente é "Colisão traseira".
  • O balanceamento entre as classes é de 68,8% acidentes com vítimas, 24,7% ilesos, 0.6% com vítimas fatais.
  • Na maior parte dos casos a condição do tempo é de céu claro.
  • Meses de janeiro e dezembro concentram a maior parte dos casos.

Foram criadas variáveis adicionais com a informação dos feriados nacionais, identificados os meses de férias, indicador das 10 rodovias com o maior percentual de acidentes registrados e indicador de trecho urbano. O objetivo é explicitar mais informações para o modelo.

A modelagem

Antes de treinar os modelos, os dois últimos ajustes foram, transformar as variáveis categóricas em variáveis numéricas e o balanceamento dos dados.

A sequência do projeto até esse ponto pode ser considerada trivial considerando os recursos empregados, pode ser processado na maioria dos computadores atuais, em uma sessão do Google Colab ou em Kernel do Kaggle.

A partir de agora vamos explorar as capacidades do Amazon SageMaker em processar modelos, ajustar hiperparametros e fazer o deploy em produção diretamente de uma seção do Jupyter Notebook. Na minha opinião essa característica acelera muito o processo de prototipagem de um produto de dados, por outro lado, requer do cientista de dados uma atenção maior com a qualidade do código. Nesse projeto utilizarei a versão de "alto nível" da API do SageMaker, ela permite um menor grau de customização em comparação com a linguagem de "baixo nível".

O primeiro passo será capturar as informações da sessão, configuração de acesso e definir um bucket de armazenamento dentro da S3.

# sagemaker session, role
sagemaker_session = sagemaker.Session()
role = sagemaker.get_execution_role()

# S3 bucket name
bucket = sagemaker_session.default_bucket()

O segundo passo será importar o modelo e definir os hiper parâmetros:

# import LinearLearner
from sagemaker import LinearLearner

# specify an output path
prefix = 'baseline'
output_path = 's3://{}/{}'.format(bucket, prefix)

# define the model instantiate LinearLearner
linear = LinearLearner(role=role, # identifica em qual sessão o modelo será instanciado
train_instance_count=1, # quantidade de instâncias que serão utilizadas durante o treino train_instance_type='ml.c4.xlarge', # tipo de CPU que será utlizada output_path=output_path, # onde o modelo será salvo sagemaker_session=sagemaker_session, # qual a sessão do SageMaker será utilizada epochs=25, # por quantas o modelo será treinado num_classes=3, # Especifica o tipo de variável target predictor_type='multiclass_classifier', # tipo de preditor balance_multiclass_weights='true', # Da a cada classe a mesma importância na função custo early_stopping_patience=5, # Se nenhuma melhoria for feita na métrica relevante, o número de épocas a aguardar antes de terminar o treinamento loss='softmax_loss'# função custo)

O modelo é o LinearLearner otimizado para o SageMaker, os parâmetros 'role', 'train_instance_count', 'train_instance_type', 'output_path' e 'sagemaker_session' são parâmetros utilizados para indicar em qual ambiente o modelo será treinado e qual tipo de CPU será utilizada, lembrando que a Amazon oferece diversos tipos de instância, inclusive com GPU, cobradas por hora e em dólar.

Os demais parâmetros, a partir de 'epochs', são configurações do modelo e alguns deles podem se ajustados para melhorar o resultado das predições. Nesse caso, por ser o modelo baseline, não explorarei essas opções.

Uma característica importante do LinearLearner é que os dados precisam ser convertidos em um numpy array. Em geral isso acelera o treinamento, quando comparado aos dados no formato dataframe.

# convert features/labels to numpy
train_x_np = train_features.astype('float32')
train_y_np = train_labels.astype('float32')

# create RecordSet formatted_train_data = linear.record_set(train_x_np, labels=train_y_np)

Abaixo o inicio do treino do modelo. Essa etapa pode ser monitorada pelo serviço Amazon CloudWatch, todas as etapas do treinamento do modelo ficam disponíveis nesse serviço, o que facilita analisar possíveis erros no código.

%%time 
# train the estimator on formatted training data
linear.fit(formatted_train_data)

Assim que o treino é concluído é necessário fazer o deploy do modelo para que possamos fazer as predições. Nessa etapa também é necessário definir o tipo e a quantidade de CPU's que serão utilizadas, o que também gera custos enquanto o serviço estiver disponível para predições. O deploy também gera um endpoint dentro do SageMaker, a partir dele uma url é criada e pode ser utilizada em qualquer API para realizar predições. Quaisquer transformações necessárias podem estruturadas no serviço AWS Lambda.

%%time 
# deploy and create a predictor
linear_predictor = linear.deploy(initial_instance_count=1, instance_type='ml.t2.medium')

A função abaixo permite fazer as predições e calcular as métricas do modelo treinado.

# code to evaluate the endpoint on test data
# returns a variety of model metrics
def evaluate(predictor, test_features, test_labels, verbose=True):
"""
Evaluate a model on a test set given the prediction endpoint.
Return binary classification metrics.
:param predictor: A prediction endpoint
:param test_features: Test features
:param test_labels: Class labels for test data
:param verbose: If True, prints a table of all performance metrics
:return: A dictionary of performance metrics.
"""

prediction_batches = [predictor.predict(batch) for batch in np.array_split(test_features, 100)]

test_preds = np.concatenate([np.array([x.label['predicted_label'].float32_tensor.values[0] for x in batch]) for batch in prediction_batches])

# calculate true positives, false positives, true negatives, false negatives
tp = np.logical_and(test_labels, test_preds).sum()
fp = np.logical_and(1-test_labels, test_preds).sum()
tn = np.logical_and(1-test_labels, 1-test_preds).sum()
fn = np.logical_and(test_labels, 1-test_preds).sum()

# calculate binary classification metrics
recall = tp / (tp + fn)
precision = tp / (tp + fp)
accuracy = (tp + tn) / (tp + fp + tn + fn)

# printing a table of metrics
if verbose:
print(pd.crosstab(test_labels, test_preds, rownames=['actual (row)'], colnames=['prediction (col)']))
print("\n{:<11} {:.3f}".format('Recall:', recall))
print("{:<11} {:.3f}".format('Precision:', precision))
print("{:<11} {:.3f}".format('Accuracy:', accuracy))
print()


return {'TP': tp, 'FP': fp, 'FN': fn, 'TN': tn,
'Precision': precision, 'Recall': recall, 'Accuracy': accuracy}
print('Metrics for simple, LinearLearner.\n')
# get metrics for linear predictor metrics = evaluate(linear_predictor, test_features.astype('float32'), test_labels, verbose=True)

É possível implementar métricas do scikit-learn para avaliar o modelo.

import itertools
from sklearn import metrics

class_names = ['no victims', 'injured', 'dead victims']
def plot_confusion_matrix(cm, classes,
normalize=False,
title='Confusion matrix',
cmap=plt.cm.Greens):
"""
This function prints and plots the confusion matrix.
Normalization can be applied by setting `normalize=True`.
"""
if normalize:
cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
print("Normalized confusion matrix")
else:
print('Confusion matrix, without normalization')

print(cm)

plt.imshow(cm, interpolation='nearest', cmap=cmap)
plt.title(title)
plt.colorbar()
tick_marks = np.arange(len(classes))
plt.xticks(tick_marks, classes, rotation=45)
plt.yticks(tick_marks, classes)

fmt = '.2f' if normalize else 'd'
thresh = cm.max() / 2.
for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
plt.text(j, i, format(cm[i, j], fmt),
horizontalalignment="center",
color="white" if cm[i, j] > thresh else "black")

plt.tight_layout()
plt.ylabel('True label')
plt.xlabel('Predicted label')
# Compute confusion matrix

cnf_matrix = metrics.confusion_matrix(test_labels, test_preds)
np.set_printoptions(precision=2)
# Plot non-normalized confusion matrix

plt.figure()
plot_confusion_matrix(cnf_matrix, classes=class_names,
title='Confusion matrix, without normalization')
# Plot normalized confusion matrix

plt.figure()
plot_confusion_matrix(cnf_matrix, classes=class_names, normalize=True,
title='Normalized confusion matrix')

Em resumo, um modelo de regressão foi capaz de produzir um Precision 0.62, esse valor pode ser considerado bom. Comparado com o estudo Traffic Accident Analysis Using Machine Learning Paradigms, que embora essa não utilizasse o Precision como avaliação do modelo, forneceu essa métrica na publicação, tendo os autores deste estudo, utilizando redes neurais para abordar um problema semelhante, obtido um Precision de 0.6. Considerando o esforço e o tempo para implementar um modelo bastante simples temos um bom baseline.

Por fim, é importante encerrar o deploy do modelo para cessar a cobrança pelo serviço.

# Deletes a precictor.endpoint
def delete_endpoint(predictor):
try:
boto3.client('sagemaker').delete_endpoint
(EndpointName=predictor.endpoint)

print('Deleted {}'.format(predictor.endpoint))
except:
print('Already deleted: {}'.format(predictor.endpoint))
# delete the predictor endpoint
delete_endpoint(linear_predictor)

O próximo passo é treinar um modelo mais complexo para melhorar o baseline, para isso vamos usar a implementação do XGBoost diretamente do SageMaker.

import sagemaker
from sagemaker import get_execution_role
from sagemaker.amazon.amazon_estimator import get_image_uri
import time
from time import gmtime, strftime

session = sagemaker.Session()

role = get_execution_role()
data_dir = './basic_xgboost'
if not os.path.exists(data_dir):
os.makedirs(data_dir)
X = train_smote.iloc[:,:-1]
y = train_smote.iloc[:,-1]
X.shape, y.shape
import sklearn

# We split the dataset into 2/3 training and 1/3 testing sets.
X_train, X_test, Y_train, Y_test = sklearn.model_selection.train_test_split(X, y, test_size=0.33)

# Then we split the training set further into 2/3 training and 1/3 validation sets.
X_train, X_val, Y_train, Y_val = sklearn.model_selection.train_test_split(X_train, Y_train, test_size=0.33)
X_test.to_csv(os.path.join(data_dir, 'test.csv'), header=False, index=False)

pd.concat([Y_val, X_val], axis=1).to_csv(os.path.join(data_dir, 'validation.csv'), header=False, index=False)

pd.concat([Y_train, X_train], axis=1).to_csv(os.path.join(data_dir, 'train.csv'), header=False, index=False)
prefix = 'basic-xgboost-deploy'

test_location = session.upload_data(os.path.join(data_dir, 'test.csv'), key_prefix=prefix)

val_location = session.upload_data(os.path.join(data_dir, 'validation.csv'), key_prefix=prefix)

train_location = session.upload_data(os.path.join(data_dir, 'train.csv'), key_prefix=prefix)
container = get_image_uri(session.boto_region_name, 'xgboost')

xgb = sagemaker.estimator.Estimator(
container, # The name of the training container
role, # The IAM role to use (our current role in this case)
train_instance_count=1, # The number of instances to use for training
train_instance_type='ml.m4.xlarge', # The type of instance ot use for training
output_path='s3://{}/{}/output'.format(session.default_bucket(), prefix), # Where to save the output the model artifacts
sagemaker_session=session) # The current SageMaker session

Primeiramente são importadas algumas bibliotecas do SageMaker, capturadas as informações da sessão, criados os arquivos de treino, teste e validação e salvos como CSV dentro da S3, de onde os dados serão utilizados para treinar o modelo. O XGBoost está em um container dentro da AWS, logo é preciso dizer em qual sessão esse container precisa estar disponível. A definição do estimador é bastante similar a do LinerLearner.

O segundo passo é definir os parâmetros e treinar o modelo. O SageMaker espera que os dados estejam em CSV, então devem ser informados os arquivos de treino e validação nessa etapa.

xgb.set_hyperparameters(
max_depth=6,
eta=0.4,
gamma=4,
min_child_weight=6,
subsample=0.8,
objective='multi:softmax',
eval_metric= 'merror', early_stopping_rounds=10,
num_class=3,
num_round=200,
seed=42)
%%time
# This is a wrapper around the location of our train and validation data, to make sure that SageMaker
# knows our data is in csv format.
s3_input_train = sagemaker.s3_input(s3_data=train_location, content_type='csv')

s3_input_validation = sagemaker.s3_input(s3_data=val_location, content_type='csv')

xgb.fit({'train': s3_input_train, 'validation': s3_input_validation})

Uma vez que o modelo concluir o processo de treinamento precisamos fazer o deploy para calcular as predições e avaliar a qualidade do modelo.

%%time  xgb_transformer = xgb.transformer(instance_count = 1, instance_type = 'ml.m4.xlarge')X_test.to_csv(os.path.join(data_dir, 'test.csv'), header=False, index=False)
test_location = session.upload_data(os.path.join(data_dir, 'test.csv'), key_prefix=prefix)
%%time
xgb_transformer.transform(test_location, content_type='text/csv', split_type='Line')
%%time
xgb_transformer.wait()
!aws s3 cp --recursive $xgb_transformer.output_path $data_dirY_pred = pd.read_csv(os.path.join(data_dir, 'test.csv.out'), header=None)from sklearn.metrics import accuracy_score, precision_score, recall_score

print("Test Evaluation: ")
print("\n{:<11} {:.3f}".format('Recall:', recall_score(Y_test, Y_pred, average='weighted')))
print("{:<11} {:.3f}".format('Precision:', precision_score(Y_test, Y_pred, average='weighted')))
print("{:<11} {:.3f}".format('Accuracy:', accuracy_score(Y_test, Y_pred)))
print()

Usando o XGBoost obteve-se um Precision de 0.77, uma evolução interessante em relação ao modelo base.

Uma outra etapa importante é ajuste de hiper parâmetros. Para isso vamos utilizar as implementações do SageMaker:

data_dir = './tuned_xgboost'
if not os.path.exists(data_dir):
os.makedirs(data_dir)
X_test.to_csv(os.path.join(data_dir, 'test.csv'), header=False, index=False)

pd.concat([Y_val, X_val], axis=1).to_csv(os.path.join(data_dir, 'validation.csv'), header=False, index=False)

pd.concat([Y_train, X_train], axis=1).to_csv(os.path.join(data_dir, 'train.csv'), header=False, index=False)
prefix = 'tuned-xgboost-deploy'

test_location = session.upload_data(os.path.join(data_dir, 'test.csv'), key_prefix=prefix)

val_location = session.upload_data(os.path.join(data_dir, 'validation.csv'), key_prefix=prefix)

train_location = session.upload_data(os.path.join(data_dir, 'train.csv'), key_prefix=prefix)
container = get_image_uri(session.boto_region_name, 'xgboost') # Now that we know which container to use, we can construct the estimator object. xgb = sagemaker.estimator.Estimator(
container, # The name of the training container role, # The IAM role to use (our current role in this case) train_instance_count=1, # The number of instances to use for training train_instance_type='ml.m4.xlarge', # The type of instance ot use for training output_path='s3://{}/{}/output'.format(session.default_bucket(), prefix), # Where to save the output the model artifacts sagemaker_session=session) # The current SageMaker session

O início do processo segue a mesma estrutura do treino do modelo, para essa etapa criamos um diretório específico com os mesmo dados e um identificador de modelo específico.

xgb.set_hyperparameters(max_depth=6,
eta=0.4,
gamma=4,
min_child_weight=6,
subsample=0.8,
objective='multi:softmax',
early_stopping_rounds=10,
num_class=3,
num_round=200,
seed=42)
from sagemaker.tuner import IntegerParameter, ContinuousParameter, HyperparameterTuner

xgb_hyperparameter_tuner = HyperparameterTuner(
estimator = xgb,
# The estimator object to use as the basis for the training jobs.
objective_metric_name = 'validation:merror', # The metric used to compare trained models.
objective_type = 'Minimize', # Whether we wish to minimize or maximize the metric.
max_jobs = 20, # The total number of models to train
max_parallel_jobs = 10, # The number of models to train in parallel
hyperparameter_ranges = {'max_depth': IntegerParameter(3, 15),
'eta' : ContinuousParameter(0.05, 0.5),
'min_child_weight': IntegerParameter(2, 12),
'subsample': ContinuousParameter(0.5, 0.9),
'gamma': ContinuousParameter(0, 10),})

Acima importamos a função de ajuste de hiper parâmetros do SageMaker, identificamos quais são os parâmetros que queremos testar e identificamos o intervalo de teste. Importante notar que alguns parâmetros são contínuos, enquanto outros são números inteiros. A vantagem dessa implementação é que, segundo a Amazon, o processo é feito usando otimização bayesiana, o que representa um grande ganho de tempo comparado com outras técnicas, como o Grid Search e o Random Search.

s3_input_train = sagemaker.s3_input(s3_data=train_location, content_type='csv')

s3_input_validation = sagemaker.s3_input(s3_data=val_location, content_type='csv')

xgb_hyperparameter_tuner.fit({'train': s3_input_train, 'validation': s3_input_validation})
%%time
xgb_hyperparameter_tuner.wait()
%%time
xgb_hyperparameter_tuner.best_training_job()
%%time
xgb_attached = sagemaker.estimator.Estimator.attach(xgb_hyperparameter_tuner.best_training_job())
%%time
xgb_tuned = xgb_attached.transformer(instance_count = 1, instance_type = 'ml.m4.xlarge')
%%time
xgb_tuned.transform(test_location, content_type='text/csv', split_type='Line')
%%time
xgb_tuned.wait()
!aws s3 cp --recursive $xgb_tuned.output_path $data_dirY_pred = pd.read_csv(os.path.join(data_dir, 'test.csv.out'), header=None)

O ajuste de hiper parâmetros resultou em um Precision de 0.797, o que não é um ganho expressivo e deixa espaço para explorar outras configurações.

Voltando ao problema proposto, os dados de 2020 foram deixados para uma validação do modelo. O objetivo foi simular o comportamento do modelo recebendo dados não balanceados, como se estivesse em produção, todos os outros pré-processamentos foram aplicados para que os dados estivessem de acordo com o esperado pelo modelo. Esse teste é particularmente interessante para avaliar a capacidade de generalização do modelo e com isso obtivemos um Precision de 0.71.

Conclusão

Com os dados sobre as condições dos acidentes de trânsito nas rodovias federais brasileiras podemos desenvolver uma ferramenta para auxiliar na prevenção desses acidentes com razoável grau de precisão, orientando a alocação de recursos, como unidades de atendimento e pontos de fiscalização.

Com as funcionalidades do SageMaker podemos rapidamente construir um produto de dados, percorrendo todas as etapas de um projeto de machine learning com apenas uma ferramenta.

Os códigos e todo o detalhamento do problema, em inglês, estão disponíveis no GitHub.

--

--