Olá Scio! API Scala para o Apache Beam — Parte 1: Teoria

Lucas Magalhães
Engenharia de Dados Academy
11 min readJul 18, 2022

Apache Beam

O Apache Beam é hoje um dos principais frameworks para Processamento Distribuído de Big Data. Se você usa a nuvem da Google (Google Cloud Platform) já deve ter ouvido falar do Google Cloud Dataflow, que nada mais é do que uma aplicação serverless para jobs do Apache Beam. Beam tem uma sintaxe única, você consegue criar jobs batch ou streaming com o mesmo vocabulário, além de abstração de várias camadas de uma aplicação comum de processamento distribuído. Coisas como otimização e tunning de subprocessos que você não precisa se preocupar, como consumo de cpu, memória ou gerenciamento do garbage collector (coisas que não acontece com outros frameworks como MapReduce e Apache Spark).

Modelo de desenvolvimento do Apache Beam

Falaremos mais sobre o modelo de programação do Apache Beam, mas antes…

E esse tal de “Scio”. Aonde entra?

Embora o Apache Beam tenha enormes vantagens como ser um serviço totalmente gerenciado (noOps) com auto scale nativo e uma sintaxe universal, você hoje consegue um maior aproveitamento desses recursos trabalhando dentro do Google Cloud Dataflow, ou seja GCP, então acaba atraindo uma gama menor de desenvolvedores e resultando numa comunidade menor se comparado a outros projetos apache como o Kafka ou o próprio Spark. Sendo assim, muitas APIs ainda não estão muito maduras como a de Python e Go. Forçando muitas vezes o desenvolvedor a trabalhar com a API tradicional do Beam, o Java. O que acaba trazendo uma barreira muito grande de entrada para novos desenvolvedores, já que o Java tem uma curva de aprendizado maior do que outras linguagens mais populares de dados como Python e R. Além do fato que, acaba sendo mais difícil para desenvolvedores que trabalham com spark por exemplo (Que é uma das maiores comunidades de Processamento distribuído) migrarem seus jobs para o Beam, já que a sintaxe é muito diferente dos demais Frameworks. E é exatamente aí que entra o Scio.

Criado em 2016 pelo Spotify, Scio é uma api em Scala para o Apache Beam feito justamente com o objetivo de simplificar o desenvolvimento (Scala por si só é menos verboso do que o Java) e aproximar devs que trabalham com outros Frameworks em scala como Spark, Scalding e Flink ao Apache Beam.

Spotify cita o Scio com algumas vantagens em cima da tradicional API Java do Apache Beam:

- Ao contrario de Java e Python, Scala é uma Domain Specific Language feito especificamente para aplicações distribuídas de Dados.

- Maior Familiaridade com outros Frameworks Tradicionais (ex. Apache Spark).

- A possibilidade de usar programação funcional que é perfeita para trabalhar com dados.

- Usar bibliotecas numéricas do Scala como Breeze e Algebird

- Macros para maior dinamismo e geração de código.

// Scio Scala API exemplo Word Countval sc = ScioContext()
sc.textFile("shakespeare.txt")
.flatMap { _
.split(“[^a-zA-Z’]+”)
.filter(_.nonEmpty))
.countByValue
}
.map(t => t._1 + “: “ + t._2)
.saveAsTextFile(args(“output”))
// Java API exemplo Word CountPipeline p = Pipeline.create(options);
p.apply(TextIO.read().from("shakespeare.txt"))
.apply(FlatMapElements
.into(TypeDescriptors.strings())
.via((String line) -> Arrays.asList(line.split("[^\\p{L}]+")))
)
.apply(Count.<String>perElement())
.apply(MapElements
.into(TypeDescriptors.strings())
.via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue())
)
.apply(TextIO.write().to("output"));

Ufa! Parece que agora o Scala não é tão ruim assim, concorda? risos.

Beam Guide

Bem, precisamos agora conhecer um pouco da teoria do Beam, até para entender como o Scio funciona, vamos dar uma pincelada nos principais conceitos. Para um maior aprofundamento sugiro dar uma olhada na documentacao oficial.

Palavras chave

Pipeline

É o objeto chave no Apache Beam, ele encapsula todos os passos que seu dado vai dar (Algo que chamamos de Transform), nele podemos setar várias opções, como o runner que vai executar, se é um job do tipo streaming ou não e outros parâmetros como ID do Projeto da GCP.

📚 Podemos comparar o Objeto Pipeline como o SparkContext no Apache Spark e no scio como ScioContext

PCollection

Outro Objeto chave no Beam, ele é a representação do seu dado de forma distribuída para os workers. Toda leitura e transformação de dados no Beam recebe e gera uma PCollection. É um elemento obrigatório para trabalharmos com esse Framework. Ela apresenta algumas características como:

  • não-compartilhamento entre pipelines (Cada Pipeline tem sua própria PCollection)
  • Cada Elemento dentro de uma PCollection precisa ser de um mesmo tipo. Ex. PCollection[String], PCollection[Int]
  • Imutabilidade ( Cada passo de Transformação precisa receber uma PCollection e criar um novo objeto PCollection e não apenas modificar o objeto já existente).
  • Uma PCollection pode ser do tipo Bounded (Tamanho conhecido e fixo) ou do tipo Unbounded (Tamanho ilimitado normalmente visto em Jobs Streaming).

📚 Podemos comparar o Objeto PCollection como o RDD no Apache Spark e no scio como SCollection

Connectors I/O

O Beam possui diversos conectores para leitura e escrita de dados, desde arquivos existentes em um object storage (GCS) até dados recebidos em tempo real com uma ferramenta de mensageria (Pub/Sub e Kafka). Mas hoje para o Scio que é um framework muito focado em Google Cloud e DataFlow temos um número um pouco menor de Connectors (Basicamente fica focado ao tipo de demanda de dados que o pessoal do Spotify trabalha no dia a dia, que já é bastante coisa para nos beneficiar):

  • File Storage (CSV, Avro, Parquet, txt…)
  • BigQuery
  • BigTable
  • Pub/Sub
  • CloudSQL
  • Datastore
  • ElasticSearch

📚 Mas é aí que entra a mágica do Scio, você pode completar a falta de um connector que não existe default com o que existe na API Java do Beam. Lembra que estamos falando de um framework que é em Scala e também roda na JVM! Você pode muito bem misturar partes do código do Beam com partes de código do Scio! Então vale tudo!

Transforms

É o Step dentro do Beam que provê o processamento do dado em si ( O nome já é auto-explicativo). Você provê uma lógica de processamento na forma de uma função e sua “função transformação” é aplicada a cada elemento da sua PCollection. Cada sub-operação dessa, será dividida entre grupos de tarefas que serão executadas pelos workers. O grau do Paralelismo está intrinsecamente ligado ao Runner escolhido e o Back-end do Cluster. Elas podem ser chamadas com o método genérico apply . Você pode chamá-lo de forma encadeada também. Cada applypara cada transformação que você quer aplicar em sua PCollection.

Exemplo genérico de um Pipeline com Transformações encadeadas

Existem varias Transformações Built-in no Beam as principais são :

  • ParDo: É a transformação mais genérica do Beam. Para cada elemento da PCollection de entrada, o método DoFn vai aplicar uma função de processamento (função que você define anteriormente) naquele elemento que pode emitir nenhuma, uma (Algoritmo Map) ou múltiplos elementos para a PCollection de saída.

📚 No Scio podemos usar as funções padrão em Scala ao invés de um ParDo para processamento genérico como map(1:1) ou o flatMappor exemplo (1:N).

  • GroupByKey: É uma transformação de uma PCollection, aonde queremos agrupar dados a partir de uma chave única comum entre os elementos. Similar ao algoritmo GROUP BYdo SQL.
  • CoGroupByKey: Uma transformação bem interessante de agrupamento, onde é possível unir 2 PCollections a partir de um campo chave em comum. O resultado é uma nova PCollection com cada elemento sendo uma junção de uma chave específica e os valores que tinham para essa determinada chave em cada uma das PCollections de Entrada.
  • Flatten: É uma transformação semelhante ao algoritmo Flatten. Ele pega 2 ou mais PCollections e combina em uma única PCollection. Obs. Todas as PCollections precisam ser do mesmo tipo.
  • Partition: Transformação inversa ao Flatten, a partir de uma função que você define, Partition vai separar uma PCollection em múltiplas PCollections menores.

📚 Keep Calm. Veremos essas e outras Transforms na Parte 3 (Hands-on Batch Processing) desta série de artigos sobre o Scio.

Side Inputs

Esse é outro recurso extremamente interessante no Beam. Além da PCollection principal o Transform ParDo pode receber outros inputs na forma de SideInput. O SideInput é um input adicional que sua função dentro do ParDo vai acessar sempre que ela for processar um elemento da sua PCollection principal. Quando você especifica um side input você está criando uma view de algum dado para ser usado pelo ParDo juntamente com sua PCollection. É uma técnica muito usada para fazer joins entre Unbounded e Bounded PCollections (Ex. juntar dados do Pub/Sub com uma Tabela Estática do BigQuery).

Schemas

Beam traz um conceito mágico sobre schemas, basicamente quanto temos estruturas mais complexas de dados como JSON,Avro e Protobuf, nós podemos usar um pouco do conceito de OOP e criar abstrações na forma de classes para representar aquela determinada PCollection. O Java e por tabela Scala, possuem excelentes bibliotecas para serializar e desserializar classes. Assim, por exemplo, se você recebe do Pub/Sub uma mensagem no formato JSON, e você tem uma estrutura lógica de dados, pode-se criar uma classe que representa esse JSON e usá-lo para converter o tipo da sua PCollection em tipo Classe.

Imagine que voce receba do Pub/Sub a seguinte mensagem:

{
"user": "Thomas Shelby",
"age" : 43,
"role" : "Gangster Lead"
}

Podemos Criar uma case class(Scala) para representar esse dado:

case class User(user: String, age: Int, role: String)

E usar um middleware para fazer o Parse. Por exemplo no Scala, existe um framework excelente para trabalhar com Json chamado Circe.

Podemos por exemplo criar uma função middleware para parsear nosso Json para a classe que criamos User:

def decodeJson(json: String): User = {
val decoded = parser.decode[User](json) match {
case Right(x) => x
case Left(x) => throw new ValidationException(x.getMessage)
}
return decoded
}

E passá-la como um Transform utilizando o método mappor exemplo:

val stringMessage: SCollection[String] = ??? // read from Pub/Sub
val messageDecoded: SCollection[User] = stringMessage.map(x => decodeJson(x)
)

Windows (Janelas de Execução)

O Conceito de Janelas é amplamente utilizado em Jobs Streaming. Transforms que fazem agregações como os métodos GroupByKey e Combine trabalham implicitamente com janelas. Basicamente como jobs streaming tem tamanho ilimitado e para agregações nós precisamos definir um tamanho de amostra para tal, utilizamos uma janela de tempo (um intervalo para chegada de eventos a partir de um timestamp para cada elemento da PCollection). Existem vários tipos de Janelas, dentre elas, as principais são:

  • Fixed Time Window: Setamos um intervalo fixo para fazer agregações a partir de um campo normalmente do tipo Timestamp já embutido nos elementos da PCollection, no exemplo abaixo, um processo com um intervalo de 30 segundos, a Window 0 terá os elementos que chegaram com timestamp entre 0:00:00 e 0:00:30(não incluso), a Window 1 terá os elementos que chegaram entre 0:00:30 e 0:01:00(não incluso) e assim em diante:
Fixed Window com duração de 30s.
  • Sliding Time Window: Parecido com a Fixed, entretanto as janelas podem se sobrepor, por exemplo imagine uma janela de 60 segundos sendo que cada nova janela começa a cada 30 segundos:
Sliding Window 60–30s
  • Session window: Um conceito um pouco diferente de janela, a partir de um período mínimo de inatividade de entrada de novos elementos na PCollection, que você escolhe, é criada uma nova janela:
Session Window exemplo.
  • Single Global Window: Aqui é onde dizemos que Batch é apenas um caso especial de streaming no Beam. Todo job Batch na verdade pode ser representado como um job Streaming utilizando a Single Global Window, ou seja todos os dados existentes vão ser direcionados para essa janela única (apenas usado para dados com um tamanho fixo, porém você pode utilizar também para Jobs Streaming criando um Trigger customizado).

Watermarks e dados “atrasados”

Em sistemas distribuídos streaming, pode existir um gap entre a chegada do dado da fonte x e a hora em que ele entra para ser processado. Por exemplo, podemos estar usando uma estratégia de janelas com 30 segundos de intervalo fixo. Imagina que estamos na janela 5:00–5:30 e após chegar um evento com timestamp 5:35 essa janela inicial é fechada. Qualquer dado que chegar após isso com Timestamp abaixo de 5:30 é considerado late data ou dado atrasado. No Beam você pode controlar isso com watermarks, que é um período a mais que você pode esperar para atribuir aquele dado a uma janela específica utilizando o método .withAllowedLateness .

📚 Veremos mais sobre esses conceitos em um artigo futuro do Scio Parte 4 (Hands-on Streaming Processing) fique de olho!!!

Triggers

Quando estamos trabalhando com janelas, Beam usa Triggers para determinar quando ele vai emitir os resultados agregados de cada window. Por padrão Beam usa o Default Trigger, ou seja ele vai “outputar” os resultados agregados quando terminar de estimar todo o dado que chegou, e descartar toda o dado que chegar atrasado dentro daquela janela específica. Você pode mudar esse comportamento utilizando os diferentes tipos de Triggers, são eles:

  • Event Time Triggers: Eles operam com o timestamp do evento em si (Chegada do dado) e utilizam o timestamp existente em cada elemento da PCollection. O Trigger Default do Beam é baseado nesse timestamp.
  • Processing Time Triggers: Utilizam o Timestamp de Processamento, que é o tempo quando o Beam processa aquele elemento específico durante qualquer estágio do Pipeline.
  • Data-Driven Triggers: Eles operam examinando o dado que chega em cada Window, e é acionado quando o dado entra em uma condição específica, criada pelo usuário. Por enquanto, esse tipo de trigger só é acionado depois da chegada de uma certa quantidade de elementos na PCollection.
  • Composite Triggers: São triggers que combinam vários outros triggers citados anteriormente e de muitas maneiras diferentes.

E quais são as vantagens dos Triggers?

  • Triggers permitem ao Beam entregar resultados mais cedo. Antes de todo o dado de uma determinada janela chegar. Por exemplo o Data-Driven Trigger que pode “outputar” uma agregação após talvez uma determinada quantidade de dados chegar dentro de uma janela específica.
  • Triggers permitem processamento do dado atrasado trigando-o depois que o watermark finaliza a espera em cima daquela janela alvo.
  • Com Triggers podemos ter um melhor controle do fluxo de dados respondendo as seguintes perguntas: O quão importante é ter todo o dado antes de computar aquele resultado específico? Quanto tempo você quer esperar pelo dado? Por exemplo, você vai esperar até achar que tem todo o dado? Ou você vai processar o dado assim que ele chegar? Quanto de Processamento/Memória você está disposto a pagar para diminuir a latência do seu dado?

Bem pessoal é isso! Vimos um pouco do básico do modelo Beam e como ele se relaciona com o Scio. Basicamente, nos próximos artigos vamos ver que conseguimos adaptar todos esses conceitos para os Jobs de Batch e Streaming. No próximo artigo vamos ver como criar um novo projeto Scio, o que é preciso instalar localmente e como rodar o seu primeiro HelloWorld.Scala com essa incrível Framework. Espero que tenham gostado! Fiquem atentos para mais em breve!! See ya!

--

--