Apache Airflow

Apache Airflow (or simply Airflow) is a platform to programmatically author, schedule, and monitor workflows. If you are interested in adding your story to this publication please reach to us via #blogposts channel on Airflow slack.

Image created by chatGPT

Data Science Research Infrastructure with Airflow, Containers, and CICD at C2i Genomics

8 min readFeb 18, 2024

--

A blog post by Zohar Donenhirsh and Alina Aven

At C2i Genomics, we focused on enhancing cancer detection through our advanced cloud diagnostic platform. This platform employs a straightforward blood test, artificial intelligence, and whole-genome pattern recognition to identify cancer at the genomic level, enabling the detection of cancer recurrence earlier than other existing technologies.

Our analytical process involves extracting DNA from a blood sample and applying Next Generation sequencing technologies to read DNA sequences. Every blood test generates 200GB of data per sample; we require a platform to provision computing and storage resources for the analysis. For instance, running an algorithm that utilizes 4 CPU cores on 1000 samples necessitates about 200TB of storage and 4000 CPU cores. Our platform, with Apache Airflow as the beating heart, is designed to handle extensive data and computational requirements, allowing data scientists to work on thousands of samples without worrying about resource limitations.

Today’s post focuses on crafting a data science platform that aligns with our team’s needs. Let’s start by specifying the essential requirements and presenting our solution. Afterwards, we will provide an implementation guide.

Identifying Our Key Needs:

Our data scientists frequently develop and iterate algorithmic code, necessitating a system that supports:

  • Data scientist’s ability to develop and deploy code independently
  • Validating the algorithm on datasets that consist of thousands of samples.

We aim to provide a streamlined experience for the data scientists to develop, improve, validate, share, and compare results without worrying about resource allocation.

The main KPI is to shorten the development cycle.

Solution:

To meet these requirements, we developed an architecture integrating Apache Airflow, Azure Devops for version management and CICD pipelines, and AWS ECS to meet these requirements.

The architecture follows the schema below:

1. Apache Airflow: Airflow is a highly regarded platform among Data Engineers for orchestrating complex workflows. The platform’s user interface visualizes workflows through the Directed Acyclic Graphs (DAGs). Airflow is our natural choice for its capabilities of automating processes, monitoring workflows, and integrating with a wide range of services like AWS ECS, EKS, and Lambda.

2. AWS ECS (Elastic Container Service): AWS ECS is a container management service that simplifies the process of running, stopping, and managing Docker containers on a cluster. ECS is designed for high scalability and performance. It provides a robust environment for running, scaling, and handling the underlying infrastructure management tasks such as container orchestration and compute resource allocation.

3. ECR (Elastic Container Registry): ECR is AWS’s Docker container registry service, offering secure and scalable storage and management of container images. Integrated with ECS, it simplifies the container deployment process in the AWS environment.

4. Azure Repositories: Azure Repositories is a part of Azure DevOps; it provides Git for code source control and integrates with CI/CD pipelines for efficient code management and deployment.

Each component plays a key role in our architecture, providing an efficient and scalable environment for data scientists to develop, test, and deploy their algorithms. The integration of these services ensures that our workflow is not only powerful and reliable but also adaptable to the changing needs of data science projects.

Next, we will present the technical guide in detail. We assume that the reader already has an Airflow environment set up and is familiar with its basic functionalities, as our focus will be on how Airflow integrates with other components.

Cluster deployment

AWS ECS offers a robust container orchestration service. It provides two distinct launch types for managing computing resources:

  • EC2 Launch Type: The cluster can launch any EC2 type available on AWS. Scale is done on demand using autoscaling groups, capacity providers, and launch templates.
  • Fargate Launch Type: Fargate simplifies the process by handling provisioning, configuring, and managing EC2 instances but has limitations in storage and computing. For simplicity, we will use the Fargate configuration in the guide.

To deploy the cluster, we are using CDK. To run the code, you will need to set up a CDK environment or manually create the resources. This example deploys the cluster, VPC, and IAM roles, enabling us to create and run tasks on the cluster.

pip install aws-cdk-lib constructs
# Import necessary CDK modules
from aws_cdk import Stack, App
from aws_cdk import aws_ecs as ecs
from aws_cdk import aws_ec2 as ec2
from aws_cdk import aws_iam as iam
from constructs import Construct

class MyEcsClusterStack(Stack):
def __init__(self, scope: Construct, id: str, **kwargs) -> None:
super().__init__(scope, id, **kwargs)

# Create a new VPC
vpc = ec2.Vpc(self, "MyVpc", max_azs=3)

# Create the ECS Cluster
cluster = ecs.Cluster(self, "MyCluster", vpc=vpc)

# Create an ECS Task Role
task_role = iam.Role(self, "ecsTaskRole",
assumed_by=iam.ServicePrincipal("ecs-tasks.amazonaws.com"),
description="Role that the ECS task can assume")
task_role.add_managed_policy(iam.ManagedPolicy.from_aws_managed_policy_name("service-role/AmazonECSTaskExecutionRolePolicy"))

# Create an ECS Task Execution Role
execution_role = iam.Role(self, "ecsExecutionRole",
assumed_by=iam.ServicePrincipal("ecs-tasks.amazonaws.com"),
description="Task Execution Role that ECS can assume")
execution_role.add_managed_policy(iam.ManagedPolicy.from_aws_managed_policy_name("AmazonEC2ContainerRegistryReadOnly"))
execution_role.add_managed_policy(iam.ManagedPolicy.from_aws_managed_policy_name("CloudWatchLogsFullAccess"))
execution_role.add_managed_policy(iam.ManagedPolicy.from_aws_managed_policy_name("service-role/AmazonECSTaskExecutionRolePolicy"))

# Initialize the CDK App
app = App()
MyEcsClusterStack(app, "MyEcsClusterStack")
app.synth()

We also need to create a task definition specifying our tasks’ CPU and RAM requirements. This can be done using the AWS CLI command:

aws ecs register-task-definition \
--region us-east-1 \
--family my-task-definition \
--network-mode awsvpc \
--execution-role-arn arn:aws:iam::YOUR_AWS_ACCOUNT_NUMBER:role/ecsExecutionRole \
--container-definitions '[{
"name": "my-ecr",
"image": "my-ecr-image",
"essential": true
}]' \
--requires-compatibilities FARGATE \
--cpu "256" \
--memory "512"

The Development Phase:

The algorithm team is continuously developing new algorithms or improving existing ones. To facilitate a smooth transition from development to deployment, we’ve implemented a robust CI/CD pipeline. This system automatically builds a Docker image of the updated code and pushes it to an Elastic Container Registry (ECR) in AWS across all relevant regions. We use Azure repositories, where the developer pushes their changes to the repository, which triggers the CI/CD.

trigger:
branches:
include:
- main

pool:
vmImage: 'ubuntu-latest'

variables:
awsRegion: 'your-aws-region' # Example: us-east-1
awsCredentials: 'your-aws-service-connection-name' # Set this in Azure DevOps Service Connections
repositoryName: 'your-ecr-repository-name'
imageName: 'your-image-name'
tag: '$(Build.BuildId)'

stages:
- stage: BuildAndPush
displayName: 'Build and Push Docker Image'
jobs:
- job: Build
displayName: 'Build'
steps:
- checkout: self

- task: Docker@2
displayName: 'Build Docker Image'
inputs:
command: build
Dockerfile: '**/Dockerfile' # Update this if your Dockerfile is in a specific directory
tags: |
$(imageName):$(tag)

- task: Docker@2
displayName: 'Login to Amazon ECR'
inputs:
command: login
containerRegistry: $(awsCredentials)
# The AWS service connection configured in Azure DevOps must have permissions to push images to ECR.

- task: Docker@2
displayName: 'Push Docker Image to ECR'
inputs:
command: push
repository: $(repositoryName)
tags: |
$(imageName):$(tag)
# Ensure that your repositoryName corresponds to the ECR repository's name.

- script: echo "Image pushed to ECR successfully"
displayName: 'Confirm Image Push'

Airflow DAG

Once the cluster is set up and an image is ready in the Elastic Container Registry (ECR), we create the Directed Acyclic Graph (DAG). In this DAG, we employ the ECS operator to dispatch task execution requests to the cluster. These requests specify the ECR image to be used, the required environment variables, and the compute resources, as defined in the Task definition. The DAG parses a JSON file containing a list of inputs the user provides according to the dataset needed to run. This file is used to determine the number of tasks in the DAG dynamically. The process involves iterating over each input in the list and generating a unique cluster request for each one. The ECS Operator also pulls logs from the AWS Cloudwatch, which are available in the Airflow logs.

Create environment variables:

Airflow UI → Admin → Variables

Add your specific environment variables; they will be used in the dag.

Airflow DAG example that utilizes the ECSRunTaskOperator:

import datetime, os, json

from airflow.models import Variable
from airflow.decorators import dag, task
from airflow.operators.python import get_current_context
from airflow.providers.amazon.aws.operators.ecs import EcsRunTaskOperator


DAG_NAME = "ecs_demo_dag"
dag_path = os.path.dirname(os.path.abspath(__file__))
dag_vars = (Variable.get(key='environment_variables_demo', default_var=dict(), deserialize_json=True))

default_args = {
'depends_on_past': False,
'start_date': datetime.datetime(2023, 1, 1)
}

@dag(default_args=default_args, schedule_interval=None, max_active_runs=1, catchup=False)

def run_demo_ecs():
@task
def get_input_data():
dag_path = os.path.dirname(os.path.abspath(__file__))
input_file_path = os.path.join(dag_path, 'inputs.json')
with open(input_file_path, 'r') as file:
data = json.load(file)
return data['inputs']

@task
def run_ecs_task(arg):
aws_task_definition = 'MY_AWS_TASK_DEFINITION'
aws_ecr_image_name = 'MY_AWS_ECR_IMAGE'
ecs_task = EcsRunTaskOperator(
task_id=f'run_docker_{list(arg.keys())[0]}',
task_definition=aws_task_definition,
cluster=dag_vars.get('aws_cluster'),
launch_type='FARGATE',
overrides={
'containerOverrides': [
{
'name': aws_ecr_image_name,
'environment': [
{'name': 'SUBJECT_ID', 'value': str(arg['subject_id'])},
{'name': 'SAMPLE_ID', 'value': str(arg['sample_id'])}
],
},
],
},
network_configuration={
"awsvpcConfiguration": {
"securityGroups": dag_vars.get('aws_security_groups'),
"subnets": dag_vars.get('aws_subnets'),
},
},
awslogs_group=f'/ecs/{aws_task_definition}',
awslogs_stream_prefix=f'ecs/{aws_ecr_image_name}'
)
ecs_task.execute(context=get_current_context())


input_data_extractions_instance=get_input_data()
run_tasks = run_ecs_task.expand(arg=input_data_extractions_instance)

input_data_extractions_instance >> run_tasks

etl_bip_checkmate_dl_dag_instance = run_demo_ecs()

Example input file:

{
"inputs": [
{
"subject_id": "SUBJECT_1",
"sample_id": "SAMPLE_1"
},
{
"subject_id": "SUBJECT_2",
"sample_id": "SAMPLE_2"
}
]
}

Running

We have successfully established all necessary resources and implemented a method for data science users to update their code. Executing the DAG is straightforward: users simply supply a JSON file and initiate the run via the Airflow UI or CLI. Throughout this process, users can conveniently monitor their DAG’s progress in Airflow and review detailed logs for tasks that are currently running, have failed, or have completed successfully.

Cluster Monitoring and Limitations

We can also monitor tasks directly within the cluster, which proves invaluable for testing and troubleshooting our infrastructure. This is important due to failures that are cluster-related. For instance, we have experienced numerous challenges, such as throttling by the AWS API and limitations in the availability of EC2 machine capacity.

Output files governance and traceability

The output files produced by tasks running on the cluster are systematically stored in a centralized S3 location, with the corresponding JSON inputs archived alongside the results for easy reference. This arrangement allows users the flexibility to save various types of output files. The reproducibility of results is ensured through the use of the input JSON and the ECR. These results are organized dynamically yet orderly within a shared space. Furthermore, accessing these outputs is facilitated by integrating data lake tools and streamlining the data retrieval and analysis process.

Our platform’s abilities: How this solution meets our needs:

The platform and processes outlined in this guide are designed to tackle the challenges of orchestrating genomics processing pipelines and providing users with tools for rapidly iterating over new ideas.

The key features of our platform are as follows:

1. Streamlined Code Development: Users can deploy their published code directly to the platform.

2. Code Testing: Workflows can run dynamically on datasets of various sizes, according to user input.

3. Flexible Resource Allocation: The platform efficiently utilizes CPU, GPU, and RAM resources for high-performance computing.

4. Traceability: The platform tracks all inputs, parameters, outputs, and code versions for full producibility.

5. Clear Process Management and Visualization: Effective management and visualization of complex data processing workflows.

6. Monitoring and User Experience: Users can monitor progress and analyze logs for troubleshooting their code on all inputs.

7. Security and Compliance: Data protection and regulatory compliance are ensured through role-based access and data processing within its origin region.

We hope you enjoyed reading about our journey with Airflow as the core of our solution and the unique approach we implemented.

We presented the platform at Airflow Summit 2023; you are welcome to hear more about our solution, including more functionality, private networking, problems we encountered with the ECS, and more!

Link to the code in GitHub.

Journey partners:

AWS Principal Solutions architect Dima Breydo, thank you for helping craft the solution.

--

--

Apache Airflow
Apache Airflow

Published in Apache Airflow

Apache Airflow (or simply Airflow) is a platform to programmatically author, schedule, and monitor workflows. If you are interested in adding your story to this publication please reach to us via #blogposts channel on Airflow slack.

Responses (2)