Sitemap

Processamento paralelo sequencial com Python e Celery

4 min readMar 16, 2020

--

Olá pessoal!

Em resumo, vou falar como gerenciar processos em background com Celery e Python. Com o diferencial de que não se trata de processos completamente independentes e que devem ser processados sequencialmente.

O título

Eu fiquei na dúvida se colocava ‘Processamento paralelo’ ou ‘Processamento distribuído’. Acho que tecnicamente o melhor título seria ‘Processamento distribuído’, mas eu acredito que para o público alvo (leigos e/ou iniciantes) desse post, o melhor seria ‘paralelo’ mesmo.

O desafio

Teremos um script que deve coletar dados de um sistema de terceiros (vamos chamá-lo de ‘portal’). Esse site de terceiros possui dados privados e respectivos aos seus proprietários (vamos chama-los ‘clientes’).

Cada ‘cliente’possui dados de acesso exclusivos ao ‘portal’. Então além de coletar as informações dos clientes, vamos precisar autentica cada conta antes de ter acesso aos dados.

O desafio é coletar dados de todos os ‘clientes’ de forma igualitária, onde devemos garantir e alocar recursos de processamento e rede de forma que todos os ‘clientes’ sejam coletados de tempos em tempos.

Se existisse uma integração real com o ‘portal’ onde ele me informasse quando algum dado novo fosse gerado ou recebido pelo cliente, essa rotina de capturar dados não seria necessária. Mas não é o meu caso.

O problema

Pra quem está acostumado a processos em paralelo, sabe que as vezes temos alguns problemas de prioridades e sequência dentro dos workers.

Pra quem ainda não trabalhou com processos em paralelo, workers são processos que podem estar na mesma máquina ou em máquinas separadas que serão responsáveis por receber uma solicitação e executar o que lhe foi pedido.

Como os workers são responsáveis apenas por executar o que lhes foram solicitados, fica a cargo de algum gerenciador de filas e tarefas falar quando ele deve entrar em ação. Em nosso caso, aí entra o http://celeryproject.org.

O script em questão, deve garantir que nenhum ‘cliente’ seja deixado pra trás e que nenhum ‘cliente’ tenha prioridade sobre o outro.

Se simplesmente colocássemos cada cliente em uma fila e quando terminasse ele voltasse novamente para a fila. Uma ‘cliente’ poderia terminar mais rápido que outro e sequência depois de alguns ciclos (ou loops) poderia ser prejudicada. Vejamos um exemplo:

Ciclo 01:
* Cliente A
* Cliente B
* Cliente C
Ciclo 02:
* Cliente B
* Cliente C
* Cliente A
Ciclo 03:
* Cliente B
* Cliente A
* Cliente C
Ciclo 04:
* Cliente B
* Cliente C
* Cliente A
...

Em nosso exemplo anterior, no primeiro ciclo temos 3 ‘clientes’ iniciando a execução. No segundo ciclo o cliente ‘A’ terminou e os clientes ‘B’ e ‘C’ continuam em execução. No terceiro ciclo o cliente ‘B’ ainda não terminou seu processamento, mas o cliente ‘C’ já e o mesmo já foi novamente pro final da fila.

Como o intuito é garantir um sequência, quando o cliente ‘A’ executou 2 vezes, o cliente ‘C’ uma vez e o cliente ‘B’ pro sua vez nenhuma, nosso exemplo mostrou que “algo errado não está certo”.

Talvez se a fonte de informação fosse diversificada, a coleta sequenciada fosse completamente desnecessária. Mas como se trata de um único ‘portal’, se um ‘cliente’ está demorando mais do que o outro, pode ser que estamos alocando recursos do nosso sistema ou do ‘portal’ que devia estar sendo alocado de outra forma. E como já foi dito, não podemos deixar nenhum cliente pra trás.

A solução

Eu não sou especialista em processamento paralelo e muito menos em http://docs.celeryproject.org. Mas gostaria de compartilha minha solução com você leitor interessado.

Esse foi meu primeiro projeto com http://docs.celeryproject.org e meu maior projeto em Python até então. Antes desse desafio, meu conhecimento nesse assunto estava restrito a Ruby e https://sidekiq.org/.

Dito isso, vamos colocar a mão na massa.

No meu projeto eu tenho duas tasks:

  1. loadCompanies: responsável por carregar todas os ‘clientes’ e suas respectivas credencias de acesso.
  2. getLeadsFromCredential: responsável por autenticar o ‘cliente’ no ‘portal’ e coletar todas a informações.

Vejamos isso por dentro:

A lógica principal no script pode ser listada da seguinte forma:

  1. Carrego todos os ‘clientes’
  2. Para cada cliente eu inicializo uma nova task
  3. Quando todas os cliente tiverem seu dados coletados, o processo recomeça do ponto 1 novamente.

Para garantir que o ‘portal’ não sofra tanto com os acessos simultâneos do script, eu optei por limitar o processamento paralelo de ‘clientes’ para no máximo 20. Assim eu tenho acesso ao dados do cliente em tempo abio e não sobrecarrego o portal.

Em meu projeto esto utilizando https://www.docker.com/. Então para instanciar meus workers eu tenho o seguinte:

Para manter tudo como previsto, eu tenho 2 workers e cada um fica responsável por uma queuee.

A primeira queuee (defaults) fica com o worker limitado a apenas um processo por vez. Já a segunda queuee (company_tasks) pode coletar dados de até 20 clientes ao mesmo tempo.

Agora se você reparar na minha configuração do Celerey (celeryconfig.py) vai notar que existe um agendamento que diz para a cada 10 segundos a task ‘tasks.loadCompanies’ seja executada. Esse processo já pôde ser visto no código ‘tasks.py’. Mas percorrendo o código novamente, você se depara com o seguinte trecho:

total = conn.default_channel.client.llen('company_tasks')i = app.control.inspect()
total += countItemsOnWorker(i.active())
total += countItemsOnWorker(i.scheduled())
total += countItemsOnWorker(i.reserved())

Nele você vai reparar que antes de serem inicializadas novamente as tasks para coletar os dados dos clientes, é feita uma contagem do número de processos agendados ou executando no worker ‘celery-worker_2’. Isso me garante que só após todos os clientes já terem sido consultados é que eu farei uma nova consulta.

--

--

Responses (1)