Criando um pipeline de ingestão de dados orientado a eventos

Lucas Ribeiro N dos Santos
OPANehtech

--

Nesse artigo irei comentar sobre uma das primeiras entregas que conduzi no time de dados do Pan ao longo de 2021. Vamos ver quais foram as soluções aplicadas, como elas evoluíram e discorrer alguns aspectos de arquitetura e code design.

O cenário

O objetivo da entrega é a ingestão dos dados oriundos da API de um fornecedor externo no Data Lake, para posterior consumo pelas áreas de negócio. A área de dados estava em sua fase inicial, construindo uma plataforma de dados cloud native na AWS.

Os solicitantes da demanda, embora da área de negócio, têm um background na engenharia de dados e por isso eram capazes de entender, questionar e propor ideias.

E por fim, naquele momento, nossa stack de ingestão tinha como linguagem o Python e se apoiava em 3 serviços da AWS:

  • Database Migration Service (DMS)
  • Glue
  • Lambda

Diante disso, vamos à parte divertida:

V1 — Hello World!

A AWS oferece alguns serviços para a camada de computação: EC2, Lambda e os containers — ECS e EKS. Na camada de analytics, existe o Glue. Todos viáveis para resolver o problema, mas o EC2 é um IaaS e isso traria a complexidade de provisionar e gerenciar instâncias. Além disso, uma vez que a ingestão é feita, a instância EC2 continuaria ligada, gerando custos desnecessários.

O ECS e o EKS são escolhas melhores, pois o container pode ser terminado depois de cumprir sua função, mas tirando o ECS no modo Fargate, ainda existe a complexidade de gerenciar a infraestrutura. Outro ponto que pesou na escolha foi que, naquele momento, o time de dados ainda não trabalhava com esses serviços na sua stack.

O Glue é serverless. Porém não foi projetado para problemas de computação e sim de analytics: descoberta, extração, enriquecimento, limpeza, normalização e combinação de dados usando por traz um cluster baseado no Apache Spark — ele é ideal em um cenário de dados massivos, como Big Data ou preparação de dados para Machine Learning. Toda essa robustez o tornam num dos serviços mais caros da AWS.

O Lambda atende melhor. Ele é serverless como o Glue, com a vantagem de ser bem mais leve e barato.

A arquitetura desse cenário já foi abordada no nosso artigo sobre Arquiteturas de Dados na AWS. A solução inicial foi muito similar ao que foi descrito lá.

O EventBridge age como barramento de eventos, disparando um Lambda em horário determinado.

  • O Lambda é responsável por autenticar, consultar o endpoint do fornecedor, converter os dados para Parquet e persisti-los no S3.
  • Os dados são catalogados por um Crawler do Glue e convertidos em tabela do Athena.

A priori, essa solução cumpriu o objetivo bem. Tem como vantagem ser serverless e é uma solução barata. No entanto, ela possui três problemas:

  1. Recursos repetidos — para consultar diferentes endpoints do mesmo fornecedor (por exemplo, um que retorna dados de clientes e outro que retorna dados de transações), a mesma stack precisava ser implantada modificando apenas alguns parâmetros. Isso gerava redundância tanto de código quanto de serviços.
  2. Baixo nível de tolerância a falhas — nos momentos em que a API do fornecedor ou a AWS estava indisponível, era necessário atuação manual.
  3. Runtime Timeout — o Lambda possui um limite de execução de 15 minutos. Alguns endpoints tinham um limite de dados que podiam ser extraídos, então o código precisava fazer N requisições (que são operações custosas em termos de tempo) e esse limite era estourado, retornando erro.

V2 — Resolvendo problemas

A evolução foi manter uma única stack, capaz de determinar qual e como o endpoint deveria ser consultado a partir das informações contidas no evento. O Lambda também teve suas responsabilidades definidas em duas partes:

Lambda de Orquestração — interpreta o evento inicial e a partir dele, decide qual endpoint deve ser consultado, com quais informações e principalmente, qual a estratégia de consulta.

Lambda Filho — recebe o(s) evento(s) do Lambda de Orquestração por meio de uma fila SQS, contendo os parâmetros de consulta do endpoint e, por fim, converte e persiste os dados no S3.

Em resumo, há dois tipos de cenário — consulta simples e múltiplas consultas. O cenário simples é no endpoint em que há uma única requisição:

  1. O Lambda de Orquestração é disparado com o EventBridge e interpreta o evento. Por meio do Design Pattern Strategy, ele define que basta uma única consulta no endpoint do evento, modela o evento com uma body e demais parâmetros e cadastra esse evento no SQS
  2. O Lambda Filho é invocado por meio do SQS, consulta o endpoint e persiste os dados no S3

No de múltiplas requisições, é necessário bater N vezes na API:

  1. O Lambda de Orquestração é disparado com o EventBridge e interpreta o evento. Por meio do Strategy, ele define que serão necessárias N consultas. Primeiro bate uma vez no endpoint do fornecedor para obter o total de dados e, então, iterativamente, ele cria e cadastra N eventos no SQS
  2. O Lambda Filho é invocado N vezes por meio do SQS, consulta o endpoint e persiste os dados no S3

O padrão Strategy se mostrou muito útil nesse contexto, pois ele é a chave para a execução, sem que seja necessário tornar o código complexo. Há uma classe abstrata Strategy, que fornece comportamentos genéricos do código e contém o método abstrato “send_message”. Duas classes concretas: SimpleCallStrategy e MultipleCallStrategy, implementam o que será enviado para o SQS que dispara o Lambda filho

Mais sobre esse Design Pattern e outros podem ser consultados aqui.

Os problemas anteriormente mencionados são removidos:

  1. Agora há uma única stack e código, cujo comportamento varia em função do evento inicial
  2. O uso de uma fila de mensagens como o SQS garante que uma mensagem será reprocessada em caso de falha, o que confere mais resiliência e confiabilidade à aplicação.
  3. Em vez de ter uma única instância do Lambda que consulta a API N vezes, agora são criadas N instâncias Lambda Filho que consultam apenas uma vez cada uma, respeitando o limite dos 15 minutos.

Embora interessante, esse desenho possui problemas que vamos discutir mais adiante no artigo.

V3 — Monitoração

Um ponto frágil do fluxo de trabalho, é que os solicitantes tinham como premissa que os dados estariam disponíveis diariamente em determinado horário. Com essa informação disponível, eles criavam dashboards e relatórios que eram disparados para outras áreas e para a alta gestão de maneira automática. Em caso de falha no processamento, informações incorretas seriam propagadas e a tomada de decisão estaria comprometida.

Para o time de dados ter uma atuação preventiva e também alertar os usuários sobre o status da ingestão, foi adicionado um tópico no SNS para notificar os usuários de negócio em casos de falha. Os usuários podem então se inscrever nesse tópico e escolher a forma como desejam ser notificados (SMS ou e-mail, por exemplo).

O SNS é um serviço de pub/sub, onde é possível ter diversos leitores subscritos no tópico. Reportar os erros por ele é uma prática interessante porque outros serviços e fluxos podem ser acoplados. Por exemplo, um serviço que captura erros e os envia para um dashboard central com a saúde do Data Lake, processos de data quality, ou até mecanismos de recuperação de falhas.

V4 — Mais funcionalidades

O desenho foi ampliado quando o negócio solicitou a ingestão dos dados de um novo endpoint. A questão, do ponto de vista técnico, é que os endpoints anteriores eram invocados com um evento com conteúdo estático: era sempre o mesmo evento, com os mesmos parâmetros, só mudando a data e a body que deveria ser enviado na chamada REST do endpoint fornecedor.

Nessa nova demanda, o evento precisaria ser dinâmico: o conteúdo da body deriva de uma lista de informações resultantes de uma consulta no Data Lake.

O primeiro passo foi criar um serviço (chamado aqui no artigo de Lambda Athena) que encapsula toda a lógica de consulta no Athena, tratando questões de erro e retry. A ideia é que ele fosse genérico, atendendo qualquer necessidade de consulta. Então o ponto de entrada é uma fila no SQS e o ponto de saída é uma lista de filas informadas no evento.

O passo seguinte foi fazer as ligações entre esse serviço e o Lambda Orquestrador. Para cada registro retornado na consulta do Athena, é necessário fazer uma consulta na API. Ficou fácil implementar essa lógica criando uma nova classe a partir do padrão Strategy. Ela garante o respeito ao Princípio Aberto/Fechado do SOLID, ao mesmo tempo que evita que todo o código já existente tenha que ser alterado.

Por último, o Lambda Athena, por ser genérico, age como um serviço e pode ser usado por qualquer outra aplicação ou serviço, evitando código duplicado e retrabalho por parte dos times da TI.

V5 — A evolução futura

Na V2, foi mencionado que a arquitetura atende bem o problema, mas possui problemas. Lembre-se que vimos que o Lambda Athena é genérico e pode ser reusado. Reuso é uma propriedade quase sempre desejada quando pensamos no universo da tecnologia.

Que outros comportamentos o desenho tem que são habituais? Conversão e persistência de Parquet num destino é muito comum para times de dados, e consultas em API são comuns para vários times de tecnologia!

Mas os nossos serviços não podem ser reaproveitados. Isso porque eles possuem múltiplas funções fortemente acopladas:

  • O Lambda Orquestrador não só orquestra, mas também consulta o total de registros
  • O Lambda Filho consulta a API e é, ao mesmo tempo, responsável pela conversão e persistência de Parquet

Uma boa prática do Lambda é ter funções leves com propósito único, assim como o Princípio de Responsabilidade Única do SOLID. A solução Desacoplar as funções em mais partes, cada uma com seu objetivo bem definido:

  • O Lambda Orquestrador apenas recebe o evento que determina como será a estratégia de consulta.
  • Agora há um novo Lambda chamado Request, responsável por fazer chamadas em API e repassá-las para algum destino que é determinado por evento.
  • O Lambda Filho é trocado por um Lambda Persist, que converte os dados para Parquet e o armazena em algum destino especificado em seu evento.
  • Os Lambdas não precisam mais enviar a mensagem de erro ao SNS. Isso é tratado separado por outra função dedicada, o Lambda Notify, que também é genérico e usado como microsserviço. Dentro desse contexto, ele é disparado por uma Dead Letter Queue (DLQ)

Agora o Lambda Request tem apenas a responsabilidade de consultar num endpoint. Isso significa que ele pode ser usado por quaisquer aplicações que tenham essa necessidade, da mesma forma que o Lambda Persist. Eles deixam de ser específicos para um contexto de negócio, e passam a ser ferramentas globais dos times de tecnologia.

Alguns comentários adicionais

Sobre a arquitetura

A característica da arquitetura é ser orientada a eventos. Isso acompanha alguns benefícios:

  • Falha isolada e resiliência — os serviços são desacoplados. Eles reconhecem apenas o seu roteador de eventos — no nosso caso, o SQS/SNS. Se um serviço apresentar uma falha, os serviços restantes continuarão em execução. O uso do SQS promove resiliência já que em cenário de falha, a fila tenta reexecutar o evento após um determinado período de tempo.
  • Escalabilidade e independência — os serviços escalam isoladamente de acordo com a quantidade de mensagem que as filas SQS contém. Essa independência permite que cada serviço pode ser desenvolvido por times diferentes, usando linguagens diferentes.
  • Redução dos custos — numa arquitetura orientada a eventos, tudo acontece sob demanda: quando há um evento, seu serviço é iniciado e executado. E você só paga por esse tempo de execução e pela capacidade provisionada, que é elástica e escala automaticamente.

A principal desvantagem na orientação a eventos é relacionada a debugging e monitoração. Por exemplo, suponha um erro no Lambda Persist. Esse erro foi gerado no Lambda Persist, ou ele é fruto de um serviço distante que foi sendo propagado, como o Lambda Athena? Apesar disso, há mentalidades, ferramentas e padrões para endereçar essas questões quando pensamos nesse tipo de arquitetura.

Alternativas e possíveis melhorias

Embora o Lambda seja majoritariamente usado aqui, é importante deixar claro situações em que seu uso não faria sentido:

  • Orquestração síncrona/complexa — nosso Lambda Orquestrador coordena o trabalho, mas de forma assíncrona, ou seja, ele dispara os eventos e a instância em execução deixa de existir, não esperando resultados. Nos cenários em que a instância do Lambda continua executando de maneira ociosa enquanto aguarda resultados ou mesmo quando há orquestração complexa e que pode tornar o código difícil de ler, a sugestão da própria Amazon é o AWS Step Functions.
  • Dados são movidos de um lugar para outro sem que nenhuma regra seja aplicada — O Lambda faz sentido quando os dados precisam receber algum tratamento ou regra de negócio que só é possível em nível de código. Quando não há essa necessidade, os serviços podem ser integrados diretamente, quando houver tal possibilidade.

E como mencionado no início do artigo, um candidato forte a tomar o lugar na camada de computação seria um container: ECS Fargate, para manter a arquitetura serverless ou EKS, para tornar o desenho cloud agnostic (para isso, seria necessário também trocar os outros componentes proprietários da AWS por suas contrapartes de mercado e que são multi-cloud, como o SQS pelo Kafka).

Outro ponto de discussão é a lista de destinos informada no evento x um tópico SNS

Considere o trecho de evento abaixo, que contém a lista de destinos que o Lambda Athena deve enviar os resultados da consulta:

Os destinos são alguns SQS diferentes. Ela funciona porque são poucos destinos, e todos eles são do mesmo tipo. Mas se fosse necessário adicionar novos tipos de destino — por exemplo o Kafka, o RabbitMQ ou o PostgreSQL — seria necessário alterar a lógica de programação para suportar esses novos destinatários. O Produtor do evento também precisa ser responsável por passar todos os destinatários. Em uma arquitetura orientada a eventos, um Produtor não deveria ter esse tipo de conhecimento sobre os Consumidores.

Uma solução mais elegante recomendada pela AWS, é o padrão Fan-Out: um tópico SNS com filas plugadas a ele. Desta forma, o Produtor do Evento só precisa passar como destino o tópico SNS em questão e o código só precisa saber como se publica uma mensagem nesse serviço. Os Consumidores se plugam no tópico através de uma fila — e a fila é importante porque o SNS não tem garantias de persistência e retry.

Esse padrão existe porque o SQS atua em 1 para 1 — Um Produtor produz o evento e apenas um Consumidor pode acessá-lo. Outra solução, que foge do AWS-native mas se aproxima do mercado, é ter uma fila que suporta múltiplos consumidores como o Kafka.

A importância do código

As vantagens e desvantagens da arquitetura foram expostas, bem como os motivadores das escolhas e todo o processo de evolução que o desenho teve. Mas o fundamento que sustenta tudo isso é o código, e de nada adianta ter a arquitetura mais moderna e robusta do mercado se o código não se sustentar.

Por isso, esse trecho é para lembrar da importância de se ter um código limpo, seguindo práticas como SOLID, DRY, YAGNI, padrões, convenções da linguagem e ausência (ou redução) de code smells. E processos de qualidade, como Code Review e TDD, que reforcem as boas práticas.

CI/CD

Toda a stack foi escrita como Infraestrutura como Código através do Serverless Framework, com processo de deploy via Jenkins em ambientes de desenvolvimento, homologação e produção. Isso confere maior agilidade e evita erros operacionais. Um super agradecimento aqui à galera de DevOps/Engenharia Cloud por todo o apoio!

Comentários finais

Essa foi uma das minhas primeiras entregas no Pan. Começou simples e foi se tornando algo robusto e complexo — assim como a minha trajetória na companhia. Ter conduzido esse projeto integralmente — toda a parte da arquitetura, desenvolvimento, testes e sustentação, me permitiu aprender muito e colocar em prática diversas coisas relacionadas ao universo cloud. Espero aprender e entregar muito.

Então comenta aí! O que você achou? O que mudaria? O que teria feito diferente?

--

--