Qualidade de Dados em Larga Escala com Great Expectations, Spark e Airflow no EMR (Case 1)
Neste artigo o objetivo é mostrar de forma prática como testar a qualidade dos seus dados em Big Data utilizando o Great Expectations com Spark, Airflow e EMR
O código utilizado neste artigo pode ser encontrado neste repositório.
O link da data docs gerado pelo Great Expectations também pode ser acessada aqui =)
A qualidade de dados é um dos maiores desafios enfrentados pelas empresas atualmente, pois é necessário garantir que os dados sejam precisos, confiáveis e relevantes para que as decisões tomadas com base nesses dados sejam bem-sucedidas.
Neste aspecto, temos visto surgir diversas tendências, como o Modern Data Stack que traz a qualidade dos dados como uma das principais práticas.
O Modern Data Stack (MDS) é um conjunto de ferramentas e tecnologias que ajuda as empresas a armazenar, gerenciar e aprender com seus dados de forma rápida e eficiente. Conceitos como Data Quality e Data Observability são destaques do MDS.
Neste artigo, objetivo é explorar o Great Expectations, uma ferramenta de validação de dados contida dentro do MDS, que pode ser usada em conjunto com Spark para garantir a qualidade dos dados em larga escala.
Great Expectations
O Great Expectations (GE) é uma ferramenta de validação de dados open source que ajuda a garantir a qualidade dos dados.
Com o Great Expectations, é possível definir expectativas sobre seus dados e verificar se elas atendem ou não.
Algumas das funcionalidades existentes incluem a capacidade de validar o esquema dos dados, garantir a integridade referencial, verificar a consistência e detectar anomalias.
O GE é muito flexível e escalável, permitindo assim a integração nas nossas pipelines de dados, seja para validar, gerar relatórios ou até mesmo impedir que a pipeline avance gravando dados inconsistentes nas camadas mais “curadas” do Data Lake.
Alguns pontos que podemos destacar:
- é possível criar testes de dados diretamente de dataframes criados com Pandas ou Spark;
- é possível gerar uma documentação dos dados em HTML de forma automática, que traz diversas informações sobre os testes executados;
- é possível salvar um conjunto de testes (suite) para serem utilizados posteriormente (checkpoints);
- podemos utilizar uma grande quantidade de expectativas já prontas, ou criar facilmente expectativas personalizadas que atendam os nossos casos de testes;
- possui uma CLI que facilita a criação dos casos de testes, ou simplemente, podemos criar codificando em Python;
- é possível conectar diretamente a fontes de origem de dados, assim validar os dados de forma mais rápida.
Case prático com Great Expectations
O cenário a ser trabalhado neste artigo é o mais próximo do que encontramos no dia a dia, então vamos trabalhar com o seguinte case:
Temos dados armazenados em um Data Lake que se encontra no S3 da AWS. Precisamos verificar a qualidade dos dados antes que o negócio tome decisões críticas em cima deles.
Os dados são sobre vendas de produtos de um e-commerce e nos diz muito sobre o comportamento dos clientes dessa loja.
Os dados utilizados para este artigo são abertos e se tratam de vendas do site da Amazon que podem ser encontrados no kaggle, neste link.
Utilizando o Great Expectations com Spark no EMR
Neste artigo iremos utilizar o Great Expectations com Spark para executar os casos de testes.
O ambiente do Spark será no EMR e o Airflow será o meio de orquestração dos jobs que irão executar.
Para facilitar o entendimento do processo, analisaremos o desenho da arquitetura logo abaixo:
Podemos destacar os seguintes pontos:
- o código Spark contendo toda a lógica para executar o GE com Spark será armazenado no S3;
- os dados estão também armazenados no S3 no formato CSV;
- a data docs gerada como resultado também será armazenada no S3, em um bucket configurado para um site estático;
- o Airflow fará toda a orquestração do EMR e controlará o ciclo de vida dos jobs.
[EXTRA] Um Caso de Uso mais completo com Great Expectations
Você pode ter se perguntado em algum momento: Onde encaixar a qualidade de dados em minha pipeline de dados?
A ideia desta seção é só trazer mais um pouco de insight para a conversa, destacando através da minha experiência onde vejo mais casos de uso para qualidade de dados e seus frameworks, seja Great Expectations ou outro.
O diagrama de arquitetura abaixo mostra um pouco mais de como podemos encaixar a qualidade dados em nossas pipelines de Big Data:
- no diagrama, temos um Data Lake montado nas camadas raw, staged, curated e analytics, que á algo muito comum nas boas práticas de Data Lake;
- o GE foi encaixado aqui no momento de passar os dados da Curated para a Analytics;
- um ponto importante, é que o quanto mais perto do negócio os testes de qualidade estiverem, melhor será a garantia dos dados, assim os testes irão refletir as regras de negócio, do que apenas “um simples testes de tipos dos dados”;
- depois dos testes de qualidade, vejo dois cenários muito comuns: caso a qualidade seja muito crítica, é possível falhar a pipeline se os testes não atingirem a qualidade esperada. O segundo cenário é que mesmo se os testes falharem, a pipeline segue normalmente e é apenas gerado o relatório ou alertas para serem analisados
Agora vamos voltar ao assunto principal.
1. Criação do script Spark com Great Expectations
Para a criação do script Spark que contém os casos de testes, iremos dividi-lo em alguns passos, conforme a seguir:
Configuração do contexto
O contexto do GE indica as principais configurações a serem consideradas para executar os testes.
O código a seguir faz a configuração do contexto através de um YAML criado por um objeto do próprio Python.
Na configuração deste contexto basicamente é informar que o Spark é utilizado para realizar os testes, pois poderia ser outro cenário, como utilização do Pandas.
Configurando a Data Docs
Um ponto importante aqui é a configuração do local em que será salva a nossa Data Docs. Por padrão a documentação HTML é gerada no disco local, porém para este artigo, a data docs será armazenada e hosteada pelo S3. Nas configurações precisamos indicar um bucket especifico para salvar a documentação.
No código a seguir, o bucket de destino (output_path) está sendo passado como parâmetro, assim o script fica mais dinâmico e personalizável.
Criação de um Validator
Antes de adicionar os casos de testes, é necessário configurar um Validator, nele indicamos os testes em forma de Batch Request.
O Validator já incorpora as funções de validação dos dados de forma built-in conforme veremos mais adiante, o que facilita e deixa muito mais intuitivo a criação dos casos de testes.
O código abaixo configura e cria o Validator utilizando o contexto dos nossos testes e também o dataframe que contém os dados para validação.
Criando os Casos de Testes
Chegou o momento mais esperado, criar os casos de testes.
Nesta etapa o objetivo é trabalhar com dois cenários de casos de testes: o primeiro é executar um perfil dos dados (profile) e o outro é adicionar os casos de testes personalizados, conforme a necessidade do negócio.
- Profile dos dados
Um Data Profile refere-se ao processo de examinar, analisar, revisar e resumir conjuntos de dados para obter informações sobre a qualidade dos dados.
O GE permite criar um profile dos dados de forma automática e muito simples.
Nesse profile será gerada informações sobre todas as colunas dos dados e testes para checar valores nulos, tipos dos dados, padrão mais recorrente em cada coluna e outros.
Para criar um profile dos dados e adicionar ao contexto de testes, basta ter o código a seguir:
Um ponto importante é que o profile é executado através de um objeto Spark criado pelo GE (df_ge), como será visto posteriormente, o que difere dos demais casos de testes que serão adicionados a seguir, pois são feitos em cima do objeto Validator (criado no passo anterior).
Outro ponto a destacar é que foi utilizado um nome para a suite de testes do profile e outro para os testes dos validators, assim na data docs ficarão separados, isso ajuda na organização da documentação.
- Casos de testes
Agora basta adicionar os casos de testes conforme a necessidade de validação dos dados.
O código a seguir adiciona os seguintes testes:
- validar se todas as colunas desejadas estão no dataset;
- validar se o campo product_id tem valores únicos e não nulos;
- validar se o campo discount_percentage contém apenas valores entre 0 e 100;
- validar se o campo rating contém apenas valores entre 0 e 5;
- validar se o campo product_link contém apenas dados com um formato válido de um link, isso utilizando um regex para validar o padrão.
Após adicionar todos os casos de testes desejados, basta salvar as expectativas na suite de testes.
Executando os testes
Agora chegou o momento de ligar todos os pontos.
O código abaixo é a função principal que será chamada pelo Spark, ela faz a leitura dos dados que desejamos e invoca as outras funções que discutimos anteriormente para configurar as suites de testes e assim executá-las.
Gostaria de destacar dois pontos do código acima:
- O primeiro ponto é para a linha de código abaixo, pois é nesse momento que o GE realmente executa as validações. Até então ele estava apenas adicionando ao Grafo de Execução e não executou de forma concreta. Ao executar o método validate, os testes são executados e os resultados retornados de forma estatística.
- O segundo ponto é que depois da execução dos testes, conseguimos gerar a data docs, indicando qual configuração utilizar. No cenário aqui, o nome da configuração é s3_site.
Encerramos o script do Great Expectations e agora é partir para a criação da DAG do Airflow, podendo assim executar os casos de testes no EMR.
2. Criação da DAG no Airflow
Neste passo é o momento de criar uma DAG no Airflow para executar os testes com o GE dentro do EMR com Spark.
Teremos as seguintes tasks na nossa DAG:
- create_emr: task responsável pela criação do EMR para execução dos jobs. Não se esqueça de configurar a conexão com a AWS (aws_default) ou IAM se estiver executando o Airflow dentro da própria AWS. As configurações do EMR se encontram no repositório do projeto.
- add_step: responsável por adicionar um job no EMR (step). As configurações desse job (spark-submit) veremos mais adiante.
- watch_step: é um sensor do Airflow responsável por monitorar os status do job anterior até que ele seja encerrado, com sucesso ou falha.
- terminate_emr: depois que o job for finalizado, essa task finaliza a instância do EMR alocada para a execução dos testes.
São essas as tasks necessárias para executar os testes, logo abaixo temos o código da DAG:
Agora vou detalhar um pouco mais a configuração do job que será adicionado no EMR para processar os testes, que basicamente é um spark-submit.
No código abaixo podemos verificar todas as configurações, inclusive os parâmetros que são passados para o script.
Importante destacar, que o código que será executado no Spark está armazenado no S3, tanto o arquivo main.py que faz a chamada das demais funções, quanto o arquivo modules.zip que contém toda a lógica para os testes executarem.
Esse modelo de código foi adotado para ser escalável e com uma melhor manutenção, além de permitir executarmos o Spark no modo cliente ou cluster facilmente.
3. Execução do Script no EMR
Com todo o script desenvolvido e a DAG no Airflow criada, podemos executar os casos de testes.
A seguir temos um exemplo da DAG no Airflow que foi executada com sucesso.
Também temos a seguir uma imagem que mostra em mais detalhes o job executado com sucesso no EMR.
4. Resultados
Agora é o momento de analisar os dois resultados dos testes executados.
O primeiro são os arquivos da data docs salvos no bucket do S3, conforme abaixo:
E o segundo resultado é ao acessar a data docs, conforme abaixo:
Lembrando que a data docs criada neste artigo pode ser acessada neste link.
Ao acessar a suite com o profile dos dados temos o resultado a seguir:
E ao acessar a suite com o casos testes criados temos o resultado abaixo:
Conclusão
O Great Expectation é a ferramenta para qualidade de dados open source que mais cresce em adoção atualmente, com uma comunidade muito ativa, em constante atualização e com várias grandes empresas no mundo todo utilizando.
Com o GE conseguimos criar casos de testes para os mais variados cenários que atendem diversos conjuntos de dados diferentes de forma fácil e com a possibilidade de personalizar testes para os nossos casos de uso.
Além de trazer os resultados estatísticos dos testes que podemos salvar e utilizarmos de forma personalizada conforme desejado, ainda traz a data docs prontinha em HTML com muitas informações úteis sobre a qualidade dos dados.
É uma excelente ferramenta, com fácil integração e gestão, que utiliza os conceitos que já conhecemos no mundo de Big Data, assim vale muito testar e utilizar no seu dia a dia para amadurecer a sua área de Governança e Monitoramento da Qualidade do Dados.
Lembre-se:
Mais do que ter os dados disponíveis para análise, é preciso garantir a qualidade deles.
Este artigo possui uma outra versão, trazendo um case de arqutietura serverless na AWS utilizando Great Expectations com Spark no Glue ETL.
LinkedIn: https://www.linkedin.com/in/cicero-moura/
Github: https://github.com/cicerojmm