Gspread no Airflow
Read this post in English here
Imagine que você precise atualizar uma planilha todos os dias com dados de uma determinada fonte de forma manual. Já pensou em tentar automatizar a atualização desta planilha?
Sim, sabemos que usar uma planilha como banco de dados não é a forma mais recomendada de se guardar os seus dados, mas sabemos que isso pode acontecer.
Então escrevi este artigo pensando neste problema, uma forma de você automatizar a atualização de uma planilha seria criar uma DAG que estivesse configurada para ser executada a cada período e que tivesse uma task que atualizasse essa planilha.
Assim, dividi esse artigo em três partes:
1. Configuração do Service Account
2. Configuração do Airflow
3. Criação da DAG
1. Configuração do Service Account
Para conseguir utilizar a API do Google para manipular sua planilha, você deve criar um Service Account. Então, siga os passos abaixo para criar um Service Account no Google.
Primeiro, entre no site Google Cloud Platform, clique no projeto e depois em New Project, como mostrado no print abaixo:
Então dê um nome ao seu projeto e clique em Create.
Perceba que aqui ele vai mudar para o nome do seu projeto para o projeto que você acabou de criar:
O próximo passo é habilitar as APIs que queremos usar neste projeto, então vá em APIs & Services, e depois em Enable APIs & Services:
Aqui, vamos habilitar duas APIs:
- Google Drive API
- Google Sheets API
1.1 Criando Credenciais no Google Sheets API
Agora temos que criar as credenciais para usar a API do Google Sheets, então pesquise novamente por Google Sheets API e clique em Manage:
Então vá em Credentials e depois em Manage Service Accounts:
Então clique em Create Service Account:
Escolha um nome para o seu serviço e clique em Create and Continue:
Então clique em Actions e em Manage Keys:
Clique em Add Key e em Create New Key:
Escolha o formato JSON e clique em Create. O download de suas credenciais será feito de forma automática.
1.2 Compartilhando a Planilha com o seu Service Account
Vá em Service Account Details e copie o e-mail do seu Service Account:
Crie uma nova Planilha e compartilhe com o e-mail que copiamos:
2. Configuração do Airflow
A forma que eu escolhi para rodar o Airflow na minha máquina foi rodando a imagem que o Docker Hub disponibiliza do Airflow. Para usar desta forma você precisa ter o Docker Compose instalado na sua máquina conforme o seu sistema operacional.
Aqui você vai encontrar o docker-compose.yml
se quiser executar o Airflow por meio do docker.
A biblioteca Gspread não está no docker-compose.yml
, então como instalamos bibliotecas novas na imagem do docker? Criando um arquivo Dockerfile. Assim, serão instaladas as bibliotecas de acordo com o meu sistema operacional. Crie um arquivo Dockerfile com a configuração abaixo:
FROM apache/airflow:2.5.1
USER airflow
RUN pip install gspread
Se você estiver usando algum sistema operacional baseado no Debian, como o ubuntu, provavelmente você precisará passar o USER root
antes de passar as bibliotecas que você deseja instalar com o apt-get
e depois voltar para o USER airflow
. Se quiser configurar o seu Dockerfile de outra forma, basta seguir o que está na documentação.
Seguindo a documentação, para conseguir instalar tudo que está no meu Dockerfile, o próximo passo é comentar a linha do nome da imagem e descomentar a linha que faz o build
no meu docker-compose.yml
:
# image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.5.1}
build: .
Feito isso, podemos executar docker-compose up -d
.
2.1 Adicionando a conexão do Google no seu Airflow
Abra no seu browser localhost:8080
para acessar a UI do Airflow. O nome de usuário e senha padrão são airflow
.
Vá em admin, connections e então clique para adicionar uma nova conexão:
- Connection Id: Aqui você vai colocar o nome que você preferir para a sua conexão
- Connection Type: Você deve selecionar a opção Google Cloud
- Project Id: Você vai informar o id do seu projeto
- Keyfile JSON: Aqui você vai colar as credencias que você fez o download
- Scopes: cole https://www.googleapis.com/auth/drive
E pronto! Pode clicar em salvar.
Depois clique em Test para verificar se sua conexão está ok:
2.2 Erros ao testar a conexão com a API
Ao testar a minha conexão com a API, eu tive o erro abaixo:
('invalid_grant: Invalid JWT: Token must be a short-lived token (60 minutes) and in a reasonable timeframe. Check your iat and exp values in the JWT claim.', {'error': 'invalid_grant', 'error_description': 'Invalid JWT: Token must be a short-lived token (60 minutes) and in a reasonable timeframe. Check your iat and exp values in the JWT claim.'})
Ao pesquisar sobre o erro, vi que estava relacionado à diferença de timezones. Depois de checar se o horário da minha máquina estava sincronizado com o meu timezone, encontrei que eu deveria adicionar no docker-compose.yml
a informação indicando para usar o meu timezone, então adicionei essas duas linhas para sincronizar o horário da imagem com o meu timezone:
volumes:
- /etc/timezone:/etc/timezone:ro
- /etc/localtime:/etc/localtime:ro
Depois de ter feito essa modificação no docker-compose.yml
consegui testar a minha conexão com sucesso.
3. Criação da DAG
Então eu criei uma DAG simples apenas com o objetivo de testar o envio de dados de um dataframe para o Gsheets usando o Airflow.
O primeiro passo foi criar um dataframe com preços de produtos:
data = {
"products": ["product_1", "product_2", "product_3"],
"price": [50, 40, 45],
}
df = pd.DataFrame(data)
Para usar a conexão da API que eu adicionei no Airflow, eu usei o GoogleBaseHook
:
hook = GoogleBaseHook(gcp_conn_id="google_conn_id")
credentials = hook.get_credentials()
google_credentials = gspread.Client(auth=credentials)
E por último, já usando o gspread, a gente informa o nome da Planilha que estamos usando, o nome do worksheet e depois enviamos os dados do dataframe para o worksheet.
sheet = google_credentials.open("Products - Data")
worksheet = sheet.worksheet("products-data")
worksheet.update([df.columns.values.tolist()] + df.values.tolist())
Esse é o código completo da DAG:
from airflow import DAG
from airflow.providers.google.common.hooks.base_google import GoogleBaseHook
import pandas as pd
import gspread
from datetime import datetime
with DAG(
dag_id="dataframe_to_spreadsheet",
start_date=datetime.now(),
schedule_interval="@daily",
) as dag:
@dag.task
def dataframe_to_spreadsheet_task():
data = {
"products": ["product_1", "product_2", "product_3"],
"price": [50, 40, 45],
}
df = pd.DataFrame(data)
# Hook to Google Sheets in order to get connection from Airflow
hook = GoogleBaseHook(gcp_conn_id="google_conn_id")
credentials = hook.get_credentials()
google_credentials = gspread.Client(auth=credentials)
# Reading a spreadsheet by its title
sheet = google_credentials.open("Products - Data")
# Defining the worksheet to manipulate
worksheet = sheet.worksheet("products-data")
# Sending data from df to the worksheet
worksheet.update([df.columns.values.tolist()] + df.values.tolist())
dataframe_to_spreadsheet_task()
Então temos a worksheet atualizada:
Se você quiser explorar outras funcionalidades da biblioteca gspread, pode acessar a documentação aqui. Você pode encontrar todo o projeto no meu repositório do GitHub.