Automate the data pipeline using Lambda
This is the last section of the project Automating an end-to-end data pipeline. In the previous posts, we looked at how to use python scripts to collect data using web scraping and APIs and how to create a MySQL database and fill it with the data we collected. In this post, we are going to look at how to automate this project, i.e. setting a time schedule to automatically run the python scripts that include getting the data and inserting it into MySQL database.
The goal of this post is:
- setting up a database on AWS RDS,
- running the Python scripts for data collection on AWS Lambda, and
- schedule the scripts to run automatically with AWS Cloudwatch.
These goals seem too much to cover on one post. However, my intention is to give an overview of the big picture so that we won’t get lost in the little details and deviate from the ultimate goal. This is not to say that we will skip the required steps as we go on. If those steps are covered somewhere else, a link will be provided. I hope this helps in producing a cleaner and easy-to-follow structure.
To be able to follow along, you need to have an AWS account.
Set up a Database on AWS RDS
The database we created in the post: Create a Database on MySQL Workbench was done locally on our computer. In this section we will set up a database on AWS RDS (Relational Database Service). From the AWS services, click on RDS to create a database and fill in the required fields.
Once the RDS MySQL instance is set up, we need to establish the connection to it. From MySQL workbench, create a new connection by clicking on the plus sign beside MySQL Connections and provide the required information:
- Connection name: it can be any .
- Hostname: this is the instance endpoint from the AWS Console
- Port: it’s always 3306
- Username: the default when creating the database is “admin”, unless you changed it!
- Password: the password you set when creating the AWS instance
Now, you can connect to your cloud instance you just created. By now, the connection is set up but there is no database. You can either create a new database of your choice or make a copy of databases that already exist in another connection. In our case, we can move the database that we created in this post Create a Database on MySQL Workbench to the new connection.
The only difference is that this database is now on the AWS cloud and not on the local host.
Run Python Scripts on AWS Lambda
In our project, we created five tables. Three of them are static. This means that the data does not need to get updated over time. These tables include information about the names of cities, countries they belong to, names of their airports and so on. The other two tables of our project are the flights and the weather forecasts tables. Those tables need to be updated over time. For that reason, we use Lambda functions such that these tables get updated automatically without further interference.
In the first two sections of this project, we already wrote python scripts to collect the flights data and the weather forecasts and insert the values on the local database. In this section we do two additional steps.
We first need to edit the Python connector to include our database on the cloud rather than the local one. To do so, use:
# import the library
import mysql.connector# create a connection to the MySQL server through connect()
cnx = mysql.connector.connect(user='admin', # it is "admin" if you did not change the default name while creating the RDS instancepassword='your_root_pwd', # the password you set while creating the RDS instancehost=' ', # this is the instance endpoint from the AWS Console database='db_name' # the name of the database you want to connect to)# cnx is a MySQLConnection object# connect to database
cursor = cnx.cursor()
Then, create Lambda functions. These functions are going to be our python scripts that are used to collect the flights and weather forecasts information. Creating a lambda function involves several technical steps need to be followed. Here, I’ve provided the links that serve the purpose.
Steps to create a lambda function:
- Create a role: To create a role, follow this link: Creating a role to
delegate permissions to an IAM user. - Create a Lambda function: Follow the steps in this link Create
a Lambda function with the console. Since we have python scripts,
choose python for Runtime. - Upload a Layer to your Lambda function: This is like creating a virtual environment for the modules that the lambda function will be using. To upload a layer, from your local machine, create a zip
file of the sitepackages of your virtual environment. On the lambda function console, click on Add a layer > create a new layer. Fill in the boxes and upload the zip file and hit create.
Now we are done setting up our lambda function and it is ready to be executed. As I mentioned before, the lambda function is nothing but our python script that we created in the previous post. So, with the new modifications, the script to get the flights information looks like the following:
import mysql.connector
import requests
from datetime import datetime , timedeltadef lambda_handler(event, context): tomorrow = ( datetime.today()
+ timedelta(days=1)).strftime("%Y-%m-%d")
from_time = ( tomorrow) + "T09:00"
to_time = (tomorrow ) + "T20:00" url ="https://aerodatabox.p.rapidapi.com/flights/airports /icao/EDDB/" + from_time+ "/" + to_time querystring = {"withLeg":"true","direction":"arrival",
"withCancelled":"false",
"withCodeshared":"false",
"withCargo":"false",
"withPrivate":"false"} headers = {'x-rapidapi-key': "your key",
'x-rapidapi-host': "aerodatabox.p.rapidapi.com"} response = requests.request("GET", url,
headers=headers, params=querystring) arrivals = response.json()['arrivals'] flight_num = [arrival['number'] for arrival in arrivals] arrival_time = [arrival['arrival']
['scheduledTimeUtc'].replace('Z','')
for arrival in arrivals] arrival_icao = ('EGLL,' * len(flight_num)).split(',')
departure_icao = [] for arrival in arrivals: try: departure_icao.append(arrival['departure']
['airport']['icao']) except: departure_icao.append("nan") # inserting values of flights into mysql
# connect to database
# define connection details cnx = mysql.connector.connect( user='admin', password='your password' host='your host', # to connect to your local instance database='gans' # the database you want to connect to ) cursor = cnx.cursor() for i in range(len(flight_num)): value = (flight_num[i], departure_icao[i],
arrival_icao[i], arrival_time[i] ) query = 'INSERT INTO flights (flight_num, departure_icao,
arrival_icao, arrival_time) VALUES (%s, %s, %s, %s);' for i in range(len(flight_num)): value = (flight_num[i], departure_icao[i],
arrival_icao[i], arrival_time[i] ) cursor.execute(query, value) cnx.commit()
cursor.close()
cnx.close()
You still need to click on Test to execute the function. This means, we are not done yet! We need to perform one last step to let this function run at a time we specify. Let’s move to the next section.
Set up AWS Cloudwatch
The last step of this project is to schedule the lambda functions to run periodically. To set up AWS Couldwatch, follow the steps on this tutorial: Schedule AWS Lambda Functions Using CloudWatch Events.
Now, your functions will get executed on a regular basis and at the time you
have specified.
Congrats! You’ve just automated an end-to-end data pipeline on AWS cloud.