Dynamic log processing with Fluentd, Konfigurator and Slack

Logging is one of the pillars of observability. In the simplest sense logging is about recording discrete events. An monitoring these logs is important as they provide detailed information about events and errors that are useful to understand the flow of events that occurs in the application, and where and why the normal flow gets disrupted in case of an error. Application logs are usually generated with different levels of severity such as ERROR, WARN, INFO, etc. which are useful to indicate what the impact of the logged event is, and the level of attention that should be given to it. Apart from application logs, it is also prudent to monitor logs from the rest of the application environment such as middleware, application servers, and also system logs. This will help provide a complete picture not just confined to the application. And therefore a system level issue impacting the application may also be captured and observed.

At Stakater we use open source tools for internal use as well as our customer implementations, and for log monitoring our stack of choice is EFK (short for Elasticsearch, Fluentd and Kibana). EFK. While Elasticsearch and Kibana are used to index, search and visualize the logs, Fluentd is the log aggregator, essentially going door to door and collecting all the logs that are configured to be collected.

Fluentd

Fluentd is an open source data collector for unified logging. It has a flexible plugin architecture, allowing easy extension of its functionality. Each application or service will log events as they occur, be it to standard out, syslog or a file. Fluentd will then collect all these logs, filter and then forward to configured locations.

On Kubernetes, we deploy Fluentd as a DaemonSet to ensure that all Nodes run a copy of the fluentd Pod. And any new node getting added to the cluster will automatically get a fluentd pod. This enables Fluentd to easily collect the logs from the node it runs on. Let’s take a look at how the logging flow works on kubernetes with fluentd.

Logging flow

The containerized applications write to stdout and stderr, handled and redirected to a logging driver, which is configured in Kubernetes to write to a file in json format. Symlinks to these log files are created at /var/log/containers/*.log

The fluentd input plugin has responsibility for reading in data from these log sources, and generating a Fluentd event against it. We use the in_tail Input plugin which allows Fluentd to read events from the tail of text files. This position from where fluentd has read a particular log file is recorded in a position file. And next time fluentd will pick up reading from this position in the file. The input is parsed, based on the configuration provided to the input plugin.

The fluentd event contains information such as where an event comes from, the time of the event, and the actual log content. These events are matched to an Output plugin type in the Fluentd configuration. The out_elasticsearch Output plugin forwards these records to Elasticsearch.

Konfigurator

The parsing configuration for fluentd includes a regular expression that the input driver uses to parse the incoming text. One can understand that this regular expression will be specific to the incoming logs. Each log source may potentially need to have a unique regular expression configured, if their log formats are unique. This can quickly become unwieldy when deploying multiple applications, or especially when updating a log format in an existing deployment. The Stakater team has developed Konfigurator, an open source Kubernetes operator, for just such as use case. It lets you dynamically generate app configuration when kubernetes resources change. Konfigurator can be used for any kind of configuration file formats. It follows an architecture known as configuration as code which means you can write code for your configuration and konfigurator will render it on runtime when needed.

Konfigurator is very generic in a sense that it doesn’t have any specific data format that you have to use for your app configuration. Using it requires you to do 2 things:

  1. Create a config template for your infrastructure app. This is a CRD (Custom Resource Definition) called KonfiguratorTemplate.
  2. Attach app specific configuration to your app’s pods

Konfigurator will dynamically generate app configuration when kubernetes resources change. It will watch Pods, Services and Ingresses for changes to configuration, and then render configurations to ConfigMap and Secret.

Konfigurator and fluentd with slack

Monitoring logs and having them viewable on Kibana is great, but we don’t want to have to constantly have the Kibana UI open on our screens and keep peeking at it every so often to check on erroneous logs. We’d like to get notifications in case of errors. Let’s take a look at how we can achieve this. The following example shows how you can send slack notifications on different slack channels (based on the app) when an error log occurs.

Setting up fluentd

First of all if you don’t have the fluentd slack plugin installed, you can add it to your image by following the instructions here.

After that you have to remove the config from your ConfigMap by either editing your ConfigMap and running kubectl apply, or you can just remove the ConfigMap completely by running kubectl delete. After that, create a new KonfiguratorTemplate like so:

apiVersion: konfigurator.stakater.com/v1alpha1
kind: KonfiguratorTemplate
metadata:
labels:
app: fluentd
name: fluentd
spec:
renderTarget: ConfigMap
app:
name: fluentd
kind: DaemonSet
volumeMounts:
— mountPath: /fluentd/etc/appconf
container: fluentd
templates:
fluent.conf:
# Send parsed logs to both output and notification labels
<match **>
@type copy
deep_copy true
# If one store raises an error, it ignores other stores. So adding `ignore_error` ensures that the log will be sent to all stores regardless of the error
<store ignore_error>
@type relabel
@label @NOTIFICATION
</store>
<store ignore_error>
@type relabel
@label @OUTPUT
</store>
</match>
<label @OUTPUT>
# Send logs to stdout
<match **>
@type stdout
</match>
</label>
<label @NOTIFICATION>
# Filter ERROR level logs
<filter **>
@type grep
<regexp>
key level
pattern (ERROR|error|Error|^E[0–9]{4})
</regexp>
</filter>
# Get distinct pods per application
{{- $podsWithAnnotations := whereExist .Pods “ObjectMeta.Annotations.fluentdConfiguration” -}}
{{- $distinctPods := distinctPodsByOwner $podsWithAnnotations -}}
# Create slack notification matchers for sending error notifications per app
{{- range $pod := $distinctPods -}}
{{- $config := first (parseJson $pod.ObjectMeta.Annotations.fluentdConfiguration) }}
{{- if $config.notifications }}
<match kubernetes.var.log.containers.{{ (index $pod.ObjectMeta.OwnerReferences 0).Name }}**_{{ $pod.ObjectMeta.Namespace }}_**.log>
@type copy
{{- if $config.notifications.slack }}
<store ignore_error>
@type slack
webhook_url {{ $config.notifications.slack.webhookURL }}
channel {{ $config.notifications.slack.channelName }}
username fluentd
icon_url https://raw.githubusercontent.com/fluent/fluentd-docs/master/public/logo/Fluentd_square.png
flush_interval 15s
parse full
color danger
link_names false
title_keys level
title %s log
message_keys level,timestamp,kubernetes_pod_name,kubernetes_namespace_name,message
message *Level* %s *Time* %s *Pod* %s *Namespace* %s *Message* %s
time_key timestamp
</store>
{{- end }}
</match>
{{- end }}
{{- end }}
</label>

Make sure to change the app name and the mount target to match your fluentd’s config. What the whole config is doing is that it’s copying all the logs to 2 targets, output (stdout) and notifications (slack).

Setting up apps

Now that this config is added to fluentd, you can start setting up your apps for which you want to send notifications. In this example, i’ve used the following data structure that needs to be in the annotation mentioned below:

fluentdConfiguration:>
[
{
“notifications”: {
“slack”: {
“webhookURL”: “{{ .webhookURL }}”,
“channelName”: “{{ .channelName }}”
}
}
}
]

Add this annotation to the app with the webhookURL and channelName and deploy the app.

Once the app is deployed, Konfigurator will re-render the config with the new slack notifications block in fluentd configuration file and you will start getting notifications.