How to Send Airflow Logs to Elasticsearch using Filebeat and Logstash

Semih Sezer
7 min readMay 18, 2020

--

A step-by-step guide for setting Elasticsearch as a remote logging destination for Airflow.

Overview

Airflow supports Elasticsearch as a remote logging destination but this feature is slightly different compared to other remote logging options such as S3 or GCS. Airflow expects you to have your own setup to send logs to Elasticsearch in a specific format. In this article, I will share my learnings and setup for sending Airflow logs to Elasticsearch.

Clarifications on Airflow’s Elasticsearch Remote Logging

As of the current Airflow version (v1.10.10, April 2020), this is what Airflow expects in Elasticsearch logs:

  • Airflow doesn’t send logs to Elasticsearch out of the box, so you need have your own setup to ship logs. I will share my own setup below.
  • You can configure Airflow to read logs from Elasticsearch. When you do this, Airflow expects that a task run’s logs will be identified by a log_id field. It will then search across all indices in Elasticsearch for that task run’s log_id. Currently there is no way to tell Airflow to look at a specific index for your logs.
  • By default, log_id is {{dag_id}}-{{task_id}}-{{execution_date}}-{{try_number}}. This can be customized by changing the log_id_template in Airflow config. Airflow doesn’t include log_id in its logs by default. This means that you need to construct log_id yourself in your logs.
  • Log message needs to be in the message field of the document.
  • There needs to be an offset field which tells Airflow how to sort the log documents. Such offsets are inserted automatically by log transfer agents.

With all this in mind, you need to have a document in Elasticsearch in this format:

{
"log_id": "{dag_id}-{task_id}-{execution_date}-{try_number}",
"message": "mymessage",
"offset": 5
}

Part 1: Reading Logs from Elasticsearch

First part is getting Airflow to read logs from Elasticsearch. Remote log configuration for Elasticsearch is slightly different than other remote log destinations such as S3 or GCS.

  1. Install Airflow’s elasticsearch module.

pip install apache-airflow[elasticsearch]

2. Enable remote logging in airflow config file. Also make sure that remote_base_log_folder is set to an empty string.

[logging]
remote_logging = True

3. Next, you need to set the host config in Elasticsearch section. If authentication is required, you need to provide user name and password in the host field. This was unintuitive for me, but unlike other remote logging scenarios, remote_log_conn_id config is not used for the Elasticsearch case.

[elasticsearch]
host = <user>:<password>@yourelastic.com:9200

4. (optional) If you need to use ssl/https to access Elasticsearch, you can use these settings for a start. Settings in this section are converted to a dictionary and passed to elasticsearch-py python client as extra arguments. So you can add any extra fields that are supported by the python Elasticsearch client such as ca_certs, etc. You can read Airflow’s es_task_handler.py module to understand the process better (which is what I did to figure all of this out).

[elasticsearch_configs]
use_ssl = "True"
verify_certs = "False"

5. With all this in mind, restart your Airflow cluster with the new configs and go to your Elasticsearch dev console to insert a sample log file. Fill in the parameters in log_id for a valid task run’s values. You can find these on the Airflow logs UI for a previously run task. Like I mentioned above, what index you put this document in doesn’t matter because Airflow performs a search across all indices.

POST /some_index/_doc {
"log_id": "{dag_id}-{task_id}-{execution_date}-{try_number}",
"message": "mymessage",
"offset": 1
}

Now, hopefully you can view the message above in the Airflow logs UI for that particular task run. If this is not working as expected, you may need to look at Airflow webserver logs to understand what is going on.

Part 2: Writing Logs to Elasticsearch

Like I mentioned above, Airflow doesn’t ship logs to Elasticsearch out of the box. We need to have our own setup to forward the logs. What we need to keep in mind is that Airflow doesn’t include log_id in its logs by default, which was required for reading the logs. This means that we need to construct log_id separately in our logs.

There are different ways we can go about this.

Option 1: Forwarding From Logs Folder with Logstash

If you are able to access Airflow’s local logs folder, you can forward the logs directly from there. We can use Logstash for this purpose and configure it to construct log_id from the directory structure {dag_id}/{task_id}/{execution_date}/{try_number}.log. We can then directly forward these logs to Elasticsearch.

# logstash.conf
input {
file {
# {dag_id}/{task_id}/{execution_date}/{try_number}.log
path => ["/usr/local/airflow/logs/*/*/*/*.log"]
codec -> "line"
#codec => "json" #if log format is json
}
}
filter {
grok {
match => { "path" => "/usr/local/airflow/logs/(?<dag_id>.*?)/(?<task_id>.*?)/(?<execution_date>.*?)/(?<try_number>.*?).log$" }
}
mutate {
add_field => {
"log_id" => "%{[dag_id]}-%{[task_id]}-%{[execution_date]}-%{[try_number]}"
}
}
}
output {
elasticsearch {
hosts => ["myelastic.com:9200"]
index => "filebeat-%{+YYYY.MM.dd}"
user => "<myuser>"
password => "<mypassword"
ssl => true
ssl_certificate_verification => false
}
}

Here, we are extracting dag_id, task_id, execution_date and try_number from the log file path and adding the new log_id field based on these values. If you change Airflow log format to json as mentioned below, you should change codec to json as well.

Disclaimer: I didn’t really test this config.

Option 2: Forwarding Logs From Stdout with Filebeat + Logstash

In my case, I am running Airflow on Kubernetes and I don’t want to create shared volumes across all pods. Kubernetes setup makes it desirable to ship container logs from stdout. For this purpose, I use Filebeat to get container logs and use Logstash to dynamically add the log_id field. This setup would apply the same to all Docker scenarios.

  1. Configure Airflow to write logs to stdout

First, configure Airflow to write logs to stdout. This will make Airflow worker processes write their logs to the stdout of the container/parent process. For KubernetesExecutor, this may already be the case, but for most other scenarios the logs would end up in the logs folder. The logs will have dag_id, task_id, execution_date and try_number fields.

[elasticsearch]
write_stdout = "True"
write_json = "True"

Once this is set, Airflow worker process logs will be written to stdout and you can view them in container logs. We also change the format to JSON so that we can parse them more easily. This is what a sample log looks like:

{
...
"message": "actual log message",
"dag_id": "mydag",
"task_id": "mytask",
"execution_date": "2020_05_15T06_00_00_000000",
"try_number": "1"
}

2. Deploy Filebeat to forward Airflow container logs to Logstash

Filebeat is a lightweight log shipper that moves logs from one place to another. For deploying Filebeat, you can follow the official docs or use one of the Filebeat helm charts. For Kubernetes context, the recommendation is deploying Filebeat as a Daemonset. This means that Filebeat will be present on every node and be able to access logs of all containers.

Here is the configuration I used for elastic/filebeat:7.6.1 docker image, following the official Elastic docs for Kubernetes deployment.

# filebeat.yml
filebeat.inputs:
- type: container
paths:
- /var/log/containers/*.log
processors:
- decode_json_fields:
fields: ["message"]
target: ""
overwrite_keys: true
- add_kubernetes_metadata:
host: ${NODE_NAME}
matchers:
- logs_path:
logs_path: "/var/log/containers/"
- rename:
fields:
- from: "log.offset"
to: "offset"
processors:
- add_cloud_metadata:
output.logstash:
hosts: ["logstash:5044"]
# if logstash is in a different k8s namespace
#hosts: ["logstash.<namespace>.svc.cluster.local:5044"]

A few things to note here:

  • The input path tells Filebeat where to find container logs on the node. In this case, I am actually sending all container logs to Logstash. You can add filtering to this or use Filebeat Autodiscover features.
  • When Filebeat parses the log message above, it will put the entire log into the message field by default. We use decode_json_fields to make Filebeat parse and expand the message field as JSON. These fields are added to the root of the log document.
  • overwrite_keys field tells Filebeat that it is OK to overwrite the message field which originally contained the entire JSON log. We want the actual log message (message.message) to end up in the message field so that Airflow can find it. Without this setting, Filebeat will leave the message field untouched and it will still contain the entire JSON log.
  • In the rename section, we rename Filebeat’s default log.offset field to offset, which is what Airflow expects.
  • Processors part adds Kubernetes pod metadata to the log and we send the log to Logstash, which we will deploy in the next step.

3. Adding log_id field with Logstash

Logstash is used to manage data pipelines and transformations.

For our use case, we use Logstash to dynamically add the log_id field to our logs. It would be nice if Filebeat could dynamically add log_id for us but unfortunately Filebeat is meant to be a lightweight shipper and doesn’t have this functionality right now. It would also be nice if Airflow logs included log_id by default. In both of these cases, we wouldn’t really need Logstash.

To deploy Logstash on Kubernetes, you can use Elastic’s Logstash Helm Chart or the community maintained Logstash helm chart. I used the latter with the open source Logstash image docker.elastic.co/logstash/logstash-oss:7.1.1.

# .../pipeline/logstash.conf
input {
beats {
port => 5044
}
}
filter {
if [dag_id] and [task_id] and [execution_date] and [try_number] {
mutate {
add_field => {
"log_id" => "%{[dag_id]}-%{[task_id]}-%{[execution_date]}-%{[try_number]}"
}
}
}
}
output {
elasticsearch {
hosts => ["myelastic.com:9200"]
index => "filebeat-%{+YYYY.MM.dd}"
user => "<myuser>"
password => "<mypassword>"
ssl => true
ssl_certificate_verification => false
}
}

The main thing to note here is the mutate section where we dynamically add the log_id field based on the values of other dag run fields. Once this pipeline is running, hopefully your Airflow logs will show up in Elasticsearch and you can view them on Airflow UI.

Conclusion

I hope you found this article helpful and it clarified some confusion around Airflow’s Elasticsearch remote logging feature. Please feel free to reach out if you have suggestions or comments.

Note: There is a bug where the spinner icon on Airflow logs UI keeps spinning even after finding the logs in Elasticsearch, that’s expected and there is a pending fix for it.

Resources:

--

--

Semih Sezer

Full-Stack | DevOps | Data Engineer | Software Developer @SAP