Get more value out of your application logs in Stackdriver

Log aggregation and management is a fundamental part of complex software platforms. It allows you to identify potential security flaws, debug issues and produce useful visualisations.

At dunnhumby we maintain a number of Google Cloud Platform (GCP) projects, each running a large number of applications including Spark jobs running on Google Dataproc clusters. We use log aggregation to monitor the health of all of these systems and to monitor activity across all of our clients.

We have extended the out-of-the-box system-level monitoring provided by Stackdriver to include custom logging and metrics from our Spark jobs. This has allowed us to include more business-focused information, including:

  • Data metrics
  • Error rates
  • Application usage
  • Frequency metrics

This post will take a in depth look at log capture and management in GCP. It will show you how to get your individual application logs into Stackdriver Logging and tagged separately into their own logger using Google Dataproc 1.2 and google-fluentd.

After reading this post you should understand:

  • How to enable logging capture on Dataproc clusters
  • How capture your application logs from Spark
  • How to organise them through the use of fluentd and logging formats
  • How to access your custom logs through Stackdriver

This post assumes you have some background knowledge of GCP, a basic understanding of what fluentd can do, and a little bit of experience navigating Stackdriver.


Enabling log aggregation on Dataproc

By default Dataproc clusters do not log to Stackdriver. To enable the fluentd logging agent you will have to set the property dataproc:dataproc.monitoring.stackdriver.enable=true at cluster creation. A simple cluster creation command will look like the following;

gcloud dataproc clusters create steven_test_cluster \
--properties dataproc:dataproc.monitoring.stackdriver.enable=true

Once this has been done your Dataproc clusters will start pushing logs to Stackdriver.

Fluentd configuration for Spark job logs

Dataproc has default fluentd configurations for aggregating logs from the entire cluster, this includes the dataproc agent, hdfs nodes, hive-metastore, spark, YARN resource manager and YARN user logs. This setup is great for aggregating system logs but we need to make some changes in order to capture Spark job logs.

The default Dataproc fluentd configuration for spark aggregates logs from /var/log/spark/.log however the default log4j properties on the Dataproc cluster do not have a FileAppender to that location, only a ConsoleAppender, which means that none of the spark output gets picked up by fluentd. There is then the matter of the spark application logs from the job you are running. By default the application logs will be stored and aggregated in the Dataproc config Google Cloud Storage (GCS) bucket, again meaning that fluentd will not pick them up and send them to Stackdriver Logging.

Getting Spark logs into Stackdriver

In order to make sure that all spark application output is picked up from one central source, you can aggregate your logs through YARN user logs. YARN user logs are stored locally on the application master and will pick up anything generated from spark or the application you are running. The logs can be aggregated by turning on cluster mode on the Dataproc cluster using the property spark:spark.submit.deployMode=cluster.

Note: Turning on cluster mode will mean that you will no longer see your job output in the Dataproc config GCS bucket or the Dataproc Jobs Console.

After making this change, the default fluentd config for YARN user logs will pick up logs and you will be able to query them through Stackdriver.

Adding custom metadata and tags

Fluentd uses regular expressions to figure out which actions to perform on a log message. Actions include:

  • Extracting information from the message as metadata
  • Modifying / rewriting the message
  • Tagging the message

The default fluentd configuration for YARN user logs contains regular expressions to extract metadata from systems across the Dataproc cluster such as Hive, HDFS and YARN itself.

By default all the logs will be tagged under ayarn-userlogs logger in Stackdriver. This will mean that all the logs will be grouped together and it will be hard to search or create metrics for individual applications.

Log messages from your applications might also not conform to any of the default fluentd regular expressions and all Stackdriver receives is the log message as is without any useful metadata extracted from it.

Fig 1. default google-fluentd config flow for Dataproc yarn-userlogs

Choosing your logging formats

To identify and categorise log messages from your applications you need to be able to uniquely identify them. To do that it helps to create a unique log format, ideally including the application name.

For example, if you have any python package called test_app, then you might create a log format that looks like this:

import logging
logging.basicConfig(format="%(asctime)s [dunnhumby.test_app] %(levelname)s: %(message)s", level=logging.INFO)
logger = logging.getLogger(__name__)

This is a simple example but you can make it as complex as you like, for example you might want to include:

  • Version numbers
  • Package / module name
  • Line numbers

Your log formats should be unique enough so that they can be easily matched. You want the fluentd regular expressions to be able to match your messages, and no more. If you inadvertently create an expression that matches more than it is supposed to then messages can get tagged incorrectly. In the example above the dunnhumby.test_app string helps uniquely identify these messages.

Applying your changes

When you have your log formats sorted, you will have to add them to the fluentd config by overwriting the default configurations. If you spin up a Dataproc cluster with Stackdriver logging enabled and ssh onto one of the worker nodes you will be able to retrieve a copy of the default fluentd config from /etc/google-fluentd/conf.d/dataproc-yarn-userlogs.conf.

We will be adding a rewrite step to the existing config so that the fluentd tags can be rewritten to include the source of the application. This will allow you to handle your custom application logs separately from everything else.

Updating existing fluentd steps

Once you have the default fluentd config you have to overwrite the <source> tag to include a rewrite phase where you are going to rewrite the fluentd tag for your applications and packages.

In the <source> section you will want to add rewrite after the concat part of the tag. This is because the rewrite needs to occur after any exception logs have been concatenated (more on this further down).

<source>
@type tail
format none
path /var/log/hadoop-yarn/userlogs/application_*/container_*/*
pos_file /var/tmp/fluentd.dataproc.yarn-userlogs.pos
refresh_interval 2s
read_from_head true
tag concat.rewrite.container.tail.*
</source>

You will then have to update the exception handling section in the <match> after this to account for the rewrite tag part. This section of the fluentd is pretty cool. It will identify any log messages that look like exceptions and concatenate all of the trackback messages into one single log message. If you don’t do this then each line of the trackback will appear as a series of separate log messages within Stackdriver. This fluentd plugin can be found here.

<match concat.rewrite.container.tail.**>
@type detect_exceptions
remove_tag_prefix concat
languages python,java
multiline_flush_interval 0.1
</match>

Adding in fluentd rewrite step

The new rewrite section comes after the concatenation of traceback messages. The configuration below will rewrite the fluentd log tags giving you the ability to handle them individually.

<match rewrite.container.tail.**>
@type rewrite_tag_filter
remove_tag_prefix rewrite
# Match dunnhumby applications
rewriterule1 message ^(?<time>[^ ]* [^ ]*) \[dunnhumby\.(?<application_type>[^ ]*)\].*$ dunnhumby.$2.${tag_parts[6]}.${tag_parts[7]}.${tag_parts[8]}
# Handle things that don't match, output them with the same incoming tag
rewriterule2 message ^.*$ ${tag}
</match>

Let’s go through the above. The <match> section will match any log that is tagged with anything that begins with rewrite.container.tail. All records will begin with a tag that looks something like:

rewrite.container.tail.var.log.hadoop-yarn.userlogs.application_xxxxxxxxxxxxx_xxxx.container_xxxxxxxxxxxxx_xxxx_xx_xxxxxx.stdout

Using the fluentd rewrite tag filter plugin the message part of each log is matched against the regular expression in rewriterule1 and application_type is extracted and used in the new name of the tag. Using the previous example of a python package called test_app, the new tag will look like:

dunnhumby.test_app.application_xxxxxxxxxxxxx_xxxx.container_xxxxxxxxxxxxx_xxxx_xx_xxxxxx.stdout

Prefixing the new tag with something unique will mean that we can process all the matching logs together. If the log message did not match rewriterule1 then it will go to rewriterule2 where it will continue with the tag container.tail.*.

You can make this as complex as you need to but it’s good to cover all bases. If you create a complex regular expression or multiple rewrite rules that cover all bases then it will drastically reduce the number of times you have to come back and edit this configuration.

Note: Dataproc 1.2 is running version 1.5.5 of the rewrite tag filter plugin. In later versions of the plugin the interface changes from the one above.

Extracting metadata from messages

Your application logs will now be separated from the yarn-userlogs and you will be able to use filters to extract any metadata from the log message to send through to Stackdriver.

<filter dunnhumby.**>
@type parser
key_name message
<parse>
@type multi_format
<pattern>
format /^(?<time>[^ ]* [^ ]*) \[dunnhumby\.(?<application_type>[^ ]*)\] *(?<severity>[^ ]*): (?<message>.*)/
time_format %Y-%m-%d %H:%M:%S,%L
</pattern>
</parse>
</filter>

This <filter> matches against the logging format described above and will split your matched log messages into their relevant pieces of information. In Stackdriver Logging, time, severity and message are all reserved keys that it will process and store separately from any other metadata retrieved from the regular expression groupings.

Sending your logs to Stackdriver

The final part of the additional configuration is the <match> on the logs processed in the <filter>. This is similar to the default configuration in the dataproc-yarn-userlogs.conf file, however the final name of the log tag is the application_type and all othertag_parts array indexs have been set to their new correct values.

<match dunnhumby.**>
  # Incoming tag is of the form dunnhumby.<application_type>.application_1486766745509_0003.container_1486766745509_0003_01_000078.stdout
  @type record_reformer
renew_record false
enable_ruby false
auto_typecast true
# Set the tag as the name of the application
tag ${tag_parts[1]}
# extract the application id and container id from the tag
application ${tag_parts[2]}
container ${tag_parts[3]}
# stdout
container_logname ${tag_suffix[4]}
# application_1486766745509_0003.container_1486766745509_0003_01_000078.stdout
filename ${tag_suffix[2]}
</match>

By going through yarn-userlogs you can extract the application_id and the container_id for your spark job because they are part of the log tag. This means that if you wanted to look at all of the logs from your spark job across all your applications you can search by it’s identifier. The final fluentd pipeline will look something like Fig 2.

Fig 2. new google-fluentd config flow for Dataproc yarn-userlogs

You will now have you individual applications stored in separate loggers within Stackdriver logging, making the search and metric creation for them easier.

You will be able to use this configuration with all your applications as long as it conforms to the fluentd regular expressions. You can even split out the spark logs from yarn-userlogs by altering the log format in the log4j properties on the Dataproc Cluster.


Summary

We have just walked through enabling Stackdriver logging in Dataproc, adding custom logging to a Spark application and configuring fluentd to recognise the new log format.

Implementing this into our solutions at dunnhumby has given us the ability to identify api usages, performance metrics and error rates for our applications across all of our clients, establishing which version and component of the application it came from.

Hopefully this provides some helpful insights as to what can be achieved using fluentd and Stackdriver, and how to organise your logging messages to maximise their usefulness.

What next?

The fluentd official documentation is a really good reference and you should consider reading that to gain more in-depth information.