Data Pipelines: Transitioning from Local Environments to the Cloud with GCP
In our project, we executed two sequential phases. Phase 1 focused on building a robust local data pipeline, while Phase 2 involved migrating that pipeline to the cloud. This order is crucial; debugging in the cloud can be significantly more challenging, so it’s essential that the code runs flawlessly in the local environment prior to deployment in the cloud environment.
Phase 1 : Local Pipeline
Data Sources:
- Cities Data: Obtained through web scraping from publicly accessible websites, hereby using Wikipedia.
- Weather Forecasts: Retrieved using the OpenWeather API.
- Flight Information: Retrieved using the AeroDataBox API.
Phase 2 : Cloud Pipeline
Cloud Migration:
Once the local pipeline is up and running, it will be migrated to the cloud using Google Cloud Platform (GCP).
GCP offers numerous advantages for data pipelines, including:
- Scalability: Easily scale resources up or down based on demand.
- Flexibility: Adapt to changing project requirements with a wide range of services.
- Automation: Streamline processes with automated tools and services.
- Maintenance: Benefit from managed services that reduce the overhead of infrastructure maintenance.
MySQL Cloud Database
Key Features:
- Deployment and Management: Easily deploy, manage, and scale MySQL databases.
- Automated Backups: Ensures data safety with automatic backup solutions.
- High Availability: Provides robust performance and reliability for applications.
- Seamless Scaling: Effortlessly scale resources based on application needs.
- Security: Includes built-in security features to protect data.
- Integration: Seamlessly integrates with other Google Cloud services.
Benefits:
- Allows developers to focus on application development without the complexities of database management.
Cloud Functions
Key Features:
- Event-Driven: Runs code in response to events without requiring infrastructure management.
- Automatic Scaling: Automatically scales based on demand to handle varying workloads.
- Lightweight Functions: Supports execution of single-purpose functions triggered by Google Cloud services, HTTP requests, or cloud events.
Benefits:
- Simplified Development: Enables quick deployment of functions, accelerating application development.
- Reduced Operational Overhead: Minimizes infrastructure management, allowing developers to focus on code.
- Seamless Integration: Easily integrates with other Google Cloud services and APIs for enhanced functionality.
Cloud Scheduler
Key Features:
- Task Automation: Automates the scheduling of tasks and jobs.
- Event Triggers: Executes HTTP-based workloads, Cloud Functions, and App Engine services.
- Flexible Scheduling: Supports cron job syntax for precise scheduling.
- Scalability: Automatically handles the execution of scheduled tasks based on demand.
Benefits:
- Simplifies the management of recurring tasks, enhancing operational efficiency.
- Reduces the need for manual intervention, allowing for more focus on application development.
Database Schema
The schema that is used for the data pipeline is made up of four tables — namely : cities, weather_forecasts, airports and flights.
Here are some tips for creating tables:
- Consistency in Naming: Use either plural or singular for table names.
- Comments on Columns: Decide whether you want to comment on table columns, as shown in the code snippets. This is optional.
- Efficient Use of Data Types: Use only the necessary size for
VARCHAR
fields. For example, the International Civil Aviation Organisation (ICAO) code requires only 4 characters by definition.
cities
-- Create the 'cities' table
CREATE TABLE cities (
city_id INT AUTO_INCREMENT, -- Automatically generated ID for each city
city VARCHAR(255) NOT NULL, -- Name of the city
country VARCHAR(255) NOT NULL, -- Name of the country
latitude FLOAT NOT NULL, -- Latitude of city
longitude FLOAT NOT NULL, -- Longitude of city
PRIMARY KEY (city_id) -- Primary key to uniquely identify each city
);
weather_forecasts
-- Create the 'weather_forecasts' table
CREATE TABLE weather_forecasts (
weather_id INT AUTO_INCREMENT, -- Automatically generated ID for each weather_forecast
city_id INT NOT NULL, -- ID of the city
forecast_time DATETIME, -- Forecast time
outlook VARCHAR(255), -- outlook weather
temperature FLOAT, -- temperature
rain_in_last_3h FLOAT, -- rain in last 3 hours
wind_speed FLOAT, -- wind speed
rain_prob FLOAT, -- rain probability
data_retrieved_at DATETIME, -- data retrieval at
PRIMARY KEY (weather_id), -- Primary key to uniquely identify each weather_forecast
FOREIGN KEY (city_id) REFERENCES cities(city_id) -- Foreign key to connect weather_forecasts with cities
);
airports
-- Create the 'airports' table
CREATE TABLE airports(
city_id INT NOT NULL,
icao VARCHAR(4),
municipality_name VARCHAR(255),
PRIMARY KEY (icao),
FOREIGN KEY (city_id) REFERENCES cities(city_id)
);
flights
-- Create the 'flights' table
CREATE TABLE flights(
flight_id INT AUTO_INCREMENT,
arrival_airport_icao VARCHAR(4),
departure_airport_icao VARCHAR(4),
departure_airport_name VARCHAR(255),
scheduled_arrival_time DATETIME,
flight_number VARCHAR(255),
data_retrieved_at DATETIME,
PRIMARY KEY (flight_id),
FOREIGN KEY (arrival_airport_icao) REFERENCES airports(icao)
);
Loading Data into the Database
Here are some tips for creating scripts:
- Use Functions for Structure: Organize your code into functions that are called from a main function, which serves as the entry point. This method makes it easy to comment out functions that are used only locally and not in the cloud.
- Include connection details for both local and cloud environments: This allows for quick and easy switching between local and cloud setups.
1. Static import — executed from local environment
cities
# !pip install lat_lon_parser
import pandas as pd
import requests
from keys import MySQL_pass # LOCAL
from keys import GCP_pass # GLOBAL
from bs4 import BeautifulSoup
from lat_lon_parser import parse # for decimal coordinates
import sqlalchemy
def retrieve_data():
list_of_cities = ["Berlin", "Hamburg", "Munich"]
cities_df = generate_cities_dataframe(list_of_cities)
create_csv()
store_data(cities_df)
cities_df_from_db = load_data()
return cities_df_from_db
def create_connection_string():
# # LOCAL
# schema = "gans_local"
# host = "127.0.0.1"
# user = "root"
# password = MySQL_pass
# port = 3306
# GLOBAL
schema = "gans_global"
host = "xx.xx.xxx.x"
user = "root"
password = GCP_pass
port = 3306
return f'mysql+pymysql://{user}:{password}@{host}:{port}/{schema}'
def generate_cities_dataframe(cities):
city_data = []
for city in cities:
url = f"https://www.wikipedia.org/wiki/{city}"
response = requests.get(url)
city_soup = BeautifulSoup(response.content, 'html.parser')
# extract the relevant information
city_latitude = city_soup.find(class_="latitude").get_text()
city_longitude = city_soup.find(class_="longitude").get_text()
country = city_soup.find(class_="infobox-data").get_text()
# keep track of data per city
city_data.append({"city": city,
"country": country,
"latitude": parse(city_latitude), # latitude in decimal format
"longitude": parse(city_longitude), # longitude in decimal format
})
return pd.DataFrame(city_data)
def create_csv():
cities_df.to_csv('cities_df.csv')
def store_data(cities_df):
connection_string = create_connection_string()
cities_df.to_sql('cities',
if_exists='append',
con=connection_string,
index=False)
def load_data():
connection_string = create_connection_string()
return pd.read_sql("cities", con=connection_string)
# --------------------------------------------------------------------------------------------------------------------------------
cities_df = retrieve_data()
cities_df
airports
...
from keys import AeroDatabox
from pytz import timezone
from datetime import datetime, timedelta
from IPython.display import JSON
import sqlalchemy
def retrieve_data():
connection_string = create_connection_string()
cities_df = fetch_cities_data(connection_string)
airports_df = fetch_airport_data(cities_df)
create_csv()
store_data(airports_df)
airports_df_from_db = load_data()
return airports_df_from_db
...
def fetch_cities_data(connection_string):
return pd.read_sql("cities", con=connection_string)
def fetch_airport_data(cities_df):
list_for_df = []
for _, city in cities_df.iterrows():
latitude = city["latitude"]
longitude = city["longitude"]
city_id = city["city_id"]
url = "https://aerodatabox.p.rapidapi.com/airports/search/location"
querystring = {"lat":latitude,"lon":longitude,"radiusKm":"50","limit":"5","withFlightInfoOnly":"true"}
headers = {
"X-RapidAPI-Host": "aerodatabox.p.rapidapi.com",
"X-RapidAPI-Key": AeroDatabox # USE THE PROVIDED KEY
}
response = requests.get(url, headers=headers, params=querystring)
for item in response.json()['items']:
airports_data = {
"city_id": city_id,
"icao": item.get("icao", None),
"municipality_name": item.get("municipalityName", None)
}
list_for_df.append(airports_data)
airports_df = pd.DataFrame(list_for_df)
return airports_df
...
2. Import via Cloud Function
requirements.txt
functions-framework==3.*
SQLAlchemy
PyMySQL
pandas
requests
pytz
keys.py
Define here your passwords and api keys ...
weather_forecasts
main.py
import functions_framework
import pandas as pd
import requests
from keys import GCP_pass # GLOBAL
from keys import OpenWeather
from pytz import timezone
from datetime import datetime
@functions_framework.http
def retrieve_data(request):
connection_string = create_connection_string()
cities_df = fetch_cities_data(connection_string)
weather_df = fetch_weather_data(cities_df)
# create_csv()
store_data(weather_df)
# weather_df_from_db = load_data()
# return weather_df_from_db
return "Data successfully updated"
...
def fetch_weather_data(cities_df):
berlin_timezone = timezone('Europe/Berlin')
API_key = OpenWeather
weather_items = []
for _, city in cities_df.iterrows():
latitude = city["latitude"]
longitude = city["longitude"]
city_id = city["city_id"]
print(city_id, latitude, longitude)
# Reference the parameters in the url.
url = (f"https://api.openweathermap.org/data/2.5/forecast?lat={latitude}&lon={longitude}&appid={API_key}&units=metric")
response = requests.get(url)
weather_json = response.json()
retrieval_time = datetime.now(berlin_timezone).strftime("%Y-%m-%d %H:%M:%S")
for item in weather_json['list']:
dt = item['dt']
temp = item["main"].get("temp", None)
weather_description = item["weather"][0].get("description", None)
wind_speed = item["wind"].get("speed", None)
rain_3h = item.get("rain", {}).get("3h", 0)
pop = item.get("pop", None);
dt_txt = item.get("dt_txt", None)
# Fill the dataframe with weather data
weather_item = {
"city_id": city_id,
"forecast_time": dt_txt,
"outlook": weather_description,
"temperature": temp,
"rain_in_last_3h": rain_3h,
"wind_speed": wind_speed,
"rain_prob": pop,
"data_retrieved_at": retrieval_time
}
weather_items.append(weather_item)
weather_df = pd.DataFrame(weather_items)
weather_df['forecast_time'] = pd.to_datetime(weather_df['forecast_time'])
weather_df['data_retrieved_at'] = pd.to_datetime(weather_df['data_retrieved_at'])
# weather_df.info()
return weather_df
...
flights
main.py
import functions_framework
import pandas as pd
import requests
from keys import GCP_pass # GLOBAL
from keys import AeroDatabox
from pytz import timezone
from datetime import datetime, timedelta
@functions_framework.http
def retrieve_data(request):
connection_string = create_connection_string()
flights_df = fetch_flight_data()
# create_csv()
store_data(flights_df)
# flights_df_from_db = load_data()
# return flights_df_from_db
return "Data successfully updated"
...
def fetch_flight_data():
icao_list = ["EDDB", "EDDH"]
api_key = AeroDatabox
berlin_timezone = timezone('Europe/Berlin')
today = datetime.now(berlin_timezone).date()
tomorrow = (today + timedelta(days=1))
flight_items = []
for icao in icao_list:
# the api can only make 12 hour calls, therefore, 2 12 hour calls make a full day
# using the nested lists below we can make a morning call and extract the data
# then make an afternoon call and extract the data
times = [["00:00","11:59"],
["12:00","23:59"]]
for time in times:
url = f"https://aerodatabox.p.rapidapi.com/flights/airports/icao/{icao}/{tomorrow}T{time[0]}/{tomorrow}T{time[1]}"
querystring = {"withLeg":"true",
"direction":"Arrival",
"withCancelled":"false",
"withCodeshared":"true",
"withCargo":"false",
"withPrivate":"false"}
headers = {
'x-rapidapi-host': "aerodatabox.p.rapidapi.com",
'x-rapidapi-key': api_key
}
response = requests.get(url, headers=headers, params=querystring)
flights_json = response.json()
retrieval_time = datetime.now(berlin_timezone).strftime("%Y-%m-%d %H:%M:%S")
for item in flights_json["arrivals"]:
flight_item = {
"arrival_airport_icao": icao,
"departure_airport_icao": item["departure"]["airport"].get("icao", None),
"departure_airport_name": item["departure"]["airport"].get("name", None),
"scheduled_arrival_time": item["arrival"]["scheduledTime"].get("local", None),
"flight_number": item.get("number", None),
"data_retrieved_at": retrieval_time
}
flight_items.append(flight_item)
flights_df = pd.DataFrame(flight_items)
flights_df["scheduled_arrival_time"] = flights_df["scheduled_arrival_time"].str[:-6]
flights_df["scheduled_arrival_time"] = pd.to_datetime(flights_df["scheduled_arrival_time"])
flights_df["data_retrieved_at"] = pd.to_datetime(flights_df["data_retrieved_at"])
return flights_df
...
3. Scheduled import via Cloud Scheduler
To automate the pipeline, we used the Cloud Scheduler, enabling the system to fetch and store weather and flight data in the database without manual input.
The cron expression used is: 0 0 * * *
(every day at 12:00 AM).
Conclusion
In this article, we embarked on the journey of building a local ETL (Extract, Transform, Load) pipeline, focusing on effective data extraction, transformation, and loading techniques. We started by scraping city data from Wikipedia, then progressed to gathering weather and flight data through APIs, demonstrating the advantages of integrating diverse data sources. Along the way, we learned how to clean, transform, and store this data in a SQL database, resulting in a coherent and reusable pipeline.
Next, we transitioned the data pipeline to a cloud environment using Google Cloud Platform (GCP). This phase involved utilizing a MySQL cloud database, cloud functions, and the cloud scheduler to automate our processes with cron jobs. This automation ensured a seamless data flow, eliminating manual tasks and keeping our data up to date.
An organized and automated data pipeline is vital for optimizing workflows and realizing the maximum value of your data. With the appropriate infrastructure, organizations can enhance their data operations, boost efficiency, and facilitate more strategic decision-making.
I hope this article provides valuable insights and inspires you to take on similar projects while encouraging further exploration of data automation.