Processamento distribuído de dados com MapReduce utilizando Python, MRJob e EMR

Cícero Moura
Data Hackers
Published in
10 min readJun 21, 2020

Os algoritmos baseados em MapReduce podem resolver diversos problemas relacionados a processamento e análise de grandes volumes de dados. Neste universo a arquitetura Serverless surge como uma boa alternativa para diminuir a complexidade da infraestrutura necessária para esta atividade. Neste artigo iremos discutir como processar e analisar dados utilizando o MapReduce de forma Serverless, simples, direta e com custo gerenciado.

Processar uma grande quantidade de dados e extrair valor para o negócio sempre é um grande desafio desde a infraestrutura de hardware até o momento da análise e entendimento dos dados. Neste contexto existem diversas ferramentas, algoritmos e estratégias que podem ser aplicadas. Porém um algoritmo muito consolidado neste contexto é o MapReduce, nada mais nada menos que a base para frameworks como o Hadoop e Spark.

Levando em consideração o problema de processar dados de forma distribuída para análise, escrita de algoritmos para essa tarefa e infraestrutura necessária para execução desta atividade, vamos falar sobre os seguintes assuntos neste artigo:

  1. Conceitos sobre MapReduce (caso você não conheça ou queira entender um pouco mais).
  2. Conhecendo o Amazon EMR (Elastic MapReduce).
  3. MRJob: uma biblioteca para escrever os seus Jobs (algoritmos) MapReduce em Python de forma simples e bonita.
  4. Escrevendo um algoritmo de forma prática para processar dados não estruturados com MRJob e EMR.

MapReduce

O MapReduce nada mais é do que um modelo de programação, uma forma de escrever algoritmos que foi proposto pelo Google para processar grandes volumes de dados (Big Data). O MapReduce é baseado em conceitos da programação funcional.

O modelo MapReduce possui duas etapas. como o seu próprio nome sugere:

  • map: mapeamento dos dados em forma de chave e valor. Fase utilizada normalmente para encontrar algum padrão nos dados.
  • reduce: recebe um conjunto de dados no formato chave e valor, realiza a combinação e sumarização dos dados.

A imagem a seguir nos mostra de forma visual a arquitetura do MapReduce:

Na imagem fica claro que temos a entrada dos dados (input), o mapeamento dos dados (Map), a redução (Reduce) e sumarização dos dados (output). Porém e esse tal de Shuffle/Sort no meio do MapReduce?

O Shuffle ou Sort é uma fase intermediária onde os frameworks que executam o MapReduce como o Hadoop e Spark fazem de forma automática, onde os dados são agrupados e ordenados através da sua chave e valor que foram geradas pela fase de Map e assim facilita o trabalho do Reduce, que já encontra os dados ordenados para a sua sumarização.

Mas será que vale a pena? A grande vantagem do MapReduce é que o algoritmo é totalmente paralelizável e assim passível para implementação de tolerância a falhas, o que os frameworks já realizam de forma nativa.

Algumas linguagens de programação que suportam o modelo funcional como Java, JavaScript e Python já possuem uma implementação do MapReduce, porém bem limitada quando tratamos do cenário de Big Data e análise dados, assim se faz necessário a escrita dos nossos próprios algoritmos de mapeamento e redução para processar os dados.

A execução do nosso algoritmo será facilitada através de frameworks como Hadoop e Spark que se preocupam na forma de executar e paralelizar o nosso código.

Amazon Elastic MapReduce

O Amazon Elastic MapReduce ou simplesmente EMR é uma plataforma de arquitetura Serverless com o objetivo de executar tarefas para processamento e análise de dados através de algumas das ferramentas do Ecossistema Hadoop.

Uma definição da própria Amazon:

O Amazon EMR é a plataforma de big data em nuvem líder do setor para processar grandes quantidades de dados usando ferramentas de código aberto, como Apache Spark, Apache Hive, Apache HBase, Apache Flink, Apache Hudi e Presto. Com o EMR, você pode executar análises em escala de Petabytes.

Através do EMR é possível criar um cluster Hadoop com poucos cliques e sem a necessidade de instalar nada de forma manual, o que diminui muito a complexidade do seu ambiente de Big Data. O EMR possui uma estrutura de custo por processamento e tempo de execução, sendo ideal para atividades de processamento de dados por demanda, pois você só irá pagar pelo tempo que o cluster estiver executando.

No EMR é possível escolher a versão do Hadoop e as ferramentas desejadas, quantidade de máquinas no cluster e seus respectivos recursos de hardware, conforme a imagem abaixo:

Opções de configuração do cluster no console da Amazon EMR

Biblioteca MRJob

O MRJob é uma biblioteca open source para Python que se propõe a nos ajudar a escrever algoritmos no modelo MapReduce de forma simples e bonita. O intuito é diminuir a complexidade da escrita de algoritmos que muitas das vezes a linguagem Java e Scala trazem neste processo.

A biblioteca é simples e possui uma boa documentação, permitindo escrever a lógica do seu algoritmo de forma organizada e executar o código em diversos ambientes como: ambiente local, cluster Hadoop, EMR ou DataProc. Com apenas um código e poucas configurações você pode alterar o ambiente onde deseja executar o seu algoritmo.

Criando e executando o Job MapReduce

A seguir vamos utilizar o MRJob de forma prática juntamente com o Amazon EMR e Amazon S3.

Arquitetura do projeto

Neste artigo vamos trabalhar com um exemplo para processamento de arquivos textuais não estruturados, com o objetivo de agrupar palavras e contar quantas vezes elas ocorrem em nosso arquivo, criando um ranking de palavras e quantidades de ocorrências para cada uma. É um exemplo relevante no contexto de processamento de linguagem natural (NPL).

O arquivo de texto escolhido para este exemplo é o livro The Adventures Of Sherlock Holmes do autor Arthur Conan Doyle que pode ser encontrado gratuitamente no site do Projeto Gutenberg, aqui vamos utilizar a versão do livro no formato plain text.

Para construir esse mini projeto iremos utilizar a seguinte arquitetura:

Assim utilizaremos dois serviços da Amazon, o S3 onde vamos simular o nosso datalake com pastas para arquivos de dados, temporários e a saída dos resultados das nossas análises. O EMR que será o nosso cluster Hadoop para executar o algoritmo de MapReduce, que também são chamados de Jobs. E o algoritmo será escrito em Python com o auxílio da biblioteca MRJob.

Configuração do ambiente de desenvolvimento

Antes de começar a escrever o código de MapReduce é necessário configurar o ambiente de desenvolvimento, aqui vamos utilizar um ambiente virtual do Python com o virtualenv e instalar as bibliotecas necessárias, conforme os passos a seguir:

  1. Criando um ambiente virtual e ativando ele na linha de comando (Linux):

virtualenv venv — python=python3.6

source venv/bin/activate

2. Instalando a biblioteca do MRJob:

pip3 install mrjob

3. Instalando o SDK da Amazon para Python 3, o Boto3:

pip3 install boto3

Com esses passos executados podemos partir para a próxima etapa.

Configuração do ambiente na Amazon AWS

Primeiramente vamos criar a estrutura de armazenamento no S3 que vai conter um bucket chamado datalake-exemplo e três pastas dentro dele: a data para os dados, temp para arquivos temporários e logs e a outputs para os resultados dos nossos algoritmos. O bucket pode ser criado na região de sua preferência e pode estar privado (aliás é uma boa prática).

Agora o próximo passo é configurar o acesso ao EMR. Para isso é necessário criar um par de chaves (key pairs) no EC2 (isso mesmo! O EMR utiliza instâncias do EC2 para criar o cluster). Algo importante aqui é escolher a região em que você vai criar o par de chaves, pois será onde o cluster no EMR será criado no momento da execução do Job MapReduce.

Para este exemplo irei criar um par de chaves na região de Ohio (us-east-2) com o nome EMR-Ohio e fazer o download da chave EMR-Ohio.pem. Depois de fazer o download é importante dar permissão ao arquivo para que a chave seja lida e executada corretamente, o comando no Linux é o seguinte: chmod og-rwx EMR-Ohio.pem.

Exibição do par de chaves criados no console do EC2 na Amazon

O último passo de configuração na AWS é criar uma chave de acesso para os serviços dentro da plataforma, pois precisaremos acessar recursos do S3 e do EMR.

Para criar a chave basta acessar o console da AWS, clicar em seu nome de conta no lado direito superior, escolher a opção minhas credenciais de segurança e criar uma chave de acesso, conforme a imagem a seguir:

Exibição da chave de acesso criado dentro do console da AWS

No momento de gerar a chave é importante baixar o arquivo ou anotar o ID da chave de acesso e a chave de acesso secreta, pois será a única vez que terá acesso a esses dados completos.

Com todas as configurações realizadas podemos seguir para a escrita do nosso algoritmo.

Escrevendo o algoritmo de MapReduce

Como o nosso exemplo é para processar um arquivo texto é contar a quantidade de ocorrências de cada palavra, um Job MapReduce simples resolve o problema. Porém ao executar um Job simples teríamos a ordenação por palavras e aqui queremos ordenar pela quantidade de ocorrências, o que será necessário criar duas etapas de mapeamento e redução. E além disso, vamos fazer um pequeno tratamento em nosso texto para considerar apenas palavras e desconsiderar pontuações e números através de expressão regular.

O nosso algoritmo completo (e comentado) ficará da seguinte forma:

Como o nosso algoritmo tem dois passos de MapReduce, vamos utilizar o MRStep do MRJob para poder organizar e declarar quais serão os passos de mapeamento e redução. Assim o nosso primeiro MapReduce faz o mapeamento das palavras e a sumarização e o segundo faz a inversão da ordem de chave e valor que antes estava como word: count agora vai ficar count: word, assim podemos ordenar por quantidades de ocorrências e obter o resultado final.

Executando o Job MapReduce

Agora chegou a hora mais esperada, o momento de ver tudo funcionando e a mágica acontecendo.

Antes de executar o script do nosso Job, é necessário declarar as configurações do MRJob para que ele possa fazer o seu trabalho. O MRJob é configurado através de um arquivo chamado .mrjob.conf (não esqueça do ponto no início, é um arquivo oculto) onde declaramos as credenciais da AWS e diversos parâmetros para customizar o nosso cluster Hadoop.

O arquivo .mrjob.conf deve ficar na raiz de pastas do seu usuário (exemplo no linux ~/.mrjob.conf), este local é onde a biblioteca irá procurar automaticamente as configurações, mas caso você queira outro local ou utilizar vários arquivos de configuração diferentes, no momento de executar o algoritmo é possível passar o parâmetro --conf-path para indicar o local do arquivo de configuração desejado. A seguir o arquivo de configuração para o exemplo deste artigo:

Neste arquivo de configuração declaramos o par de chaves de acesso na AWS, o caminho da chave do EC2 para acesso via ssh, a região onde vamos criar o cluster (por padrão o MRJob cria em Oregon) e a quantidade de recursos que vamos alocar para executar o nosso Job MapReduce.

É importante destacar que a forma como vamos executar neste exemplo o MRJob vai criar um novo cluster somente para executar o nosso Job e depois irá encerrá-lo, caso você queira utilizar um cluster existente é possível passando um parâmetro a mais no momento da execução. Todas as configurações para o EMR podem ser encontrados na documentação.

Depois de todo o cenário montado e agora para fazer a mágica acontecer basta só executar o comando abaixo:

Para entender mais o que estamos executando:

  • -r emr: indica que o Job será executado no EMR e em seguida o arquivo a ser processado;
  • --output: indica onde será gravado os arquivos de saída do MapReduce;
  • --cloud-tmp-dir: indica onde ficará os arquivos temporários e logs do processo.

Após executar o script Python o cluster será criado e encerrado depois da execução de forma automática:

Os arquivos de resultados podem ser acessados na pasta outputs/job1:

Uma prévia do resultado do MapReduce logo abaixo:

Para saber um pouco mais…

Caso você tenha gostado do MRJob e queira se aprofundar um pouco mais, segue algumas configurações interessantes:

  • Para utilizar um cluster já existente no EMR basta utilizar o parâmetro --cluster-id e o id do cluster (exemplo: j-2CMSHJ3JRFZF1).
  • é possível instalar pacotes personalizados do Python no cluster que estejam sendo utilizados em seu algoritmo com o parâmetro bootstrap no arquivo de configuração (.mrjob.conf).
  • ainda é possível utilizar o Spark e suas bibliotecas para executar os seus Jobs de MapReduce.
  • se você precisar configurar uma imagem específica do EMR, do Hadoop ou Spark é possível configurar com parâmetros conforme a documentação.

Concluindo…

Os algoritmos de MapReduce escritos com o MRJob podem ser executados em sua máquina local retirando o parâmetro -r emr do comando de execução e também podem ser apontados para dados e diretórios locais. O objetivo de utilizar um cluster para executar MapReduce realmente é para ambientes de Big Data, onde a massa de dados é muito grande e há necessidades de paralelizar as execuções de Jobs para processamento dos dados de forma rápida e eficiente.

O MRJob atende bem às expectativas tanto no momento de escrita do algoritmo de MapReduce quanto no momento de personalizar o seu ambiente de execução, seja em cluster local ou em nuvem, com muita flexibilidade e várias opções para execução.

Você ainda pode acessar o código completo utilizado neste artigo no meu github.

--

--

Cícero Moura
Data Hackers

Arquiteto de Dados, pós-graduado em Big Data e Machine Learning. Palestrante em Big Data. Também sou AWS Community Builder e AWS Community Leader.