Handling Airflow logs with Kubernetes Executor

Zeev Shteiman
Localize.city
Published in
5 min readFeb 10, 2021

--

In this article we have outlined how you can set up remote S3 logging when using KubernetesExecutor, without creating complex infrastructure. This solution has already been implemented by several members of the Airflow community and the purpose of this post is to share it with anyone that is seeking a solution.

At Localize.city we have been using Airflow for almost 4 years now. Until recently, our data team has been using the Local Executor. Lately, our Airflow has become a data automation hub for our Data Science, R&D and Business Analysis teams and the Local Executor was not enough as we needed a way to scale dynamically and improve our deployment flow.

Airlflow has several ways to support scaling. One of the ways is using their Kubernetes Executor that was introduced in Apache Airflow 1.10.0. When using Kubernetes Executor, Airflow is running in a Kubernetes cluster and each Airflow task starts a new pod. This way all the processes can be completely independent (Scheduler/Webserver/Workers). You can read more about it here.

Where are the logs?

After we set everything up and migrated our Airflow to use Kubernetes, we noticed that logging does not work as one would expect. Because each task runs on a separate pod, the Webserver has no way to access the logs (not a built-in way anyway).

Airflow supports 2 modes of remote logging, but it is challenging to make them work without tweaking the code. The 2 modes are: Elastic Search, S3. Fortunately, Airflow is written in Python and you can pass airflow any standard logging config object. You can implement your own handlers, loggers, formatters, etc. We wanted a simple solution with minimum overhead. In this post I will describe how we implemented logging on k8s with S3 remote logging.

The solution includes 4 files and some config in airflow.cfg (each file will be explained later):

|-- airflow_common
| |-- custom_logging
| | |-- file_task_handler.py
| | |-- k8s_task_handler.py
| | |-- s3_task_handler.py
| | `-- log_config.py

We didn’t write it all from scratch, as a starting point, we used the default file_task_handler.py, s3_task_handler.py, log_config.py that comes with Airflow.

Part 1 — Reading logs while the task is running

As mentioned before, in order to read the logs while the task is running and to display them in the Webserver, the task needs to write the logs to a place where the Webserver can read them. We didn’t want to define shared volumes where logs will be written, so instead, we used the k8s API to access the logs.

Using the Kubernetes API we can read logs from the standard output of a pod, so we needed to make sure the logs are written to stdout. We created a new handler KubernetesTaskHandler to achieve this goal (find the code here).

Now, you need to tell Airflow when to use this handler. This is where the log_config.py is relevant. In the DEFAULT_LOGGING_CONFIG dictionary, we need to add a new handler:

'k8stask': {
'class': 'your_path.k8s_task_handler.KubernetesTaskHandler',
'formatter': 'airflow',
'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
'filename_template': FILENAME_TEMPLATE
}

After we have done that, in the same dictionary, we need to update the airflow.task logger:

'airflow.task': {
'handlers': ['task', 'k8stask'],
'level': LOG_LEVEL,
'propagate': False
}

You can see that we have added the new handler here as well. It means that the task will continue to write the log to a file as before, but it will also write the log to stdout.

Finally, we need to make sure that the Webserver can read the log from the pod’s stdout. How does the Airflow Webserver know what to use when it tries to read the log? In airflow.cfg there is this section to define a handler for task_log_reader:

# Name of handler to read task instance logs.
# Default to use task handler.
task_log_reader = task

We have implemented the functionality in the handler called task. This handler uses FileTaskHandler as defined in log_config.py. The relevant logic is implemented in the _read method in the FileTaskHandler (find the code here).

Now you should be able to read logs while the task is executing.

Part 2 — Reading logs after the task has finished

When the task is finished, the pod that was running the task is not available anymore. Also, we didn’t define any shared volume to save the logs to. So, what do we do? We use remote logging for S3.

Because Airflow supports S3 remote logging (we assume that the reader knows how to setup S3 remote logging for LocalExecutor), all we have to do is create a new s3_task_handler.py. The only reason to do this is because S3TaskHandler inherits the FileTaskHandler, and we have upgraded it before. So the new S3TaskHandler needs to inherit the new FileTaskHandler. Once we have done that, we need to update the task handler definition in the log_config.py in the section that handles remote logging for S3:

S3_REMOTE_HANDLERS = {
'task': {
'class': 'your_path.s3_task_handler.S3TaskHandler',
'formatter': 'airflow',
'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
's3_log_folder': REMOTE_BASE_LOG_FOLDER,
'filename_template': FILENAME_TEMPLATE
},
}

Now, make sure that your airflow.cfg has the logging_config_class setup correctly:

logging_config_class = your_path_to_custom_log_config.DEFAULT_LOGGING_CONFIG

Find the code for our log_config.py here.

Our code works with Airflow 1.10.14, but it can be adapted to other versions (as long as it is after 1.10.0). What is important to understand is how things are connected.

Drawbacks

Our solution works, but it is not without drawbacks. Here are a few that I can think of:

  1. If your pod is evicted without the ability to upload the log to s3, you will lose the logs and will not be able to understand what happened. The only way to handle this, as I see it, is to stream your logs to something like Elastic Search. I don’t believe this to be a major issue in most cases, as normal failure and crashes are logged and handled correctly.
  2. The current logic in the _read method that tries to find the correct pod to read the logs from is a bit complex and there may be a better solution if the pod name could be saved in the task context.

Summary

Now you know how to setup logs with Kubernetes Executor without creating complex infrastructure. Please submit your comments or share your thoughts. If you find it useful, share it with anyone that may need this guide.

--

--