Serverless Data Pipeline Using AWS Glue and AWS CDK (Python)

Serverless data pipeline with Slack notifications

Anna Pastushko
CodeX
5 min readJan 16, 2023

--

Architecture diagram

Problem

Data providers upload raw data into S3 bucket. Then data engineers complete data checks and perform simple transformations before loading processed data to another S3 bucket, namely:

  1. Ensure “Currency” column contains only “USD”.
  2. Ensure “Currency” column has no missing values.
  3. Drop “Currency” column as there is only one value given — “USD”.
  4. Add a new “Average” column based on “High” and ”Low” columns.
  5. Save processed data to S3 bucket in parquet format.

Solution

Here is what I may offer them.

To trigger the process by raw file upload event, (1) enable S3 Events Notifications to send event data to SQS queue and (2) create EventBridge Rule to send event data and trigger Glue Workflow. Both event handlers are needed because they have different ranges of targets and different event JSON structures.

Once the new raw file is uploaded, Glue Workflow starts.

The first component of Glue Workflow is Glue Crawler. It polls SQS queue to get information on newly uploaded files and crawls only them instead of a full bucket scan. If the file is corrupted, then process will stop and error event will be generated. Handling error events is not in the scope of this solution because it varies based on business needs, e.g. error event can be sent to Slack, or it might trigger an entirely new workflow.

The second component of Glue Workflow is Glue Job. It completes the business logic (data transformation and end user notification) and saves the processed data to another S3 bucket. Like Glue Crawler, in case of failure, it generates error event which can be handled separately.

The solution diagram is given in the header of this article.

Implementation

Local Development Environment Bootstrapping

  1. Install AWS CLI and set up credentials.
  2. Install NodeJS to be able to use CDK.
  3. Install CDK using the command sudo npm install -g aws-cdk.
  4. Create a new directory for your project and change your current working directory to it.
  5. Run cdk init --language python to initiate the CDK project.
  6. Run cdk bootstrap to bootstrap AWS account with CDK resources.
  7. Install Docker to run Docker container inside Lambda.

CDK Stack

This is the final look of the project. I will provide a step-by-step guide so that you’ll eventually understand each part of it.

DataPipeline
├── assets
│ └── glue_job.py
├── cdk.out
│ └── ...
├── stacks
│ ├── __init__.py
│ └──glue_pipeline_stack.py
├── app.py
├── cdk.json
└── requirements.txt

Our starting point is the stacks directory. It contains a mandatory empty file __init__.py to define a Python package and glue_pipeline_stack.py.

In glue_pipeline_stack.py, you import required libraries and constructs and define GluePipelineStack class (any name is valid) which inherits cdk.Stackclass.

Next, you create three S3 buckets for raw/processed data and Glue scripts using Bucket construct. Glue Scripts, in turn, are going to be deployed to the corresponding bucket using BucketDeployment construct.

Typically raw data is accessed within several first days after upload, so you may want to add lifecycle_rules to transfer files from S3 Standard to S3 Glacier after 7 days to reduce storage cost.

⚠️ NB. When the stack is destroyed, buckets and files are deleted. You can prevent this from happening by removing removal_policy and auto_delete_objects arguments.

Next, you create SQS queue and enable S3 Event Notifications to target it. This combination allows you to crawl only files from the event instead of recrawling the whole S3 bucket, thus improving Glue Crawler’s performance and reducing its cost.

After that, you create Glue Database using CfnDatabase construct and set up IAM role and LakeFormation permissions for Glue services.

⚠️ NB. Access to AWS Glue Data Catalog and Amazon S3 resources are managed not only with IAM policies but also with AWS Lake Formation permissions. You get Insufficient Lake Formation permission(s) error when the IAM role associated with the AWS Glue crawler or Job doesn’t have the necessary Lake Formation permissions.

Next, you create Glue Crawler and Glue Job using CfnCrawler and CfnJob constructs.

In this case, recrawl_policy argument has a value of CRAWL_EVENT_MODE, which instructs Glue Crawler to crawl only changes identified by Amazon S3 events hence only new or updated files are in Glue Crawler’s scope, not entire S3 bucket.

Also, in this example, I used the awswrangler library, so python_version argument must be set to 3.9 because it comes with pre-installed analytics libraries. In case you don’t need those, you can check the documentation to see which version suits your needs.

In order to automate Glue Crawler and Glue Job runs based on S3 upload event, you need to create Glue Workflow and Triggers using CfnWorflow and CfnTrigger.

glue_crawler_trigger waits for EventBridge Rule to trigger Glue Crawler.

glue_job_trigger launches Glue Job when Glue Crawler shows “success” run status.

The final step in the GluePipelineStack class definition is creating EventBridge Rule to trigger Glue Workflow using CfnRule construct.

Usually, I prefer to use second level constructs like Rule construct, but for now you need to use first level construct CfnRule because it allows adding custom targets like Glue Workflow. In the documentation you can find the list of targets supported by the Rule construct. It might be changed in the future, but this is not an option for now.

I think parameters are pretty self-explanatory, so I believe it won’t be a hard time for you.

Next, go to the assets directory, where you need to create glue_job.py with data transformation logic.

First, you create Utils class to separate business logic from technical implementation. There are two functions in Utils class: get_data_from_s3 and send_notification.

Alas, it is not possible to get the file name directly from EventBridge event that triggered Glue Workflow, so get_data_from_s3 method finds all NotifyEvents generated during the last several minutes and compares fetched event IDs with the one passed to Glue Job in Glue Workflow’s run property field. Once match is found, method finds file using object key from event and loads it to pandas DataFrame.

Also, don’t forget to replace _url with your own Slack hook.

Next, you initialize the Utils class and define the data transformation and validation steps.

Now you need to move back to the parent directory and open app.py file where you use App construct to declare the CDK app and synth() method to generate CloudFormation template.

Now you are able to deploy stack to AWS using command cdk deploy and feel the power of deployment automation.

🎉 🎉🎉 Congratulations, you have just deployed your stack and the workload is ready to be used.

Account cleanup

You can delete all resources created in your account during development by following steps:

  1. Run the following command to delete stack resources: cdk destroy
  2. Clean ECR repository and S3 buckets created for CDK because it can incur costs.

Conclusion

AWS CDK provides you with an extremely versatile toolkit for application development. It can be challenging at first, but your efforts will pay off in the end because you will be able to manage and transfer your application with one command.

CDK resources and full code can be found in the GitHub repository.

🎀 Thank you for reading till the end. I do hope it was helpful, please let me know in the comments if you spot any mistakes.

--

--