Como usar AWS Lambda e EMR para processamento de dados

Lucas Mindelo
A3data
Published in
13 min readApr 12, 2023

Uma alternativa para processamento de dados com uma grande volumetria, utilizando ferramentas atuais, como o Apache Spark .

O que vamos fazer?

No nosso exemplo, vamos criar um script em Python que usa o Boto3 para enviar um trabalho para o cluster EMR. Em seguida, executar o script como uma função na AWS. Segue imagem abaixo com todo o processo:

Para submeter um trabalho no cluster pela Lambda podemos usar a API do Amazon EMR ou o AWS SDK para Python (Boto3). Isso permite que você use a capacidade de processamento escalável do EMR sem comprometer a capacidade limitada da AWS Lambda. E para isso vamos utilizar o Apache Spark.

O que é o Apache Spark?

O Apache Spark é um framework de computação distribuída de código aberto, desenvolvido para processamento e análise de grandes volumes de dados (big data). Ele oferece velocidade, facilidade de uso e flexibilidade, permitindo a execução de tarefas em diversos ambientes e com várias linguagens de programação. Documentação completa aqui.

Segue o código pyspark que vamos utilizar para nosso exemplo:

from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

# Definir os paths S3
INPUT_PATH = "landing path s3"
OUTPUT_PATH = "bronze path s3"

# Criar uma sessão spark
print("start spark session")
spark = (
SparkSession
.builder
.appName('landing-to-bronze')
.getOrCreate()
)

# Ler o arquivo csv
df = (
spark
.read
.format("csv")
.options(header = 'False',delimiter = ';')
.load(INPUT_PATH)
)

# Escrever dados no formato Parquet
(
df
.write
.format("parquet")
.mode("overwrite")
.save(OUTPUT_PATH)
)

# Parar a sessão Spark
spark.stop()

Como segue nos comentários do código, vamos carregar os dados do S3 que estão no formato csv e salvar eles em outro bucket dentro do S3 no formato parquet.

Separando dados

Precisaremos de um ou mais arquivos de dados para nosso exemplo. Estarei usando arquivos com informações de CNPJ de empresas brasileiras disponibilizados no site de dados públicos dados.gov.

Temos algumas formas de pegar essas informações:

1. Podemos utilizar a biblioteca requests ou urllib do python para pegar diretamente no endereço web usando a lambda ou script local e enviar para o S3 utilizando a biblioteca boto3.

2. Podemos baixar os arquivos do site e subir diretamente pelo console AWS, porém essa é a maneira menos recomendada em questão de segurança das informações.

3. Podemos baixar os arquivos diretamente do site, e enviar para o S3 usando AWS CLI(Command-Line Interface). Que esse o método utilizado neste artigo.

Introdução AWS

A AWS (Amazon Web Services) é uma plataforma de serviços em nuvem fornecida pela Amazon que oferece uma ampla gama de serviços de computação em nuvem, armazenamento, banco de dados, análise, inteligência artificial, Internet das Coisas e muito mais. Com a AWS, os usuários podem pagar apenas pelos serviços que usam, em vez de investir em infraestrutura de TI de alto custo e de longo prazo.

Observações:

>> Conta AWS(necessário adicionar cartão de crédito para criar a conta).

>> Os serviços utilizados no nosso exemplo não são 100% gratuitos.

>> A cobrança é referente a quantidade de recurso utilizado na AWS.

Configurações prévias para uso AWS

  • Criar conta AWS (link)
  • Criar chaves de acesso a AWS(link)
  • Configurar acessos da conta AWS (IAM)
  • Instalar o AWS CLI (Ubuntu):
sudo apt install awscli
  • Configurar credenciais da AWS CLI
aws configure
AWS Access Key ID [None]: ANOTREALACCESSKEYID
AWS Secret Access Key [None]: ANOTREALSECRETACCESSKEY
Default region name [None]: eu-west-1
Default output format [None]: json
  • Criar um bucket com nome “mybucket” na região us-east-1(Norte da Virgínia)) :

>> Estaremos criando nessa região, pois o custo também depende da região e essa é uma das mais baratas

aws s3api create-bucket --bucket mybucket --region us-east-1
  • Enviar todos os arquivos da pasta local para o bucket “mybucket”:
aws s3 sync . s3://mybucket
  • Verificar se os arquivos estão no bucket:
aws s3 ls s3://mybucket

O que é e por que usar AWS Lambda?

Lambda é um serviço de computação sem servidor (severless) da AWS que permite que você execute código sem precisar gerenciar ou provisionar servidores.

Com a Lambda, você pode criar funções que são executadas em resposta a eventos, como solicitações de API, uploads de arquivos, alterações em bancos de dados, entre outros. Quando uma função Lambda é acionada, a AWS automaticamente provisiona o ambiente de execução, executa o código e, em seguida, desativa o ambiente de execução quando a função é concluída.

Uma das principais vantagens da Lambda é que você só paga pelo tempo em que sua função está em execução. Isso significa que, ao contrário dos servidores tradicionais, você não precisa pagar por capacidade ociosa e pode escalar facilmente suas funções para lidar com aumentos de tráfego ou cargas de trabalho. Além disso, a Lambda suporta várias linguagens de programação, incluindo Python, Node.js, Java, C#, Go, entre outros.

O que é e por que usar o AWS EMR?

EMR (Elastic MapReduce) é um serviço de computação em nuvem da AWS que permite processar grandes volumes de dados distribuídos em um cluster. Com o EMR, é possível executar tarefas de processamento de dados, como análise de dados, mineração de dados, processamento de logs e processamento de dados em tempo real.

O EMR é baseado no Apache Hadoop e no Apache Spark, oferecendo escalabilidade, integração com outros serviços da AWS, suporte a várias linguagens de programação, flexibilidade e análise em tempo real.

Boto3

O Boto3 é o SDK (Software Development Kit) oficial da AWS para Python. Ele permite que os desenvolvedores interajam e gerenciem os serviços da AWS diretamente de seus aplicativos Python. Com o Boto3, você pode automatizar tarefas, provisionar recursos e gerenciar serviços da AWS de forma programática, tornando mais fácil integrar os serviços AWS em seus projetos. Documentação completa aqui.

Configurando ambiente

Vamos deixar o ambiente preparado para trabalhar com as lambdas e o EMR.

Primeiramente, se atentar que para utilizar algumas bibliotecas python é necessário a pré-instalação de uma ou mais camadas (Layers) para alocar elas à lambda.

Criando Camada (Layer)

  • Criar pasta onde vai ficar as dependências (bibliotecas)
  • Instalar bibliotecas dentro da pasta :
pip install [dependencia] -t .

>> Caso dê erro de user and target inserir — no-user no final do código

pip install [dependencia] -t . --no-user
  • Transformar a pasta em um arquivo .zip
  • Criar camada
  • Verificar se a lambda nao tem camadas ja alocadas
  • Atribuindo a camada á lambda

Criando par de chaves

Para criar a instância EMR, devemos antes criar uma par de chaves para que o master node consiga se conectar com os workers e para acessar o cluster EC2. Vamos para o recurso de EC2 dentro do console aws e criamos a par de chaves na aba rede e segurança, como segue na imagem:

Políticas de acesso

Em seguida, será preciso configurar as políticas de acesso que vamos utilizar na lambda, no EMR e nas máquinas EC2.

EC2

Se já não estiver criada, vamos criar uma função e alocar as políticas necessárias

  • Criando função
  • Escolhendo entidade da função (no nosso caso, serviço da AWS)
  • Atribuindo políticas
  • Adicione um nome e crie a função

Lambda

Quando se é criado uma função lambda, a função IAM dela tambem é criada. Então vamos criar uma política para alocar ela a função já existente.

  • Criando política
  • Clique em json:
  • Estou usando as políticas que segue no json abaixo:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"elasticmapreduce:RunJobFlow",
"elasticmapreduce:ListClusters",
"elasticmapreduce:DescribeCluster",
"elasticmapreduce:AddJobFlowSteps",
"elasticmapreduce:DescribeStep",
"elasticmapreduce:ListSteps",
"elasticmapreduce:TerminateJobFlows",
"ec2:DescribeAvailabilityZones",
"ec2:DescribeSecurityGroups",
"ec2:DescribeSubnets",
"ec2:DescribeVpcs",
"s3:ListBucket",
"s3:GetObject",
"s3:PutObject"
],
"Resource": [
"*"
]
},
{
"Effect": "Allow",
"Action": "iam:PassRole",
"Resource": [
"arn:aws:iam::543505234161:role/EMR_DefaultRole_V2",
"arn:aws:iam::543505234161:role/EMR_EC2_DefaultRole"
]
},
{
"Effect": "Allow",
"Action": [
"logs:CreateLogStream",
"logs:CreateLogGroup",
"logs:PutLogEvents"
],
"Resource": "*"
}

]
}
  • Atribua um nome a política
  • Vamos adicionar a política para a função IAM
  • Adicione a política que acabamos de criar

EMR

Vamos criar uma função e alocar as políticas necessárias.

Seguindo o mesmo processo de criação citado anteriormente na lambda, crie uma política e atribua o seguinte json a ela:

{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:ListBucket",
"s3:GetObject",
"s3:PutObject",
"s3:GetObjectVersion",
"s3:PutObjectAcl",
"s3:GetBucketAcl",
"s3:GetBucketLocation"
],
"Resource": [
"arn:aws:s3:::elasticmapreduce*",
"arn:aws:s3:::us-east-1.elasticmapreduce*"
]
},
{
"Sid": "ListActionsForEC2Resources",
"Effect": "Allow",
"Action": [
"ec2:DescribeAccountAttributes",
"ec2:DescribeCapacityReservations",
"ec2:DescribeDhcpOptions",
"ec2:DescribeImages",
"ec2:DescribeInstances",
"ec2:DescribeLaunchTemplates",
"ec2:DescribeNetworkAcls",
"ec2:DescribeNetworkInterfaces",
"ec2:DescribePlacementGroups",
"ec2:DescribeRouteTables",
"ec2:DescribeSecurityGroups",
"ec2:DescribeSubnets",
"ec2:DescribeVolumes",
"ec2:DescribeVolumeStatus",
"ec2:DescribeVpcAttribute",
"ec2:DescribeVpcEndpoints",
"ec2:DescribeVpcs",
"ec2:RunInstances",
"ec2:TerminateInstances"
],
"Resource": "*"
},
{
"Sid": "PassRoleForEC2",
"Effect": "Allow",
"Action": "iam:PassRole",
"Resource": "arn:aws:iam::*:role/EMR_EC2_DefaultRole",
"Condition": {
"StringLike": {
"iam:PassedToService": "ec2.amazonaws.com*"
}
}
}
]
}
  • Adicionar a política para a função IAM

Função Lambda

Variáveis de ambiente

Agora vamos para onde estará presente todas as nossas ações, na função lambda. Por questões de segurança, alguns casos será necessário a criação de variáveis de ambiente, para não deixar exposto chaves ou conteúdos sensíveis que vão ser necessários na utilização do código, usei o bucket como exemplo como vemos abaixo.

  • Adicionar variáveis de ambiente na lambda (se for usar):

As variáveis de ambiente fornecem uma forma segura e flexível de gerenciar informações sensíveis e configurações da função Lambda, permitindo que você ajuste facilmente as configurações da sua função sem precisar modificar o código.

Segue exemplo de como chamar no código:

import os

s3_bucket = os.environ.get('S3_BUCKET')
s3_key = os.environ.get('S3_KEY')

Para executar a lambda, vou criar um evento com nome teste.

Bootstrap

Vamos entender o conceito de bootstrap, algo necessario para entendimento do processo e que vamos utilizar.

Ações de bootstrap são scripts personalizados que são executados no Amazon EMR antes da instalação das aplicações e antes da etapa do cluster começar. Esses scripts são executados em todas as instâncias do cluster, incluindo instâncias principais, core e de tarefas. As ações de bootstrap podem ser usadas para instalar software adicional, configurar o sistema operacional, ajustar configurações do sistema ou realizar qualquer outra tarefa de configuração necessária para preparar o ambiente do cluster.

Um exemplo de scrip dentro de um arquivo .sh para inserir na BootstrapActions:

#!/bin/bash

sudo pip3 install boto3
sudo pip3 install awswrangler

# #create app folder
sudo mkdir -p /app/

# #copy spark app
sudo aws s3 cp s3://my-bucket/scripts/landing_to_bronze.py /app/

#get jars from internet
sudo wget -P /usr/lib/spark/jars https://repo1.maven.org/maven2/io/delta/delta-core_2.12/1.0.0/delta-core_2.12-1.0.0.jar
sudo wget -P /usr/lib/spark/jars https://repo1.maven.org/maven2/com/microsoft/sqlserver/mssql-jdbc/10.2.0.jre8/mssql-jdbc-10.2.0.jre8.jar

Script

Agora a parte principal do nosso exemplo, a criação do cluster e das etapas relacionadas a esse cluster, temos varias possibilidades de configurações diferentes na criação do nosso cluster EMR através do boto3, estou usando apenas algumas delas.

  • Vamos começar importando as bibliotecas que serão usadas, no nosso caso apenas duas.
import boto3
import os

Criando cluster EMR

Segue alguns parametros para a função run_job_flow, todos os :

  1. Name: Nome do cluster EMR.
  2. ReleaseLabel: A versão do Amazon EMR a ser usada.
  3. Instances: Define a configuração das instâncias do cluster, como o tipo e o número de instâncias.
  4. Applications: Aplicações a serem instaladas no cluster, como Spark, Hadoop, Hive, etc.
  5. JobFlowRole: Função do IAM a ser assumida pelas instâncias do cluster.
  6. ServiceRole: Função do IAM que permite ao Amazon EMR acessar os recursos necessários em seu nome.
  7. VisibleToAllUsers: Se o cluster EMR será visível a todos os usuários.
  8. Steps: Etapas a serem executadas no cluster, como etapas do Spark, Hadoop ou Hive.
  9. LogUri: O caminho do bucket S3 onde os logs do cluster serão armazenados.
  10. Configurations: Lista de configurações adicionais para aplicativos e componentes do cluster.
  11. Tags: Lista de tags a serem aplicadas ao cluster.
  12. BootstrapActions: Lista de ações de bootstrap a serem executadas antes da instalação das aplicações e antes da etapa do cluster começar.
def create_emr_cluster(emr_client,s3_bucket):
response = emr_client.run_job_flow(
Name='EMR-Cluster',

#Versão do EMR
ReleaseLabel='emr-6.5.0',

#configurações de inicio do cluster
BootstrapActions=[
{
'Name': 'string',
'ScriptBootstrapAction': {
'Path': f's3://{s3_bucket}/config/setup.sh',
'Args': [
'git'
]
}
},
],

# Estabelecer as maquinas EC2 que farão parte do cluster
Instances={
'Ec2SubnetId': 'subnet-0072df50bc02986ca',
'KeepJobFlowAliveWhenNoSteps': False, # O cluster EMR continuará viva mesmo sem etapas?
'TerminationProtected': False, # Proteção de termino do Cluster EMR
'InstanceGroups': [
{
'Name': 'Master',
'InstanceRole': 'MASTER',
'InstanceType': 'm4.large',
'InstanceCount': 1,
},
{
'Name': 'Workers',
'InstanceRole': 'CORE',
'InstanceType': 'm4.large',
'InstanceCount': 2,
},
],
},

# Aplicações que serão instaladas no cluster EMR
Applications=[
{'Name' : 'Spark'},
{'Name' : 'JupyterEnterpriseGateway'},
{'Name' : 'JupyterHub'},
{'Name' : 'Hadoop'}
],
Configurations = [],
JobFlowRole='EMR_EC2_DefaultRole', # Atribui função IAM que será utilizada nas maquinas EC2
ServiceRole='EMR_DefaultRole_V2', # Atribui função IAM que será utilizada no EMR
VisibleToAllUsers=True, # Se será visivel a todos os usuarios
LogUri=f's3://{s3_bucket}/logs/'
)

# Retorna o ID do cluster EMR criado
return response['JobFlowId']

Criar etapas para adicionar ao cluster:

Vamos utilizar o parâmetro steps para adicionar etapas ao cluster.

  1. Name: O nome da etapa, que pode ser qualquer string descritiva.
  2. ActionOnFailure: A ação a ser tomada se a etapa falhar. Neste exemplo, estamos usando TERMINATE_CLUSTER, o que significa que o cluster será encerrado se a etapa falhar, temos tambem:TERMINATE_JOB_FLOW’|’CANCEL_AND_WAIT’|’CONTINUE
  3. HadoopJarStep: Um dicionário que contém informações sobre o programa JAR ou script a ser executado:
  • Jar: O caminho para o JAR a ser executado. Neste exemplo, estamos usando o command-runner.jar, que é um JAR especial que permite executar comandos do sistema.
  • Args: Uma lista de argumentos de linha de comando passados ​​para a função principal do arquivo JAR quando executado.
def add_step_to_cluster(emr_client, cluster_id, s3_bucket, s3_key):
step_response = emr_client.add_job_flow_steps(
JobFlowId=cluster_id,
Steps=[
{
'Name': 'Execute-Python-Script',
'ActionOnFailure': 'TERMINATE_CLUSTER',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': [
'spark-submit',
'--deploy-mode',
'cluster',
f's3://{s3_bucket}/{s3_key}'
]
}
}
]
)

return step_response['StepIds'][-1]

Função handler da lambda

Um parâmetro importante que vamos utilizar é a função wait. A função wait no Boto3 é usada para verificar periodicamente o status de um recurso ou operação até que o recurso atinja o estado desejado. Os waiters são úteis quando você deseja aguardar a conclusão de uma operação ou a transição de um recurso para um estado específico antes de prosseguir com a execução do script.

No nosso caso vamos usar o wait para aguardar a conclusão da tarefa usando o parâmetro do EMR e então encerrar o cluster.

O cluster EMR necessita ser encerrado, pois nós somos cobrados pelo tempo em que ele se encontra ativo.

Aqui estão os parâmetros que você pode usar com a função wait:

  1. ClusterId: O ID do cluster EMR.
  2. StepId: O ID da etapa que você deseja acompanhar.
  3. WaiterConfig: Configurações opcionais para personalizar o comportamento do waiter, incluindo:
  • Delay: O tempo de espera entre as verificações de status, em segundos. O padrão é 30 segundos.
  • MaxAttempts: O número máximo de tentativas de verificação de status antes de o waiter falhar. O padrão é 60 tentativas.
def lambda_handler(event, context):
emr_client = boto3.client('emr')
s3_bucket = os.environ.get('S3_BUCKET')
s3_key = os.environ.get('S3_KEY')

cluster_id = create_emr_cluster(emr_client, s3_bucket)
step_id = add_step_to_cluster(emr_client, cluster_id, s3_bucket, s3_key)


# Espera até que a etapa do script Spark termine de ser executada
waiter = emr_client.get_waiter('step_complete')
waiter.wait(
ClusterId=cluster_id,
StepId=step_id,
WaiterConfig={
'Delay': 30,
'MaxAttempts': 60
}
)

# Finaliza o cluster EMR
emr_client.terminate_job_flows(JobFlowIds=[cluster_id])
  • Clicar em deploy para inserir as alterações que foram feitas na lambda
  • Em seguida clicaremos em test, para rodar nosso script

Podemos acompanhar o processo indo para o serviço EMR no console AWS:

  • Status do cluster
  1. STARTING: O cluster está sendo provisionado e configurado. As instâncias estão sendo lançadas, e o software está sendo instalado e configurado.
  2. BOOTSTRAPPING: As ações de bootstrap estão sendo executadas. Neste estado, os scripts de bootstrap são executados nas instâncias do cluster para configurar adicionalmente o ambiente.
  3. RUNNING: O cluster está pronto para executar etapas. Todas as instâncias estão prontas e o cluster aceita etapas para processamento.
  4. WAITING: O cluster está ativo e aguardando a adição de etapas. Este estado ocorre apenas quando não há etapas atualmente em execução e a configuração ‘KeepJobFlowAliveWhenNoSteps’ está definida como True.
  5. TERMINATING: O cluster está sendo encerrado. As instâncias são encerradas e os recursos são liberados. Isso pode acontecer automaticamente após a conclusão de todas as etapas, ou manualmente se o usuário solicitar o encerramento do cluster.
  6. TERMINATED: O cluster foi encerrado e todos os recursos associados foram liberados. Um cluster neste estado não pode ser reiniciado ou ter etapas adicionais.
  7. TERMINATED_WITH_ERRORS: O cluster foi encerrado e todos os recursos associados foram liberados, mas ocorreram erros durante a execução de uma ou mais etapas. Verifique os logs do cluster para obter detalhes sobre os erros.
  • Clicamos no cluster e vamos até a aba etapas para verificar o estado da nossas etapas
  • Cluster finalizado com todas as etapas concluídas e dados já na bronze em formato parquet 🥳.

Essa é uma das minhas experiências com AWS e alguns recursos que ela disponibiliza.

Meu contato: linkedin

Referências

--

--

A3data
A3data

Published in A3data

Use inteligência artificial para resolver problemas complexos. A A3Data organiza os dados de seus clientes para gerar valor para o negócio.