Data Processing in AWS

Carlos Cruz
NicaSource
Published in
8 min readSep 29, 2022
Data Processing
Data Processing

It's estimated that more than 94 zettabytes (85,492,501,966 terabytes) of data will be created and consumed by 2022.

We create data faster than we can process and analyze it. The previous is closely related to the growth in internet users and the ease of internet access with cell phones, IoT, and more. Because of it, a special set of tools oriented to work with big data was developed. Using these tools combined with a Data Lake is a common practice for processing the incredible amount of data created daily.

Following up with the foundation laid during part I:

Getting started with Data Lakes in AWS

For this article, we will focus on how to process the specific amount of data in our data source (MySQL Sakila Database) by applying AWS services for this use case.

As always, here’s the GitHub repository with the resources and a more beginner-friendly guide:

https://github.com/carloscruzns/datalakepoc#datalakepoc

Before diving deeper, we need to set up different services to help us process our data. Let’s go over the ones we will focus on in this article:

  • Lambda: it will provide the processing power for Extract, Transform, and Load (ETL) operations.
  • Glue Data Catalog: it will contain our data’s location and schema.
  • Secret Manager: it will store the credentials for our data source in this service.
  • Athena: it allows us to make interactive queries to our data in Amazon S3 using standard SQL.

Secret Manager

We will start with Secret Manager, a service to store sensitive data. To correctly set up this service, we will save our credentials for our MySQL Database in it. In the end, our secret will look like this.

IAM Role

Now that our credentials are securely stored, we need to set up an execution role for Lambda with the necessary permissions. The policies we will attach are the following:

Needless to say, this role doesn’t follow the principle of least privilege. We must tweak it if we want to follow that principle. Security should be job 0, and there are tools to ensure our Data Lake is secure (for example, AWS Lake Formation). It might be worth exploring this topic in detail in a future article.

Glue Data Catalog

Once the previous steps have been completed, we should be able to run our scripts to extract and enhance our data. Before that, we need to understand that the result of the extracting actions will be stored in columnar files, more precisely .parquet files. Columnar file types are designed to store data more efficiently when we’re reading huge amounts of it.

Since we’re working with AWS as our cloud provider, we will use the Glue Data Catalog, an index to the location and the schema of our parquet files that can be used with our ELT jobs. We will set up the databases for this project, one for each layer. The final result will be the following:

Athena setup.

Now, we can use Glue Data Catalog for data stored in the Data Lake. At this point, we only have databases, but that will change soon. We will proceed to set up AWS Athena to use it in the ELT process. In the previous article, we created three buckets for our layers and one for resources. It is time to use this last bucket so we may store Athena’s query results. We will edit the primary workgroup and set a path in our bucket to identify it. (It is possible to create more than one workgroup, so storing the results with different prefixes is optimal) And with that, we are done with Athena for this article.

AWS Data Wrangler

In this case, our data source doesn’t require using big data technology, like Spark, to be processed. We will use an open-source data manipulation library. And if you have experience in the area, you already know what I’m talking about. We are going to apply Pandas for Python. In addition, we will take advantage of the features of another library developed by AWS themselves, AWS Data Wrangler. It will help us to integrate all the data-related services practically and conveniently. The Data Wrangler is already provided as an option when using layers in lambda functions, but we will take a careful look at this in a second.

Processing Coal Layer

With all the prep work done, let’s start building our ELT process. First, we must create a new lambda function with Python 3.8 and select the previously created role. Make sure to set a name that conveys the layer. The data source we are working with AWS Lambda functions comes with a list of pre-installed packages that we can use. AWS Data Wrangler is not included and must be added via Lambda Layers.

In summary, our new lambda function must have the following settings:

  • Python 3.8
  • Previously created role
  • AWS Layers — Lambda layer: AWSDataWrangler-Python38
  • Basic setting: 128 MB Memory
  • Basic setting: 512 MB Storage
  • Basic setting: 5 min timeout

We will copy the code from the GitHub repository for the coal layer and paste it in our new lambda function. All that’s left is to change the S3 bucket’s names for your own and set the previously created secret id. We will go through a few key points in the code.

con = wr.mysql.connect(secret_id=’data-lake/sakila-mysql-secret’) #Example#### EXTRACTING PAYMENT #####
df_payment = wr.mysql.read_sql_table(
table="payment",
schema="sakila",
con=con
)

We will pass our connection to the read_sql_table method, along with the target table and schema. It will return a Pandas data frame of all the data in our target so we may explore and modify it as suitable. Since we are focusing on the extraction process, there is no need to alter it.

wr.s3.to_parquet(
df=df_payment,
path=s3_path_payment,
dataset=True,
database='sakila_coal',
table='payment'
)

The final step for the extraction of this table is to specify our database in our Glue Data Catalog, set our new table name, and fill in the rest of the parameters. The previous will create a new table schema if one doesn’t exist. As a result, we will have saved the data from the MySQL database to the data lake and created a schema for it in the process. Thanks to AWS Data Wrangler, the process was simple and painless. We will repeat these steps for the rest of the tables we would like to extract. When we run our new lambda function manually, we should see the files loaded in our S3 coal layer bucket.

In this project, we will have only one lambda function with the task of extracting all the required tables. Depending on the extraction process’s complexity or the data amount, it might be necessary to set one separate lambda for each table. Creating separate lambdas will allow us to execute them in parallel or make our custom order with services like step functions. If the volume of data increases drastically, we might need to start looking for alternative services like EMR, Glue, MWAA (Amazon Managed Workflows for Apache Airflow), etc.

Processing Melting Layer

We have our raw data in the coal layer, waiting to be cleansed and improved. We will create a new lambda function with the same settings as the previous one and paste its corresponding code from the GitHub repository. Make sure to update the S3 bucket’s name and if you have changed any names for the data catalog, update those as well.

We will focus on enhancing the table film from our data source in this function. The objective is to de-normalize the table film, the film category, and the category. The expected result is a data frame with the information of our films and a field with the value of the category.

#### LOADING DATAFRAMES #####
df_film = wr.athena.read_sql_table(table="film", database="sakila_coal")
df_film_category = wr.athena.read_sql_table(table="film_category", database="sakila_coal")
df_category = wr.athena.read_sql_table(table="category", database="sakila_coal")

Thanks to the work we did with Glue Data Catalog, we can just specify our table and database and using Athena, we can get a data frame with the requested information. We have to repeat this process for our target tables.

#### MERGING DATAFRAMES #####
df_film = pd.merge(df_film, df_film_category, on=’film_id’)
df_film = pd.merge(df_film, df_category, on=’category_id’)

Now we can proceed to use Pandas with the merge method to get a single data frame with our film information and the category value.

wr.s3.to_parquet(
df=df_film,
path=s3_path_film_category,
dataset=True,
database='sakila_pressure',
table='film'
)

The last step is the same as in the coal layer lambda. With the transformation completed, we can proceed to save our data frame in the pressure layer. We should be able to see the result there.

Processing Diamond Layer

We finally have our data clean and with some added information. In this final step, we will add fields from other sources to enhance it further, so we get a more complete dataset.

#### MERGING DATAFRAMES #####
df_film = pd.merge(df_film, df_inventory, on='film_id')
df_film = pd.merge(df_film, df_rental, on='inventory_id')
df_film = pd.merge(df_film, df_payment, on='rental_id')

wr.s3.to_parquet(
df=df_film,
path=s3_path_film_sales,
dataset=True,
database='sakila_diamond',
table='film_sales'
)

We proceed to save it like in the previous steps and store it in the diamond layer. And with that, we will have a data model to help us create some insights.

Final Thoughts

If you have been following these steps, you have already realized that we have managed to set up an ELT process for our data. We have accomplished this thanks to all our prep work in different AWS Services. We can now safely store our credentials for different data sources with Secret Manager. We also have a role for lambda functions for different data processing steps. Athena and Glue allow us to read our stored columnar files easily, and using AWS Data Wrangler, we can tie everything together. Improvements can be made, like applying less privilege to our lambda roles, automating the execution of the ELT process, etc. But the objective of this article was to focus on how to take advantage of current AWS services to do an ELT tailored for this use case. I hope this goal has been achieved, and I will see you in the next article, my dear fellows.

Bibliography

--

--