Tutorial: Building a Parking Data Pipeline with Stad Gent Open Data API, PostgreSQL, and Metabase
Follow me on LinkedIn if you like what you read here.
In this tutorial, I’ll walk you through how I used the Stad Gent Open Data API to fetch parking garage data, store it in a PostgreSQL database, and visualize it using Metabase.
Take a look at the result: parkinggent.xudo.be.
“The system is designed to run on Hetzner Cloud servers with Ubuntu and includes automated data aggregation and cleanup to efficiently manage database storage. I wanted to explore tools that I don’t often encounter with my current clients, who primarily use QlikView and Qlik Sense. That’s why I built this small hobby project — to stay sharp and keep up with the fast-evolving landscape of data tools.
My name is Wouter Trappers, and I’m a freelance business intelligence expert at Xudo. Building data pipelines and dashboards is my day job, but even so, I ran into challenges with some of the steps outlined here. Keep in mind that these steps are the result of trial and error, and not everything goes as smoothly as a tutorial might suggest. If something doesn’t work right away, don’t get discouraged — keep tinkering until it does!”
Overview of the System
- Data Collection: Python scripts fetch parking data from the Stad Gent API every 5 minutes and store it in a PostgreSQL database.
- Data Storage: PostgreSQL tables are created to store raw and aggregated data.
- Data Aggregation: A script aggregates data hourly and deletes detailed data older than 2 weeks.
- Visualization: Metabase is installed on a separate server, configured to run over HTTPS, and used to create dashboards for real-time and historical parking data.
Step 1: Setting Up the Hetzner Cloud Servers
Create Two Servers in the Hetzner cloud console.
- Server 1: For PostgreSQL and Python scripts.
- Server 2: For Metabase.
- Use Ubuntu 22.04 LTS for both servers.
I use the smallest servers for this of type CX22. They have 2 CPU’s, 4GB RAM and and 40GB SSD for 3,29€ per month. I also take regular snapshots of my servers in Hetzner to make sure I have a recent version to restore when something goes wrong — which it did trying to install HTTPS on the Metabase server.
Why two servers, and not just one?
Using two servers instead of one is a design choice that brings several benefits, particularly in terms of scalability, security, and performance. For the sake of brevity, I’ll leave it at that for now, but if you’re interested in diving deeper into the reasons for using two servers — rather than just one, which is technically feasible — you can read more on Xudo’s website.
Step 2: Install and Configure PostgreSQL
- Install PostgreSQL on the Ubuntu server.
Ssh into the server from your command line with your user and password and excute the following commands:
sudo apt update
sudo apt install postgresql
2. Create a Database and User:
sudo -u postgres psql
CREATE DATABASE parking_data;
CREATE USER parking_user WITH PASSWORD 'your_password';
GRANT ALL PRIVILEGES ON DATABASE parking_data TO parking_user;
3. Install pgAdmin4
You can create tables from from the command line as well, but we installed pgAdmin4 to manage our databases more conveniently:
sudo curl -fsSL https://www.pgadmin.org/static/packages_pgadmin_org.pub | sudo gpg --dearmor -o /usr/share/keyrings/pgadmin-keyring.gpg
sudo sh -c 'echo "deb [signed-by=/usr/share/keyrings/pgadmin-keyring.gpg] https://ftp.postgresql.org/pub/pgadmin/pgadmin4/apt/$(lsb_release -cs) pgadmin4 main" > /etc/apt/sources.list.d/pgadmin4.list'
sudo apt update
sudo apt install pgadmin4-web
sudo apt install apache2
sudo a2enconf pgadmin4
4. Create Tables
Create the following tables using pgAdmin4.
Raw Data Table: parking_data
CREATE TABLE IF NOT EXISTS parking_data.parking_data (
id text COLLATE pg_catalog."default",
name text COLLATE pg_catalog."default" NOT NULL,
last_update timestamp with time zone NOT NULL,
total_capacity integer NOT NULL,
available_capacity integer NOT NULL,
occupation integer NOT NULL,
type text COLLATE pg_catalog."default" NOT NULL,
description text COLLATE pg_catalog."default",
external_id text COLLATE pg_catalog."default",
opening_times_description text COLLATE pg_catalog."default",
is_open_now integer NOT NULL,
temporarily_closed integer NOT NULL,
operator_information text COLLATE pg_catalog."default",
free_parking integer NOT NULL,
url_link_address text COLLATE pg_catalog."default",
occupancy_trend text COLLATE pg_catalog."default",
specific_access_information text COLLATE pg_catalog."default",
level text COLLATE pg_catalog."default",
road_number text COLLATE pg_catalog."default",
road_name text COLLATE pg_catalog."default",
latitude double precision NOT NULL,
longitude double precision NOT NULL,
text text COLLATE pg_catalog."default",
category text COLLATE pg_catalog."default",
dashboard text COLLATE pg_catalog."default" NOT NULL,
last_modified timestamp with time zone,
CONSTRAINT parkinggent_external_id_key UNIQUE (external_id) )
Aggregated Data Table: parking_hourly
CREATE TABLE IF NOT EXISTS parking_data.parking_hourly (
name text COLLATE pg_catalog."default" NOT NULL,
is_open_now integer,
temporarily_closed integer,
last_modified timestamp without time zone,
average_total_capacity double precision,
average_available_capacity double precision,
capacity_trend integer,
last_update timestamp with time zone )
Step 3: Writing the python scripts
1. Fetch Data from Stad Gent API
Install Required Python Libraries on the database server
pip install requests psycopg2-binary
2. Python Script to Fetch and Store Data:
I added extensive logging to the script to identify some of the fields that weren’t being fetched correctly. I decided to put the fields I could not easily get to work in comment, since I didn’t really need them in the context of the dashboard I wanted to build.
The data in the api is refreshed every 5 minutes. To make sure I get the latest updates I use a cron job to run this script every 5 minutes.
import requests
import psycopg2
import logging
from psycopg2 import sql
#Configure logging
logging.basicConfig(
filename="parking_data_insertion.log",
level=logging.DEBUG,
format="%(asctime)s - %(levelname)s - %(message)s"
)
# API URL
API_URL = "<https://data.stad.gent/api/explore/v2.1/catalog/datasets/bezetting-parkeergarages-real-time/records?limit=100>"# Database connection parameters
DB_CONFIG = {
"dbname": "parking_data",
"user": "parking_user",
"password": "your_password",
"host": "127.0.0.1",
"port": 5432
}
def fetch_and_insert_data():
try:
logging.info("Fetching data from API.")
response = requests.get(API_URL)
response.raise_for_status()
data = response.json()
# Log raw response for debugging
# logging.debug(f"Raw API response: {data}")
# Check if 'results' is present and log its contents
results = data.get("results", [])
if not results:
logging.warning("No results found in the API response.")
else:
logging.info(f"Fetched {len(results)} records from API.")
# Insert records into database
insert_data_to_db(results)
except requests.RequestException as e:
logging.error(f"Error fetching data from API: {e}")
except Exception as e:
logging.error(f"Unexpected error while fetching data: {e}")
def insert_data_to_db(records):
logging.info("Starting the database insertion process.")
connection = None
try:
connection = psycopg2.connect(**DB_CONFIG)
cursor = connection.cursor()
logging.info("Successfully connected to the database.")
insert_query = """
INSERT INTO parking_data.parking_data(
id, name, last_update, total_capacity, available_capacity, occupation, type,
description, opening_times_description, is_open_now, temporarily_closed, url_link_address,
operator_information, free_parking, occupancy_trend,latitude, longitude,
category, dashboard
) VALUES (
%(id)s, %(name)s, %(lastupdate)s, %(totalcapacity)s, %(availablecapacity)s, %(occupation)s, %(type)s,
%(description)s, %(openingtimesdescription)s, %(isopennow)s, %(temporaryclosed)s,%(urllinkaddress)s,
%(operatorinformation)s, %(freeparking)s, %(occupancytrend)s,%(latitude)s, %(longitude)s,
%(categorie)s, %(dashboard)s
)
"""
logging.debug("SQL INSERT query prepared.")
for i, record in enumerate(records, start=1):
# Extract data directly from the record dictionary
name = record.get("name")
lastupdate = record.get("lastupdate")
totalcapacity = record.get("totalcapacity")
availablecapacity = record.get("availablecapacity")
occupation = record.get("occupation")
type = record.get("type")
description = record.get("description")
id = record.get("id")
openingtimesdescription = record.get("openingtimesdescription")
isopennow = record.get("isopennow")
temporaryclosed = record.get("temporaryclosed")
operatorinformation = record.get("operatorinformation")
freeparking = record.get("freeparking")
urllinkaddress = record.get("urllinkaddress")
occupancytrend = record.get("occupancytrend")
# Extract nested data (if applicable)
# specific_access_info = record.get("locationanddimension", {}).get("specificAccessInformation", [""])[0]
# level = record.get("level")
# roadnumber = record.get("roadnumber")
# roadname = record.get("roadname")
# Extract nested data (if applicable)
latitude = record.get("location", {}).get("lat")
longitude = record.get("location", {}).get("lon")
# text = record.get("text")
categorie = record.get("categorie")
dashboard = record.get("dashboard")
# Check for missing essential data
if not name or not totalcapacity or not latitude or not longitude:
logging.warning(f"Record {i} skipped: Missing essential data.")
continue
data = {
"name": name,
"lastupdate": lastupdate,
"totalcapacity": totalcapacity,
"availablecapacity": availablecapacity,
"occupation": occupation,
"type" : type,
"description" : description,
"id" : id,
"openingtimesdescription": openingtimesdescription,
"isopennow" : isopennow,
"temporaryclosed": temporaryclosed,
"operatorinformation" : operatorinformation,
"freeparking" : freeparking,
"urllinkaddress" : urllinkaddress,
"occupancytrend" : occupancytrend,
# "specificAccessInformation": specific_access_info,
# "level" : level,
# "roadnumber" :roadnumber,
# "roadname" : roadname,
"latitude" : latitude,
"longitude" : longitude,
# "text" : text,
"categorie" : categorie,
"dashboard" : dashboard
}
# Check for missing keys before inserting
missing_keys = [key for key in data if data[key] is None]
if missing_keys:
logging.warning(f"Record {i} skipped due to missing keys: {', '.join(missing_keys)}")
continue
try:
cursor.execute(insert_query, data)
logging.info(f"Record {i} inserted successfully.")
except Exception as e:
logging.error(f"Error inserting record {i}: {e}")
logging.error(f"Data for record {i}: {data}") # Log the data for inspection
continue
connection.commit()
logging.info("All records successfully committed to the database.")
except psycopg2.Error as e:
logging.error(f"Database error: {e}")
except Exception as e:
logging.error(f"Unexpected error: {e}")
finally:
if connection:
cursor.close()
connection.close()
logging.info("Database connection closed.")
# Call the function to fetch data and insert into the database
fetch_and_insert_data()
Room for improvement
One improvement would be to figure out why some of the fields are not loading correcty. Another one has to do with the way I schedule this job to run. Writing this tutorial I found out I might have also used the python library Schedule and run the script as a service. This is certainly something I will be looking at next time I build a hobby project like this.
3. Aggregation Script
To make it possible to analyse trends in the past, without having to keep all the data that is created every five minutes, I aggregated the data per hour. I added a trend as well: are people arriving or are people leaving?
import psycopg2
import logging
def insert_parking_data():
"""
Establishes a connection to the database and inserts the calculated parking data.
"""
# Configure logging
logging.basicConfig(filename='parking_data_hourly_aggregation.log', level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
# Database connection parameters
DB_CONFIG = {
"dbname": "parking_data",
"user": "parking_user",
"password": "your_password",
"host": "127.0.0.1",
"port": 5432
}
sql = """
WITH sq_first AS (
SELECT DISTINCT ON (name)
name,
FIRST_VALUE(available_capacity) OVER (PARTITION BY name ORDER BY ABS(EXTRACT(EPOCH FROM last_modified - (NOW() - INTERVAL '1 hour'))) ASC) AS first_available_capacity
FROM
parking_data.parking_data
WHERE
last_modified <= NOW() - INTERVAL '1 hour'
),
sq_last AS (
SELECT DISTINCT ON (name)
name,
FIRST_VALUE(available_capacity) OVER (PARTITION BY name ORDER BY last_modified DESC) AS last_available_capacity
FROM
parking_data.parking_data
WHERE
last_modified >= NOW() - INTERVAL '1 hour'
)
INSERT INTO parking_data.parking_hourly_aggregation (name, is_open_now, temporarily_closed, last_update, last_modified, average_total_capacity, average_available_capacity, capacity_trend)
SELECT
parking_data.name,
parking_data.is_open_now,
parking_data.temporarily_closed,
MAX(parking_data.last_update) AT TIME ZONE 'Europe/Brussels' AS last_update,
MAX(parking_data.last_modified) AT TIME ZONE 'Europe/Brussels' AS last_modified,
ROUND(AVG(parking_data.total_capacity), 0) AS average_total_capacity,
ROUND(AVG(parking_data.available_capacity), 0) AS average_available_capacity,
sq_last.last_available_capacity - sq_first.first_available_capacity AS capacity_trend
FROM
parking_data.parking_data
LEFT JOIN sq_first ON parking_data.name = sq_first.name
LEFT JOIN sq_last ON parking_data.name = sq_last.name
WHERE
parking_data.last_modified >= NOW() - INTERVAL '1 hour' -- Filter for data from the last hour
GROUP BY
parking_data.name,
parking_data.is_open_now,
parking_data.temporarily_closed,
sq_first.first_available_capacity,
sq_last.last_available_capacity;
"""
connection = None # Define it outside try-except
try:
connection = psycopg2.connect(**DB_CONFIG)
cursor = connection.cursor()
logging.info("Successfully connected to the database.")
# Execute the INSERT query
cursor.execute(sql)
connection.commit() # Commit the transaction
logging.info("Data inserted successfully.")
except (Exception, psycopg2.Error) as error:
logging.error("Error while inserting data", exc_info=error)
finally:
if connection:
cursor.close()
connection.close()
logging.info("Database connection closed.")
if __name__ == "__main__":
insert_parking_data()
4. Delete script
To make sure the server doesn’t fill up too quickly, I only keep the full detail of the data of the last 48 hours.
import psycopg2
import logging
def delete_old_parking_data():
"""
Establishes a connection to the database and deletes records older than 48 hours.
"""
# Configure logging
logging.basicConfig(filename='parking_data_delete_old_data.log', level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
# Database connection parameters
DB_CONFIG = {
"dbname": "ParkingGent",
"user": "postgres",
"password": "rypkGWyh1r",
"host": "127.0.0.1",
"port": 5432
}
delete_sql = """
DELETE FROM parkinggent.parkinggent
WHERE last_modified < NOW() - INTERVAL '48 hours';
"""
connection = None # Define it outside try-except
try:
connection = psycopg2.connect(**DB_CONFIG)
cursor = connection.cursor()
logging.info("Successfully connected to the database.")
# Execute the DELETE query
cursor.execute(delete_sql)
connection.commit() # Commit the transaction
logging.info("Old data deleted successfully.")
except (Exception, psycopg2.Error) as error:
logging.error("Error while deleting data", exc_info=error)
finally:
if connection:
cursor.close()
connection.close()
logging.info("Database connection closed.")
if __name__ == "__main__":
delete_old_parking_data()
Step 4: Schedule the Scripts using cron jobs
Use cron
to run this scripts every five minutes, every hour and every day.
crontab -e
Step 5: Configure the Postgres database to allow connections from the Metabase server
Add the IP-address 11.22.33.44/32 of the Metabase server to the pg_hba.conf file on the database server to allow for access to the data. Restart the posgres service for this change to take effect.
Step 6: Install and Configure Metabase
- Install Metabase:
sudo apt install openjdk-11-jre
wget <https://downloads.metabase.com/v0.46.3/metabase.jar>
2. Configure HTTPS:
Obtain an SSL certificate (e.g., using Let’s Encrypt). In our case we work with a java Jetty webserver, and not with ngnix or apache, this means we use the — standalone parameter to create the certificate.
sudo apt update
sudo apt install certbot
sudo certbot certonly --standalone -d yourdomain.com -d www.yourdomain.com
sudo certbot --nginx -d yourdomain.com -d www.yourdomain.com
3. Convert the certificate to a Java KeyStore (JKS):
openssl pkcs12 -export \\
-in /etc/letsencrypt/live/yourdomain/fullchain.pem \\
-inkey /etc/letsencrypt/live/yourdomain/privkey.pem \\
-out metabase.p12 \\
-name metabase
keytool -importkeystore \\
-deststorepass your_keystore_password \\
-destkeypass your_keystore_password \\
-destkeystore /etc/letsencrypt/live/yourdomain/keystore.jks \\
-srckeystore metabase.p12 \\
-srcstoretype PKCS12 \\
-srcstorepass your_pkcs12_password \\
-alias metabase
4. Run Metabase as a Service:
Create a systemd service file in this location using the nano text editor:
sudo nano /etc/systemd/system/metabase.service
[Unit]
Description=Metabase
After=syslog.target
[Service]
ExecStart=/usr/bin/java -jar /path/to/metabase.jar
EnvironmentFile=/etc/default/metabase
User=metabase
WorkingDirectory=/path/to/metabase
Restart=on-failure
#environment
Environment="MB_JETTY_SSL=true"
Environment="MB_JETTY_SSL_PORT=8443"
Environment="MB_JETTY_SSL_KEYSTORE=/opt/metabase/keystore.jks"
Environment="MB_JETTY_SSL_KEYSTORE_PASSWORD=your_keystore_password"
[Install]
WantedBy=multi-user.target
After modifying the service file, you need to reload the systemd manager configuration to apply the changes. Then you can start and enable the service:
sudo systemctl daemon-reload
sudo systemctl start metabase
sudo systemctl enable metabase
Step 7: Build Dashboards in Metabase
- Connect Metabase to PostgreSQL:
2. Set the url to HTTPS:
3. Build the objects and visualizations you want to use in your dashboard.
Important to note here is that I prepared a couple of views in the Postgresql database to link perform certain calculations and integrations to avoid having to do them here. The Metabase dashboards are linked to the views, not to the tables.
For example the hourly trend:
Metabase allows you to put data on a map if you have the coordinates in your data, which we have:
Conclusion
Of course I skipped over some of the details here and there, but this should give you a solid understanding of what it takes to build your own Metabase dashboard connected to an open dataset API. Now, I have a fully automated system that fetches, stores, aggregates, and visualizes parking data from Stad Gent. So, when my friends ask me where to park their car before we head out, I simply send them this linkto my dashboard, and we’re good to go! I hope this inspires you to explore your own data — happy building!