Asyncworker — Microframework para consumers assíncronos em Python

Na Sieve, as formas mais comuns de integração entre serviços são através de chamadas síncronas via requisições HTTP, ou de chamadas assíncronas a workers AMQP. Para HTTP, clientes ou servidores, utilizamos aiohttp para tudo. Atualmente, utilizamos o asyncworker na construção de novos workers e no processo de modernização de aplicações existentes.

Nesse post, vou tentar mostrar através de exemplos práticos por que criamos e utilizamos o asyncworker. Pra isso, vamos considerar como exemplo uma aplicação que consome mensagens de uma fila no rabbitmq e indexa essas mensagens em um Elasticsearch.

Primeiro, nós precisamos de uma instância do rabbitmq rodando. Vamos fazer isso com docker:

docker run -d -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_VHOST=/ --name rabbitmq rabbitmq:3.7.8-management-alpine

No rabbit, a porta 5672 é utilizada para AMQP e a 15672 para interface de gerenciamento HTTP. Essa imagem tem usuário e senha padrão como guest/guest. Você consegue acompanhar o processo de start da aplicação através dos logs:

docker logs -f rabbitmq

Producer

Como queremos escrever um consumer com o asyncworker, agora que já temos uma instância do rabbitmq rodando, precisamos criar e alimentar uma fila, escrevendo um producer para dar algum trabalho para o nosso consumer.

Fila words_to_index com o binding: exchange=default e routing_key=words

Nosso producer vai publicar uma mensagem nesta fila, contendo uma palavra a ser indexada no Elasticsearch pelo nosso consumer. Como input, vamos utilizar uma lista de palavras: https://s3.amazonaws.com/diogo.martins/public/portuguese-brazil.txt

Esse arquivo contém uma palavra por linha, ~1 milhão de palavras e 11,4mb. Como não queremos ler todo o arquivo de uma vez e nem precisamos de todas as linhas pra esse exemplo, vamos fazer o streaming da parte do corpo que precisamos e vamos usar aiohttp pra isso.

pip install aiohttp==3.4.4

Agora que já conseguimos ler linha a linha a lista de palavras, tudo que precisamos é nos conectar ao rabbitmq e publicar cada uma das mensagens na fila words_to_index.

Após rodarmos o script, podemos observar que 10000 mensagens foram produzidas.


Consumer

Vamos começar instalando as dependências:

pip install async-worker==0.5.0 aioelasticsearch==0.5.2

Agora vamos subir uma instância de Elasticsearch, para que o nosso consumer consiga indexar as mensagens:

docker run -d -p 9200:9200 -p 9300:9300 -e “discovery.type=single-node” --name elasticsearch docker.elastic.co/elasticsearch/elasticsearch:6.4.2

O código é bem simples, mas tem bastante coisa acontecendo por debaixo dos panos:

  • ack / reject automático de mensagens. Nesse caso, se o handler rodar com sucesso, todas as mensagem sofrerão ACK. Caso uma exceção não tratada seja capturada pelo asyncworker, todas as mensagens sofrerão REJECTe voltarão para a fila;
  • Reconexão automática em caso de perda de conexão;
  • Um consumer é criado para cada rota do handler e mensagens são empurradas pelo rabbitmq, neste caso, com prefetch de 512 mensagens;
  • Recebemos dados em lote. Configuramos nossa rota com a opção de bulk de 256 mensagens, ou seja, o nosso handler será chamado com uma lista de 256 mensagens, o que é perfeito para o nosso caso em que queremos fazer um insert em bulk no elasticsearch.

Neste exemplo, estamos consumindo mensagens somente da fila words_to_index, mas existem casos onde queremos tratar mensagens de filas diferentes da mesma forma. Pra isso, basta adicionarmos mais elementos à lista de rotas deste handler:

@app.route(["queue_name1", "queue_name2", "queue_name3"], vhost="/")

Já utilizarmos o asyncworker de forma estável em diversos serviços em produção e, comprovadamente, atende o seu principal objetivo: Ser simples e fazer com que o desenvolvedor se foque nas regras de negócio da aplicação e abstraia as peculiaridades de comunicação.

O que mostrei aqui é só uma parte do que é possível fazer com o asyncworker hoje, e ainda vamos adicionar muitas funcionalidades ao projeto, então recomendo à todos que acompanhem o projeto. Se você gostou, nos ajude colaborando no projeto ou dando uma estrela no github =)

Welcome to a place where words matter. On Medium, smart voices and original ideas take center stage - with no ads in sight. Watch
Follow all the topics you care about, and we’ll deliver the best stories for you to your homepage and inbox. Explore
Get unlimited access to the best stories on Medium — and support writers while you’re at it. Just $5/month. Upgrade