Airflow — Writing your own Operators and Hooks

Andres Tavio
B6 Engineering

--

At B6 we use Apache Airflow to manage the scheduled jobs that move data into and out of our various systems. We have built Airflow pipelines for jobs such as moving data out of our CRM (Salesforce) into our data warehouse and ingesting real estate sale and mortgage data from external sources.

One of the reasons we were attracted to Airflow were its concepts of Operators and Hooks. From their documentation, an Operator:

“… describes a single task in a workflow. Operators are usually (but not always) atomic, meaning they can stand on their own and don’t need to share resources with any other operators.”

and Hooks are:

“… interfaces to external platforms and databases like Hive, S3, MySQL, Postgres, HDFS, and Pig. Hooks implement a common interface when possible, and act as a building block for operators.”

So Operators are the workhorses that execute the tasks that define a DAG, and Operators make use of Hooks to communicate with external databases or systems. These are the basic tools in the Airflow toolkit.

When first working with Airflow, you might be impressed by the number of built in Operators and Hooks available to you, things such as RedshiftToS3Transfer, PostgresOperator, MySqlHook, and SlackHook. But like any good framework, the real power comes from the ability to customize and extend. Airflow allows this by giving developers the ability to create their own Operators and Hooks which they can build according to their specific needs/use cases.

Here’s an example. After writing a few DAGs we noticed we had a pattern of downloading a file from our data lake (S3), processing it, and then uploading it to Salesforce or back to S3. While it is certainly possible to use a simple PythonOperator and boto3 to interact with S3 and download a file, why not encapsulate this behavior in a new Operator that we could then use in other DAGs? We made use of the built-in S3 Hook to make things simpler.

class S3ToLocalOperator(BaseOperator):

def execute(self, context):
print('Downloading {}/{} to {} '.format(self.s3_bucket, self.s3_key, self.local_file_path))
hook = S3Hook(self.s3_conn_id) # Check if key exists, download if so
if hook.check_for_key(self.s3_key, self.s3_bucket):
s3_file_object = hook.get_key(self.s3_key, self.s3_bucket)
s3_file_object.download_file(self.local_file_path)
else:
raise ValueError('File not found in S3')
print('Download successful')

Speaking of Hooks, we also decided to create a Hook for Salesforce’s Bulk API. This allowed us to make use of Airflow’s Connections and keep our credentials out of code and config files.

class SalesforceBulkHook(BaseHook):
"""
Hook to manage connection to Salesforce API
"""
def __init__(self, conn_id, source=None):
super().__init__(source)
self.conn = self.get_connection(conn_id)
extras = self.conn.extra_dejson
self.username = extras['username']
self.password = extras['password']
self.security_token = extras['security_token']
self.domain = extras['domain']
self.api = SalesforceBulkApi(self.username, self.password, self.security_token, self.domain)

def insert_property(self, data, batch_size=MAX_BATCH_SIZE, use_serial=False):
return self.api.insert_property(data, batch_size, use_serial)
def insert_mortgage(self, data, batch_size=MAX_BATCH_SIZE, use_serial=False):
return self.api.insert_mortgage(data, batch_size, use_serial)

Here are some questions to ask yourself to find out if writing your own Operators and Hooks is a good idea.

  • Are there a bunch of PythonOperators all doing the same thing? Identify repeated patterns across your DAGs, build an Operator and keep your code DRY!
  • Are you extracting data from a lesser known data source using a custom API wrapper? Build a Hook and let Airflow handle the credentials!
  • Are there a series of Tasks in a DAG that could be combined into one? Do these Tasks appear in other DAGs? Make an Operator and increase the readability of your DAG definition files!

Writing custom Hooks and Operators is a great way to level up your organization’s Airflow skills while simultaneously making your pipelines easier to maintain, easier to debug, and easier to create.

--

--