Orchestration and Automation with Managed Apache Airflow (MWAA)
Welcome back to the third part of our series on “Building an End-to-End ML Pipeline for Malware Detection.” In the previous two posts, we covered data and model development, and model tracking. In today’s post, we’ll be bringing everything together by connecting the dots, and creating an end-to-end machine learning pipeline — from data to deployment. We’ll be using Amazon EMR Serverless and Amazon Managed Workflows for Apache Airflow (MWAA), and at the end of this post, you will have a working pipeline using AWS technologies.
First, let’s take a look at Amazon EMR Serverless for running our PySpark data processing pipeline code. We built this code to transform the raw data into a feature dataset during the data wrangling phase. Amazon EMR Serverless provides a serverless runtime environment that abstracts infrastructure management. It automatically provisions resources based on application needs, allowing you to run PySpark jobs without creating a cluster or managing individual instances. This makes it easier to manage workloads and reduces the operational overhead of managing resources. The serverless model also enables more dynamic workload management and can help you operate more cost-effectively and at scale compared to traditional EMR clusters.
Before diving into the nitty-gritty, let’s take a moment to appreciate Amazon Managed Workflows for Apache Airflow (MWAA), a crucial piece of the puzzle. MWAA is your Airflow wingman, offering a managed service that makes orchestrating data workflows a breeze. Picture Apache Airflow with a squad of handy features like automatic scaling, built-in authentication, and top-notch security at your service, all without the headache of manual setup and upkeep. MWAA lets you define your workflows using a directed acyclic graph (DAG) — the perfect tool for sequencing your ML tasks and building that dream machine learning pipeline. Plus, it’s coder-friendly, supporting both the classic Airflow syntax and the shiny new Taskflow API for those who like to keep it fresh.
Setting up your pipeline
Here, we will go step-by-step in setting up our pipeline. Go to the blog series’ GitHub repository to follow along. You will specifically want to navigate to the `airflow/` directory.
AWS S3 configuration
- Log in to the AWS console and navigate to S3.
- Create a new bucket for Airflow:
– Click “Create Bucket”.
– Name the bucketairflow-<username>
.
– Keep other settings as default and click “Create bucket”. - Upload the
requirements.txt
:
– Place therequirements.txt
file at the root of your new bucket. - Upload DAGs:
– Navigate to theairflow
directory from this GitHub repository you've cloned.
– Upload the entiredags
directory, including all subdirectories and files, to your new S3 bucket underairflow-<username>/dags/
. This includesclassic_syntax.py
andutils/
(utility scripts and the configuration file). - Upload pipeline scripts:
– Additionally, upload thepipeline
directory containing thepreprocess.py
script toairflow-<username>/pipeline/
. Ensure it mirrors the structure in your repository:pipeline/preprocess.py
(script for data preprocessing). - Create a secondary S3 bucket for datasets and outputs:
– Name the secondary bucketairflow-<username>-data
.
– Create the following directories inside this bucket:benchmark/
,groundtruth/
,predictions/
,raw/
,test/
,train/
,validation/
.
– Uploadbenchmark.txt
from the repository to thebenchmark/
directory.
– Download the Kaggle dataset “Malware Detection in Network Traffic Data” from here and upload CSV files to theraw/
directory.¹
Notes for customization and understanding dependencies
Training script from GitHub:
- The
train.py
script is referenced from a GitHub repository in theprepare_configs
function defined inairflow/dags/utils/helper_functions.py
. This function configures a SageMaker SKLearn estimator with the script sourced directly from:
estimator = SKLearn(
role=role,
instance_count=1,
instance_type="ml.m5.xlarge",
source_dir="airflow/pipeline",
entry_point="train.py",
git_config={
"repo": "https://github.com/JamesFCoffey/malware-detection-ml-pipeline.git"
},
hyperparameters=hyperparameters,
metric_definitions=metric_definitions,
framework_version="1.2-1",
py_version="py3",
)
- If you need to use a different version of the
train.py
or a different repository, modify thegit_config
parameter accordingly.
Requirements file dependency:
- The
requirements.txt
file references aconstraints.txt
located in a GitHub repository. This is specified within therequirements.txt
as:
--constraint "https://raw.githubusercontent.com/JamesFCoffey/malware-detection-ml-pipeline/main/airflow/constraints.txt"
- This ensures that all Python packages are installed with versions compatible with the constraints specified in the
constraints.txt
. If you wish to customize or update the constraints, you may edit this URL or modify theconstraints.txt
file directly in the repository.
Setting up managed Apache Airflow (MWAA)
- Navigate to managed Apache Airflow in the AWS Console and create a new environment:
– Set the environment name to “MyAirflowEnvironment”.
– Select Airflow version “2.8.1”.
– Schedule a weekly maintenance window at a convenient time. - DAG code in Amazon S3:
– Set the S3 bucket toairflow-<username>
.
– Set the DAGs folder toairflow-<username>/dags
.
– Set the requirements file toairflow-<username>/requirements.txt
.
– Click “Next” to proceed to “Configure Advanced Settings”. - Networking:
– In the MWAA setup interface, select “Create MWAA VPC”. This action will open AWS CloudFormation in a new tab.
– Proceed with the default settings in CloudFormation and click “Create stack”. This process typically takes about 2.5 minutes.
– Once the stack creation completes, navigate to the “Resources” tab within the CloudFormation stack.
– Scroll to find the Logical ID labeled “VPC” and note down the value of the Physical ID associated with it.
– Return to the MWAA environment configuration tab in your browser.
– Click the refresh button under “Virtual private cloud (VPC)” and then set the VPC to the noted Physical ID. Subnets 1 and Subnet 2 should be automatically set by the CloudFormation stack.
– Set web server access to “Public network (Internet accessible)”.
– Select “Create new security group”.
– Leave “Service managed endpoints” selected.
Note:
If you do not already have a dedicated VPC for MWAA, I recommend selecting “Create MWAA VPC”. This choice will ensure the correct configuration of your Airflow resources within AWS. Also, while “Public network (Internet accessible)” is convenient, it may not be suitable for all security requirements. Depending on the sensitivity of your workflows and organizational security policies, consider restricting access to a more controlled network setting. - Defaults:
– Keep “Environment class” set tomw1.small
with the following settings: Maximum worker count =10
, Minimum worker count =1
, Scheduler count =2
.
– Under “Encryption”, keep “Customize encryption settings (advanced)” unselected.
– “Monitoring” should be kept to “Airflow task logs” at a log level “INFO”.
– “Airflow configuration options” and “Tags” should be empty. - Permissions:
– Select “Create a new role”. It should be of the format “AmazonMWAA-MyAirflowEnvironment-<xxxxxx>”.
– Click “Next” to proceed to “Review and create”. - Review and Create the Environment:
– Ensure all settings are correct.
– Click “Create environment”.
– Continue with the the instruction steps below as the environment setup should take about 26 minutes.
Setting up EMRServerlessS3RuntimeRole in IAM
Create an IAM policy
- Navigate to IAM: In the AWS Console, navigate to the Identity and Access Management (IAM) service.
- Access management: On the left-hand menu, click on Policies under “Access management”.
- Create a new policy:
– Click on Create policy.
– Select the JSON tab to start with a blank policy template.
– Open theemr-access-policy.json
file found in your cloned repo underairflow/iam/
.
– Modify the resource ARNs in the JSON file to match the names of your S3 buckets: Replace"arn:aws:s3:::<AIRFLOW_S3_BUCKET_NAME>"
and"arn:aws:s3:::<DATASET_S3_BUCKET_NAME>"
with your actual bucket names.
– Copy the modified JSON content and paste it into the policy editor in the AWS console.
– Click Next.
– Enter a meaningful name for this policy, e.g.,EMRServerlessS3AccessPolicy
.
– Optionally, provide a description for the policy.
– Click Create policy.
Create an IAM role
- Create role:
– In the IAM service, click on Roles under “Access management” on the left-hand menu.
– Click Create role.
– Select Custom trust policy under “Trusted entity type”.
– Open theemr-serverless-trust-policy.json
file from yourairflow/iam/
directory.
– Copy the content of the trust policy and paste it into the policy editor in the AWS console.
– Click Next. - Attach permissions:
– Under “Permissions policies”, search for the policy you created earlier (EMRServerlessS3AccessPolicy
).
– Select the checkbox next to the policy to attach it to the role.
– Click Next. - Finalize role creation:
– Enter a meaningful name for the role, such asEMRServerlessS3RuntimeRole
.
– Provide a description, e.g., “Role with policies for using EMR Serverless with an Airflow pipeline.”
– Click Create role.
Setting up Amazon SageMaker
Create a SageMaker domain
- Navigate to Amazon SageMaker:
– Log into the AWS Console.
– Open the Amazon SageMaker service. - Set up domain:
– Click on Domains under “Admin configurations” on the left-hand menu.
– Click Create domain.
– Select Set up for single user (Quick setup).
– Click Set up. - Domain configuration:
– Once the domain has been created, click on it to view the Domain details.
– Navigate to the Domain settings tab.
– Note the Execution role listed there which follows the formatAmazonSageMaker-ExecutionRole-<xxxxxxxxxxxx>
. This role will be modified in the next steps to grant additional permissions.
Modify the SageMaker execution role
- Access IAM for role configuration:
– Navigate to Identity and Access Management (IAM) in the AWS Console.
– Click on Roles under “Access management” in the left-hand menu. - Locate and modify the role:
– Use the search function to find the SageMaker execution role noted earlier. You might need to click the refresh roles button if the newly created role does not appear immediately.
– Click on the role name. - Add permissions:
– Under Permissions policies, click Add permissions.
– Choose Attach policies from the drop-down menu.
– In the search bar, type and select AmazonS3FullAccess to allow the role to interact fully with Amazon S3 resources, which is crucial for storing and retrieving data and model artifacts. Note: Ensure to review and adjust the permissions according to your organizational security policies, as granting full S3 access can expose sensitive data if not properly managed.
– Click Add permissions.
Modify the Amazon MWAA execution role
Navigate to the MWAA environment
- Access MWAA:
– In the AWS Console, navigate to Amazon MWAA. - View environment details:
– Click on Environments in the left-hand menu.
– Select the name of your Airflow environment, typically “MyAirflowEnvironment” if you have followed the setup instructions.
– Scroll to the bottom of the environment details page to the Permissions section.
– Note the Execution Role listed under “Permissions,” typically in the formatAmazonMWAA-MyAirflowEnvironment-<xxxxxx>
.
Modify the IAM role
- Access IAM:
– Open a new tab in your browser and navigate to Identity and Access Management (IAM) within the AWS Console. - Find and modify the role:
– Click on Roles under “Access management” on the left-hand menu.
– Use the search function to find the execution role you noted earlier.
– Click on the role name. - Edit role permissions:
– Under Permissions policies, find the only policy listed and click on the policy name.
– On the policy summary page, click Edit.
– Switch to the JSON tab in the “Policy editor”. - Update policy JSON:
– Open the fileAmazonMWAA-MyAirflowEnvironment-access-policy.json
from theairflow/iam/
directory in your local repo.
– Replace the placeholders in the policy JSON with actual values.
– Copy the modified policy JSON into the AWS policy editor. - Save changes:
– After ensuring all placeholders are correctly replaced, click Review policy.
– Verify the changes and then click Save changes to update the policy.
Notes on configuration
- S3 bucket names: The
<AIRFLOW_S3_BUCKET_NAME>
and<DATASET_S3_BUCKET_NAME>
are your primary and dataset-specific S3 buckets, respectively, used for storing everything from logs to datasets. - Role names: The
<EMR_ROLE_NAME>
and<SAGEMAKER_ROLE_NAME>
should reflect the roles created to give EMR and SageMaker the necessary permissions for operation. - Region specific: Ensure the
<REGION>
placeholder matches the AWS region in which your services are hosted to avoid any regional discrepancies in resource access.
Pipeline configuration
Edit the config.py
File
- Retrieve the configuration file:
– Theconfig.py
file, which was previously uploaded to yourairflow-<username>
S3 bucket, needs to be updated with actual values you've collected throughout the setup process.
– Download the file from the S3 bucket located atdags/utils/config.py
or access it directly in your cloned repository atairflow/dags/utils/config.py
. - Edit configuration values:
– Open theconfig.py
file in a text editor of your choice.
– Replace the placeholders with the actual configuration values.
– Ensure that the ARN formats in the file reflect the exact ARN formats used in AWS IAM for EMR and SageMaker roles. - Example of edited
config.py
:
# config.py
config = {
"account_id": "123456789012",
"airflow_bucket": "airflow-johndoe",
"dataset_bucket": "airflow-johndoe-data",
"emr_role_arn": "arn:aws:iam::123456789012:role/EMRServerlessS3RuntimeRole",
"default_monitoring_config": {
"monitoringConfiguration": {
"s3MonitoringConfiguration": {"logUri": "s3://airflow-johndoe/logs/"}
},
},
"sagemaker_role_arn": "arn:aws:iam::123456789012:role/service-role/AmazonSageMaker-ExecutionRole-20240224T215408",
}
Update the config file in S3
- Upload the updated config file:
– After making the necessary changes to theconfig.py
, save the file.
– Upload the updated file back to yourairflow-<username>
S3 bucket, specifically to the pathdags/utils/config.py
.
– You can use the AWS Management Console, AWS CLI, or any S3 client tool to upload the file. - Verify the upload:
– Ensure that the updatedconfig.py
is correctly placed in the S3 bucket.
– You can check this by navigating to theairflow-<username>
bucket in the AWS S3 console and locating the file underdags/utils/
.
Create virtual environment for EMR Serverless
For more details, refer to the AWS documentation on “Using Python libraries with EMR Serverless”.
Building the Docker image
- Navigate to the directory:
– Open your terminal and navigate to theairflow/
directory where theDockerfile
is located. ThisDockerfile
should be set up to create a PySpark environment. Make sure your Dockerfile is properly configured to install PySpark and package it. - Build the image:
– Run the following Docker command to build the image and output the necessary environment files:
docker build --output . .
Handling the output
- Locate the archive:
– After the build process completes, look for thepyspark_venv.tar.gz
file in your current directory. This archive contains the virtual environment set up by the Docker build process. - Upload the archive to S3:
– Make sure your AWS CLI is configured with sufficient permissions to perform the upload.
– Use the AWS CLI to upload the virtual environment archive to your Airflow S3 bucket:
aws s3 cp pyspark_venv.tar.gz s3://airflow-<username>/venv/
Optional cleanup
- Remove the virtual environment directory:
– If you wish to clean up your local directory and remove the virtual environment that was created during the Docker build process, you can safely delete the directory:
rm pyspark_venv.tar.gz
Accessing the Airflow UI
- Once the environment is ready, open the Airflow UI from the MWAA console.
- Trigger the
sagemaker-ml-pipeline
DAG by clicking on the "Trigger DAG" button. - The DAG should take under 2 hours to complete.
Pro Tip
Be sure to go into Amazon SageMaker and delete your endpoint deployments so that you don’t get a scary bill from AWS. If you run the DAG multiple times, you will have multiple endpoints (plus the charges that are associated with each)!
Wrapping up
As we conclude this third part of our journey, we’ve mastered the art of crafting a seamless pipeline. We’ve leveraged both the prowess of Amazon EMR Serverless and Amazon Managed Workflows for Apache Airflow (MWAA). You now understand the integration and operation of these components, empowering you to streamline your data processing and machine learning workflows.
I hope you’ll take a look at the final post in this series. In this final part, we’ll wrap up our adventure. We’ll explore better tools that can boost our machine learning process. And we’ll think about how to use new technologies to keep our systems at the top of their game. We’ll also marvel at the distance we’ve traveled and the exciting possibilities that lie ahead.
Keep an eye out for upcoming posts, and be sure to follow me on X (aka Twitter). Your questions and comments are always welcome — they help shape the conversation and keep this learning journey rolling. Looking forward to hearing from you!
[1]: Stratosphere Laboratory. A labeled dataset with malicious and benign IoT network traffic. January 22th. Agustin Parmisano, Sebastian Garcia, Maria Jose Erquiaga. https://www.stratosphereips.org/datasets-iot23.