Encapsulando Transformações em DataFrames com Pyspark

Rubens Sôto
Data Hackers
Published in
2 min readApr 18, 2020

Hoje iremos realizar algumas transformações simples em um dataframe e veremos uma forma de deixar nosso código muito mais legível.

Considerando que você esta em ambiente com Spark configurado, vamos criar a sessão para começar nossas transformações:

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('EtlCovid').getOrCreate()

Sessão criada, vamos importar nosso data source para um Spark Dataframe.

df = spark.read.csv('us-counties.txt', header = True)

Criando Funções de Transformação

Nós vamos criar 3 funções simples, a primeira irá adicionar um campo fictício ao dataframe.

def add_column(df):

return df.withColumn('Contry', lit('USA'))

A segunda irá filtrar o dataframe.

def filter_state(df):
return df.where(col('state') == 'Washington')

Nossa última função, irá agrupar nosso dataframe por dia.

def group_by_date(df):
return df.groupBy('date', 'Contry').agg(sum('cases').alias('Cases'), sum('deaths').alias('Deaths'))

Utilizando Nossas Funções — Forma 1

Nossas funções estão criadas, agora vamos aplicar ao nosso dataframe “df” que foi criado lá em cima.

df1 = add_column(df)

df2 = filter_state(df1)

df3 = group_by_date(df2)

Será que funcionou? Vamos dar uma olhada no dataframe final o “df3”.

Resultado do método show()

Maravilha!!!!! Todas as nossas transformações funcionaram, tudo certo então? Claro que não!

Vocês conseguem perceber como o nosso código esta feio? df1, df2, df3…Imagina se fossem 10 transformações, mesmo se você fosse a pessoa mais criativa do mundo em criar nomes, ia ficar uma confusão.

Será que tem um jeito melhor de fazer isso? Claro que tem!!!

Utilizando Nossas Funções — Forma 2

Temos que conseguir um jeito, que não seja necessário em cada transformação, ter que criar um novo dataframe.

Vamos criar uma função para nos ajudar nessa tarefa.

from pyspark.sql.dataframe import DataFrame

def transform(self, function):
return function(self)

DataFrame.transform = transform

A função recebe dois parâmetros, self e function, self será o próprio dataframe a quem estamos querendo realizar a transformação e function, será a função que fará a transformação.

Agora o nosso código ficará:

df_transformed = df.transform(add_column)\
.transform(filter_state)\
.transform(group_by_date)

Ficou bem mais simples né?? Se o número de transformações crescer, nosso código ficará muito mais simples e organizado.

O que vocês acharam de realizar as transformações dessa forma? Comentem ai!!

Clique Aqui para Acessar meu Linkedin

Fonte: https://mungingdata.com/pyspark/chaining-dataframe-transformations/

Imagem Docker Utilizada nos Testes: https://hub.docker.com/r/jupyter/all-spark-notebook/

--

--

Rubens Sôto
Data Hackers

Sou apaixonado por tecnologia, especialmente pela área de dados. Os assuntos que mais gosto de escrever e ler são relacionados a Big Data e Cloud.