Criando uma aplicação com Structured Streaming em PySpark usando Dataproc

André Sousa
google-cloud-brasil
8 min readAug 29, 2022
Dataproc é um serviço totalmente gerenciado e altamente escalável para executar Apache Spark, Apache Flink, Presto e + de 30 frameworks.

Introdução

Nesse artigo vamos criar uma aplicação de streaming estruturado em PySpark, que:

  • Irá consumir dados sintéticos de compras de uma instância de Kafka (em GCP);
  • Processar e fazer transformação de demonstração;
  • Salvar os dados particionados em Google Cloud Storage em formato parquet;

Ao todo vamos usar 2 repositórios, em um deles tem o exemplo da aplicação streaming que vamos submeter no Dataproc, e no outro repositório vamos usar para produzir dados sintéticos para o kafka. O resultado final é ver os dados salvos e particionados no Cloud Storage.

Structured Streaming

O structured streaming é um mecanismo de processamento de fluxo escalável de dados e tolerante a falhas. O motor Spark SQL executa em forma incremental e contínua e atualiza o resultado final à medida que os dados de streaming continuam chegando.

Nesse artigo usaremos a API Dataset/DataFrame em Python para expressar agregações de streaming e fazer uma transformação básica de demonstração.

Internamente, por padrão, as consultas de streaming estruturado são processadas usando um mecanismo de processamento de micro-batch, que processa fluxos de dados como uma série de pequenos jobs, alcançando assim latências de ponta de até 100 milissegundos.

Um exemplo prático

Este é um exemplo de uma compra que vamos gerar sinteticamente para o kafka.

Exemplo de uma mensagem sintética gerada para Kafka

Aqui nesse link você pode ver detalhes de como criar um cluster kafka em GCP. Para essa aplicação exemplo, foi instanciado um kafka em servidor único e sem autenticação usando o marketplace, esse aqui. Nesse caso você só necessita configurar o servidor em /opt/kafka/config/server.properties, e iniciar o zookeeper (sudo /opt/kafka/bin/zookeeper-server-start.sh /opt/kafka/config/zookeeper.properties) e logo em seguida o kafka (sudo /opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties), e você terá um kafka rodando na porta 9092.

Nesse repositório tem um passo a passo de como gerar os dados sintéticos usando uma biblioteca que gera dos sintéticos randômicos chamado Faker.

Veja aqui o código que gera os dados sintéticos:

Classe para produzir um mensagem de compras — purchase.py [kafka-producer]

Uma aplicação streaming

Propusemos estruturar a nossa aplicação conforme abaixo e iremos usar um gerenciador de dependências conhecido como Poetry, onde:

  • dist : Diretório de distribuição (dist), no momento do build, onde será gerado os pacotes em 2 formatos: ​​Python wheel (*.whl) e formato unix (tar.gz)
  • jobs: Criamos um ponto de entrada chamado aqui de sample_job para chamar o nosso streaming job. Você poderá gerar outros jobs que utilizam algumas classes que criamos para consumir e salvar os dados.
  • poetry.lock, etc.: Poetry cria alguns arquivos para gerenciar as dependências e promover o empacotamento da sua aplicação, veja aqui mais detalhes.
  • scripts: Criamos esse script para inicializar o Dataproc. O Dataproc é um serviço gerenciado do Spark e do Hadoop que permite que você aproveite as ferramentas de dados de código aberto para processamento em batch, consultas, streaming e aprendizado de máquina.
  • structured_streaming/stream_processor: Aqui ficam as classes/funções que irão consumir alguma fonte de dados, nesse caso o kafka.
  • structured_streaming/data_transformation: Aqui ficam as classes/funções de transformação
  • structured_streaming/storage: Aqui ficam as classes/funções para armazenar os dados
  • tests: Testes unitários e de integração
Estrutura de uma aplicação da exemplo desse artigo

Nesse repositório tem a aplicação completa e detalhes de como executar e submeter.

Consumindo dados do Kafka

Esse código mostra como criar uma fonte Kafka para um streaming dataframe, há muitas opções de configuração, e nossa sugestão para essa aplicação foi:

Classe para consumir as mensagens de compras — kafka_consumer.py [structured-streaming-job-pyspark]

Basicamente iremos consumir de um tópico específico 5️⃣, nesse caso “com.google.sample.purchases” de um servidor kafka 1️⃣. E iremos buscar as mensagens mais novas neste tópico 2️⃣(starting Offsets: earliest), no caso no máximo 100000 mil mensagens por gatilho 4️⃣(maxOffsetsPer Trigger), e deixar configurado não falhar 3️⃣ caso a retenção do Kafka destrua mensagens a cada hora (failOnDataLoss==false).

Para esse kafka de exemplo, configuramos a retenção de apenas 1 (uma) hora, daí se o job fica parado por mais de uma hora, e se failOnDataLoss está setado como true, é possível que haja erros de tentar recuperar um offset de mensagens que Kafka já deletou, por isso estamos setando como false.

Uma breve explicação, sobre esse código é considerar o fluxo de dados de entrada como uma “Tabela de Entrada”. Cada item de dados que está chegando no fluxo é como uma nova linha sendo anexada à tabela de entrada, chamada de dataframe. Esses dados serão adicionados até que uma ação seja definida, que vamos mostrar nas próximas sessões. Aqui nesse link pode ver os conceitos básicos de como os dados são adicionados.

Transformação

Para deixar a nossa aplicação mais simplificada, vamos apenas adicionar novas colunas no nosso dado que se refere à data de processamento, então acrescentamos: ano (year), mês (month), dia (day), hora (hour), conforme demonstrado a seguir:

Função para adicionar as colunas ano, mês, dia e hora — tranformations.py [structured-streaming-job-pyspark]

Vale lembrar que é possível fazer agregações, joins e muitas outras transformações antes de armazenar os dados. Para seleção, projeção, agregação, veja aqui. Para operações envolvendo janela de tempo, veja aqui. Para operações de junção, veja aqui as operações suportadas.

Salvando os dados no Google Cloud Storage

Agora vamos armazenar a saída transformada em um bucket no Google Cloud Storage (GCS). Como vamos usar o Dataproc, não será necessário instalar o conector GCS, pois ele é instalado por padrão em todos os nós do cluster do Dataproc. Aqui está o código que salva os dados:

Classe para escrever no Cloud Storage — write_storage.py [structured-streaming-job-pyspark]

Basicamente, depois de definir as operações de transformações, tudo o que resta é armazenar ou definir alguma opção saída do DataStreamWriter. Você poderá especificar:

  • Detalhes de saída: formato de dados, localização, etc. 1️⃣ Formato: parquet e 3️⃣ Localização da saída: Diretório no Google Cloud Storage
  • Intervalo de disparo: 2️⃣ processingTime, trigger_time: A cada 60 segundos salvar os dados
  • Checkpointing: 4️⃣ Localização dos metadados: Diretório no Google Cloud
  • partitionBy: 5️⃣ Nesse caso, vamos particionar por ano, mês, dia, e hora: “year”, “month”, “day”, “hour”

Pontos importantes

Recuperando de falhas

No streaming Estruturado, se você habilitar o ponto de verificação (4️⃣ checkpointLocation) para uma consulta de streaming, poderá reiniciar a consulta após uma falha e a consulta reiniciada continuará de onde a falha parou, garantindo tolerância a falhas e garantias de consistência de dados.

Configurar jobs para reiniciar consultas de streaming em caso de falha

Para tornar seus jobs tolerantes a falhas, você também pode configurar os jobs para reiniciar suas consultas automaticamente após uma falha.

Por padrão, os jobs do Dataproc não serão reiniciados automaticamente em caso de falha. Usando configurações opcionais, você pode definir jobs para reiniciar em caso de falha. Ao definir um job para reiniciar, você especifica o número máximo de tentativas por hora (o valor máximo é 10 tentativas por hora) e/ou o número máximo de tentativas totais (o valor máximo é 240 tentativas no total). Veja aqui detalhes sobre como configurar jobs reinicializáveis.

Usando Dataproc

O Dataproc é um serviço totalmente gerenciado e altamente escalável para executar o Apache Spark, para submeter seu job, antes você precisa empacotar sua aplicação, criar o cluster com as dependências da sua aplicação, e só então submeter.

Empacotando sua aplicação

Para empacotar a sua aplicação, rode o comando abaixo, e você poderá notar que será gerado 2 pacotes, um em formato *.whl e outro em tar.gz. Vamos usar um desses arquivos para submeter nosso job.

>> poetry build
Building structured-streaming (0.1.0)
— Building sdist
— Built structured-streaming-0.1.0.tar.gz
— Building wheel
— Built structured_streaming-0.1.0-py3-none-any.whl

Exportando as suas dependências antes de criar o cluster Dataproc

Esse comando irá gerar o arquivo requirements.txt com as dependências necessárias para rodar o job. Vamos usar esse arquivo para criar o nosso cluster Dataproc.

>> poetry export -f requirements.txt — output requirements.txt

Faça o upload para um diretório acessível pela seu projeto GCP, nesse caso vamos fazer o upload para um bucket no Google Cloud Storage

>> gsutil cp requirements.txt gs://<PROJECT_ID>-scripts

Criar um cluster Dataproc

Agora, finalmente vamos criar um cluster. Previamente fizemos o upload do script para criar o cluster:

Vamos a criação do cluster, rode esse comando no seu terminal. Antes, se você ainda não configurou Google CLI, veja aqui como fazer. Lembre-se também de habilitar a API do Dataproc no seu projeto, se tiver dúvidas siga esse passo a passo.

>> gcloud dataproc clusters create cluster-sample \
— region=${REGION} \
— image-version 2.0-debian10 \
— initialization-actions=gs://<PROJECT_ID>-scripts/initialize-cluster.sh
Waiting on operation
Waiting for cluster creation operation…done.
Created
Cluster placed in zone [us-central1-a].

Em GCP >> Dataproc >> Cluster você verá o cluster criado, conforme a seguir:

Cluster criado com sucesso e disponível

Submetendo o job dessa aplicação

>> gcloud dataproc jobs submit pyspark \
— cluster=cluster-sample \
— region=us-central1 \
— properties=^#^spark.jars.packages=’org.apache.spark:spark-streaming-kafka-0–10_2.12:3.1.3,org.apache.spark:spark-sql-kafka-0–10_2.12:3.1.3' \
— py-files dist/structured_streaming-*.whl \
— max-failures-per-hour=10 \
— max-failures-total=20 \
jobs/sample_job.py \
— <IP_KAFKA_SERVER>:9092 com.google.sample.purchases.2 gs://andresousa-experimental-streaming-test/output gs://andresousa-experimental-checkpoints/checkpoint ’60 seconds’
Job [0cb2f64f528f4c6cb27968d6f1c23f63] submitted.
Waiting for job output…
(…)

Em GCP >> Dataproc >> Jobs você verá o job com status “Running”, como é um streaming, o job não será finalizado até você cancele, dê um stop. Se o job atingir um determinado número de falhas por hora (10) ou no total (20), ele será restaurado.

Dados sintéticos

Agora que nosso job está no ar, usando o outro projeto e com o kafka disponível, vamos produzir pelo menos 1000 mensagens com dados sintéticos, com o comando:

>> poetry run producer PLAINTEXT://<IP_KAFKA_SERVER>:9092 com.google.sample.purchases 1000
exiting production loop
produced 100 messages
produced 200 messages
produced 300 messages
produced 400 messages
produced 500 messages
produced 600 messages
produced 700 messages
produced 800 messages
produced 900 messages
produced 1000 messages

Monitorando job

Em GCP >> Dataproc >> Job >> ID Job você poderá verificar os logs do job. Note que ele está consumindo o tópico com.google.sample.purchases.2, e informando sobre os offsets das mensagens.

Verificando os logs do job streaming

Google Cloud Storage

Os dados a cada 60 segundos são persistidos continuamente já particionados no bucket passado por parâmetro no job:

Arquivos em formato parquet que aterrissaram no cloud storage

Resultado

O job salvará os dados particionados por ano, mês, dia, e hora. É possível consultar uma fonte de dados externa do Cloud Storage usando Bigquery. Assim você pode criar um dataset e uma tabela com fonte externa, e executar consultas nos dados, veja a seguir:

Janela para criar uma tabela com fonte externa no BigQuery

E assim executar consultas:

Executando consultas nos dados recentemente importados

Pronto! Você, nesse artigo você aprendeu a criar uma aplicação streaming usando Pyspark e submeter ao Dataproc, e sem se preocupar em gerenciar o seu cluster. Agora, tente você mesmo clonar o repositório, alterar a fonte de dados e submeter o seu job no Dataproc usando a sua conta GCP.

Se tiver dúvidas, me mande uma mensagem ou comentem que respondo assim que possível.

--

--