Asyncworker — Microframework para consumers assíncronos em Python

Diogo Magalhães Machado
3 min readNov 4, 2018

Na Sieve, as formas mais comuns de integração entre serviços são através de chamadas síncronas via requisições HTTP, ou de chamadas assíncronas a workers AMQP. Para HTTP, clientes ou servidores, utilizamos aiohttp para tudo. Atualmente, utilizamos o asyncworker na construção de novos workers e no processo de modernização de aplicações existentes.

Nesse post, vou tentar mostrar através de exemplos práticos por que criamos e utilizamos o asyncworker. Pra isso, vamos considerar como exemplo uma aplicação que consome mensagens de uma fila no rabbitmq e indexa essas mensagens em um Elasticsearch.

Primeiro, nós precisamos de uma instância do rabbitmq rodando. Vamos fazer isso com docker:

docker run -d -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_VHOST=/ --name rabbitmq rabbitmq:3.7.8-management-alpine

No rabbit, a porta 5672 é utilizada para AMQP e a 15672 para interface de gerenciamento HTTP. Essa imagem tem usuário e senha padrão como guest/guest. Você consegue acompanhar o processo de start da aplicação através dos logs:

docker logs -f rabbitmq

Producer

Como queremos escrever um consumer com o asyncworker, agora que já temos uma instância do rabbitmq rodando, precisamos criar e alimentar uma fila, escrevendo um producer para dar algum trabalho para o nosso consumer.

Fila words_to_index com o binding: exchange=default e routing_key=words

Nosso producer vai publicar uma mensagem nesta fila, contendo uma palavra a ser indexada no Elasticsearch pelo nosso consumer. Como input, vamos utilizar uma lista de palavras: https://s3.amazonaws.com/diogo.martins/public/portuguese-brazil.txt

Esse arquivo contém uma palavra por linha, ~1 milhão de palavras e 11,4mb. Como não queremos ler todo o arquivo de uma vez e nem precisamos de todas as linhas pra esse exemplo, vamos fazer o streaming da parte do corpo que precisamos e vamos usar aiohttp pra isso.

pip install aiohttp==3.4.4

Agora que já conseguimos ler linha a linha a lista de palavras, tudo que precisamos é nos conectar ao rabbitmq e publicar cada uma das mensagens na fila words_to_index.

Após rodarmos o script, podemos observar que 10000 mensagens foram produzidas.

Consumer

Vamos começar instalando as dependências:

pip install async-worker==0.5.0 aioelasticsearch==0.5.2

Agora vamos subir uma instância de Elasticsearch, para que o nosso consumer consiga indexar as mensagens:

docker run -d -p 9200:9200 -p 9300:9300 -e “discovery.type=single-node” --name elasticsearch docker.elastic.co/elasticsearch/elasticsearch:6.4.2

O código é bem simples, mas tem bastante coisa acontecendo por debaixo dos panos:

  • ack / reject automático de mensagens. Nesse caso, se o handler rodar com sucesso, todas as mensagem sofrerão ACK. Caso uma exceção não tratada seja capturada pelo asyncworker, todas as mensagens sofrerão REJECTe voltarão para a fila;
  • Reconexão automática em caso de perda de conexão;
  • Um consumer é criado para cada rota do handler e mensagens são empurradas pelo rabbitmq, neste caso, com prefetch de 512 mensagens;
  • Recebemos dados em lote. Configuramos nossa rota com a opção de bulk de 256 mensagens, ou seja, o nosso handler será chamado com uma lista de 256 mensagens, o que é perfeito para o nosso caso em que queremos fazer um insert em bulk no elasticsearch.

Neste exemplo, estamos consumindo mensagens somente da fila words_to_index, mas existem casos onde queremos tratar mensagens de filas diferentes da mesma forma. Pra isso, basta adicionarmos mais elementos à lista de rotas deste handler:

@app.route(["queue_name1", "queue_name2", "queue_name3"], vhost="/")

Já utilizarmos o asyncworker de forma estável em diversos serviços em produção e, comprovadamente, atende o seu principal objetivo: Ser simples e fazer com que o desenvolvedor se foque nas regras de negócio da aplicação e abstraia as peculiaridades de comunicação.

O que mostrei aqui é só uma parte do que é possível fazer com o asyncworker hoje, e ainda vamos adicionar muitas funcionalidades ao projeto, então recomendo à todos que acompanhem o projeto. Se você gostou, nos ajude colaborando no projeto ou dando uma estrela no github =)

--

--