Building a Cloud-based Data Pipeline for a Fictive E-Scooter Company

Craig Nicolay
8 min readAug 17, 2023

--

Hello, fellow data scientists! I’m thrilled to share my experience in building a cloud-based data pipeline for a fictive e-scooter company called Gans as part of a project for the Data Science Bootcamp at WBS Coding School. Gans is a startup developing an e-scooter-sharing system. It aspires to operate in the most populous cities all around the world. In each city, the company will have hundreds of e-scooters parked on the streets and allow users to rent them by the minute.

Fictive e-scooter Company Logo (Gans)

The company wants to anticipate where the e-scooter should be placed. Gans made a study that identified a need for e-scooters at airports. To make smart decisions on how many e-scooters to dispatch Gans decided to employ a data engineer (me) to tackle this question.

The first step is to collect data from some of the largest airports in Europe and to collect corresponding flight data to anticipate the number of e-scooters that should be dispatched to each airport. The second step is to assemble and automate a data pipeline on the cloud.

The project outlined here provides valuable insights into constructing an efficient data pipeline. So, let’s get started!

Data Collection

Web Scraping

My first task was to collect data from external sources to help anticipate e-scooter demand at airports. To achieve this, I wrote a Python web scraping script using the powerful Beautiful Soup library. It allowed me to extract the top 5 cities in Europe with the busiest airports from Wikipedia.

from bs4 import BeautifulSoup
import requests
import pandas as pd
import re
import sqlalchemy

# begin a for loop to create a dictionary of information for each city
cities = ['Berlin', 'Hamburg', 'London', 'Manchester', 'Barcelona']

# empty list that will be filled with one dictionary of information per city
list_for_df = []

for city in cities:
# we can use the universal nature of wikipedias urls to our advantage here
# all of the urls are the same besides the city name
url = f'https://en.wikipedia.org/wiki/{city}'

# here we make our soup for the city
r = requests.get(url)
soup = BeautifulSoup(r.content, 'html.parser')

# here we initialise our empty dictionary for the city
response_dict = {}

# here we fill the dictionary with information using the ids, classes, and selectors that we found in the html
response_dict['city'] = soup.select(".firstHeading")[0].get_text()
response_dict['country'] = soup.select(".infobox-data")[0].get_text()

# Not for all cities the info for the population is on the same spot
if soup.select_one('th.infobox-header:-soup-contains("Population")'):
response_dict['population'] = soup.select_one('th.infobox-header:-soup-contains("Population")').parent.find_next_sibling().find(text=re.compile(r'\d+'))
response_dict['lat'] = soup.select(".latitude")[0].get_text()
response_dict['long'] = soup.select(".longitude")[0].get_text()


# add our dictionary for the city to list_for_df
list_for_df.append(response_dict)

# make the DataFrame
cities_df = pd.DataFrame(list_for_df)

# fixing latitude
cities_df['lat'] = cities_df['lat'].str.split('″').str[0].str.replace('°', '.', regex=False).str.replace('′', '', regex=False).str.replace('N', '00', regex=False)

# fixing longitude
cities_df['long'] = cities_df['long'].str.split('″').str[0].str.replace('°', '.', regex=False).str.replace('′', '', regex=False).str.replace('E', '00', regex=False)

# change data type
cities_df['population'] = cities_df['population'].str.replace(',', '', regex=False)
cities_df['lat'] = cities_df['lat'].astype(float)
cities_df['long'] = cities_df['long'].astype(float)
cities_df['population'] = cities_df['population'].astype(int)

If you print the cities_df you should get this table below.

APIs

Additionally, I incorporated APIs into the data collection process. I could assemble API calls directly using Python's requests library by leveraging OpenWeatherMap for weather data and AeroDataBox’s API for airport and flight data. This real-time information played a crucial role in enhancing the accuracy of the potential e-scooter demand.

JSON Parsing

The collected data often came in JSON format, requiring careful navigation to extract the necessary information. Python’s built-in JSON capabilities proved instrumental in efficiently parsing the JSON files and locating the relevant data points. This step ensured that we had the right information for our analysis.

Data Cleaning

Data cleaning was a crucial step to maintain the quality and consistency of our collected data. I utilized various techniques, including Python’s string operations, the powerful string methods offered by the Pandas library, and regular expressions, to clean and transform the data effectively. This process ensured that the data was in a usable format for further analysis.

Pythonic Iteration

To handle the processing of large datasets efficiently, I harnessed the power of Python by using for loops and list comprehensions. These iterative techniques allowed me to process the data quickly and effectively, enabling us to scale our operations seamlessly.

Function Structuring

To enhance code organization and reusability, I structured my Python code into functions. This modular approach made it easier to manage specific tasks and improved the maintainability and scalability of our codebase. By encapsulating functionality within functions, we could reuse them across different parts of the pipeline, saving time and effort.

Data Storage

MySQL Database

Once I had collected and processed the data, it was crucial to store it appropriately. To achieve this, we set up a MySQL database. We designed a well-defined SQL data model, carefully crafting the relationships between tables to ensure data integrity.

MySQL schema

Creating MySQL tables with the appropriate data types, constraints, and keys played a pivotal role in maintaining data consistency and reliability. With a robust database structure in place, we could store and retrieve data efficiently, meeting Gans’ real-time data requirements.

-- DROP DATABASE gans_local;
CREATE DATABASE gans_local;

USE gans_local;


-- 0 city_id 324 non-null int64
-- 1 city 100 non-null object
-- 2 country 100 non-null object
-- 3 population 324 non-null int64
-- 4 lat 324 non-null float64
-- 5 long 324 non-null float64

CREATE TABLE IF NOT EXISTS cities (
city_id INT AUTO_INCREMENT,
city VARCHAR(100),
country VARCHAR(100),
population INT,
lat FLOAT,
`long` FLOAT,
PRIMARY KEY (city_id)
);

-- 0 city_id 324 non-null int64
-- 1 time 324 non-null object
-- 2 temp 324 non-null float64
-- 3 wind_speed 324 non-null float64

CREATE TABLE IF NOT EXISTS weather(
weather_id INT AUTO_INCREMENT,
city_id INT NOT NULL,
`time` DATETIME,
temp FLOAT,
wind_speed FLOAT,
PRIMARY KEY (weather_id),
FOREIGN KEY (city_id) REFERENCES cities(city_id)
);

-- city_id 22 non-null int64
-- 1 icao 22 non-null object
-- 2 city_id 22 non-null int

-- DROP TABLE airports;
CREATE TABLE IF NOT EXISTS airports(
city_id INT NOT NULL,
icao VARCHAR(10),
PRIMARY KEY (icao),
FOREIGN KEY (city_id) REFERENCES cities(city_id)
);


-- 0 ICAO 20 non-null object
-- 1 departure_airport 20 non-null object
-- 2 local_time 20 non-null object

CREATE TABLE IF NOT EXISTS flight(
flight_id INT AUTO_INCREMENT,
ICAO VARCHAR(10),
departure_airport VARCHAR(10),
local_time DATETIME,
PRIMARY KEY (flight_id),
FOREIGN KEY (ICAO) REFERENCES airports(icao)
);

Now that we have created the backbone of the database. We pushed the cities table to MySQL.

Let’s create the airport table in a new Python file. First, let’s connect to MySQL database to retrieve the cities table.

import pandas as pd
import sqlalchemy
import requests

#Connect to MySQL database
schema = 'your shema name' #name of your database (schema) might be different
user = 'your user name' #user name of your MySQL
password = 'your password' #your MySQL password
port = 3306
con = f'mysql+pymysql://{user}:{password}@{host}:{port}/{schema}'

#read the cities_df table
cities_df = pd.read_sql('cities',con=con)
cities_df

If it is working you should get this:

cities_df
#extract lsit of airports from the cities_df coordinates by using AeroDataBox API under - Airport by code 
list_for_df_airports = []

for i, city in enumerate(cities_df['city_id']):
url = "https://aerodatabox.p.rapidapi.com/airports/search/location"

querystring = {"lat":cities_df.iloc[i]["lat"],"lon":cities_df.iloc[i]['long'],"radiusKm":"50","limit":"10","withFlightInfoOnly":"true"}

headers = {
"X-RapidAPI-Key": "use your key", #use your Airport by code API
"X-RapidAPI-Host": "aerodatabox.p.rapidapi.com"
}

response = requests.request("GET", url, headers=headers, params=querystring)

list_for_df_airports.append(response.json()['items'])

#create the dataframe
airports_dict = {
'icao':[]}


for item in list_for_df_airports:
if item: # Check if the item is not empty
airports_dict['icao'].append(item[0]['icao'])

airports_df = pd.DataFrame(airports_dict)

#add the city ID to the table
airports_df['city_id'] = [1, 2, 3, 5,]

#print airports_df

airports_df

You should get this table:

airports_df

At this point, we have the cities_df data and airports_df data. In the next step, we are going to push those two tables onto the cloud and create a lambda function for the weather_df and the flights_df.

Leveraging the Cloud

Creating a cloud-based data pipeline allows scalability, flexibility, automation, and easy maintenance. Therefore, we set up an RDS instance on AWS (Amazon Web Services) and established connections between MySQL and the cloud instance.

#connect to the cloud-based MySQL 
host = 'wbs-pipeline-db.cbfhrvtwfxua.eu-north-1.rds.amazonaws.com' # The Endpont of the RDS
schema = 'your shema name' #name of your database (schema) might be different
user = 'your user name' #user name of your MySQL
password = 'your password' #your MySQL password
port = 3306
con = f'mysql+pymysql://{user}:{password}@{host}:{port}/{schema}'

#push cities_df to the cloud
cities_df.to_sql('cities',con=con,if_exists='append',index=False)

#push airports_df to the cloud
airports_df.to_sql('airports',con=con,if_exists='append',index=False)

By utilizing AWS Lambda, a serverless computing service, we could execute our Python code in the cloud without worrying about server management. Deploying our code as Lambda functions allowed for automation and scalability, ensuring that the pipeline ran reliably and efficiently.

import json
import pandas as pd
import sqlalchemy
import requests
import datetime


def get_weather(city_df):
API_key = 'use your key'

weather_dict = {'city_id':[],
'time':[],
'temp':[],
'wind_speed':[]}


for i, city in enumerate(city_df['city_id']):
url = f"http://api.openweathermap.org/data/2.5/forecast?lat={city_df.iloc[i]['lat']}&lon={city_df.iloc[i]['long']}&appid={API_key}&units=metric"
response = requests.get(url)
weather_json = response.json()
for time in weather_json['list'][:9]:
weather_dict['city_id'].append(city)
weather_dict['time'].append(time['dt_txt'])
weather_dict['temp'].append(time['main']['temp'])
weather_dict['wind_speed'].append(time['wind']['speed'])

weather_df = pd.DataFrame(weather_dict)
return weather_df

def get_flights(airports_df):
querystring = {"withLeg":"true","direction":"Arrival","withCancelled":"false","withCodeshared":"true","withCargo":"false","withPrivate":"false"}
headers = {
"X-RapidAPI-Key": "use your key",
"X-RapidAPI-Host": "aerodatabox.p.rapidapi.com"
}

today = datetime.datetime.now().date()
tomorrow = (today + datetime.timedelta(days=1))

times = [["00:00","11:59"],["12:00","23:59"]]

flights_dict = {'ICAO':[],'departure_airport':[],'local_time':[]}
icao_list = airports_df['icao'].to_list()

for time in times:

for icao in icao_list[1:2]:
url = f"https://aerodatabox.p.rapidapi.com/flights/airports/icao/{icao}/{tomorrow}T{time[0]}/{tomorrow}T{time[1]}"
response = requests.request("GET", url, headers=headers, params=querystring)
print(response)
flight_json = response.json()

for flight in flight_json['arrivals']:
flights_dict['ICAO'].append(icao)
try:
flights_dict['departure_airport'].append(flight['departure']['airport']['icao'])
except:
flights_dict['departure_airport'].append('unknown')
try:
flights_dict['local_time'].append(flight['arrival']['scheduledTimeLocal'])
except:
flights_dict['local_time'].append(pd.NaT)

flights_df = pd.DataFrame(flights_dict)
flights_df['local_time'] = pd.to_datetime(flights_df['local_time'])
return flights_df

def lambda_handler(event, context):
host = 'wbs-pipeline-db.cbfhrvtwfxua.eu-north-1.rds.amazonaws.com' # The Endpont of the RDS
schema = 'your shema name' #name of your database (schema) might be different
user = 'your user name' #user name of your MySQL
password = 'your password' #your MySQL password
port = 3306
con = f'mysql+pymysql://{user}:{password}@{host}:{port}/{schema}'

city_df = pd.read_sql('cities',con=con)
weather_df = get_weather(city_df)
weather_df.to_sql('weather',con=con,if_exists='append',index=False)
airports_df = pd.read_sql('airports',con=con)
flights_df = get_flights(airports_df)
flights_df.to_sql('flight',con=con,if_exists='append',index=False)
# TODO implement
return {
'statusCode': 200,
'body': json.dumps('Hello from Lambda!')
}

Scheduling and Automation

To ensure the data pipeline operated consistently and delivered real-time data, we scheduled our Lambda functions to run on a specified schedule. By defining an appropriate schedule, we automated the execution of data collection and storage processes, ensuring the pipeline stayed up to date.

Conclusion

Constructing an efficient and scalable data pipeline is crucial for anticipating e-scooter demand. Although Gans is not a real company and was merely used as a fictional case study, the project outlined here provides valuable insights into building a cloud-based data pipeline. From web scraping and API integration to data cleaning, Pythonic iteration, MySQL database setup, and leveraging AWS and serverless architecture, we explored the key components needed for success. So, go ahead and apply these learnings to your own projects, and happy data engineering!

--

--