Create Serverless Data Pipeline Using AWS CDK (Python)

Anna Pastushko
CodeX
Published in
8 min readJul 9, 2022
An architecture diagram for data pipeline, Athena query and CI/CD process describing all AWS services used, namely S3, Lambda, SQS, Glue, CodePipeline, CloudFormation and Athena. Implemented as Python CDK project (IaC).
Architecture diagram

Context

The data science team is about to start a research study and they requested a solution on the AWS cloud. Here is what I got to offer them:

Main process (Data Processing):

  • Device uploads .mat file with ECG data to S3 bucket (Raw)
  • Upload triggers event creation which is sent to the SQS queue.
  • Lambda polls the SQS queue (event mapping invocation) and starts processing event. Lamba’s runtime is a Python Docker container because the libraries’ size exceeds the layers’ size limit of 250 MB. If any error occurs in the process, you’ll get a notification in Slack.
  • Once completed, processed data in parquet format is saved to S3 Bucket (Processed).
  • To enable data scientists to query the data, the Glue Crawler job creates a schema in the Data Catalog. Then, Athena is used to querying the Processed bucket.

Secondary process (CI/CD):

When a developer wants to change processing job logic, he should just prepare the changes and commit them to the CodeCommit repository. Everything else is automated and handled by CI/CD process.

CodeBuild service converts CDK code into CloudFormation template and deploys it to your account. In other words, it creates all infrastructure components automatically. Once completed, deployed resources’ group (stack) is available in CloudFormation service on web UI. To simplify these two steps and provide self-updates for CI/CD process as well, CodePipeline abstraction is used. You’ll also get Slack notifications about the progress.

Preparation

To prepare your local environment for this project, you should follow the steps described below:

  1. Install AWS CLI and set up credentials.
  2. Install NodeJS to be able to use CDK.
  3. Install CDK using the command sudo npm install -g aws-cdk.
  4. Create a new directory for your project and change your current working directory to it.
  5. Run cdk init --language python to initiate the CDK project.
  6. Run cdk bootstrap to bootstrap AWS account with CDK resources.
  7. Install Docker to run Docker container inside Lambda.

Project Structure

This is the final look of the project. I will provide a step-by-step guide so that you’ll eventually understand each component in it.

DataPipeline
├── assets
├── lambda
├── dockerfile
└── processing.py
├── cdk.out
└── ...
├── stacks
├── __init__.py
├── data_pipeline_stack.py
├── cicd_stack.py
└── data_pipeline_stage.py
├── app.py
├── cdk.json
└── requirements.txt

Our starting point is stacks directory. It contains a mandatory empty file __init__.py to define a Python package. Three other files are located here:

  • data_pipeline_stack.py
  • cicd_stack.py
  • data_pipeline_stage.py

At first, we open data_pipeline_stack.py and import all libraries and constructs needed for further development. Also, we need to define a class with a parent class cdk.Stack.

After that, we use SQS Queue construct to connect the S3 bucket and Lambda. Arguments are pretty simple: stack element id (‘raw_data_queue’) , queue name (‘data_pipeline_queue’) and the time that message in the queue will not be visible after Lambda takes it for processing (cdk.Duration.seconds(200)). Note that the visibility timeout value depends on your processing time — if processing takes 30 seconds, it is better to set it to 60 seconds. In this case, I set it to 200 seconds because processing takes ~100 sec.

Next, we will create S3 buckets for raw and processed data using Bucket construct. Having in mind that raw data usually is accessed within several first days after upload, we can add lifecycle_rules to transfer data from S3 Standard to S3 Glacier after 7 days to reduce storage cost.

Also, we need to connect the raw bucket and SQS queue to define the destination for events that are generated from the bucket. For that, we use add_event_notification method with two arguments: the event we want the queue to be notified on (_s3.EventType.OBJECT_CREATED) and destination queue to notify (_s3n.SqsDestination(data_queue)).

⚠️ After stack is destroyed, bucket and all the data inside it will be deleted. This behaviour can be changed by deleting (setting to default) removal_policy and auto_delete_objects arguments.

Next step is to create Lambda using DockerImageFunction construct. Please refer to the code below to see what arguments I define. I think they are pretty self-explanatory and you are already familiar with previous examples so I believe it won’t be a hard time. In case of trouble please refer to the documentation.

⚠️ The only thing I should highlight is the value of timeout parameter in Lambda — it should always be less than visibility_timeout parameter in Queue (180 vs 200).

Then, we attach a policy to automatically created a Lambda role, so it can process files from S3 using attach_inline_policy method. You can tune the actions/resource parameters to grant Lambda more granular access to S3.

Now we move to assets directory.

There we need to create dockerfile and processing.py with data transformation logic, which is pretty simple. At first, we parse event from SQS to get information about file and bucket, then parse .mat file with ECG data, clean it and save it in .parquet format to Processed bucket. Also, it includes logging and Slack error messages. In the end, we should delete message from queue, so file is not processed again.

For your pipeline, you can change processing logic and replace _url with your own Slack hook.

Let’s go quickly through the logic of Docker file: at first, we pool special image for Lambda from AWS ECR repository, then install all Python libraries, copy our processing.py script to container and run command to launch handler function from the script.

⚠️ Do not forget add libraries you used in processing.py to dockerfile.

At this stage we finished creating Data pipeline stack and can go further and start developing CI/CD stack.

CI/CD stack

For CI/CD process we will use CodePipeline service, which helps us to make deployment process easier. Each time we change Data pipeline or CI/CD stack via CodeCommit push, CodePipeline will automatically re-deploy both stacks. In short, stack for application should be added to CodePipeline stage, after that stage is added to CodePipeline. After that app is synthesised for CI/CD stack, not application stack. You can find more detailed description of connection between files and logic behind CodePipeline construct below.

Illustrates how AWS CodePipeline construct works in Python CDK. Stack for your application should be added first to file with CodePipeline stage, after that you can add stage to CodePipeline in cicd_stack file. And app will be synthesised for CI/CD stack, not your application stack.
Steps for CodePipeline stack creation

At first, we need to open cicd_stack.py and start with import of all libraries and constructs we will use. Later, we will create CodeCommit repository manually, but for now we need only reference to it, so we can add it as source for CodePipeline.

We use CodePipeline construct to create CI/CD process. We use parameter self_mutation set to True to allow pipeline to update itself, it has True value by default. Parameter docker_enables_for_synth should be set to True if we use Docker in our application stack. After that, we add stage with application deployment and initiate pipeline build to construct our pipeline. Latter is necessary step to set up Slack notifications in the future.

Next step is to configure Slack notifications for CodePipeline, so developers can monitor deployment. For that we use SlackChannelConfiguration construct. We can get value for slack_channel_id by right-clicking channel name and copying last 9 characters of URL. To get slack_workspace_id parameter value, use AWS Chatbot Guide. To define types of notifications we want to get, we use NotificationRule constract. If you want to define events for notification more granularly, use Events for notification rules.

ℹ️ With .pipeline property we refer to the CodePipeline pipeline that deploys the CDK app. It is available only after the pipeline has been constructed with build_pipeline() method. For source argument we should pass not construct, but pipeline object.

After defining pipeline we add stage for Data pipeline deploy. To make our project cleaner, we define stage for Data Pipeline deployment in separate file. For that we use cdk.Stage parent class.

For those of you, who use CDKv1, additional step is to modify cdk.json configuration file, you should add the following expression to context.

"context": {"@aws-cdk/core:newStyleStackSynthesis": true}

At this point, we created all constructs and files for Data pipeline stack. The only thing left is to create app.py with all final steps. We import all constructs we created from cicd_stack.py and create tags for all stack resources.

Congratulations, we finished creating our stacks. Now, we can finally create CodeCommit repository called data_pipeline_repository and push files to it.

Configuration for creation of CodeCommit repository via manual creation with tags.
CodeCommit repository creation

We can manually add the same tags as we created in the Stack, so we can see all our resources created for this task bound together in cost reports.

⚠️ Check limitations for CodeBuild in Service Quotas before deployment.

Congratulations, now we can finally deploy our stack to AWS using command cdk deploy and enjoy how all resources are set up automatically.

Athena queries

Let’s start with Glue Crawler creation, for that you need to go to Data Catalog section in Glue console and click on Crawlers. Then you should click on add crawler button and go over all steps. I added the same tags as for other Data pipeline resources, so I can track them together.

Configuration for creation of Glue Crawler via manual creation with tags.
Glue Crawler creation

Don’t change crawler source type, add S3 data store and specify path to your bucket in Include path. After that create new or add already existing role and specify how often you want to run it. Then you should create database , in my case I created ecg_data database. After all steps are completed and crawler is created, run it.

That is all we need to query processed_ecg_data table with Athena. Example of simple query can be found below.

Illustrates usage of Athena SELECT * query on created processsed_ecg_data table.
Athena query

Account cleanup

In case you want to delete all resources created in your account during development, you should perform the following steps:

  1. Run the following command to delete all stacks resources:
    cdk destroy CodePipelineStack/DataPipelineDeploy/DataPipelineStack CodePipelineStack
  2. Delete CodeCommit repository
  3. Clean ECR repository and S3 buckets created for Athens and CDK because it can incur costs.
  4. Delete Glue Crawler and Database with tables.

ℹ️ Command cdk destroy will only destroy CodePipeline (CI/CD) stack and stacks that depend on it. Since the application stacks don't depend on the CodePipeline stack, they won't be destroyed. We need to destroy Data pipeline stack separately, there is a discussion on how to delete them both.

It is not very convenient to delete some resources manually and there are several discussions with AWS developers to fix it.

Conclusion

CDK provides you with toolkit for development of applications based on AWS services. It can be challenging at first, but your efforts will pay off at the end. You will be able to manage and transfer your application with one command.

CDK resources and full code can be found in GitHub repository.

Thank you for reading till the end. I do hope it was helpful, please let me know if you spot any mistakes in the comments.

--

--