Olá Scio! API Scala para o Apache Beam — Parte2: Guia de instalação

Lucas Magalhães
Engenharia de Dados Academy
7 min readAug 30, 2022

Java 11

Antes de mais nada não podemos esquecer que o Scala é uma linguagem que roda pela JVM e por isso é obrigatório termos o java instalado. A galera do Scio recomenda até o momento o uso do java 11 para desenvolvimento com o framework. Para windows você pode tentar seguir o tutorial do CodeJava e a Documentação da Oracle para instalação no Linux. Para mac via HomeBrew você pode utilizar o seguinte comando:brew install java11

Scala 2.12+

E é claro que não podemos esquecer do próprio Scala. Você pode seguir o tutorial do site oficial da Linguagem para instalação de acordo com seu sistema operacional. Para este exemplo utilizaremos a versão 2.12.15 do Scala.

SBT (Scala Build Tool)

Assim como no java utilizamos o Maven para gerenciador de pacotes, no Scala temos o SBT, ele é uma ferramenta de gestão de pacotes e build essencial para qualquer projeto scala/java. Segue o tutorial do site oficial para instalação de acordo com seu sistema operacional.

Criando um projeto Scio

Para criar um projeto do Scio podemos usar o scio-template criado pelo próprio Spotify ou adicionar as dependências de Biblioteca no arquivo build.sbt usado pelo próprio SBT onde declaramos todas as configurações do projeto. Neste exemplo vamos criar um projeto usando o sbt e instalando as dependências manualmente. Criamos uma pasta para o projeto chamada scio-getting-started/

📚 Você pode seguir e baixar o projeto desse artigo acessando o repo: https://github.com/magal1337/scio-hands-on/tree/main/scio-getting-started

Nela vamos executar o comando sbt e vamos setar 2 parâmetros: O nome do projeto e a versão que utilizaremos do Scala. Ao final desses comandos e gerado uma estrutura básica de arquivos de um projeto sbt, assim como o build.sbt

sbtset name := "scio-getting-started"set scalaVersion := "2.12.15"session saveexit

Para usarmos o scio precisamos instalar alguns pacotes maven como o scio-core e o módulo de testes, assim como o apache-beam runner. Nesse exemplo vamos instalar o Direct Runner e o Dataflow Runner. Lembrando que todas essas dependências são configuradas com o arquivo build.sbt que geramos anteriormente. Vamos usar a versão 0.10.3 do Scio e a versão 2.29 do Apache Beam. Ao final desses ajustes vamos ter o seguinte arquivo build.sbt:

Pronto! Agora você já tem o básico para começar com o Scio!

🏀 Warming up! Demo NBA Players

Vamos criar um caso de uso para testarmos o Framework. Vamos analisar um dataset do Kaggle de Jogadores da NBA. Nele temos um Histórico de Jogadores com algumas informações como o Nome do Jogador, Altura, Peso, Universidade que cursou, Ano de nascimento, Cidade em que nasceu e Estado/País em que nasceu. É comum no Scio trabalharmos com uma mistura de Programacao Funcional e OOP. Assim podemos dizer que todo dataset ou tabela poderia ser uma Entidade. Vamos chamar essa Entidade de nbaPlayer e vamos criar uma nova pasta seguindo a estrutura do sbt:

 src/main/scala/entities/

Nela vamos criar um novo objeto chamado part2InstallEntities.scala. Nesse objeto vamos ter uma classe Scala para definir as informações dos jogadores em cada linha do dataset. O arquivo será do tipo csv, por isso usaremos a Biblioteca do Scio CsvIO para leitura e escrita do CSV. Criaremos um decoder que basicamente é uma instrução de como o scala deve parsear o csv e criar a case class do tipo nbaPlayer para cada jogador do nosso dataset. Além disso, precisamos dizer como o CsvIO deve ler o csv de origem e escrever também nosso resultado final. A ideia do nosso primeiro Job irá ser:

  • Ler o CSV com as infos dos jogadores
  • Parsear cada linha do csv como uma Scollection do tipo nbaPlayer (Nossa case class)
  • Calcular a altura máxima de cada jogador por estado
  • Filtrar apenas os estados com jogadores de altura máxima maior do que 2 metros
  • Gravar o resultado como um arquivo csv

Nosso Objeto part2InstallEntities.scala ficará dessa forma:

Agora com as Entidades e parâmetros de configuração extra criados, vamos definir nosso job do Scio, vamos criar um arquivo chamado:

 src/main/scala/part2Install/helloScio.scala

Todo job Scio é definido criando-se um objeto Scala e um método main que vai ser nosso entrypoint e pode receber argumentos. Para aproveitar nossa classe nbaPlayerExample ja criada em part2InstallEntities vamos fazer com que helloScio herde essa classe e seja serializavel para que o Scio possa usá-la. O corpo do job então ficaria dessa forma:

E é dentro do método main que vamos escrever nossa lógica! Vamos começar criando nosso ScioContext e lendo o arquivo csv. Vamos definir nosso job com 2 argumentos: O caminho de entrada do csv e o caminho aonde o Scio vai gravar o csv final. Um comando prático do scio para criar o contexto e conjuntamente obter os argumentos do job se chama ContextAndArgs. O retorno dele são 2 variáveis o ScioContext (vamos chamar de sc) e args que é um objeto do tipo Map (Semelhante ao dicionario do Python) em que cada elemento (chave/valor) é um argumento que passamos no início do Job.

def main(cmdlineArgs: Array[String]): Unit = {

val (sc, args) = ContextAndArgs(cmdlineArgs)
}

Com o contexto criado usando a variável sc vamos ler o arquivo csv com o método .csvFile do Scio, passando o caminho do csv que vai estar no argumento “input” e a configuração de leitura do csv que foi criada na classe pai nbaPlayerExample chamado csvReadConfig

val nbaDf: SCollection[nbaPlayer] = sc.csvFile(args("input"), csvReadConfig)

Agora temos uma nova SCollection chamada nbaDf em que cada elemento é uma case class do tipo nbaPlayer! Assim conseguimos acessar os parâmetros do csv com muito mais facilidade!

Agora para acharmos os jogadores mais altos por estado primeiramente, precisamos definir uma key para nossa SCollection para agruparmos posteriormente. Para isso podemos usar o método .keyBy do Scio e definir a key a partir do elemento da Scollection ( Que no nosso caso é a Case Class nbaPlayer). Sabemos que a informação do estado está na variavel birthState, então é ela que vamos acessar e colocar como key da nossa SCollection transformando de uma SCollection[T] para uma SCollection[(k,T)](Uma SCollection chave/valor e criada usando uma tupla do Scala). Bem, além disso, precisamos apenas da key estado e da altura dos jogadores para fazer nossa análise. A altura dos jogadores está na variavel height. No Scio podemos usar o método .map para fazer um select nos campos que queremos reter da nossa case class. Então até agora temos a seguinte configuração do nosso job no método main:

def main(cmdlineArgs: Array[String]): Unit = {

val (sc, args) = ContextAndArgs(cmdlineArgs)

val nbaDf: SCollection[nbaPlayer] = sc.csvFile(args("input"), csvReadConfig)

val tallestPlayers = nbaDf
.keyBy(_.birthState)
.map(x => (x._1,x._2.height))

Agora que já temos somente o campo que nos interessa e a key referenciada, podemos fazer um cálculo de agrupamento. No nosso caso será o método .maxByKey que vai pegar o maior inteiro por key da nossa SCollection (Vamos obter o maior inteiro da variável height para cada key distinta da SCollection). Além disso, em nossa SCollection podemos ter dados nulos tanto na altura como no estado. Vamos removê-los da nossa SCollection com o método .filter. Primeiro vamos acessar a key da nossa Scollection pela tupla criada e com o metodo .getOrElsedo Scala forçamos qualquer variável nula fique com o valor String “null” e assim dizemos que só queremos manter os elementos da SCollection que tem a key com valor diferente de “null”.

📚 Existem vários meios de se filtrar uma SCollection esse é apenas um deles.

def main(cmdlineArgs: Array[String]): Unit = {

val (sc, args) = ContextAndArgs(cmdlineArgs)

val nbaDf: SCollection[nbaPlayer] = sc.csvFile(args("input"), csvReadConfig)

val tallestPlayers = nbaDf
.keyBy(_.birthState)
.map(x => (x._1,x._2.height))
.maxByKey
.filter(_._1.getOrElse("null") != "null")

Muito Bem, agora vamos apenas manter os estados que tem jogadores com altura máxima maior ou igual a 2 metros! Mas antes disso vamos tratar os casos que podem vir nulos. No nosso caso vamos utilizar o método .mapde novo somente para forçar qualquer valor de height que seja nulo para se tornar zero com novamente o método .getOrElsee na sequência vamos filtrar para manter somente os estados com altura maior do que 2 com o método .filter do Scio.

def main(cmdlineArgs: Array[String]): Unit = {

val (sc, args) = ContextAndArgs(cmdlineArgs)

val nbaDf: SCollection[nbaPlayer] = sc.csvFile(args("input"), csvReadConfig)

val tallestPlayers = nbaDf
.keyBy(_.birthState)
.map(x => (x._1,x._2.height))
.maxByKey
.filter(_._1.getOrElse("null") != "null")
.map(x=> (x._1.get,x._2.getOrElse(0)))
.filter(_._2 >= 200)

Perfeito! E para fechar vamos “printar” no nosso Terminal os valores dos elementos da SCollection (podemos fazer isso usando o método .map e para cada elemento da SCollection vamos dar um println nos dados).

E também vamos novamente utilizar o CsvIO do Scio para salvar o resultado em um arquivo csv com o método .saveAsCsvFilepassando o argumento "output" que vai ser nosso caminho de gravação que inserimos la no início do job e também a variável csvWriteConfiguration que criamos na Classe nbaPlayerExample. Assim Fechamos nosso Job que fica dessa forma:

📚 Lembrando sempre de chamar o método .run() no final do seu job Scio com o ScioContext.

Para deployar o Job você precisa escolher um Runner, como esse e um experimento simples vamos executá-lo com o DirectRunner mas você poderia utilizar o DataFlowRunner para executá-lo em produção modificando apenas o parâmetro "runner" e passar paths por exemplo do Google Cloud Storage, para ler e gravar os arquivos csv. Pelo SBT podemos fazer da seguinte forma:

sbt "runMain part2Install.helloScio --runner=DirectRunner 
--input=data/input/players.csv --output=data/output/tallest_players"
state,biggest_heightRomania,231
South Sudan,231
China,229
Germany,229

A Romênia e o Sudão do Sul possuem os jogadores com as maiores alturas 231 cm! Seguidos da China e da Alemanha!

Manute Bol é o campeão do Sudão do Sul com 231 cm! Conhecido como a “Muralha Africana”.

Bem, isso é tudo pessoal! Fiquem tranquilos sobre os métodos de transformação, criação de classes e serialização no Scio. Vamos ver mais a fundo no artigo sobre processamento batch e mais outros métodos de agrupamento! Espero que tenham gostado e fiquem ligados que em breve vem mais análises da NBA usando o nosso poderoso Framework. See ya!

--

--