Photo by Scott Graham on Unsplash

How to work with Airflow Docker operator in Amazon MWAA

Felipe de Mello Rodrigues

--

Apache Airflow is an open-source workflow management platform which is usually associated with the orchestration of complex data pipelines. Airflow is written in Python and its workflow and tasks are configured via Python scripts following the configuration-as-code pattern.

Amazon MWAA (Managed Workflow for Apache Airflow) is a managed service for Apache Airflow released by AWS at the end of 2020, MWAA removes all the complexity related with infrastructure deployment allowing to focus on your data transformations and pipelines. If you are interested about this service and want to discover more have a look in my previous blog post on the MWAA series How to use Apache Airflow CLI with Amazon MWAA.

To support an easy development and integration Airflow relies on the concept of operators which represents a single, ideally idempotent, task. Operators allow to trigger specific tasks based on code developed by the community including integration tasks with external systems or cloud services.

The list of official operators is available in Airflow operators.

Docker operator in Apache Airflow

In the official list there is a particular operator which supports the execution of Docker containers as Airflow tasks, the name of the package is airflow.providers.docker.operators.docker.

In an on-premise installation the Docker operator works as expected but when we switch to Amazon MWAA there is a constraint that impacts its use.

The issue about Docker operator and Amazon MWAA

It is important to understand that in order to run Docker containers your Airflow tasks need a Docker host installed in the same server were the Airflow environment is running.

Since Amazon MWAA is a managed service and it doesn’t allow access to the underlying infrastructure there is no option available to install Docker host.

If you are moving your Airflow on-premise installation to Amazon MWAA you may think there is no alternative and you eventually need to replace this dependency. The good news is there is a simple and effective workaround that can be applied in your Airflow DAG to bypass this constraint.

In this blog post I introduce the concept of ECS operator and demonstrate how you can replace your Docker operators to continuously make use of containers in your MWAA workflows.

Amazon ECS and Airflow ECS operator

Notice the ECS operator is not part of the same list shared in the first section of this article but it is available in the package airflow.contrib.operators.ecs_operator and the official documentation is available in ECS operator.

In essence both operators need a valid Docker image to run the task but the ECS operator demands additional configuration related with AWS infrastructure, for example, IAM role with permissions to run an ECS task, VPC subnets and Security Group rules in case your Docker container needs to access external services.

In the next sections we will configure an MWAA environment, create a sample DAG with the ECS operator and perform a quick demo to visualise how this operator integrates with the Airflow tasks while running from a MWAA environment.

Amazon MWAA provisioning and setup

For this demo, I deploy a basic MWAA environment and to make the process easier I requested the automatic creation of Security Group and IAM role via MWAA setup in the AWS console but I am using a custom VPC with private subnets and internet access via NAT Gateway, this point is very important since I am planning to pull the container image from a public registry.

Important to mention that my codes are based on Airflow version 2.0.2, if you are running Airflow version 1.x.x you may find some errors since the package structure has being rebuilt in between the major versions.

Let’s have a look in the additional infrastructure that needs to be provisioned.

ECS cluster and ECS task definition

First, create a new ECS cluster in your AWS account. For this example I decided to use FARGATE as the computing resource for my services but feel free to adapt the example to your own requirements. The name selected for my ECS cluster is airflow-ecs-cluster.

The ECS task definition is configured based on the following attributes:

  • Task Definition Name: airflow-ecs-operator
  • Type: FARGATE
  • Network Mode: awsvpc
  • Task execution role: ecsTaskExecutionRole (default)

Container Definitions

  • Container name: hello-world
  • Image: hello-world:latest

Task Size

  • Task memory (GB): 0.5GB
  • Task CPU (vCPU): 0.25 vCPU

To avoid increasing the complexity of this example I use the image hello-world published in Docker Hub (public registry). You can adapt this example to pull images from ECR by giving the right permission to your IAM execution role.

Important note: keep in mind we need access to the internet via NAT Gateway to pull the images from Docker Hub since our deployment is inside a private subnet.

The ECS task definition is now created and available for deployment.

CloudWatch log group

When you create an ECS task via AWS console a CloudWatch group is generated automatically based on the name of the task. If you are deploying the resources via AWS CLI or infrastructure-as-code pay attention to the CloudWatch group name and log stream prefix assigned to the ECS task definition.

If you create the resources via AWS console like in my case, have a look in the ECS task definition to confirm the name of the group and log stream prefix, this information is relevant later during the configuration of our DAG. In my example these are the values generated by the task definition:

  • CW group: /ecs/airflow-ecs-operator
  • CW log stream prefix: ecs/hello-world

The CW group is important because the ECS operator collects the logs of the Docker container and returns to the Airflow task automatically, without the right values for the CW group we cannot configure the ECS operator properly and the logs won’t be collected from the ECS task.

IAM role

As discussed before, the IAM role of MWAA was created automatically during MWAA setup via AWS console and it includes several policies for specific services such as MWAA, S3, CloudWatch, SQS and KMS. All of these services are used by MWAA architecture and underlying infrastructure.

In order to grant access to run ECS tasks we need to attach a new inline policy to the same IAM role created by MWAA. Edit your IAM role and add the following content to the policy:

Important note: replace the values ${aws_region} and ${aws_account_id} by your respective region and account.

Notice the name of the IAM role for ECS task execution is ecsTaskExecutionRole and the CloudWatch group name is /ecs/airflow-ecs-operator. If you decide to use different resources remember to update these values in the IAM policy.

Finally, notice in the last section we grant access to read a few SSM parameters. They will be used later to collect relevant data for the operator configuration from inside the DAG code.

VPC and Security Group

Since the ECS task definition is based on network mode awsvpc we need a valid set of subnets and security group to run the ECS task.

Feel free to select your own parameters but just keep in mind you may need to adapt this configuration depending on the requirements of your Docker container, for example:

  • Does the container need access to the public internet? In this case your subnet needs to have access to an Internet Gateway or NAT Gateway.
  • Does the container need access to specific services such as RDS? In this case your security group needs to be configured with proper inbound rules in the RDS security group.

Create an Airflow DAG with the ECS operator

With all the pre-requirements fulfilled it is time to start the Airflow DAG and verify the results.

Access your Airflow UI based on the link provided in the MWAA environment and verify the list of DAGs. At the beginning, no DAGs are available so it is time to upload our first DAG to the S3 bucket.

The code below contains a sample of the Airflow ECS operator and it uses a set of SSM parameters to collect the right configuration.

Save the code above in a file named airflow_dag_ecs_operator.py and upload the file to the DAGs folder inside your MWAA S3 bucket. If everything works fine you should see the new DAG available in your Airflow UI in a matter of seconds right after the upload.

Configuration with the SSM parameters

Notice there are six variables being retrieved automatically from SSM Parameter Store from inside of the Python script associated with the DAG.

These parameters contain all the relevant information requested by the ECS operator, attributes such as ECS cluster name, ECS task definition, VPC subnets, Security Group, CW group name and log prefix.

Create all the parameters in SSM Parameter Store prior to the first run of the Airflow DAG.

The names and values are available in the table below. Keep in mind to replace the samples for subnets and security group by your own values.

Execute the DAG and test the ECS operator

Let’s activate the DAG and trigger a manual run via Airflow UI.

During the process have a look at the ECS cluster and notice the ECS task being provisioned while your Airflow task is running in parallel.

After the task is concluded check the stopped tasks and open the logs via the ECS console.

Notice the messages printed by the hello-world container were transferred to the output of the ECS task (CloudWatch). Open the CW group and check the content, a new log stream is available.

After checking the ECS logs get back to the Airflow UI. The task status is completed and marked as successful.

Open the Airflow logs and notice the same data from CloudWatch was transferred automatically from the log stream to the Airflow task.

This data is very important and the automatic collection of logs is quite handy since we don’t need to look for logs across different AWS services and now Airflow holds all the relevant information related with the workflow. All thanks to the ECS operator.

Conclusion

As you can see, the ECS operator provides a powerful integration component and an incredible workaround while using Docker containers in Airflow environments deployed through Amazon MWAA.

Hope this article helped you in your Airflow and Amazon MWAA journey.

If you are interested in different topics related with AWS and Data Engineering stay tuned and see you in the next post. :)

--

--

Felipe de Mello Rodrigues
Felipe de Mello Rodrigues

Written by Felipe de Mello Rodrigues

Lead Data Architect & Engineer at Thoughtworks. Experience in multiple cloud providers such as AWS, Databricks, Confluent Cloud, Fivetran, DBT and Snowflake.