Amazon DynamoDB Value Sensor for Apache Airflow
In Apache Airflow, a sensor is a type of operator that is used to wait for a certain condition to be true before continuing with the execution of a DAG (Directed Acyclic Graph) task. The purpose of sensors is to enable the DAG to be more flexible and reactive to changes in the environment it operates in.
Many customers integrate external systems with Airflow that have no native sensor support. By leveraging this sensor to monitor an Amazon DynamoDB table for status updates from a third-party system, you can synchronize and control the execution of your workflow based on the availability, readiness, or completion of external processes or data sources.
This technique can be helpful in various real-world use cases. Here are a couple of examples:
- Equipment or Machinery Monitoring: Suppose you have a workflow that depends on the operational status of equipment or machinery. The equipment may write its status to a DynamoDB table, indicating whether it is online, offline, in use, or idle. By using a this sensor to monitor a specific table item attribute value, you can ensure that the workflow only proceeds when the equipment is in the desired state. This can be useful in manufacturing, industrial automation, or facility management scenarios.
- Job or Task Completion Tracking: In scenarios where long-running tasks or jobs are executed, you can record their status in DynamoDB. Use this sensor to monitor the completion of these tasks. For instance, if you have a workflow that relies on the successful execution of a batch job or data processing task that is not native to AWS, the custom sensor can check the job’s status in the DynamoDB table and proceed accordingly, either triggering downstream tasks or handling potential errors.
- Inventory Level Monitoring: Consider a company that uses an external inventory management system to keep track of their products. The inventory management system does not have any built-in Airflow sensor support, and the company needs to monitor the inventory levels to trigger certain workflows. In this case, the DynamoDB value sensor can be used to track the inventory levels in the external system. The sensor can be programmed to check the inventory levels at regular intervals and trigger the workflow if the inventory hits zero. By using the DynamoDB value sensor, the company can integrate their external system with Airflow and leverage the platform’s benefits, such as automated workflows, scheduling, and monitoring. This can lead to increased efficiency, reduced errors, and better visibility into the inventory levels of the company’s products.
How it works
This sensor will allow you to wait for an attribute value to be present for an item in a DynamoDB table. To use it, have your external system write a value to a DynamoDB item, (e.g. done
) The next time the sensor polls the item and finds the expected value, the sensor will return true
. This will allow your DAG to continue with its execution.
This sensor is part of the Amazon provider for Airflow. Instructions to install it can be found here. You must useapache-airflow-providers-amazon
version 8.0.0 or higher.
Example DAG
In this example, we’ll simulate some external system that runs jobs, upon whose completion our DAG needs to wait.
To use this example, you’ll need to create a DynamoDB table for the sensor to monitor:
aws dynamodb create-table \
--table-name sensor-table \
--attribute-definitions AttributeName=pk,AttributeType=S \
--key-schema AttributeName=pk,KeyType=HASH \
--billing-mode PAY_PER_REQUEST
Once the table is done provisioning, set the status of a job to started
:
aws dynamodb put-item --table-name sensor-table \
--item '{"pk": {"S": "job1234"}, "status": {"S": "started"}}'
Here is the complete example DAG demonstrating the usage of DynamoDBValueSensor:
from datetime import datetime
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.providers.amazon.aws.sensors.dynamodb import DynamoDBValueSensor
with DAG(
dag_id="dynamodb_value_sensor_blog",
schedule="@once",
start_date=datetime(2023, 4, 1),
catchup=False,
tags=["example"],
) as dag:
t1 = BashOperator(task_id="task1", bash_command="echo task 1")
dynamodb_sensor = DynamoDBValueSensor(
task_id="waiting_for_dynamodb_item_value",
poke_interval=30,
timeout=120,
retries=10,
table_name="sensor-table",
partition_key_name="pk",
partition_key_value="job1234",
attribute_name="status",
attribute_value="done",
)
t2 = BashOperator(task_id="task2", bash_command="echo task 2")
t1 >> dynamodb_sensor >> t2
Start the DAG, and observe that the processing stops after task1
. The DAG will remain in this state, pending the update of the DynamoDB item’s value to done.
Set the status of the job to done
:
aws dynamodb put-item --table-name sensor-table \
--item '{"pk": {"S": "job1234"}, "status": {"S": "done"}}'
The DAG will then run to completion after the sensor polls DynamoDB again.