EMR Serverless e Airflow: Processando dados de forma escalável com Spark na AWS

Gerenciar cluster Spark? Esperar o tempo de bootstrap do EMR? Ter um servidor para configurar? Tudo isso é coisa do passado, agora a moda é processar dados sem servidor

Cícero Moura
Data Hackers
8 min readSep 23, 2022

--

A AWS levantou uma hype muito grande na área de dados no final de 2021 quando anunciou a nova opção para processamento e análise de dados com o EMR (Elastic Map Reduce), a versão Serverless.

No meio deste ano (2022) a versão foi lançada de forma definitiva e aqui vamos discutir um pouco mais sobre como rodar aplicações Spark, Hive, Presto e outras sem a necessidade de gerenciar servidores.

Assim o objetivo deste artigo é apresentar um pouco mais sobre a versão sem servidor do EMR e fazer um case prático para processamento de dados junto com Airflow e Spark.

Clique no link abaixo para acessar o código completo utilizado neste artigo:

EMR e sua versão Serverless

Para quem já utiliza o serviço do EMR, com certeza já se deparou com alguns problemas que incomodam bastante, como por exemplo:

  • Diversas configurações que precisam ser passadas para provisionar um servidor, o tipo de máquina, quantidade de máquinas (master, workers) e outras…
  • Tempo de provisionamento que é de 10–15 minutos todas as vezes que é preciso de um novo servidor;
  • Resolução de bugs que podem acontecer no servidor, como as vezes o DNS do EMR falhar e não conseguir acessar outras aplicações do ambiente.

Por esses problemas e outros, além do custo de manter uma máquina ligada, outras opções de ter uma versão do Spark e aplicações de Big Data em outros serviços da AWS como o ECS e EKS não são incomuns.

Quando a AWS anunciou uma versão Serverless para o EMR, a comunidade viu com bons olhos as possibilidades de aplicações. E essa boa esperança em volta do serviço tem começado a ser suprida.

Mas afinal como funciona…

Como qualquer outro serviço Serverless, o grande ganho do EMR é não precisar provisionar servidores, pagar apenas pelo o que utilizar e ter a liberdade de escolher e escalar recursos computacionais sem dificuldades.

A versão Serverless do EMR possibilita executar aplicações Spark, Hive e Presto. O serviço pode determinar e provisionar automaticamente os recursos de memória e computação necessários para o processamento das solicitações e aumentar ou reduzir esses recursos nas diferentes etapas com base nas mudanças dos requisitos das aplicações.

Com isso o EMR pode escalar sua aplicação de forma automática conforme a demanda das suas cargas de trabalho.

E como você só paga pelos recursos que utiliza, o EMR Serverless é econômico na execução de análises em escala de petabytes.

Conceitos

Alguns conceitos que a versão Serverless do EMR utiliza:

  • EMR Studio: é onde podemos verificar o status dos trabalhos em execução, consultar o histórico e ferramentas como o Spark UI para fazer debug dos scripts.
  • Applications: basicamente é o cluster que estamos provisionando, podemos configurar a quantidade de recursos computacionais mínimos e também até onde permitimos ele escalar. Um cluster pode ter um estado de Created, Stopped, Deleted e outros. Quando pausado pode ser reativado posteriormente para processar outras cargas de trabalho.
  • Jobs: os jobs basicamente são as nossas aplicações em execução, onde cada aplicação pode utilizar uma quantidade de recurso especificada, além de ter permissões de segurança diferentes. Um job pode assumir vários status, como por exemplo: Submitted, Running, Failed, Success e outros.

EMR Serverless + Airflow: Um case prático

Para a utilização da versão do serverless do EMR ficar mais clara, iremos tratar aqui um exemplo prático, algo muito comum em um arquitetura de Data Lake.

O case é o seguinte:

Temos um Data Lake com as seguintes camadas, raw, staged e curated que são construídas em buckets diferentes no Amazon S3. Os dados que vamos tratar estão no formato CSV e são dos datasets de exemplos do movielens. Os CSVs estão armazenados na camada raw e precisamos processá-los, de forma que sejam convertidos para parquet na camada staged, depois algumas transformações e junções de datasets serão realizadas e os resultados gravados na camada curated.

Os datasets para este exemplo são os seguintes:

  • movies: contém dados dos filmes, como nome, ano de lançamento, categorias e outras informações;
  • ratings: contém as notas que foram pontuadas por vários usuários em relação aos filmes do dataset movies.

O nosso código será dividido em duas etapas para ficar mais didático:

  1. PySpark: explicação dos scripts para processamento dos dados conforme os requisitos explicados anteriormente;
  2. Airflow: criação da DAG para orquestrar a execução dos scripts PySpark com a utilização do EMR Serverless.

Para visualizar melhor o nosso projeto, o diagrama de arquitetura abaixo mostra como será configurado a integração entre as tecnologias.

Desenho de arquitetura para utilização do EMR Serverless com o Airflow

O Airflow para este exemplo estará sendo executado em uma instância simples no EC2, ele será o responsável por iniciar a aplicação no EMR Serverless e adicionar os jobs Spark que irá processar os dados entre as camadas do Data Lake no S3.

Para que os scripts Spark fiquem acessíveis para qualquer aplicação, os mesmos serão armazenados no S3 e referenciados ao executar uma aplicação Spark pelo Airflow.

PySpark: processamento dos dados no Data Lake

O script para processamento dos dados será dividido em duas operações principais e uma que controla a execução de cada etapa. Iremos verificar os detalhes a seguir.

  1. Processamento dos dados da raw para staged

Neste passo basicamente o que será feito é abrir os datasets de movies e ratings em formato CSV que estão na camada raw e salvar os mesmos no formato parquet na camada staged.

2. Processamento dos dados da staged para curated

Nessa etapa, iremos abrir os dados que foram salvos em parquet na camada staged no passo anterior, fazer a junção (join) dos dados de filmes com suas respectivas notas, converter os campos do formato timestamp para data/hora e depois salvar apenas um dataset na camada curated.

3. Execução dos scripts de forma genérica

Para os scripts explicados anteriormente serem executados, os mesmos são chamados através de uma função genérica que recebe como parâmetro principal (job_name) qual a camada que será executado (raw_to_staged ou staged_to_curated), conforme o código abaixo:

Além de qual etapa será executada, os scripts esperam os seguintes parâmetros para configurar o processamento de cada camada do Data Lake:

  • input_path: o caminho do bucket do S3 que contém os dados de origem, basicamente qual camada do Data Lake.
  • output_path: o caminho do bucket do S3 que será salvos os dados, também se refere a camada do Data Lake de destino dos dados naquele momento.

Airflow: orquestrando os jobs Spark no EMR Serverless

Para este exemplo, o Airflow está sendo executado em um máquina EC2 através de contêineres do Docker.

Basicamente teremos duas tasks para serem executadas no Airflow: raw_to_staged e staged_to_curated.

Para cada uma das tasks os parâmetros serão configurados e passados para o job Spark conforme mencionado na seção anterior.

As tasks para processamento dos dados serão montadas de forma dinâmica, assim o código fica escalável para adicionar novas etapas caso necessário.

Além das tasks para processamento dos dados, teremos uma task para iniciar a aplicação do EMR Serverless (create_spark_app) e uma para deletar a aplicação ao final de todos os processamentos (delete_app).

O código abaixo mostra como ficou a DAG do Airflow:

Obs: no momento da escrita deste artigo, o Airflow ainda não possui um operador (operator) oficial para o EMR Serverless, assim estou utilizando um Operator que foi compartilhado pela AWS para executar DAGs com o EMR Serverless.

De forma visual, teremos a DAG da seguinte forma na interface do Airflow:

Importante destacar que ao criar uma application na task create_spark_app o EMR Serverless retorna um application_id que será utilizado em todas as outras tasks da DAG, utilizando assim o mesmo cluster para processamento dos dados. E isso também explica a dependência entre todas as tasks da DAG no desenho acima.

Para finalizar a parte de configuração da DAG do Airflow, precisamos criar ainda duas variáveis:

  • JOB_ROLE_ARN: são as permissões de configuração para execução dos jobs dentro da application, para criar uma role do IAM corretamente, essa documentação da AWS pode ser consultada.
  • S3_LOGS_BUCKET: caminho de um bucket para o S3 onde serão salvos os logs do Spark.

A imagem a seguir mostra como ficou as variáveis na UI do Airflow:

Executando EMR Serverless com o Airflow

Ao executar a DAG, podemos verificar o progresso no Airflow e acompanhar os status do cluster EMR em execução.

Na imagem abaixo temos uma application do EMR Serverless já no estado de criada executando a versão 6.6.0 da imagem do EMR.

Ao acessar a application do EMR, podemos verificar na imagem a seguir que temos dois jobs, um que já foi executado com sucesso e outro que está começando o estado de execução.

Acessando um job específico, ainda conseguimos visualizar os detalhes de sua execução, como os parâmetros recebidos, o local do script e as suas permissão de execução (IAM role).

Obs: o tempo médio atualmente para um job ser criado no cluster é começar a ser executado (sair do status de Submitted para Running) está entre 2–5 minutos nos testes realizados.

Como resultado da execução dos jobs, temos os dados sendo salvos nas camadas do Data Lake:

  • Dados na camada staged:
  • Dados na camada curated:

Conclusão

O serviço do EMR Serverless ainda está em evolução na AWS, porém já se mostra com grande potencial para se utilizar nas aplicações voltadas para Big Data.

Para cargas de dados que são executadas em batch e sob demanda (como a maior parte dos dados em Big Data) é uma boa solução que junta escalabilidade, disponibilidade, performance e custo, vale muito a pena testar e validar essa solução.

Assim o EMR Serverless já é uma alternativa viável não somente para o EMR clássico, mas para soluções de Big Data que utilizam serviços como o Kubernetes para gerenciar e manter um cluster Spark.

Redes sociais

Referências

--

--

Cícero Moura
Data Hackers

Arquiteto de Dados, pós-graduado em Big Data e Machine Learning. Palestrante em Big Data. Também sou AWS Community Builder e AWS Community Leader.