Building Data Pipeline with Python libraries (part I)

Anton Salyaev
4 min readJan 5, 2023

--

Photo of Football field by Tembela Bohle from Pexels
Photo by Tembela Bohle from Pexels

Idea: Write A Data Pipeline using Kafka, PostgreSQL & Python libraries such as Flask, Faust, Asyncio, Psycopg2, Kafka-python.

Prerequisites

Please install below components as per your OS:

  • Docker.io
  • Python ≥ 3.8
  • Provisioning : RAM from 8 Gb, CPU from 2, HDD from 20 Gb

Use Case

The use case is to build a data pipeline in Python that extracts and transforms the football match event data contained as JSON files. The raw layer should be transformed into a set of tables that are representative of a relational database. As the source of data samples I will get the data from https://developer.sportradar.com. Following is our architecture that accomplishes that:

Image by Author

All project assembled on GitHub repository, just feel free to clone it: https://github.com/BermanDS/football_data_pipeline_v1.git

Let’s print the tree of our project:

football_data_pipeline_v1
├── .env.example
├── app__db_postgresql
│ ├── docker-compose.yml
│ └── init_conf.sh
├── app__faust_kafka_to_db
│ ├── __init__.py
│ ├── log
│ ├── main.py
│ ├── requirements.txt
│ ├── src
│ │ ├── football_events.py
│ │ ├── settings.py
│ │ └── utils.py
│ ├── test
│ │ ├── __init__.py
│ │ └── test_football_events.py
│ └── update_maps.py
├── app__kafka_broker
│ └── docker-compose.yml
├── app__streaming_api_to_kafka
│ ├── __main__.py
│ ├── requirements.txt
│ └── src
│ ├── settings.py
│ ├── streamapi.py
│ └── utils.py
├── data
│ ├── log
│ └── tmp
│ ├── event_match_21514521.json
│ ├── event_match_27751052.json
│ ├── event_match_27751054.json
│ ├── event_match_27751056.json
│ ├── event_match_27751060.json
│ ├── event_match_27751062.json
│ ├── event_match_27751064.json
│ ├── event_match_27751066.json
│ ├── event_match_27751068.json
│ ├── event_match_27751070.json
│ ├── event_match_27751074.json
│ ├── event_match_27751076.json
│ ├── event_match_27751078.json
│ ├── event_match_27751080.json
│ └── event_match_27751082.json
├── docker-compose_stream.yml
├── README.md

At current part I will cover the processing streaming API and publishing to Kafka.

Data Source

I’ll define the source of data as streaming API, by using for this application (Flask API) which will be generate data from mounted directory with samples of JSON files as streaming service.

Let’s use the file docker-compose_stream.yml with next code:

in working directory define .env file with necessary credentials, ports and directories for mounting. SAMPLE__DATAthe volume with data samples (in example data/tmp), PORT — port of our streaming service on host (in example 5556).

Try next command in terminal:

_node: ~/work/football_data_pipeline_v1$ docker-compose -f docker-compose_stream.yml up --build -d

After that we can check in browser: http://localhost:5556/streaming_example

Parsing streaming API and publishing to Kafka

This is a simple application which try to get streaming API and parsing data from input JSON in area key equal ‘data’ and without processing publish it to Kafka using connector on Python.

Let’s run the Kafka instance

First of all, please paste in .env from .env.example also in working directory of project the critical variables — credentials and paths. KAFKA__HOST, KAFKA__PORT, ZOOKEPER__PORT, KAFKA__DATA, KAFKA__CONFIGS. The necessary topic (KAFKA__TOPIC) will be created during startup the Kafka instance. Make sure about existence of directories (volumes) and permission for them using. The images for Kafka I will use from https://hub.docker.com/r/confluentinc.

The using Kafka in this case will give us next necessary features such as fault-tolerant, durability and scalability. We can restore, rollback data transformation and make dedicated consumer for storing raw layer of data.

Just try next command in terminal:

_node: ~/work/football_data_pipeline_v1$ docker-compose -f app__kafka_broker/docker-compose.yml up --build -d

Run Python application as parser API and Kafka producer

So, at the end of current part let’s study the application of parsing data from API and publishing to Kafka.

I have next structure:

app__streaming_api_to_kafka
├── __main__.py
├── requirements.txt
└── src
├── settings.py
├── streamapi.py
└── utils.py

First of all, in separate terminal design new Python environment and install requirements.txt. If .env is correctthe settings.py give us necessary credentials and paths. In utils.py I defined necessary procedures for facilitating our main processes such as Kafka connector by using library Kafka-python.

The main part in this application is object which allow process streaming API and publish to Kafka by using library Asyncio. Python’s asyncio is a co-routine-based concurrency model that provides elegant constructs to write concurrent python code without using threads.

For launching application I need use __main__.py

Type next command in terminal:

_node: ~/work/football_data_pipeline_v1$ python app__streaming_api_to_kafka

For Checking publishing data in Kafka I will use instance of Kafka connector and method reading_que()

Conclusion

At the next part I’ll show the components which process Kafka and store data to DB.

--

--