Censius
Published in

Censius

101 Guide on Apache Airflow Operators

Apache Airflow is a tool for automating workflows, tasks, and orchestration of other programs on clusters of computers. Airflow empowers organizations with its simple rules-based language that allows for complex data processing to be coded in minutes. We’ll learn about airflow operators in this post, which you can use to create your own pipelines.

Operators carry out the instructions contained in your script or workflow description file (e.g., .py, .json). There are several Airflow operators that can help you achieve your goals. However, it can be challenging to understand the behavior of these operators without having a good conceptual understanding of Airflow itself.

What are Apache Airflow Operators?

Apache Airflow is an open-source MLOps and Data tool for modeling and running data pipelines. Airflow Operators are commands executed by your DAG each time an operator task is triggered during a DAG run. In general, anytime an operator task has been completed without generating any results, you should employ tasks sparingly since they eat up CPU time and increase the delay.

Recommended Reading: How to Automate Data Pipelines with Airflow?

If your DAG is executing steadily, tasks can be an easy way to solve a problem. However, you need to know how operators interact and where to use them for best results. In simple terms, when you create operator objects, you’ll generate tasks.

Recommended Reading: What are DAGs?

If you want some data processed as quickly as possible and don’t need the results right away but instead need the output of that data as part of analysis or workflow, then you’ll want to use tasks.

The general architecture of apache airflow is seen in the above image.

Properties Of Airflow Operators :

  • It defines the nature of the task and how it should be executed.
  • When an operator is instantiated, the task becomes a node in DAG.
  • It automatically retries in case of failures.

Types Of Airflow Operators :

Action Operator

  • It is a program that performs a certain action.
  • For Example, EmailOperator, and BashOperator.

Transfer Operator

  • It is responsible for moving data from one system to another.
  • If you’re working with a large dataset, avoid using this Operator.

Sensor Operator

  • Sensor Operator waits for data to arrive at a defined location.
  • They are long-running tasks.
  • They are useful for keeping track of external processes like file uploading.

Operators play a crucial role in the airflow process. We’ll go through a few of the most popular operators later, but first, let’s look at the relationship between a task and an operator.

The differences between a task and an operator might be confusing at first. The figure below might help you understand the relation between DAG, Task, and Operator.

The images depict the relationships between the DAG, Tasks, and Operators.

Tasks are ideally self-contained and do not rely on information from other tasks. When you run an operator class object, it becomes a task. Generally, operators() produce <operator.objects> that are transformed into tasks.

Defining The Dag

dag= DAG(
dag_id='t0',
schedule='@time',
...
)

Defining Tasks

t01= op01(
task_id='name_task_1',
operator_params=...,
dag=dag,
...
)
t02= op02(
task_id='name_task_2',
operator_params=...,
dag=dag,
...
)

Defining Relations between Tasks

t01 >> t02
t02 >> t03
...
#Task 1 -> Task 2 -> Task 3

Let’s have a look at some of the most popular operators:

Apache Airflow Bash Operator — Executes a bash command

BashOperator in Apache Airflow provides a simple method to run bash commands in your workflow. This is the operator you’ll want to use to specify the job if your DAG performs a bash command or script.

t1 = BashOperator(
task_id=t1,
dag=dag,
bash_command='echo "Text"'
)

BashOperator Code — Github

Apache Airflow Python Operator — Calls an arbitrary python function

The Airflow PythonOperator provides a basic yet effective operator that lets you run a Python callable function from your DAG.

def print_string():
print("Test String")
t2 = PythonOperator(
task_id="t3",
dag=dag,
python_callable=print_string,
)

Python Operator Code — Github

Apache Airflow Email Operator — Sends an email

EmailOperator is the most straightforward method for sending emails from airflow. With Email Operator, you can send task-related emails or build up an alerting system. The biggest drawback is that this operator isn’t very customizable.

t4= EmailOperator(
task_id=t4,
to='test@mail.com',
subject='Alert Mail',
html_content=""" Mail Test """,
dag=dag
)

EmailOperator Code — Github

Apache Airflow PostgresOperator

The Postgres Operator interface defines tasks that interact with the PostgreSQL database. It will be used to create tables, remove records, insert records, and more.

with DAG(
dag_id="postgres_operator_dag",
start_date=datetime.datetime(2021, 10, 11),
schedule_interval="@once",
catchup=False,
) as dag:
t4= PostgresOperator(
task_id="t4",
sql="""
CREATE TABLE IF NOT EXISTS pet (
table_id SERIAL PRIMARY KEY,
name VARCHAR NOT NULL,
table_type VARCHAR NOT NULL,
birth_date DATE NOT NULL,
OWNER VARCHAR NOT NULL);
""",
)

PostgreOperator Code

Apache Airflow SSH Operator

t5 = SSHOperator(
task_id='SSHOperator',
ssh_conn_id='ssh_connectionid',
command='echo "Text from SSH Operator"'
)

SSH Operator Code

Apache Airflow Docker Operator

Docker Operator helps to execute commands inside a docker container. Docker is a tool for creating and managing “containers,” which are tiny virtual systems where you may run your code. With the help of the airflow docker operator, you can store files in a temporary directory created on the host and mounted into the container.

t6 = DockerOperator(
task_id='docker_command',
image='centos:latest',
api_version='auto',
auto_remove=True,
command="/bin/sleep 30",
docker_url="unix://var/run/docker.sock",
network_mode="bridge"
)

Docker Operator Code

Apache Airflow HTTP Operator

To perform an activity, a call is made to an endpoint on an HTTP system. This is beneficial if you’re using an API that returns a big JSON payload and you’re only interested in a part of it.

t7 = HttpSensor(
task_id='t7',
http_conn_id='http_default',
endpoint='',
request_params={},
response_check=lambda response: "httpbin" in response.text,
poke_interval=4,
dag=dag,
)

HTTP Operator Code

Apache Airflow Snowflake Operator

SnowflakeOperator performs SQL commands on a Snowflake database. These operators can create, insert, merge, update, delete, copy into, and terminate tasks if needed.

dag = DAG(
'example_snowflake',
start_date=datetime(2021, 11, 11),
default_args={'snowflake_id': SNOWFLAKE_ID},
tags=['example'],
catchup=False,
)
t8 = SnowflakeOperator(
task_id='t8',
dag=dag,
sql=CREATE_TABLE_SQL_STRING,
warehouse=SNOWFLAKE_WAREHOUSE,
database=SNOWFLAKE_DATABASE,
schema=SNOWFLAKE_SCHEMA,
role=SNOWFLAKE_ROLE,
)

SnowflakeOperator Code

Apache Airflow Spark Operators

Apache Spark is a general-purpose cluster computing solution that is quick and scalable. It provides Spark SQL for SQL and structured data processing, MLlib for machine learning, and a lot more. All of the configurations for SparkSqlOperator come from the operator parameters.

t9= SparkJDBCOperator(
cmd_type='spark_to_jdbc',
jdbc_table="foo",
spark_jars="${SPARK_HOME}/jars/postgresql-42.2.12.jar",
jdbc_driver="org.postgresql.Driver",
metastore_table="bar",
save_mode="append",
task_id="t9",
)

Spark Operator Code

Apache Airflow SensorBase Operators

Sensor operators continue to run at a set interval, succeeding when a set of criteria is satisfied and failing if they time out. Is it necessary for you to wait for a file? Is it possible to see if a SQL item exists? Is it possible to postpone the execution of your DAG? That is the extent of the Airflow Sensors’ capabilities.

def _failure_callback():
if isinstance(context['exception'], AirflowSensorTimeout):
print("timed out message")
with DAG() as dag:
t10= FileSensor(
task_id='t10',
poke_interval=100,
timeout=20,
mode="reschedule",
fail_callback=fail_callback
)

Sensor Operator Code

Apache Airflow Bigquery Operators

BigQueryCheckOperator can be used to execute checks against BigQuery.

create_table = BigQueryCreateEmptyTableOperator(
task_id="t11",
dataset_id=DATASET_NAME,
table_id="test_table11",
schema_fields=[
{"name": "emp_name", "type": "STRING", "mode": "REQUIRED"},
{"name": "salary", "type": "INTEGER", "mode": "NULLABLE"},
],
)

BigqueryOperator Code

There are many more operators; you can view the complete list of airflow operators.

Check Apache Documentation to learn more.

Apache Airflow Operators Best Practices

To get the most out of these operators, you must know what they do and when it’s appropriate to apply them in your particular use case. In general, airflow operators fall into two categories: scheduling tasks or data manipulation tasks. A scheduling operator will schedule events based on some time pattern, such as expiring over a given amount of time. A data manipulation operator will perform a specific processing task on incoming data sets, such as breaking out tables for better query-ability.

  • Do not use Airflow in the same codebase with databases. If someone else is modifying your models, Airflow will throw errors when running it in the database. This may confuse you and make operation time longer. Also, be sure to leave comments in all of your code to explain what each line does. This makes it easier for people unfamiliar with Airflow to figure out how things work.
  • Use operators sparingly. Operators can be great and save time, but they can also be very time-consuming and bloated.
  • The Airflow scheduler regularly triggers a DAG depending on the start date and schedule interval parameters supplied in the DAG file. Instead of starting a DAG run at the beginning of its schedule period, the Airflow scheduler starts it towards the conclusion. Consider the following DAG, which runs every day at 9 a.m:

dag = DAG(‘dagname’, default_args=default_args, schedule_interval=’0 9 * * *’)

  • Use operators that do not change existing data like: stream_tasks_by_updated_time and update_datasets_by_.
  • A DAG ID must be specified when creating a DAG object. Across all of your DAGs, the DAG ID must be unique. If you have two DAGs with the same DAG ID, only one of them will appear, and you may see unexpected behavior. It is best to define a suitable DAG ID and a DAG description. If you use an alias to reference the id, you will get an error saying it does not match data when you run that model.
  • Give your sensor a timeout parameter at all times. Consider your application and how long you expect the sensor to be waiting before adjusting the sensor’s timeout.
  • Use the poke mode if your poke interval is relatively short. For instance, using reschedule mode may cause your scheduler to become overloaded. Use the reschedule mode whenever possible, especially for long-running sensors, to avoid your sensor consuming a worker slot all of the time. This avoids deadlocks in Airflow, where sensors use all available worker slots.

Learn how to Improve model health with Censius AI

Conclusion

Several MLOps tools are available, but Apache Airflow offers unique advantages, and more businesses are using it to manage their data pipelines. An AirFlow Operator is an orchestrator for data delivered by an Airflow pipeline. The operator tells the pipeline where to send data, how often to send it, and what actions to take when new data arrives. We looked at what operators are and discussed several types of operators in this article. I hope you enjoyed the article and stay safe!

Originally Published on: https://censius.ai/blogs/apache-airflow-operators-guide

--

--

--

Censius is an AI observability platform that continuously monitors models, analyzes their performance, and provides explainability so that businesses derive better AI outcomes.

Recommended from Medium

Contracting software development company pitfalls

Python Dictionaries

Inside PixiJS: The Geometry, Shader, and State Systems

Collection in Dart

Wordpress permalinks & Nginx

GSoC 2021 at OpenMRS | Community Bonding Period

An in-depth look at 100% Zero Downtime deployments with Terraform | Checkly

Does Grammarly support Firefox Browser?

Does Grammarly support Firefox Browser by Rajdeep Singh

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Harshil Patel

Harshil Patel

Software Developer and Technical Writer.

More from Medium

Building an Apache Airflow configured with Local Executor and Spark Standalone Cluster with Docker

Airflow Operators Worth Knowing

Apache Airflow: Scaling Using Celery Executor

Build a web interface to your PySpark big data project