How to Write an Airflow Operator

Mehmet Vergili
2 min readSep 5, 2018

An operator in airflow is a dedicated task. They generally implement a single assignment and do not need to share resources with any other operators. We have to call them in correct certain order on the DAG. They generally run independently.

In general if two operators need to share information we should combine them into a single operator. If there is no way to combine them we can use cross communication (XCom) to share information between them.

An Airflow DAG consists of operators to implement tasks. If there is no operator to implement a task we use PythonOperator to implement the task in a python function.

If we have a PythonOperator task that we use in more than one project or use more than once in the same project, we should write it in an operator.

In this article we will create an operator for the move_data_mysql task that we used on article “Building Data Pipeline with Airflow”

The move_data_mysql function has two parts, the first part converts the hive table into text format and the second part runs the apache sqoop to move data into a MsSQL table.
As you can see, it is a very common requirement for a pipeline and creating an operator is going to reduce developer development time.

move_data_mssql function

All operators derive from BaseOperator except sensor operators. Sensor operators derive from BaseSensorOperator that derive from BaseOperator. In an operator the most important part is execute function. All operators has an execute function and some helper functions that are related to its task. A general format of an operator can be found below.

class AnAirflowOperator(BaseOperator):

template_fields = ('param1', 'param2')
ui_color = '#A6E6A6'

@apply_defaults
def __init__(
self,
param1,
param2,
*args, **kwargs):

super(AirflowOperator, self).__init__(*args, **kwargs)
self.param1 = param1
self.param2 = param2

def execute(self, context):
# the task what we want to execute

template_fields: they are parameters that we use templates to define them when we call the operator.
ui_color: it is color of the operator on the DAG graph
execute function: task will be implemented under this function.

To create an operator with the move_data_mssql function, we must write the function under the execute function and set all the variables as operator parameters like below.

Since hive_table and mssql_table created from template we add them into template_fields.

We need to add *args, **kwargs into operator’s parameter list to get DAG arguments. This is mandatory since BaseOperator need information about the DAG.

Calling New Operator

Calling the operator you wrote is the same as calling other airflow operators.

move_hive_to_sql = MoveHiveToSQLOperator(
task_id='move_hive_to_sql',
hive_table='mydata_clean_{{ ds }}',
mssql_table='mydata_{{ ds }}',
column_list='*'
num_mappers='4',
dag=dag,
)

As you seen we use {{ ds }} template for hive_table and mssql_table. This is why we add these parameters into template_fileds.

References

[1] Airflow http://pythonhosted.org/airflow/
[2] https://github.com/apache/incubator-airflow

--

--