Orchestration and Automation with Managed Apache Airflow (MWAA)

James Coffey
11 min readMay 1, 2024

--

Photo by Kazuo ota on Unsplash

Welcome back to the third part of our series on “Building an End-to-End ML Pipeline for Malware Detection.” In the previous two posts, we covered data and model development, and model tracking. In today’s post, we’ll be bringing everything together by connecting the dots, and creating an end-to-end machine learning pipeline — from data to deployment. We’ll be using Amazon EMR Serverless and Amazon Managed Workflows for Apache Airflow (MWAA), and at the end of this post, you will have a working pipeline using AWS technologies.

First, let’s take a look at Amazon EMR Serverless for running our PySpark data processing pipeline code. We built this code to transform the raw data into a feature dataset during the data wrangling phase. Amazon EMR Serverless provides a serverless runtime environment that abstracts infrastructure management. It automatically provisions resources based on application needs, allowing you to run PySpark jobs without creating a cluster or managing individual instances. This makes it easier to manage workloads and reduces the operational overhead of managing resources. The serverless model also enables more dynamic workload management and can help you operate more cost-effectively and at scale compared to traditional EMR clusters.

Before diving into the nitty-gritty, let’s take a moment to appreciate Amazon Managed Workflows for Apache Airflow (MWAA), a crucial piece of the puzzle. MWAA is your Airflow wingman, offering a managed service that makes orchestrating data workflows a breeze. Picture Apache Airflow with a squad of handy features like automatic scaling, built-in authentication, and top-notch security at your service, all without the headache of manual setup and upkeep. MWAA lets you define your workflows using a directed acyclic graph (DAG) — the perfect tool for sequencing your ML tasks and building that dream machine learning pipeline. Plus, it’s coder-friendly, supporting both the classic Airflow syntax and the shiny new Taskflow API for those who like to keep it fresh.

Setting up your pipeline

Here, we will go step-by-step in setting up our pipeline. Go to the blog series’ GitHub repository to follow along. You will specifically want to navigate to the `airflow/` directory.

AWS S3 configuration

  1. Log in to the AWS console and navigate to S3.
  2. Create a new bucket for Airflow:
    – Click “Create Bucket”.
    – Name the bucket airflow-<username>.
    – Keep other settings as default and click “Create bucket”.
  3. Upload the requirements.txt:
    – Place the requirements.txt file at the root of your new bucket.
  4. Upload DAGs:
    – Navigate to the airflow directory from this GitHub repository you've cloned.
    – Upload the entire dags directory, including all subdirectories and files, to your new S3 bucket under airflow-<username>/dags/. This includes classic_syntax.py and utils/ (utility scripts and the configuration file).
  5. Upload pipeline scripts:
    – Additionally, upload the pipeline directory containing the preprocess.py script to airflow-<username>/pipeline/. Ensure it mirrors the structure in your repository: pipeline/preprocess.py (script for data preprocessing).
  6. Create a secondary S3 bucket for datasets and outputs:
    – Name the secondary bucket airflow-<username>-data.
    – Create the following directories inside this bucket: benchmark/, groundtruth/, predictions/, raw/, test/, train/, validation/.
    – Upload benchmark.txt from the repository to the benchmark/ directory.
    – Download the Kaggle dataset “Malware Detection in Network Traffic Data” from here and upload CSV files to the raw/ directory.¹

Notes for customization and understanding dependencies

Training script from GitHub:

  • The train.py script is referenced from a GitHub repository in the prepare_configs function defined in airflow/dags/utils/helper_functions.py. This function configures a SageMaker SKLearn estimator with the script sourced directly from:
estimator = SKLearn(
role=role,
instance_count=1,
instance_type="ml.m5.xlarge",
source_dir="airflow/pipeline",
entry_point="train.py",
git_config={
"repo": "https://github.com/JamesFCoffey/malware-detection-ml-pipeline.git"
},
hyperparameters=hyperparameters,
metric_definitions=metric_definitions,
framework_version="1.2-1",
py_version="py3",
)
  • If you need to use a different version of the train.py or a different repository, modify the git_config parameter accordingly.

Requirements file dependency:

  • The requirements.txt file references a constraints.txt located in a GitHub repository. This is specified within the requirements.txt as:
--constraint "https://raw.githubusercontent.com/JamesFCoffey/malware-detection-ml-pipeline/main/airflow/constraints.txt"
  • This ensures that all Python packages are installed with versions compatible with the constraints specified in the constraints.txt. If you wish to customize or update the constraints, you may edit this URL or modify the constraints.txt file directly in the repository.

Setting up managed Apache Airflow (MWAA)

  1. Navigate to managed Apache Airflow in the AWS Console and create a new environment:
    – Set the environment name to “MyAirflowEnvironment”.
    – Select Airflow version “2.8.1”.
    – Schedule a weekly maintenance window at a convenient time.
  2. DAG code in Amazon S3:
    – Set the S3 bucket to airflow-<username>.
    – Set the DAGs folder to airflow-<username>/dags.
    – Set the requirements file to airflow-<username>/requirements.txt.
    – Click “Next” to proceed to “Configure Advanced Settings”.
  3. Networking:
    – In the MWAA setup interface, select “Create MWAA VPC”. This action will open AWS CloudFormation in a new tab.
    – Proceed with the default settings in CloudFormation and click “Create stack”. This process typically takes about 2.5 minutes.
    – Once the stack creation completes, navigate to the “Resources” tab within the CloudFormation stack.
    – Scroll to find the Logical ID labeled “VPC” and note down the value of the Physical ID associated with it.
    – Return to the MWAA environment configuration tab in your browser.
    – Click the refresh button under “Virtual private cloud (VPC)” and then set the VPC to the noted Physical ID. Subnets 1 and Subnet 2 should be automatically set by the CloudFormation stack.
    – Set web server access to “Public network (Internet accessible)”.
    – Select “Create new security group”.
    – Leave “Service managed endpoints” selected.
    Note:
    If you do not already have a dedicated VPC for MWAA, I recommend selecting “Create MWAA VPC”. This choice will ensure the correct configuration of your Airflow resources within AWS. Also, while “Public network (Internet accessible)” is convenient, it may not be suitable for all security requirements. Depending on the sensitivity of your workflows and organizational security policies, consider restricting access to a more controlled network setting.
  4. Defaults:
    – Keep “Environment class” set to mw1.small with the following settings: Maximum worker count = 10, Minimum worker count = 1, Scheduler count = 2.
    – Under “Encryption”, keep “Customize encryption settings (advanced)” unselected.
    – “Monitoring” should be kept to “Airflow task logs” at a log level “INFO”.
    – “Airflow configuration options” and “Tags” should be empty.
  5. Permissions:
    – Select “Create a new role”. It should be of the format “AmazonMWAA-MyAirflowEnvironment-<xxxxxx>”.
    – Click “Next” to proceed to “Review and create”.
  6. Review and Create the Environment:
    – Ensure all settings are correct.
    – Click “Create environment”.
    – Continue with the the instruction steps below as the environment setup should take about 26 minutes.

Setting up EMRServerlessS3RuntimeRole in IAM

Create an IAM policy

  1. Navigate to IAM: In the AWS Console, navigate to the Identity and Access Management (IAM) service.
  2. Access management: On the left-hand menu, click on Policies under “Access management”.
  3. Create a new policy:
    – Click on Create policy.
    – Select the JSON tab to start with a blank policy template.
    – Open the emr-access-policy.json file found in your cloned repo under airflow/iam/.
    – Modify the resource ARNs in the JSON file to match the names of your S3 buckets: Replace "arn:aws:s3:::<AIRFLOW_S3_BUCKET_NAME>" and "arn:aws:s3:::<DATASET_S3_BUCKET_NAME>" with your actual bucket names.
    – Copy the modified JSON content and paste it into the policy editor in the AWS console.
    – Click Next.
    – Enter a meaningful name for this policy, e.g., EMRServerlessS3AccessPolicy.
    – Optionally, provide a description for the policy.
    – Click Create policy.

Create an IAM role

  1. Create role:
    – In the IAM service, click on Roles under “Access management” on the left-hand menu.
    – Click Create role.
    – Select Custom trust policy under “Trusted entity type”.
    – Open the emr-serverless-trust-policy.json file from your airflow/iam/ directory.
    – Copy the content of the trust policy and paste it into the policy editor in the AWS console.
    – Click Next.
  2. Attach permissions:
    – Under “Permissions policies”, search for the policy you created earlier (EMRServerlessS3AccessPolicy).
    – Select the checkbox next to the policy to attach it to the role.
    – Click Next.
  3. Finalize role creation:
    – Enter a meaningful name for the role, such as EMRServerlessS3RuntimeRole.
    – Provide a description, e.g., “Role with policies for using EMR Serverless with an Airflow pipeline.”
    – Click Create role.

Setting up Amazon SageMaker

Create a SageMaker domain

  1. Navigate to Amazon SageMaker:
    – Log into the AWS Console.
    – Open the Amazon SageMaker service.
  2. Set up domain:
    – Click on Domains under “Admin configurations” on the left-hand menu.
    – Click Create domain.
    – Select Set up for single user (Quick setup).
    – Click Set up.
  3. Domain configuration:
    – Once the domain has been created, click on it to view the Domain details.
    – Navigate to the Domain settings tab.
    – Note the Execution role listed there which follows the format AmazonSageMaker-ExecutionRole-<xxxxxxxxxxxx>. This role will be modified in the next steps to grant additional permissions.

Modify the SageMaker execution role

  1. Access IAM for role configuration:
    – Navigate to Identity and Access Management (IAM) in the AWS Console.
    – Click on Roles under “Access management” in the left-hand menu.
  2. Locate and modify the role:
    – Use the search function to find the SageMaker execution role noted earlier. You might need to click the refresh roles button if the newly created role does not appear immediately.
    – Click on the role name.
  3. Add permissions:
    – Under Permissions policies, click Add permissions.
    – Choose Attach policies from the drop-down menu.
    – In the search bar, type and select AmazonS3FullAccess to allow the role to interact fully with Amazon S3 resources, which is crucial for storing and retrieving data and model artifacts. Note: Ensure to review and adjust the permissions according to your organizational security policies, as granting full S3 access can expose sensitive data if not properly managed.
    – Click Add permissions.

Modify the Amazon MWAA execution role

Navigate to the MWAA environment

  1. Access MWAA:
    – In the AWS Console, navigate to Amazon MWAA.
  2. View environment details:
    – Click on Environments in the left-hand menu.
    – Select the name of your Airflow environment, typically “MyAirflowEnvironment” if you have followed the setup instructions.
    – Scroll to the bottom of the environment details page to the Permissions section.
    – Note the Execution Role listed under “Permissions,” typically in the format AmazonMWAA-MyAirflowEnvironment-<xxxxxx>.

Modify the IAM role

  1. Access IAM:
    – Open a new tab in your browser and navigate to Identity and Access Management (IAM) within the AWS Console.
  2. Find and modify the role:
    – Click on Roles under “Access management” on the left-hand menu.
    – Use the search function to find the execution role you noted earlier.
    – Click on the role name.
  3. Edit role permissions:
    – Under Permissions policies, find the only policy listed and click on the policy name.
    – On the policy summary page, click Edit.
    – Switch to the JSON tab in the “Policy editor”.
  4. Update policy JSON:
    – Open the file AmazonMWAA-MyAirflowEnvironment-access-policy.json from the airflow/iam/ directory in your local repo.
    – Replace the placeholders in the policy JSON with actual values.
    – Copy the modified policy JSON into the AWS policy editor.
  5. Save changes:
    – After ensuring all placeholders are correctly replaced, click Review policy.
    – Verify the changes and then click Save changes to update the policy.

Notes on configuration

  • S3 bucket names: The <AIRFLOW_S3_BUCKET_NAME> and <DATASET_S3_BUCKET_NAME> are your primary and dataset-specific S3 buckets, respectively, used for storing everything from logs to datasets.
  • Role names: The <EMR_ROLE_NAME> and <SAGEMAKER_ROLE_NAME> should reflect the roles created to give EMR and SageMaker the necessary permissions for operation.
  • Region specific: Ensure the <REGION> placeholder matches the AWS region in which your services are hosted to avoid any regional discrepancies in resource access.

Pipeline configuration

Edit the config.py File

  1. Retrieve the configuration file:
    – The config.py file, which was previously uploaded to your airflow-<username> S3 bucket, needs to be updated with actual values you've collected throughout the setup process.
    – Download the file from the S3 bucket located at dags/utils/config.py or access it directly in your cloned repository at airflow/dags/utils/config.py.
  2. Edit configuration values:
    – Open the config.py file in a text editor of your choice.
    – Replace the placeholders with the actual configuration values.
    – Ensure that the ARN formats in the file reflect the exact ARN formats used in AWS IAM for EMR and SageMaker roles.
  3. Example of edited config.py:
# config.py
config = {
"account_id": "123456789012",
"airflow_bucket": "airflow-johndoe",
"dataset_bucket": "airflow-johndoe-data",
"emr_role_arn": "arn:aws:iam::123456789012:role/EMRServerlessS3RuntimeRole",
"default_monitoring_config": {
"monitoringConfiguration": {
"s3MonitoringConfiguration": {"logUri": "s3://airflow-johndoe/logs/"}
},
},
"sagemaker_role_arn": "arn:aws:iam::123456789012:role/service-role/AmazonSageMaker-ExecutionRole-20240224T215408",
}

Update the config file in S3

  1. Upload the updated config file:
    – After making the necessary changes to the config.py, save the file.
    – Upload the updated file back to your airflow-<username> S3 bucket, specifically to the path dags/utils/config.py.
    – You can use the AWS Management Console, AWS CLI, or any S3 client tool to upload the file.
  2. Verify the upload:
    – Ensure that the updated config.py is correctly placed in the S3 bucket.
    – You can check this by navigating to the airflow-<username> bucket in the AWS S3 console and locating the file under dags/utils/.

Create virtual environment for EMR Serverless

For more details, refer to the AWS documentation on “Using Python libraries with EMR Serverless”.

Building the Docker image

  1. Navigate to the directory:
    – Open your terminal and navigate to the airflow/ directory where the Dockerfile is located. This Dockerfile should be set up to create a PySpark environment. Make sure your Dockerfile is properly configured to install PySpark and package it.
  2. Build the image:
    – Run the following Docker command to build the image and output the necessary environment files:
docker build --output . .

Handling the output

  1. Locate the archive:
    – After the build process completes, look for the pyspark_venv.tar.gz file in your current directory. This archive contains the virtual environment set up by the Docker build process.
  2. Upload the archive to S3:
    – Make sure your AWS CLI is configured with sufficient permissions to perform the upload.
    – Use the AWS CLI to upload the virtual environment archive to your Airflow S3 bucket:
aws s3 cp pyspark_venv.tar.gz s3://airflow-<username>/venv/

Optional cleanup

  1. Remove the virtual environment directory:
    – If you wish to clean up your local directory and remove the virtual environment that was created during the Docker build process, you can safely delete the directory:
rm pyspark_venv.tar.gz

Accessing the Airflow UI

  • Once the environment is ready, open the Airflow UI from the MWAA console.
  • Trigger the sagemaker-ml-pipeline DAG by clicking on the "Trigger DAG" button.
  • The DAG should take under 2 hours to complete.

Pro Tip

Be sure to go into Amazon SageMaker and delete your endpoint deployments so that you don’t get a scary bill from AWS. If you run the DAG multiple times, you will have multiple endpoints (plus the charges that are associated with each)!

Wrapping up

As we conclude this third part of our journey, we’ve mastered the art of crafting a seamless pipeline. We’ve leveraged both the prowess of Amazon EMR Serverless and Amazon Managed Workflows for Apache Airflow (MWAA). You now understand the integration and operation of these components, empowering you to streamline your data processing and machine learning workflows.

I hope you’ll take a look at the final post in this series. In this final part, we’ll wrap up our adventure. We’ll explore better tools that can boost our machine learning process. And we’ll think about how to use new technologies to keep our systems at the top of their game. We’ll also marvel at the distance we’ve traveled and the exciting possibilities that lie ahead.

Keep an eye out for upcoming posts, and be sure to follow me on X (aka Twitter). Your questions and comments are always welcome — they help shape the conversation and keep this learning journey rolling. Looking forward to hearing from you!

[1]: Stratosphere Laboratory. A labeled dataset with malicious and benign IoT network traffic. January 22th. Agustin Parmisano, Sebastian Garcia, Maria Jose Erquiaga. https://www.stratosphereips.org/datasets-iot23.

--

--