Photo by Soumya Ranjan from Pexels

Idea: Write A Data Pipeline using Kafka, PostgreSQL & Python libraries such as Flask, Faust, Asyncio, Psycopg2, Kafka-python.

The first part available here.

Prerequisites

Please install below components as per your OS:

  • Docker.io
  • Python ≥ 3.8
  • Provisioning : RAM from 8 Gb, CPU from 2, HDD from 20 Gb

Use Case

The use case is to build a data pipeline in Python that extracts and transforms the football match event data contained as JSON files. The raw layer should be transformed into a set of tables that are representative of a relational database. As the source of data samples I will get the data from https://developer.sportradar.com.

All project assembled on GitHub repository, just feel free to clone it: https://github.com/BermanDS/football_data_pipeline_v1.git

Modeling data

We have complex JSON structure, which has the next keys: ‘generated_at’, ‘sport_event’, ‘sport_event_status’, ‘statistics’, ‘timeline’. For our case I’ll use only sport_event and timeline. The sample of structure:

{'generated_at': '2023-01-05T19:24:44+00:00',
'sport_event': {'id': 'sr:sport_event:27751090',
'start_time': '2021-08-22T13:00:00+00:00',
'start_time_confirmed': True,
'sport_event_context': {'sport': {'id': 'sr:sport:1', 'name': 'Soccer'},
'category': {'id': 'sr:category:1',
'name': 'England',
'country_code': 'ENG'},
'competition': {'id': 'sr:competition:17',
'name': 'Premier League',
'gender': 'men'},
'groups': [{'id': 'sr:league:56620', 'name': 'Premier League 21/22'}]},
'timeline': [{'id': 959188430,
'type': 'match_started',
'time': '2021-08-22T13:00:23+00:00'},
{'id': 959188432,
'type': 'period_start',
'time': '2021-08-22T13:00:23+00:00',
'period': 1,
'period_type': 'regular_period',
'period_name': 'regular_period'},
{'id': 959188892,
'type': 'throw_in',
'time': '2021-08-22T13:01:00+00:00',
'match_time': 1,
'match_clock': '0:37',
'competitor': 'home',
'x': 79,
'y': 95,
'period': 1,
'period_type': 'regular_period'},
{'id': 959189288,
'type': 'free_kick',
'time': '2021-08-22T13:01:31+00:00',
'match_time': 2,
'match_clock': '1:07',
'competitor': 'away',
'x': 74,
'y': 79,
'period': 1,
'period_type': 'regular_period'},
{'id': 959190116,
'type': 'goal_kick',
'time': '2021-08-22T13:02:40+00:00',
'match_time': 3,
'match_clock': '2:16',
'competitor': 'away',
'x': 95,
'y': 50,
'period': 1,
'period_type': 'regular_period'}}

We have next attributes within raw data : ‘id’, ‘type’, ‘time’, ‘period’, ‘period_name’, ‘match_time’, ‘match_clock’, ‘competitor’, ‘outcome’, ‘home_score’, ‘away_score’, ‘method’, ‘stoppage_time’, ‘stoppage_time_clock’, ‘description’, ‘injury_time_announced’, ‘break_name’, ‘event_id’, ‘start_time’, ‘competition_id’,’competition_name’, ’competition_gender’.

I’ll split the raw data to several fact tables, some dimensional tables and for data normalization — glossary tables. Next ERD (Entity-Relationship Diagram) can describe my structure:

Image by Author

The fact tables have the same primary key — ‘id,event_id’, these data I receive from area ‘timeline’. I’ll define next instances: [1] Action’s data (contains all actions within match), [2] Goals data (all actions and context of score changing), [3] Missed goals data (information about risk moments), [4] Information about breaks (all breaks during match). I believe that these instances are usefull for BI dashboards and fast analysis for any services, also ML.

The dimensional tables (competitions and competitions_matches) — from area ‘sport_event’.

Consuming Kafka and saving to DB

As DB instance I’ll use PostgreSQL with PGadmin tool.

Let’s run the PostgreSQL and PGAdmin instances

First of all, please paste in .env all required variables: PG__PORT , PG__HOST, PG__DATA (mounted directory for data), PG__LOGS (mounted directory for logs), PG__USER, PG__PASSW (master credentials), PG__USER_BOT, PG__PASSW_BOT (non superuser credentials), PG__DBNAME, PG__SCHEMA (requisites of data area for the project), PG__INIT_PATH (path to script init_conf.sh with initialization tables and relations for the project).

This is just a sample, not full script of initialization. Other necessary variables for tuning DB, which I can copy as default from .env.example : PG__MAX_CONNECTIONS, PG__SHARED_BUFFERS, PG__WORK_MEM PG__EFFECTIVE_CACHE_SIZE, PG__MAINTENANCE_WORK_MEM,
PG__TEMP_FILE_LIMIT
. And variables for PGAdmin: PGADMIN__EMAIL, PGADMIN__PASSWD, PGADMIN__SERVER_MODE, PGADMIN__DATA (mounted volume for PGAdmin instance), PGADMIN__PORT.
After that, define necessary directories for mounting with provisioning by permissions and check up docker-compose.yml file:

And try in terminal next command:

_node: ~/work/football_data_pipeline_v1$ docker-compose -f app__db_postgresql/docker-compose.yml up --build -d

For simple checking I try to visit through internet browser the page: http://<PG_ADMIN_HOST>:<PGADMIN__PORT>/browser/ and I see the starting pgAdmin tool. I need just define database instance once and I can see area of DB with next structure:

Image by Author

Run Python application FAUST as Kafka consumer

Faust is a stream processing library, implementing Kafka Streams to Python.

I use Faust in current project to consume messages from Kafka topic, then perform certain transformation and by using Psycopg2 (Python library for connecting to PostgreSQL instance) I store clean data to DB according their schema.

I have next structure of subproject:

app__faust_kafka_to_db
├── __init__.py
├── log
├── requirements.txt
├── src
│ ├── football_events.py
│ ├── metadata.py
│ ├── settings.py
│ └── utils.py
├── test
│ ├── __init__.py
| ├── pytest.ini
│ └── test_football_events.py
└── update_maps.py

First of all, in separate terminal design new Python environment and install requirements.txt. If .env is correctthe settings.py give us necessary credentials and paths. In utils.py I defined necessary procedures for facilitating our main processes such as PostgreSQL connector by using library Psycopg2.

The main part in this application is object which allow process raw data by transform them according to Data Model on several instances.

Before launching the application I’ll try the tests:

_node: ~/work/football_data_pipeline_v1$ pytest app__faust_kafka_to_db/test/test_football_events.py
pyTest passing

These test covered all important stages of data transformation including checks for data types of critical values.

After tests it’s necessary to update guides for main data instances such as football_events.map_period, football_events.map_action, football_events.map_competitor, football_events.map_breaks, football_events.map_method.

_node: ~/work/football_data_pipeline_v1$ python app__faust_kafka_to_db/update_maps.py

So, for now I’m ready for starting FAUST worker, which will apply transformation object (class FootballEvents) and DB connector (DBpostgreSQL) for implementation necessary logic.

Just try next command in terminal:

_node: ~/work/football_data_pipeline_v1$ faust -A app__faust_kafka_to_db worker -l info

I’ll go to pgAdmin and try to see next picture:

pgAdmin with results

Conclusion

This is just one of many variations of data pipeline with using Kafka, RDBMS and wild assortment of Python libraries.

TO DO

There is left uncovered component with storing raw layer of data to distributed storage.

All project assembled on GitHub repository, just feel free to clone it: https://github.com/BermanDS/football_data_pipeline_v1.git

--

--