Improving your observability: Creating Metrics from your Logs

Carlos Gimeno
adidoescode
Published in
8 min readJul 8, 2021
Photo by Austin Distel on Unsplash

First of all, I would like to give Kudos to my colleague Alejandro Cespedes, who gave me the original idea as he was working on a different project to extract metrics from the logs.

As as Site Reliability Engineer, one of the key aspects of my role is to improve the observability on your app. Ideally, you want to have the logs and metrics so you can create alerts, based on any of them. But sometimes that’s not the case: either you have logs or metrics. Let’s see how we can fix that and how we can create metrics based on our logs.

Our use case: AKAMAI Datastream

AKAMAI works great as it offers a lot of capabilities in different topics, as offloading or security. However there are some gaps in terms of observability as some of the reports offered by it are not in real time. Also, if you want to debug something, you need to access Luna UI, and the problem is that not everyone has access to it, or they don’t have enough knowledge to work with it. Also, it’s not a good idea to grant permissions on a tool if the grantee it’s not skilled enough for operating it, specially if the tool has a steep learning curve like AKAMAI.

To overcome this, I decided to use AKAMAI Datastream. AKAMAI Datastream is a product from AKAMAI that ships raw logs coming from their edge servers to a destination of your choice. It supports different destinations, like S3, Datadog, Azure, Google Cloud etc.

I’m not going to enter the details on how to configure AKAMAI Datastream, but if you want to know more about it, you can take a look into their official documentation here. The idea of this post is to consume the logs coming from Datastream and make them available to the SRE’s in our organization. Also, we’re going to create some metrics based on these logs and we’ll expose them as prometheus metrics. Let’s take a look on how a Datastream log looks like:

{
"version": 1,
"cp": "123456",
"reqId": "1239f220",
"reqTimeSec": "1573840000",
"bytes": "4995",
"cliIP": "128.147.28.68",
"statusCode": "206",
"proto": "HTTPS",
"reqHost": "test.hostname.net",
"reqMethod": "GET",
"reqPath": "/path1/path2/file.ext",
"reqPort": "443",
"rspContentLen": "5000",
"rspContentType": "text/html",
"UA": "Mozilla%2F5.0+%28Macintosh%3B+Intel+Mac+OS+X+10_14_3%29",
"tlsOverheadTimeMSec": "0",
"tlsVersion": "TLSv1",
"objSize": "484",
"uncompressedSize": "484",
"overheadBytes": "232",
"totalBytes": "0",
"queryStr": "param=value",
"accLang": "en-US",
"cookie": "cookie-content",
"range": "37334-42356",
"referer": "https%3A%2F%2Ftest.referrer.net%2Fen-US%2Fdocs%2FWeb%2Ftest",
"xForwardedFor": "8.47.28.38",
"maxAgeSec": "3600",
"reqEndTimeMSec": "3",
"errorCode": "ERR_ACCESS_DENIED|fwd_acl",
"turnAroundTimeMSec": "11",
"transferTimeMSec": "125",
"dnsLookupTimeMSec": "50",
"customField": "any-custom-value",
"cacheStatus": "1",
"country": "US",
"city": "HERNDON"
}

As you can see, the logs provide a lot of useful info like the path which is being hit, if the request was served from the cache or not or if the request has been blocked etc. Having them indexed in Elastichsearch is a huge step ahead, but let’s see how we can extract some metrics from these logs.

Meet vector.dev

If you haven’t heard before about vector, we can define it as a high-performance observability router. It can read data from multiple data sources, and determine all the required transformations and then ship the result to multiple destinations. All of these with a very small footprint as it’s written in Rust, making it very efficient.
Using vector we’re going to implement the following ETL:

As you can see on the diagram, AKAMAI will push the logs into S3, then vector will read these logs, creating some metrics that will be exposed to our prometheus instance and also shipping the logs to Kafka.
Let’s go step by step and let’s see how we can parse the logs coming from S3:

[sources.s3]
type = "aws_s3"
region = "your_region"
auth.access_key_id = "access_key_id"
auth.secret_access_key = "secret_access_key"
compression = "gzip"
sqs.delete_message = true # optional, default
sqs.poll_secs = 15 # optional, default, seconds
sqs.queue_url = "SQS_URL" # required

This is the basic config for parsing the logs coming from S3. Notice how vector requires an SQS queue. That’s because it’s used to scale vector horizontally: each time that AKAMAI writes into S3 bucket, an event is fired into SQS. Vector instances are then notified that a new log has been written and they proceed to read it from the bucket.
Next step is to parse the JSON message stored on these logs. We can achieve that very easily with the following snippet:

[transforms.parsing]
type = "remap" # required
inputs = ["s3"] # required
source = '''
. = parse_json!(string!(.message))
'''

Not much to see here, we’re just parsing the JSON message and splitting it into the different fields that it contains. Notice that we’re using as input the previous stage, S3.

Using this transformation, you can also add (or delete) more fields into your message. The following snippet shows how you can add more fields to your message, adding a simple string, adding a timestamp using a supported function for that and also how to delete fields in which you’re not interested:

[transforms.parsing]
type = "remap" # required
inputs = ["s3"] # required
source = '''
. = parse_json!(string!(.message))
.service.id = "123456789"
.timestamp = now()
del(.myunusedfield)
'''

As you can see, there’s a lot of room for you use case. All this transformations are based on Vector Remap Language. It uses a very simple syntax and there’s a lot of built-in functions. Make sure that you have it close to you when creating a pipeline using Vector.

Coming back to our example, let’s write the logs into Kafka:

[sinks.kafka]
type = "kafka" # required
inputs = ["parsing"] # required
bootstrap_servers = "Kafka_servers"
compression = "lz4" # optional, default
topic = "destination_topic" # required
# Encoding
encoding.codec = "json" # required
# Healthcheck
healthcheck.enabled = true

I’m writing to Kafka as is a common pattern: write to a Kafka topic which will be used as contention measure for Elasticseach: in case that we’re able to write logs very quickly, Elasticsearch won’t be overloaded by this, enabling us to consume them at a slower pace.
So, the full example will be something like this:

[sources.s3]
type = "aws_s3"
region = "your_region"
auth.access_key_id = "access_key_id"
auth.secret_access_key = "secret_access_key"
compression = "gzip"
sqs.delete_message = true # optional, default
sqs.poll_secs = 15 # optional, default, seconds
sqs.queue_url = "SQS_URL" # required
[transforms.parsing]
type = "remap" # required
inputs = ["s3"] # required
source = '''
. = parse_json!(string!(.message))
'''
[sinks.kafka]
type = "kafka" # required
inputs = ["parsing"] # required
bootstrap_servers = "Kafka_servers"
compression = "lz4" # optional, default
topic = "destination_topic" # required
# Encoding
encoding.codec = "json" # required
# Healthcheck
healthcheck.enabled = true

This is the basic skeleton for a vector pipeline. As you can see it is quite simple, as it contains a stage for defining a source for our data- a simple transformation which splits the JSON into the different fields, and a sink stage which is used to instruct vector where to store the result. You can use as a starting point for elaborating more complex stuff, but…

Wait…What about metrics?

So, we managed to read the logs from Datastream and ship them to Kafka which is quite good as all the SRE’s will be able use it as they wish. Let’s see now how we can improve this approach, creating some metrics based on this logs. As a proof of concept, we’re going to create two different metrics, cacheHits and cacheMiss.
For achieving that, we need to add a few transformation stages, first one, for filtering the logs in which we’re interested:

[transforms.cache_misses]
type = "filter" # required
inputs = ["parsing"] # required
condition = '.cacheStatus == "0"'

On this step, we’re filtering the logs that contains “cacheStatus = 0” using again as input the parsing stage. Let’s create the metric using the filtered logs

[transforms.logs2metrics-cache_misses]
# General
type = "log_to_metric" # required
inputs = ["cache_misses"] # required
# Metrics
[[transforms.logs2metrics-cache_misses.metrics]]
# General
field = "cacheStatus" # required
name = "cache_misses" # optional, no default
namespace = "website" # optional, no default
type = "counter" # required

As you can see, we’re creating a log_to_metric transformation using the filtered logs on the previous stage. On the transformation, we’re instructing vector to create the new metric based on cacheStatus field, using as a name cache_misses, on the namespace website and of type counter as we want to count the number of misses to the cache. Using this, vector will produce a metric like this:

website_cache_misses 50

You can also add labels to our new metric. Imagine that you have several markets and you want to have the previous metric for each market. If each market is identified by accept-language header, you can add it easily to the metric using the following snippet:

[transforms.logs2metrics-cache_misses]
# General
type = "log_to_metric" # required
inputs = ["cache_misses"] # required
# Metrics
[[transforms.logs2metrics-cache_misses.metrics]]
# General
field = "cacheStatus" # required
name = "cache_misses" # optional, no default
namespace = "website" # optional, no default
type = "counter" # required
tags.country = "{{message.accept-language}}"

And the result of the previous snippet:

website_cache_misses{country="es-ES"} 50

You can replicate the same process for creating the metric for the cache hits. Finally, we’ll need to expose these new metrics adding a new sink:

[sinks.prometheus_exporter]
type = "prometheus_exporter" # required
inputs = ["logs2metrics*"] # required
address = "0.0.0.0:9080" # required

Using the new sink, vector will expose on the port 9080 the new metrics as openMetrics format that can be later scraped by prometheus.
So, the full example for vector config will be similar to this one:

[sources.s3]
type = "aws_s3"
region = "your_region"
auth.access_key_id = "access_key_id"
auth.secret_access_key = "secret_access_key"
compression = "gzip"
sqs.delete_message = true # optional, default
sqs.poll_secs = 15 # optional, default, seconds
sqs.queue_url = "SQS_URL" # required
[transforms.parsing]
type = "remap" # required
inputs = ["s3"] # required
source = '''
. = parse_json!(string!(.message))
'''
[transforms.cache_misses]
type = "filter" # required
inputs = ["parsing"] # required
condition = '.cacheStatus == "0"'
[transforms.logs2metrics-cache_misses]
# General
type = "log_to_metric" # required
inputs = ["cache_misses"] # required
# Metrics
[[transforms.logs2metrics-cache_misses.metrics]]
# General
field = "cacheStatus" # required
name = "cache_misses" # optional, no default
namespace = "website" # optional, no default
type = "counter" # required
[transforms.cache_hits]
type = "filter" # required
inputs = ["parsing"] # required
condition = '.cacheStatus == "1"'
[transforms.logs2metrics-cache_hits]
# General
type = "log_to_metric" # required
inputs = ["cache_hits"] # required
# Metrics
[[transforms.logs2metrics-cache_hits.metrics]]
# General
field = "cacheStatus" # required
name = "cache_hits" # optional, no default
namespace = "website" # optional, no default
type = "counter" # required
[sinks.prometheus_exporter]
type = "prometheus_exporter" # required
inputs = ["logs2metrics*"] # required
address = "0.0.0.0:9080" # required
[sinks.kafka]
type = "kafka" # required
inputs = ["parsing"] # required
bootstrap_servers = "Kafka_servers"
compression = "lz4" # optional, default
topic = "destination_topic" # required
# Encoding
encoding.codec = "json" # required
# Healthcheck
healthcheck.enabled = true

Summary

As you can see, Vector is a powerful tool that allows you to not only ingest your logs, but also it allows you to create metrics on the fly using these logs. Let me know if you find it useful and if you have any other cool use cases.

--

--