Accelerate Your ETL Data Processing with Python and AWS

Discover how to leverage the power of Python and AWS to create efficient and effective data pipelines.

yashramani
Simform Engineering
8 min readMar 7, 2023

--

In today’s data-driven world, businesses rely on data pipelines to collect, process, and analyze vast amounts of data. A popular approach to creating data pipelines is the ETL (extract, transform, load), which extracts data from multiple sources, transforms it into a usable format, and loads it into a target system for analysis.

With the power of cloud computing, it is now easier than ever to build scalable and efficient ETL pipelines using AWS (Amazon Web Services). In this blog post, we examine the key components of an ETL-based pipeline using AWS DynamoDB and Python and provide a step-by-step guide to building your own pipeline.

Whether you’re a data analyst, data scientist, or business owner, this blog will give you the knowledge and tools you need to create ETL pipelines that turn raw data into actionable insights.

What is ETL?

ETL is known as “Extract,” “Transform,” & “Load.”

The process which is used
1. to get (Extract) the data from various sources
2. to perform some operations (Transform) on that data
3. to store (Load) that data in a meaningful way

  • Here, the “pipeline” is — building a flow to get the data from a particular source, then processing & finally storing it in a database. As the flow is being built on data, ETL pipeline is called a “data pipeline” too.
  • Today, most organizations collect raw data through online websites, cloud platforms, surveys, etc.
  • After raw data collection, organizations need data analysts who can perform operations or calculations to clean or transform the unstructured data as per the requirements. Now, let’s understand “Transform” with an example.
  1. Let’s say my client/stakeholder asks me to apply transformation logic with row data coming with 50 columns, but they want only 10 with some specific conditions.
  2. My first step will be to understand the requirements. Next, I should check for those 10 columns and try to find an optimum solution to transform my raw data. (This is a critical step in ETL)
  3. The last but important step should be to store the transformed data in the. database (SQL or No-SQL) with an efficient database schema design. (We will look into this in some time.)

What is Python?

  • Python is a scripting, general-purpose, high-level, and interpreted programming language.
  • The use-case of Python is popular with ETL because of its clean syntax, more optimized code methods & the privilege of a great library & community support.
  • We can define/write one function that does everything from getting data from API to storing it in a particular database. Or separate these different steps as functions that can work like modules/sub-tasks of the whole pipeline.
  • We can also leverage some of the optimization techniques from Python to deal efficiently with time & space complexity because when dealing with Big Data, all things matter.

Overview of AWS:

  • Amazon Web Services (AWS) is one of the most comprehensive and broadly adopted cloud platforms, offering more than 200 fully featured services from data centers globally.
  • AWS is helpful while setting up the infrastructure on a cloud-based environment where we don’t have to worry about any servers, networking & other resources as most of the services are “serverless.” We have to pay only for what we use.
  • As mentioned above, AWS has many services to offer, but we are currently going to see only some which I’ve used & found interesting specifically for ETL tasks to get done.

Extract:

  • Now that we have a basic understanding of what Extract does, let’s dive deeper to understand its real-time use case.
  • Suppose we have a requirement to collect data from a third-party API service called YouTube API (a part of the google API library). First, we will have to install the required library or package for Python to set up the initialization of an API.

Link for installing library: google-api-python-client

  • After installing the package, we define a variable to initialize the above API with some credentials known as access & secret keys. All the publicly available APIs have restrictions against direct use, so we’d need credentials.
import os
import googleapiclient.discovery


OAUTHLIB_INSECURE_TRANSPORT = "1"
api_service_name = os.environ["API_SERVICE_NAME"]
api_version = os.environ["API_VERSION"]
DEVELOPER_KEY_1 = os.environ["DEVELOPER_KEY_1"]

youtube = googleapiclient.discovery.build(
api_service_name, api_version, developerKey=DEVELOPER_KEY_1, cache_discovery=False
)
  • After setting up, things are pretty simple to understand and implement.
  • We have to use Python functions to fetch or collect the data from APIs and the data in these APIs are in JSON format generally.
  • Now, we’ll get around 100 fields (different values); like if we want to find one YouTube channel, we can pass the channel ID as input to the function and get metadata of that particular channel ID.
  • But if we want to store only 15 fields out of 100, we should collect only selective fields for transformation.
# get total channel view count
def fetch_channel_view_count(channel_id):
"""
i/p: channel_id (type: str)
o/p: channel name, view count, upload_id (type: str)
description: takes channel_id as input and returns upload_id and channel view count
"""
# youtube channel API call
channel_request = youtube.channels().list(
part='statistics,snippet,contentDetails',
id=channel_id,
)
# getting particular channel data
channel_data_response = channel_request.execute()

# extracting required fields from YouTube API response
for channel_info in channel_data_response.get('items'):
ChannelName = channel_info['snippet'].get('title', None)
ChannelViewCount = channel_info['statistics'].get('viewCount', None)
ChannelID = channel_info['id']
ChannelUploadID = channel_info['contentDetails']['relatedPlaylists'].get('uploads', None)

# returning 4 required fields as variables
return ChannelName, ChannelViewCount, ChannelID, ChannelUploadID
  • We can use the AWS lambda function to run our Python code serverless or leverage AWS ECS (Elastic Container Service) to run our code as a task over a dedicated server.
  • Here, we’ll have to use ECS as the Lambda function can run up to 15 minutes, and ECS doesn’t have any time limitations.
  • If we know our task or piece of code would not exceed 15 minutes of execution time, it’s advisable to use Lambda over ECS because Lambda is serverless. So we won’t have to worry about CPU or processing power.
  • Attach a code snippet for initializing YouTube API in Python with API ClientID & Client Secret.

Transform:

  • Now comes the interesting part of ETL, the Transformation. With the selected fields, first, we will check the data type of each field as sometimes we may want to store data in the form of “float/decimal” data type, but the API response will be in the “integer/numeric” data type.
  • After selecting specific data types matching our requirements, we’ll either do manual calculations or append two fields to create a new field/column.
'''
new_field = '<YouTube_Channel_ID>_<YouTube_Channel_Name>'
'''

unique_field = 123456789_RandomChannel
  • Great. We have successfully transformed our raw data into a new field. We can take it as a reference.
  • After transformation, we generally store data in the form of a Python dictionary or a list of dictionaries.
  • Let me explain, to store data correctly in any database, we have a concept of Key-Value pair where the Key is your field/column, and the Value is the value of the field. JSON data is also Key-Value paired data.
  • In Python, we have a concept of a Dictionary that stores values in the form of Key-Value pair. We can use List here because List can contain any type of variable inside it. Let’s understand this with the example below.
[
{
'channel_id': 123456789,
'chnanle_name': 'RandomChannel',
'new_field': '123456789_RandomChannel',
'total_videos': 50
},
{
'channel_id': 123456789,
'chnanle_name': 'RandomChannel',
'new_field': '123456789_RandomChannel',
'total_videos': 50
},
]
  • As we can see, the List contains multiple dictionary objects inside it, and the dictionary object has Key-Value pair. We will use this List of dictionaries to store data into the database.
# iterating over rows/records of pandas dataframe
for index, row in channel_df.iterrows():
try:
datetime_format = pd.to_datetime(row['platformdate_id'].replace('yt#', ''), format='%d%m%Y').strftime('%Y-%m-%d %H:%M:%S')
c_dict = {
'artist_id': str(row['artist_id']),
'platformdate_id': str(row['platformdate_id'].replace('yt#', '')),
'created_at': str(datetime_format),
'artist_name': str(row['artist_name']),
'channel_id': str(row['channel_id']),
'data': row['data'],
'youtube_channel_name': str(row['youtube_channel_name']),
'vevo_channel_name': str(row['vevo_channel_name'])
}
# appending required dictionary of data into empty list "c_list" for further process
c_list.append(c_dict)
except Exception as e:
print(f"Exception occured as {e}")
continue

Load:

  • When storing or loading the transformed data into the database, we have 2 options or formats available, mentioned below.

SQL type

No-SQL type

  • Let’s begin with SQL, also known as Structured Query Language, meaning the data is stored in tabular format with pre-defined column names.
  • Here, we have to specify different column names while creating a table in the database, known as “table schema design.” The schema design can be modified after creation but is not recommended while dealing with Big Data.
  • But in our case, the number of columns or keys is not predictable. So, we will go with a No-SQL database. No-SQL databases allow us to add multiple fields or columns after the creation of a table.
  • We have to only take care of 2 fields/keys, known as the “partition key” and “sort key.”
  • When defined alone, we can refer to the Partition key as the primary key. Otherwise, there is a combination of “partition” and “sort” keys.
  • When both are included in the table schema design, the combination is referred to as the “primary key” inside the table.
  • AWS DynamoDB is a No-SQL database. We’ll use DynamoDB to store the data into tables with defining partition key & sort key.
  • The DynamoDB table schema design is shown below in the picture.
Create table option inside DynamoDB console
  • The below image represents how actually the data will be stored inside the DynamoDB table where video_id_date is the partition key, and other columns are just fields of the table.
Result from querying in DynamoDB table

Takeaways:

  1. Data pipelines are crucial elements of any workflow involving data handling. Large amounts of data can be gathered, transformed, and analyzed with a data pipeline.
  2. Python is a popular programming language for data processing, thanks to its extensive library support, ease of use, and flexibility.
  3. AWS provides a range of powerful services that are ideal for building and deploying data pipelines, including Amazon S3, AWS Lambda, AWS Glue, and Amazon EMR.
  4. It’s a good idea to stick to some standard practices when creating data pipelines. These include utilizing a modular and reusable design, including error handling and logging, and streamlining your process by using automation and monitoring tools.
  5. Python and AWS can be used to create data pipelines, which can simplify your data processing workflows, boost productivity, and give you greater insights into your data. You can turn unstructured data into actionable business insights that lead to better decisions and results with the proper tools and methods.

Conclusion:

Creating data pipelines using Python and AWS help simplify data processing and analysis workflows, allowing you to unlock the full potential of your data and gain valuable insights about your business. This can improve your bottom line and drive better outcomes for your organization. With the right tools, techniques, and guidance, anyone can learn to build powerful, scalable data pipelines that transform raw data into actionable insights.

Stay tuned and follow Simform Engineering for important updates on various tools and technologies.

--

--