Sincronizando dados do banco relacional com o ElasticSearch usando Logstash
Manter o sincronismo de seus dados em tempo real entre diferentes ferramentas pode ser um grande desafio, mas não para o Logstash.
Afinal pra que serve o Logstash?
Logstash é uma ferramenta muito versátil, open-source, que permite criar pipelines para extrair, transformar e enviar seus dados para um outro destino em tempo real.
Inputs -> Filters -> Outputs
Os dados trafegam dentro da pipeline do Logstash em forma de eventos.
Inputs geram eventos, filters os transformam, e outputs os enviam.
— Humm, mas como eu crio esses eventos para serem processados na minha pipeline?
Input plugins: São plugins built-in que abstraem a lógica de criação de eventos para diferentes fontes de dados, meus dados poderiam ter origem num banco relacional, num tópico Kafka, numa fila do RabbitMQ, num webhook HTTP, ou até mesmo no Twitter, entre muitos outros.
Portanto independente da fonte, bastaria configurar o input plugin para capturar suas informações e gerar eventos de acordo com a origem. No caso do nosso exemplo, usaremos o Jdbc Input Plugin para gerar eventos através das rows de uma query de banco de dados relacional.
Considere para o exemplo abaixo a pré-existência de 3 tabelas: posts, authors, e comments num banco relacional. Queremos transformar nossos posts em eventos, onde cada evento conterá além de dados do próprio post, dados do autor(1 post ->1 autor) e dos comentários(1 post->N comentários):
E que queremos desnormalizar nossos dados que estão nessas 3 tabelas para inserir no ElasticSearch num index de posts chamado idx_posts, essas informações:
Usamos assim o plugin jdbc:
No exemplo acima estamos conectando a um banco de dados relacional, usando o jdbc driver.
Na linha #16, definimos que vamos usar um cursor retornado pela própria query, e toda vez que essa query rodar esse valor será injetado em uma variável interna do plugin chamada :sql_last_value. Portanto sempre que a query rodar podemos comparar com o cursor da iteração anterior para filtrar os resultados recentes, conforme o statement da linha #26.
O Logstash cacheia essas informações de cursor em um arquivo que pode ser configurado na linha #10.
Simples assim e já estamos transformando cada uma das rows que nossa query retorna em eventos dentro da nossa pipeline!
Mas, temos um problema: devido ao JOIN com a tabela de comments, se um post tiver 2 comentários, a query retornará 2 rows, gerando 2 eventos para o mesmo post, certo? Nós queremos gerar apenas um evento para cada post, e agregar as informações do mesmo post nesse único evento, e agora José??
Filter Plugins: Usando filtros podemos realizar transformações no nosso evento num segundo step da pipeline, no nosso exemplo, vamos usar um filter do tipo aggregate para transformar eventos do mesmo post em apenas 1 evento, além de agregar as informações dos textos dos comentários para mandar para frente na pipeline:
Na linha #5 agrupamos os eventos por id de post, e dentro do code na linha #7, definimos o novo mapeamento do nosso evento.
Nas linhas #12 e #13, criamos um array de text_comments, para guardar as informações contidas do comment_text da nossa query.
Usamos também um event.cancel() para ignorar os eventos que vieram do input, pois vamos disparar nosso próprio evento customizado a partir de agora.
O push_previous_map_as_event da linha #17 indica que ao chegar um novo post, eu posso mandar o anterior para frente na pipeline. E o timeout =>1 da linha #18, indica que se ficar 1 segundo sem chegar novos eventos do input também posso mandar o evento para frente.
Legal agora tenho meu evento abaixo, o que eu faço com isso?!
Output Plugins: Um plugin de output literalmente define o que fazer com o evento que chegou para ele. No nosso exemplo vamos usar o Elasticsearch Output Plugin.
Pronto então a cada 1 segundo iremos consultar o banco e entender se algum novo post foi criado ou modificado com base na data de modificação, e faremos o upsert dele lá no ElasticSearch, top demais né?
É legal dizer aqui que poderíamos ter vários plugins de output, portanto eu poderia além de mandar esse evento para gravar no index la no ElasticSearch, poderia mandar o mesmo para uma fila, printar no console, disparar um email e chamar um webhook http, tudo a partir do mesmo evento, legal né?
E por hoje é só, valeu!
Samir Varandas é desenvolvedor na unico.
Venha trabalhar com a gente, temos vagas abertas!