Como rodar pyspark scripts no EMR com o Airflow

Tutorial do zero desde a criação dos recursos na AWS até a implementação da DAG no Airflow com uma imagem do Docker

Matheus Vasconcellos
Data Hackers
5 min readMar 20, 2022

--

Fonte: elaboração própria

Se você é Engenheiro de Dados ou é responsável por criar pipelines de Big Data em Cloud Computing, provavelmente já passou horas estudando em como integrar softwares open-source, como Airflow e Spark, com as soluções da cloud, seja ela GCP, Azure ou AWS (que é o foco deste artigo).

Nesse sentido, a AWS hoje oferece o serviço chamado Amazon Managed Workflows for Apache Airflow (MWAA), Airflow gerenciado que conta com as integrações feitas de forma bem mais simples.

No entanto, para muitas empresas, esse recurso pode ser caro demais ou representar um grande lock-in. Com isso em mente, e buscando trazer uma alternativa, escrevo este artigo com EMR, Airflow, S3 e Spark.

Assim, o objetivo deste post é desenvolver uma solução para o Airflow orquestrar a execução de jobs em pyspark por meio de clusters do EMR, tendo, por fim o armazenamento dos dados no bucket do S3. A imagem abaixo ilustra a arquitetura citada.

Fonte: elaboração própria

Assim sendo, o artigo está divido em 5etapas:

  1. Pré-requisitos
  2. Setup AWS
  3. Configuração do Airflow com o Docker
  4. Criação da DAG
  5. Deploy do código

Caso você queira acompanhar os códigos completos também pelo GitHub, clique aqui.

1. Pré-requisitos

1.1 Docker

De acordo com a versão do Airflow 2.2.4 do projeto, é preciso no mínimo da versão do Docker Compose v1.29.1 e do Docker Community Edition (CE) com 4.00 GB de memória.

1.2 Conta na AWS

Como o projeto é todo utilizando a AWS, é claro que você precisa de uma conta para desenvolver a sua solução.

Vale lembrar que, esta implementação não é gratuita, ou seja, você será cobrado pelo tempo em que o cluster do EMR estiver ligado. Para mais informações de preços, clique aqui.

2. Setup AWS

2.1 AWS CLI

Primeiro de tudo, para executar comandos da AWS no prompt de comando, é necessário configurar o CLI, conforme explicado aqui, na pasta AWS Configuration.

2.2 EMR Roles

Existem dois roles necessários para execução do código, visto que eles concedem permissão ao EMR para acessar outros recursos da AWS. São eles: EMR_DefaultRole e EMR_EC2_DefaultRole.

Para criá-los, basta executar o seguinte comando:

2.3 Criar bucket no S3

Caso você já tenha um bucket e queira usá-lo, pode pular está etapa. Mas caso queira criar um do zero, é possível via CLI com este comando abaixo:

3. Configuração do Airflow com o Docker

A customização foi feita com base na imagem oficial do Airflow versão 2.2.4. Aproveito a oportunidade para mencionar o artigo do Start Data Engineer https://www.startdataengineering.com/post/how-to-submit-spark-jobs-to-emr-cluster-from-airflow/ que traz uma solução para o mesmo problema levantado neste artigo, no entanto com a imagem do Airflow bem diferente.

No case aqui proposto, foi adicionado o AWS_SHARED_CREDENTIALS_FILE no environment. Além disso, foram incluídos também os seguintes volumes, que compartilham as pastas locais com os containers da imagem do Airflow:

  • ~/.aws:/usr/local/airflow/.aws/
  • ./pyspark_script:/opt/airflow/pyspark_script
  • ./data:/opt/airflow/data

Com isso, o arquivo yaml (somente com as partes modificadas) da imagem oficial do Airflow com as modificações mencionadas acima, ficou conforme o código abaixo:

Com isso, terminamos a parte de configuração de ambiente tanto da AWS quanto do Airflow.

4. Criação da DAG

Após a configuração da AWS e do Airflow no Docker, vamos à implementação da DAG. Conforme a arquitetura exposta no início do artigo, segue o detalhamento na DAG abaixo:

Fonte: elaboração própria

Olhando a DAG detalhadamente, cada caixa é uma task. Do início ao fim, temos as seguintes execuções:

1. De modo paralelo, nós criamos o cluster e enviamos o script, assim como o arquivo de dados para o S3;

2. Em seguida, adicionamos os steps (scripts em pyspark) ao cluster do EMR;

3. Depois, temos a checagem se todos os steps foram executados;

4. Por fim, terminamos o cluster criado na primeira etapa.

Para a criação e configuração do cluster do EMR, temos o código abaixo. Nele, definimos o tamanho da instância, quantidade e também os roles criados no passo 2.2.

Obs.: Vale lembrar que o tempo de criação do cluster é, em média, de 10 minutos.

Com relação às tasks send_data_to_s3 e send_script_to_s3, utilizam a mesma função:

Após a execução dessas três em paralelo, adicionamos os scripts em pyspark como steps no cluster do EMR criado, com este código:

Neste caso, só temos um step, no entanto, caso você tenha mais de uma execução, basta adicionar outro dicionário na lista SPARK_STEPS.

A penúltima task é referente ao check das execuções de todos os SPARK_STEPS e a última para terminar o cluster, conforme código abaixo:

5. Deploy do código

Anteriormente, passamos pelos detalhes do código. Agora, vamos para a parte da execução, que se torna bem simples com o Docker: basta copiar o código no GitHub através da linha de comando abaixo:

Verifique se todos os pré-requisitos das seções 1 e 2 estão preenchidos.

Feito isso, altere a constante BUCKET_NAME do arquivo pyspark_scripts/modeling_profile_user_data.py e dags/run_script_emr.py.

Em seguida, digite o código abaixo na linha de comando para subir o container com o Airflow:

Feito isso, digite na url do seu navegador: http://localhost:8080/home

Conclusão

Neste artigo criamos uma pipeline de dados de jobs de pyspark no EMR Cluster orquestradas pelo Airflow.

Com base no código do GitHub é possível criar suas próprias pipelines utilizando a estrutura de pastas definida e já configuradas no Docker.

Um ponto de atenção que merece ser ressaltado é o tempo de 10 minutos para ativação cluster, que pode ser alto dependendo da velocidade que o dado precisa ser entregue. Uma alternativa é criar uma DAG que ative previamente clusters e que esses jobs sejam executados em cima do cluster já criado.

Por fim, seguem as minhas redes sociais:

LinkedIn: https://www.linkedin.com/in/matheus-lins-vasconcellos/

Github: https://github.com/matheusvclls

Referências:

Documentação Airflow: https://airflow.apache.org/

Documentação Amazon Managed Workflows for Apache Airflow (MWAA): https://docs.aws.amazon.com/mwaa/index.html

Documentação EMR: https://aws.amazon.com/pt/emr/

Precificação EMR: https://aws.amazon.com/pt/emr/pricing/?nc=sn&loc=4

Start Data Engineer EMR + Airflow: https://www.startdataengineering.com/post/how-to-submit-spark-jobs-to-emr-cluster-from-airflow/

--

--

Matheus Vasconcellos
Data Hackers

I love how the data could be useful and solve problems.