Automating an end-to-end Data Pipeline on AWS Cloud

Lina Haidar
8 min readAug 8, 2021

--

Photo by tian kuan on Unsplash

In this post, I am presenting the outcome of a two-weeks project that I did on a boot camp. I am going to take you through the steps that I followed to complete the project.

Since the project is broad and it combines several fields, it is sectioned into four posts. Each post provides a step by step approach to achieve the objective. Here are the links to the posts:

1- Collect Data Using Web-scraping and APIs

2- Create a Database on MySQL Workbench

3- Connect Python with MySQL

4- Automate the Data Pipeline Using Lambda Functions

We are going to start with the first step.

1- Collect Data Using Web-scraping and APIs

The first step of this project is to collect the data. The type of the data of course varies from one project to another. In this project, we are going to collect the following information:

1. names of European cities
2. airports flights information of each city
3. the weather forecasts in each city

We will be using three different sources in the collection process. Rather than collecting the data manually, we are going to use web-scraping and APIs. Depending on the method, a different python library is used. Based on that, this article is sectioned into three parts.

Web Scraping to Collect Names of Cities

Web scraping is a technique used to extract data from any website. Whether
this method is legal or illegal depends on how you are going to use the data. In our case, we are only collecting names of cities for the sake of practicing how the method web scraping works. It is not a real project that we will profit from.

Basically, collecting such data can be done manually, but we are going to use python libraries that take care of scraping the information we are interested in from the website we specify, namely Requests and BeautifulSoup.

Requests is a tool to make HTTP requests to any API. The following lines
of code show how to implement it:

import requestsurl = "https://worldpopulationreview.com/continents/cities/europe"response = requests.get(url)response.contenct()

To get the content of the variable response, use response.content(). The
output is the html of the website. It is hard to read or to extract information
from. Our goal is to collect the names of cities and the country they belong to. In order to do that, BeautifulSoup comes into play.

BeautifulSoup is a Python library that is used for pulling data out of the
content of our object response.

soup = BeautifulSoup(response.content, "html.parser")

soup is a BeautifulSoup object that represents the html as a nested data structure. It has methods that allow us to navigate through the content of the website. Among them is the select method which allows us to select exactly the information we are interested in. To see how to choose these selectors, visit the url provided above. On the website page, right click and choose inspect. Then follow these steps as explained on the image:

Web Scraping Steps

In the python script, paste what you’ve copied and pass it as an argument
to the select method:

soup = BeautifulSoup(response.content, "html.parser")Moscow = soup.select("#__next > div > div > div.inside-body-content-container.clearfix > div:nth-child(2) > div > div > div > div > div > div > div.jsx-2642336383.table-container.undefined > table > tbody > tr:nth-child(1) > td:nth-child(1)")print(Moscow)

The lines above successfully collected the name of the first city. However, our goal is to select the entire column of cities. To do so, we are going to edit the argument of the select method. Before that, let’s copy another selector of another city and see how they are different:

Moscow = soup.select("#__next > div > div > div.inside-body-content-container.clearfix > div:nth-child(2) > div > div > div > div > div > div > div.jsx-2642336383.table-container.undefined > table > tbody > tr:nth-child(1) > td:nth-child(1)")London = soup.select("#__next > div > div > div.inside-body-content-container.clearfix > div:nth-child(2) > div > div > div > div > div > div > div.jsx-2642336383.table-container.undefined > table > tbody > tr:nth-child(2) > td:nth-child(1)")

We can see that the only difference is in tr:nth-child(2). Also, notice that
the selector loops through the column by changing the row number. By editing the argument, we can get the entire column of cities:

cities =  soup.select("#__next > div > div > div.inside-body-content-container.clearfix > div:nth-child(2) > div > div > div > div > div > div > div.jsx-2642336383.table-container.undefined > table > tbody > tr td:nth-child(1)")

The output is a list of tags. Each tag has the city name. To get the text out
of the tag, use the BeautifulSoup method get_text().

Following the same steps, we can collect the other data. Here is a complete
python script to do so:

from bs4 import BeautifulSoupimport requestsurl = "https://worldpopulationreview.com/continents/cities/europe"response = requests.get(url)response.contentsoup = BeautifulSoup(response.content, "html.parser")response.status_codecities_tags = soup.select("#__next > div > div > div.inside-body-content-container.clearfix > div:nth-child(2) > div > div > div > div > div > div > div.jsx-2642336383.table-container.undefined > table > tbody > tr td:nth-child(1)")countries_tags = soup.select("#__next > div > div > div.inside-body-content-container.clearfix > div:nth-child(2) > div > div > div > div > div > div > div.jsx-2642336383.table-container.undefined > table > tbody > tr td:nth-child(2) > a")num = 10 # number of entitiescities = []
countries = []
for i in range(num):
cities.append(cities_tags[i].get_text())
countries.append(countries_tags[i].get_text())

By executing the above code, we have our data in the lists. The values are arranged index wise, i.e. the city with index i from the cities list belongs to the country with index i from the countries list. This is true only if the original html code is structured in this manner.

In some websites, that is not the case. The data is not arranged as we expect
it to be. For example, if we extract the same information from Wikipedia, and by observing the output of the lists cities and countries, there is a mismatch of the values at certain indices. It would be a good exercise to see where this mismatch is. Try it out!

Weather API to Collect Weather Forecasts

The objective of this section is to collect the weather forecasts of the cities that we collected earlier. We are going to use OpenWeather API which provides us with free weather forecasts. To be able to use it, you need to make an account. Once done, right click on your profile name and click My API Key. You should be able to see the key there. With this key, we are ready to make requests to the API.

We are going to use a Python wrapper that allows for easier access to the
data, which is PyOWM.

from pyowm import OWMOWM_key = "paste your key here" # pass in the key as a string
owm = OWM(OWM_key)
mgr = owm.weather_manager()
forecast = mgr.forecast_at_place('Berlin, DE', '3h').forecastvalues = ()
city_id = []
rain_prob = []
forecast_time = []
for w in forecast.weathers:
forecast_time.append(datetime.utcfromtimestamp(w.
reference_time('unix')).strftime('%Y-%m-%d %H:%M:%S') )
rain_prob.append( w.precipitation_probability ) city_id.append( forecast.location.id ) values=(city_id , rain_prob , forecast_time )

For the sake of simplicity, we collect the weather forecasts for only one city.
At this step the data gets bigger. The API provides the weather forecasts every
3 hours for five days. One thing to notice here is that the city_id is made as a
list of the same value (here the city id of Berlin) and not only one value. The
reason for that is that all of the data belongs to the city we chose. Once this
step is done successfully, we can make a for loop to include more cities.

This code can be further developed by including the object forecast in a for
loop such that each iteration sends an API requests for one city.

AeroDataBox API to Collect Information of Flights

The last information we are interested in collecting for this project is the in-
formation of the flights in each airport of the cities that we collected in the
first section. The API we are going to use here is AeroDataBox API through
RapidAPI, which is the world’s largest API Marketplace. To start with:

  1. Create an account at Rapid API ,
  2. Go to the Aerodatabox api and click “Subscribe to test”,
  3. Choose the Basic, free plan, and finally
  4. Go to Endpoints.

You should be able to see the following:

AeroDataBox API Endpoints

How to make the API request:

  1. To the left side is the endpoint. From there, choose Flights API > Departures and Arrivals (FIDS) by airport ICAO.
  2. In the “Test Endpoint”, specify the required information.
  3. To the right side, choose “Code Snippets”, click the arrow inside the box
    > python > Requests.
  4. Copy the code to your text editor.

All the information we need is included in the response object. After the
fetch is successful, we read and parse the data using json(), which is a python
dictionary. To access the data, it is helpful to print the response.json() to see
what keys this dictionary has. From here you can go further and pick the information you are interested in.

import requests
from datetime import datetime , timedelta
tomorrow = ( datetime.today() + timedelta(days=1)).strftime("%Y-%m-%d")from_time =( tomorrow) + "T09:00"
to_time =(tomorrow ) + "T20:00"
# get the icao for european cities
#arrival_icao = "EDDB"
url ="https://aerodatabox.p.rapidapi.com/flights/airports/icao/EDDB/" + from_time+ "/" + to_timequerystring = {"withLeg":"true","direction":"arrival","withCancelled":"false","withCodeshared":"false","withCargo":"false","withPrivate":"false"}headers = {'x-rapidapi-key': "your key ",'x-rapidapi-host': "aerodatabox.p.rapidapi.com"}response = requests.request("GET", url, headers=headers, params=querystring)arrivals = response.json()['arrivals']flight_num = [arrival['number'] for arrival in arrivals]arrival_time = [arrival['arrival']['scheduledTimeUtc'].replace('Z','') for arrival in arrivals]arrival_icao = ('EGLL,' * len(flight_num)).split(',') # ocao for the departuredeparture_icao = []for arrival in arrivals: try: departure_icao.append(arrival['departure']['airport'] ['icao']) except: departure_icao.append("nan")

Notice here that we are including only one city to avoid complication. Once
this step is done successfully, you can create a for loop to include the cities we
collected in the first section.

By now, we are done collecting the data:

  1. names of European cities
  2. airports flights information of each city
  3. the weather forecasts in each city

So far, this data is stored in lists. In the next post (Create a Database on MySQL Workbench), we are going to set up a database on MySQL workbench that will be used to store this data.

--

--