ETL — Generate fake data on JSON file, Extract this data from JSON file, Transform values , Load it into a MySQL Database with Python and Schedule tasks with Linux Cronjob
Index
Introduction
Goal
1. Lets do it starting with Docker Playground:
2. Init things inside Docker Labs:
2.1 Create and populate the initial database schema
3. Making a data generator of attendances with Faker lib and Python
4. Creating and running the Python ETL script
5. Finally, configuration for daily scheduled ETL script
Conclusion
Introduction
In this post we will present some of most used techniques when we sync data from a JSON file to a MySQL database using Python program language.
Goal
Create a data generator, run it and consume the data with a ETL (Extract, Transform and Load) mechanism with Python, sending it to MySQL Database with a current pre-loaded schema.
1. Lets do it starting with Docker Playground:
PWD is a Docker playground which allows users to run Docker commands in a matter of seconds. It gives the experience of having a free Alpine Linux Virtual Machine in browser, where you can build and run Docker containers and even create clusters in Docker Swarm Mode.
Its simple to use…
2. Init things inside Docker Labs:
# MySQL 5.7 #
docker run -d -p 3306:3306 -v /data/mysql:/var/lib/mysql --name banco_mysql -e MYSQL_ROOT_PASSWORD=123123 -e MYSQL_DATABASE=brunow -e MYSQL_USER=brunow -e MYSQL_PASSWORD=brunow mysql:5.7.33# PHPMyAdmin #
docker run --name phpmyadmin -d --link banco_mysql:db -p 8081:80 phpmyadmin/phpmyadmin# Instalar apks e Python 3 #
apk update
apk upgrade
apk add nano
apk add python3
apk add openrc --no-cache
apk --no-cache add openjdk11 --repository=http://dl-cdn.alpinelinux.org/alpine/edge/community
python3 -m ensurepip
pip3 install --upgrade pip
pip3 install faker mysql.connector
2.1 Create and populate the initial database schema
Now we have to sync the SQL script to MySQL. The following lines must do this job:
# SQL Script para criar a base inicial #
touch /data/mysql/db_at.sql
echo '
SET SQL_MODE = "NO_AUTO_VALUE_ON_ZERO";
START TRANSACTION;
SET time_zone = "+00:00";CREATE TABLE `num_at_not_recept_monthly_by_midia` (
`created_at` varchar(7)
,`nm_midia` varchar(50)
,`qtd_ligacoes` bigint(21)
);CREATE TABLE `num_at_recept_daily_by_status_by_media` (
`created_at` varchar(10)
,`nm_status` varchar(50)
,`nm_midia` varchar(50)
,`qtd_ligacoes` bigint(21)
);CREATE TABLE `tb_call` (
`id` bigint(20) NOT NULL COMMENT "id da ligacao",
`midia_id` int(11) NOT NULL COMMENT "id da midia",
`user_id` int(11) NOT NULL COMMENT "id do usuario",
`customer_id` int(11) NOT NULL COMMENT "id do cliente",
`tipo_id` int(11) NOT NULL COMMENT "id do tipo de ligacao",
`project_id` int(11) NOT NULL COMMENT "id do projeto",
`status_id` int(11) NOT NULL COMMENT "id do status da ligacao",
`nm_protocol` varchar(255) NOT NULL COMMENT "protocolo da ligacao",
`qtd_minutos_call` int(11) DEFAULT NULL COMMENT "tempo de duracao da ligacao em minutos",
`created_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT "data de criacao do registro",
`updated_at` timestamp NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT "data de atualizacao do registro"
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT="Tabela de ligações ,receptivas e não receptivas.";CREATE TABLE `tb_call_status` (
`id` int(11) NOT NULL COMMENT "id status ligacao",
`nm_status` varchar(50) NOT NULL COMMENT "nome do status",
`st_status` int(1) NOT NULL DEFAULT "1" COMMENT "situacao do status - 1 = ativo, 0 = inativo",
`created_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT "data de criacao",
`updated_at` timestamp NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT "data de atualizacao"
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT="Tabela de status do atendimento" ROW_FORMAT=DYNAMIC;INSERT INTO `tb_call_status` (`id`, `nm_status`, `st_status`, `created_at`, `updated_at`) VALUES
(1, "EM_ATENDIMENTO", 1, "2022-02-09 23:14:36", NULL),
(2, "FINALIZADO_SUCESSO", 1, "2022-02-09 23:14:36", NULL),
(3, "FINALIZADO_ERRO", 1, "2022-02-09 23:14:36", NULL),
(4, "ERRO", 1, "2022-02-09 23:14:36", NULL);CREATE TABLE `tb_call_tipo` (
`id` int(11) NOT NULL COMMENT "id do tipo da ligacao",
`nm_tipo` varchar(50) NOT NULL COMMENT "nome do tipo da ligacao",
`st_tipo` int(11) NOT NULL DEFAULT "1" COMMENT "status do tipo da ligacao. 1=ativo, 0=inativo",
`created_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT "data de criacao",
`updated_at` timestamp NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT "data de atualizacao"
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT="Tabela de tipos de atendimento";INSERT INTO `tb_call_tipo` (`id`, `nm_tipo`, `st_tipo`, `created_at`, `updated_at`) VALUES
(1, "ENTRADA", 1, "2022-02-09 23:15:34", NULL),
(2, "SAIDA", 1, "2022-02-09 23:15:34", NULL),
(3, "DESCONHECIDO(A)", 1, "2022-02-10 02:51:02", NULL);CREATE TABLE `tb_customer` (
`id` int(11) NOT NULL COMMENT "id do cliente",
`nm_customer` varchar(255) NOT NULL COMMENT "nome do cliente",
`nm_sexo` varchar(1) DEFAULT NULL COMMENT "genero do cliente. M, F ou I",
`created_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT "data de criacao",
`updated_at` timestamp NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT "data de atualizacao"
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT="Tabela de clientes";INSERT INTO `tb_customer` (`id`, `nm_customer`, `nm_sexo`, `created_at`, `updated_at`) VALUES
(1, "FULANO DA SILVA", "M", "2022-02-09 23:17:21", NULL),
(2, "FULANA DA SILVA", "F", "2022-02-09 23:17:21", NULL),
(3, "FULAN DA SILVA", "I", "2022-02-09 23:17:21", NULL);CREATE TABLE `tb_midia` (
`id` int(11) NOT NULL COMMENT "id da midia",
`nm_midia` varchar(50) NOT NULL COMMENT "tipo da midia (nome)",
`nm_campanha` varchar(50) NOT NULL COMMENT "nome da campanha",
`nm_fonte` varchar(50) NOT NULL COMMENT "nome da fonte",
`nm_pagina` varchar(50) NOT NULL COMMENT "nome da pagina",
`created_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT "data de criacao",
`updated_at` timestamp NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT "data de atualizacao"
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT="Tabela de mídias";INSERT INTO `tb_midia` (`id`, `nm_midia`, `nm_campanha`, `nm_fonte`, `nm_pagina`, `created_at`, `updated_at`) VALUES
(1, "WEB", "Promoção - Planos a partir de $0,99 por mês.", "MKT", "www.emergeit.com.br", "2022-02-09 23:20:22", NULL),
(2, "PORTAL SAUDE", "Campanha do Agasalho", "IE", "www.emergeit.com.br", "2022-02-09 23:20:22", NULL);CREATE TABLE `tb_project` (
`id` int(11) NOT NULL COMMENT "id do projeto",
`nm_project` varchar(50) NOT NULL COMMENT "nome do projeto",
`st_project` int(11) NOT NULL DEFAULT "1" COMMENT "status do projeto. 1=ativo, 0=inativo",
`created_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT "data de criacao",
`updated_at` timestamp NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT "data de atualizacao"
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT="Tabela de tipos de atendimento" ROW_FORMAT=DYNAMIC;INSERT INTO `tb_project` (`id`, `nm_project`, `st_project`, `created_at`, `updated_at`) VALUES
(1, "ATENDIMENTO_HOSP_A", 1, "2022-02-09 23:21:48", NULL),
(2, "ATENDIMENTO_HOSP_B", 1, "2022-02-09 23:21:48", NULL),
(3, "ATENDIMENTO_CLINICA_A", 1, "2022-02-09 23:21:48", NULL),
(4, "ATENDIMENTO_CLINICA_B", 1, "2022-02-09 23:21:48", NULL);CREATE TABLE `tb_user` (
`id` int(11) NOT NULL COMMENT "id do usuario",
`nm_user` varchar(255) NOT NULL COMMENT "nome abreviado do usuario",
`nm_completo` varchar(255) NOT NULL COMMENT "nome completo do usuario",
`created_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT "data de criacao",
`updated_at` timestamp NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT "data de atualizacao"
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT="Tabela de clientes" ROW_FORMAT=DYNAMIC;INSERT INTO `tb_user` (`id`, `nm_user`, `nm_completo`, `created_at`, `updated_at`) VALUES
(1, "bruno.farias", "Bruno Cardoso Farias", "2022-02-09 23:22:21", NULL),
(2, "teste.silva", "Teste da Silva", "2022-02-09 23:59:18", NULL);DROP TABLE IF EXISTS `num_at_not_recept_monthly_by_midia`;CREATE ALGORITHM=UNDEFINED DEFINER=`root`@`%` SQL SECURITY DEFINER VIEW `num_at_not_recept_monthly_by_midia` AS SELECT date_format(`c`.`created_at`,"%Y-%m") AS `created_at`, `mid`.`nm_midia` AS `nm_midia`, count(0) AS `qtd_ligacoes` FROM ((`tb_call` `c` join `tb_midia` `mid` on((`c`.`midia_id` = `mid`.`id`))) join `tb_call_tipo` `ct` on((`c`.`tipo_id` = `ct`.`id`))) WHERE (`ct`.`nm_tipo` <> "ENTRADA") GROUP BY date_format(`c`.`created_at`,"%Y-%m"), `mid`.`nm_midia`;DROP TABLE IF EXISTS `num_at_recept_daily_by_status_by_media`;CREATE ALGORITHM=UNDEFINED DEFINER=`root`@`%` SQL SECURITY DEFINER VIEW `num_at_recept_daily_by_status_by_media` AS SELECT date_format(`c`.`created_at`,"%Y-%m-%d") AS `created_at`, `cs`.`nm_status` AS `nm_status`, `mid`.`nm_midia` AS `nm_midia`, count(0) AS `qtd_ligacoes` FROM (((`tb_call` `c` join `tb_call_status` `cs` on((`c`.`status_id` = `cs`.`id`))) join `tb_midia` `mid` on((`c`.`midia_id` = `mid`.`id`))) join `tb_call_tipo` `ct` on((`c`.`tipo_id` = `ct`.`id`))) WHERE (`ct`.`nm_tipo` = "ENTRADA") GROUP BY date_format(`c`.`created_at`,"%Y-%m-%d"), `cs`.`nm_status`, `mid`.`nm_midia`;ALTER TABLE `tb_call`
ADD PRIMARY KEY (`id`),
ADD KEY `fk_midia_id` (`midia_id`),
ADD KEY `fk_user_id` (`user_id`),
ADD KEY `fk_customer_id` (`customer_id`),
ADD KEY `fk_tipo_id` (`tipo_id`),
ADD KEY `fk_project_id` (`project_id`),
ADD KEY `fk_status_id` (`status_id`);ALTER TABLE `tb_call_status`
ADD PRIMARY KEY (`id`) USING BTREE;ALTER TABLE `tb_call_tipo`
ADD PRIMARY KEY (`id`);ALTER TABLE `tb_customer`
ADD PRIMARY KEY (`id`);ALTER TABLE `tb_midia`
ADD PRIMARY KEY (`id`);ALTER TABLE `tb_project`
ADD PRIMARY KEY (`id`) USING BTREE;ALTER TABLE `tb_user`
ADD PRIMARY KEY (`id`) USING BTREE;ALTER TABLE `tb_call`
MODIFY `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT "id da ligacao";ALTER TABLE `tb_call_status`
MODIFY `id` int(11) NOT NULL AUTO_INCREMENT COMMENT "id status ligacao", AUTO_INCREMENT=5;ALTER TABLE `tb_call_tipo`
MODIFY `id` int(11) NOT NULL AUTO_INCREMENT COMMENT "id do tipo da ligacao", AUTO_INCREMENT=4;ALTER TABLE `tb_customer`
MODIFY `id` int(11) NOT NULL AUTO_INCREMENT COMMENT "id do cliente", AUTO_INCREMENT=4;ALTER TABLE `tb_midia`
MODIFY `id` int(11) NOT NULL AUTO_INCREMENT COMMENT "id da midia", AUTO_INCREMENT=3;ALTER TABLE `tb_project`
MODIFY `id` int(11) NOT NULL AUTO_INCREMENT COMMENT "id do projeto", AUTO_INCREMENT=5;ALTER TABLE `tb_user`
MODIFY `id` int(11) NOT NULL AUTO_INCREMENT COMMENT "id do usuario", AUTO_INCREMENT=3;ALTER TABLE `tb_call`
ADD CONSTRAINT `fk_customer_id` FOREIGN KEY (`customer_id`) REFERENCES `tb_customer` (`id`) ON DELETE NO ACTION ON UPDATE NO ACTION,
ADD CONSTRAINT `fk_midia_id` FOREIGN KEY (`midia_id`) REFERENCES `tb_midia` (`id`) ON DELETE NO ACTION ON UPDATE NO ACTION,
ADD CONSTRAINT `fk_project_id` FOREIGN KEY (`project_id`) REFERENCES `tb_project` (`id`) ON DELETE NO ACTION ON UPDATE NO ACTION,
ADD CONSTRAINT `fk_status_id` FOREIGN KEY (`status_id`) REFERENCES `tb_call_status` (`id`) ON DELETE NO ACTION ON UPDATE NO ACTION,
ADD CONSTRAINT `fk_tipo_id` FOREIGN KEY (`tipo_id`) REFERENCES `tb_call_tipo` (`id`) ON DELETE NO ACTION ON UPDATE NO ACTION,
ADD CONSTRAINT `fk_user_id` FOREIGN KEY (`user_id`) REFERENCES `tb_user` (`id`) ON DELETE NO ACTION ON UPDATE NO ACTION;
COMMIT;' > /data/mysql/db_at.sql
Now we could input this data with PHPMyAdmin:
# Carregar SQL do container Docker #
docker exec -i banco_mysql mysql -u root -p123123 brunow < /data/mysql/db_at.sql
Lets check if all things are fine:
3. Making a data generator of attendances with Faker lib and Python
In this section we will work in the script which will generate all the attendances data:
# Criar o gerador de dados com Python #
mkdir -p /root/code
mkdir -p /root/data/raw/calls
touch /root/code/fake-call-generator.py
chmod 0777 /root/code/fake-call-generator.py
echo 'import os
import random
import time
import uuid
from time import sleep
import json
from faker import Faker
from datetime import datetime
import logging as logger# Inicializando ..
logger.basicConfig(
format="%(asctime)s (%(levelname)s) %(message)s",
datefmt="[%Y-%m-%d %H:%M:%S]",
level=logger.DEBUG
)
logger.info("Iniciando o processo de EXTRACT - Lendo arquivos JSON")
logger.info("Iniciando a inserção de dados fakes...")## DEFININDO ARQUIVO
fileName = "/root/data/raw/calls/calls-"+datetime.today().strftime("%Y-%m-%d")+".json"## GERANDO DADOS
faker = Faker(["pt_BR"])
quantidade = 0
while quantidade < 30000 :with open(fileName, "a") as file:
data = json.loads("{}")
#data["id"] = random.getrandbits(32)
data["tipo_midia"] = random.choice(["WEB", "PORTAL SAUDE"])
data["user_id"] = random.choice(["1", "2"])
data["customer_id"] = random.choice(["1", "2", "3"])
data["call_type"] = random.choice(random.choices(["INBOUND", "OUTBOUND", "UNKNOWN"], weights=(80, 19.9, 0.1), k=1))
data["project_name"] = random.choice(["ATENDIMENTO_HOSP_A", "ATENDIMENTO_HOSP_B", "ATENDIMENTO_CLINICA_A", "ATENDIMENTO_CLINICA_B"])
data["status_atendimento"] = random.choice(random.choices(["EM_ATENDIMENTO", "FINALIZADO_SUCESSO", "FINALIZADO_ERRO", "ERRO"], weights=(1, 98.8, 0.1, 0.1), k=1))
data["protocolo"] = str(uuid.uuid4())
data["segundos_ligacao"] = random.randint(60, 10800) ## de 1 min até 3 horas...
data["created_at"] = str(datetime.now())
#data["created_at"] = str(faker.date_time_between(start_date="-1y", end_date="now"))
data["updated_at"] = Nonequantidade = quantidade + 1
logger.info("[Registro: "+str(quantidade)+"]")try:logger.info(("---" * 20))
#print("ID: " + str("Gerado automaticamente..."))
logger.info("Midia: " + str(data["tipo_midia"]))
logger.info("ID Usuario: " + str(data["user_id"]))
logger.info("ID Cliente: " + str(data["customer_id"]))
logger.info("Tipo At: " + str(data["call_type"]))
logger.info("Projeto: " + str(data["project_name"]))
logger.info("Status: " + str(data["status_atendimento"]))
logger.info("Protocolo: " + str(data["protocolo"]))
logger.info("Qtd Sec At.: " + str(data["segundos_ligacao"]))
logger.info("Criado em: " + str(data["created_at"]))
logger.info("Atualiz. : " + str(data["updated_at"]))
logger.info(("---" * 20))# escrever em um arquivo.
json.dump(data, file)
file.write("\n")except (Exception) as error:
logger.error(error)' > /root/code/fake-call-generator.py
This script will generate 30.000 lines of JSON attendance data. Will be placed at /root/data/raw/calls by DAY. Lets execute it and check what happens:
# Rodar o script criado anteriormente #
python3 /root/code/fake-call-generator.py
Now, lets check what is inside of my folder:
4. Creating and running the Python ETL script
In this section we will create the ETL script in Python. This will be necessary for read the data.
# Cria o script ETL Python #
mkdir -p /root/etl
touch /root/etl/calls-etl.py
chmod 0777 /root/etl/calls-etl.py
echo 'import os
from time import time
import json
from datetime import datetime
import mysql.connector
import logging as logger# INICIO DO SCRIPT
logger.basicConfig(
format="%(asctime)s (%(levelname)s) %(message)s",
datefmt="[%Y-%m-%d %H:%M:%S]",
level=logger.DEBUG
)
logger.info("Iniciando o processo de EXTRACT - Lendo arquivos JSON")## JSON LOAD - EXTRACT
jsonFileToExtract = "/root/data/raw/calls/calls-"+datetime.today().strftime("%Y-%m-%d")+".json"
logger.info("Arquivo: "+jsonFileToExtract+"...")finalData = []
with open(jsonFileToExtract) as f:
jsonData = [json.loads(line) for line in f]
for line in jsonData:## TRANSFORMATION
tData = {};
tData["midia_id"] = 1 if line["tipo_midia"] == "WEB" else 2
tData["user_id"] = int(line["user_id"])
tData["customer_id"] = int(line["customer_id"])if (line["call_type"] == "INBOUND"):
tData["tipo_id"] = 1
elif (line["call_type"] == "OUTBOUND"):
tData["tipo_id"] = 2
else:
tData["tipo_id"] = 3if (line["project_name"] == "ATENDIMENTO_HOSP_A"):
tData["project_id"] = 1
elif (line["project_name"] == "ATENDIMENTO_HOSP_B"):
tData["project_id"] = 2
elif (line["project_name"] == "ATENDIMENTO_CLINICA_A"):
tData["project_id"] = 2
else:
tData["project_id"] = 4if (line["status_atendimento"] == "EM_ATENDIMENTO"):
tData["status_id"] = 1
line["segundos_ligacao"] = None
elif (line["status_atendimento"] == "FINALIZADO_SUCESSO"):
tData["status_id"] = 2
elif (line["status_atendimento"] == "FINALIZADO_ERRO"):
tData["status_id"] = 2
else:
tData["status_id"] = 4
line["segundos_ligacao"] = NonetData["nm_protocol"] = str(line["protocolo"])
tData["qtd_minutos_call"] = int(round(line["segundos_ligacao"]) / 60) if line["segundos_ligacao"] != None else None # minutos
tData["created_at"] = line["created_at"]finalData.append(tuple(tData.values()))## MYSQL CONNECTION - LOAD TO MYSQL
conn = mysql.connector.connect(user="root", password="123123",
host="localhost",
database="brunow")
conn.autocommit = False
cursor = conn.cursor()
query = """
INSERT INTO tb_call (
midia_id,
user_id,
customer_id,
tipo_id,
project_id,
status_id,
nm_protocol,
qtd_minutos_call,
created_at
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
"""try:
cursor.executemany(query, finalData)
conn.commit()# MSG Sucesso
logger.info(str(cursor.rowcount)+" registros inseridos...")
except Exception as e:
# MSG Erro
logger.error(e)
finally:
if conn.is_connected():
cursor.close()
conn.close()
logger.debug("Removendo instancia da Database... OK")
' > /root/etl/calls-etl.py
This script have a section who transforms the data. Example: In the JSON data, the field called ‘segundos_ligacao’ means seconds and the table needs minutes. So we had to get this triggers and apply it in transformation section. Also some fields must be in the correct type, such as Integer, String and Date.
However, lets run the script and obtain information of the output.
We can now check the data in PHPMyAdmin on port 8081:
We could check the two main Views, created for fast data visualization
5. Finally, configuration for daily scheduled ETL script
We are in the end of this post. Now, all we are learning in this sections will be automatized via cronjob, a simple Linux native scheduler. This is not the only option. We could use Apache Airflow for orchestrate this pipeline or something like it. Now we choose Linux cronjob for less effort, more efficiency and with more productivity as possible.
# Criar cronjob para que scripts rodem automaticamente #
chmod -R 0777 /root/*
echo '30 22 * * 1-5 python3 /root/code/fake-call-generator.py >> /var/log/fake-call-generator.log' >> /etc/crontabs/root
echo '59 23 * * 1-5 python3 /root/etl/calls-etl.py >> /var/log/calls-etl.log' >> /etc/crontabs/roottouch /var/log/fake-call-generator.log
touch /var/log/calls-etl.logcat /etc/crontabs/root
About the configs of scheduler:
- The first script allows to run the data generator at 10:30pm (22:30) every work day (mon-fri). This will do the action of create de JSON documents and populate it with fake data;
- Now, the second one will realize the ETL mechanism, doing the extraction of JSON file previous related, then do some data transformations and curations and finally loading it into MySQL db. It runs at every work day (mon-fri) at 11:59pm (23:59).
We could also check some logs in /var/log with the script file name.
Conclusion
Now we have all concluded here. The data generator script works before the ETL script, granting stability on all requests to send to database with Linux cronjob. This pipeline was built with so much love and productivity. We could work on scripts for make it more flexible and get better results on ETL. Thanks for reading.
Docs:
If I could answer any question… reply it! Thank you!
PS: This environment is completely for test use case or proof of concepts, do not connect it to internet because its lack of security. You can find more about security on application docs.