Parsing 101: Best Practices & Tips
In this post I cover some of the best practices I use and advocate when writing a Chronicle SIEM Parser. The fundamentals here will help whether writing a custom Parser from scratch, forking a default Parser, or writing a Chronicle Parser Extension.
Concepts
Let’s recap some concepts and fundamentals first.
Chronicle SIEM normalizes raw log data into a structured schema, the Universal Data Model (UDM), by running a Parser against a tagged feed of data, an Ingestion Label.
💡 If this is all new to you, I recommend taking the (free) Chronicle SIEM fundamentals course, which includes Parser Basics.
Parser (CBN)
A Parser, or sometimes referred to as a CBN (Configuration Based Normalization) is a Chronicle SIEM configuration file that maps a raw log into Chronicle SIEM’s schema, UDM (the Universal Data Model).
UDM
Chronicle SIEM’s schema. Want to Search, build a Dashboard, craft a Detection Engine YARA-L rule? You’ll need data in UDM format.
Ingestion Label
A tag applied to a log source that maps the incoming raw data feed to a Chronicle Parser.
Parsing Errors
A Parser Error is generated when the Parser doesn’t work as expected, e.g.,:
- A Grok regex fails
- An input filter fails
- A rename or replace operation fails
- and many more things that can go wrong…
Parsing Validation
A Parsing Validation is generated when the Parser worked, but the output UDM Event or UDM Entity does not meet the requirements, e.g.,:
- A UDM
metadata.event_type
ofUSER_LOGIN
doesn’t include atarget.user
value - A UDM
metadata.event_type
ofNETWORK_CONNECTION
doesn’t include atarget.hostname
Best Practices
And now for some of the best practices to consider when working on your custom, forked, or parser extensions.
❗ The features discussed below may not be enabled by default. If you attempt any operations, e.g., using the Parser API, and receive an error contact your Partner or Chronicle support for assistance.
Use VSCode
As mentioned, while you can (in preview) manage Parsers in the Chronicle UI, if you do a lot of Parser work you may want to look into using Microsoft VSCode as your IDE.
It’s open source, free, includes a Log Stash syntax highlighter (which makes writing a Parser far easier on the eyes), and has great SSH integration for remote development.
If you format and indent your Parser code neatly you can use VSCode’s collapse and expand keybindings too.
When used in combination with the Chronicle CLI or CBN CLI utility it’ll make for faster development or debugging.
Use the Chronicle CLI or CBN CLI utilty
Use either the Chronicle CLI, or the Chronicle CBN CLI tool.
If you have this installed on your VSCode host, you have a powerful development environment to run and test your updates.
💡 I covered the Chronicle CLI in a recent post.
Use Comments in your Parser
Chronicle SIEM’s Parsers support comments — use them to your advantage. While a Parser makes perfect sense the day you wrote it, it’s another thing when you, or someone else, has to come and re-visit it several months later.
If you inspect Chronicle SIEM’s default Parsers you’ll note the following header is always (usually) applied:
# Product: ACME Labs
# Category: Load Balancer
# Supported Format: JSON
# Reference: https://acme.internal/acme_load_balancer
# Last Updated: 2022-01-11
# Copyright 2022 ACME Inc
It’s not just headers though, you can (and should) apply comments through a Parser to explain why you the Parser is doing something, rather than what it’s doing. Take this example:
...
# only assign a Namespace if the source address is RFC1918 or Loopback IP address
if [jsonPayload][id][orig_h] =~ /^(127(?:\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?))\{3\}$)|(10(?:\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?))\{3\}$)|(192\.168(?:\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?))\{2\}$)|(172\.(?:1[6-9]|2\d|3[0-1])(?:\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?))\{2\}$)/ {
mutate {
replace => {
"event1.idm.read_only_udm.principal.namespace" => "%{resource.labels.project_id}"
}
}
}
...
The comment aims to explain why and what its doing, and means either you or the next person who has to maintain the parser doesn’t have to spend several minutes trying to work out just what is going on.
If using VSCode you can use the keyboard shortcuts of Ctrl + K then Ctrl + C to block comment, and Ctrl + L then Ctrl + U to block un-comment.
Initialize Conditional Variables First
Before extracting fields from the original raw log initialize any variables you will use for conditional checks; otherwise, if the field does not exist, the Parser will generate an error (failed_parsing).
Take the below Parser syntax that maps the original field does_not_exist
to metadata.product_name
:
mutate{
replace => {
"event1.idm.read_only_udm.metadata.product_name" => "%{does_not_exist}"
}
}
If does_not_exist
does not exist (the clue was in the field name) then you get the following error:
LOG_PARSING_CBN_ERROR:
"generic::invalid_argument: pipeline failed: filter mutate (4) failed:
replace failure: field \"event1.idm.read_only_udm.metadata.product_name\": source field \"does_not_exist\": field not set"
You could use the on_error
statement to catch such an error:
mutate{
replace => {
"event1.idm.read_only_udm.metadata.product_name" => "%{does_not_exist}"
}
}
on_error => "_error_does_not_exist"
}
Wile the above successfully catches the parsing error, into a boolean called _error_does_not_exist
, this approach does not work for a conditional check. Take the below example
if [does_not_exist] != "" {
mutate{
replace => {
"event1.idm.read_only_udm.metadata.product_name" => "%{does_not_exist}"
}
}
on_error => "_error_does_not_exist"
}
This will generate an error, as below, as the if
conditional does not support on_error
statements:
LOG_PARSING_CBN_ERROR:
"generic::invalid_argument: pipeline failed: filter conditional (4) failed:
failed to evaluate expression: generic::invalid_argument: \"does_not_exist\" not found in state data"
How do you initialize variables? Simple put them ahead of any extraction filter plugins, e.g., JSON, CSV, XML, KV, or Grok, as below:
filter {
# 0. create variables for any field you will use for a conditional check
mutate {
replace => {
"timestamp" => ""
"does_not_exist" => ""
}
}
# 1. load the logs fields from the default message field
json {
source => "message"
array_function => "split_columns"
on_error => "_not_json"
}
...
And with that, your Parser will gracefully handle the scenario if the field exists or not in a conditional. Combined with on_error statements you can start to create a resilient parser.
⚔️ Assume log data is hostile, and will do unexpected things!
Use on_error statements
Building from the last statement, that log is hostile, you should build a Parser to handle unexpected log formats, and badly formatted data.
During on-boarding or maintenance it is not uncommon for the wrong log source to be ingested, and often you’ll only find out when you try to apply a Parser update which fails 😡
Building on the prior Parser example, a best practice is add on_error
handlers to your filter plugin, and test these before proceeding to the main body of your Parser:
- The Parser uses the JSON filter plugin with an
on_error
boolean called_not_json
- If
_not_json
is true, i.e., the input log wasn’t valid JSON, then the first branch of the condition will be used, and if_not_json
is false, i.e., the input log was JSON, then the else branch will be used:
...
# 1. load the logs fields from the default message field
json {
source => "message"
array_function => "split_columns"
on_error => "_not_json"
}
# 2. test the received log matched the expected format
if [_not_json] {
drop { tag => "TAG_MALFORMED_MESSAGE" }
} else {
...
Additional checks can be applied to test a known expected field in the correct format:
# 2. test the received log matched the expected format
if [_not_json] {
drop { tag => "TAG_MALFORMED_MESSAGE" }
} else {
# timestamp is always expected
if [timestamp] != "" {
# main parser logic goes here
} else {
# if we don't see the timestamp field, it's not out log source
drop { tag => "TAG_UNSUPPORTED" }
}
}
...
This is very defensive parser writing, but it will ensure you are not blocked on Parser submission in the event of having incorrect data sources attached to the same Ingestion Label.
Use the Drop filter, including Tags
As covered in the previous examples, implement Drop filter labels in your Parser for improved metric generation.
- TAG_UNSUPPORTED
- TAG_MALFORMED_ENCODING
- TAG_MALFORMED_MESSAGE
- TAG_NO_SECURITY_VALUE
These metrics are made available in the Ingestion Metrics table in Chronicle’s Data Lake (I wrote about this previously here):
SELECT
log_type,
drop_reason_code,
COUNT(drop_reason_code) AS count
FROM `datalake.ingestion_metrics`
GROUP BY 1,2
ORDER BY 1 ASC
GROK
Running a quick analysis (and by analysis I mean a Grep) of what is the most commonly used Chronicle SIEM input filters — its… Grok
| --- | ---- | ---- | -- | --- |
| csv | grok | json | kv | xml |
| --- | ---- | ---- | -- | --- |
| 1% | 79% | 11% | 8% | 1% |
Note, this isn’t showing the format of the original log, but rather a common approach is use an initial Grok statement to extract a different format from a Syslog message but it does show that even with an era of semi-structured log formats you will still be using a lot of Regex.
Golang (RE2) vs PCRE Regex
If you’ve written Regex for SIEMs before you’ll most likely have used PCRE. Chronicle, being a Google Cloud product uses Go based RE2 — the regex engine in Golang.
RE2 supports most PCRE syntax, but differences relevant for Chronicle CBN listed below:
|-------------------------|----------------------------|
| PCRE Regex | CBN Golang Regex |
|-------------------------|----------------------------|
| (?<_custom_field>\w+)\s | (?P<_custom_field>\\w+)\\s |
In terms of features, RE2 doesn’t support PCRE regex features such as, back referencing, lookaheads, etc…
For more info on RE2 see WhyRE2.
Use Named Capture Groups over GROK Patterns
This is a more subjective best practice.
Chronicle SIEM Parsers support GROK Patterns (pre-built named regex groups) but, while GROK Patterns will make for easier writing of a Parser, can add complexity to troubleshooting (you can’t see the Regex itself) and less performant Parsers overall (see below example, often they’re not using the most optimal regex).
GROK Regex Pattern (78 steps)
^User \\\"%{DATA:user}\\\” (?:(logged on|logged off)) to workstation \\\"%{DATA:device}\\\"$
RE2 Regex (34 steps)
^[^\\]+\\\"(?P<_user>[^\\]+)\\\"\s(?:(logged\son|logged\soff))\s[^\\]+\\\"(?P<_device>[^\\]+)\\\"$
Escaping escape characters
An important when implementing RE2 regex in Chronicle SIEM’s Parser configurations is as follows:
- Regex shorthand characters must be escaped in a Grok filter, e.g.,
\s
becomes\\s
- Regex special characters do not need to be double escaped in a Grok filter, e.g.,
\\
stays as\\
Here’s an example of this in action, take the following log:
This is a sample log.
and an associated Parser configuration:
filter {
mutate {
replace => {
"event1.idm.read_only_udm.metadata.event_type" => "GENERIC_EVENT"
"event1.idm.read_only_udm.metadata.vendor_name" => "ACME Labs"
}
}
grok {
match => {
"message" => ["^(?P<_firstWord>[^\s]+)\s.*$"]
}
on_error => "_grok_message_failed"
}
if ![_grok_message_failed] {
mutate {
replace => {
"event1.idm.read_only_udm.metadata.description" => "%{_firstWord}"
}
}
}
mutate {
merge => {
"@output" => "event1"
}
}
}
What will the output be of the firstWord
named regex capture group in metadata.description
?
events: <
timestamp: <
seconds: 1672137285
nanos: 322259051
>
idm: <
read_only_udm: <
metadata: <
event_timestamp: <
seconds: 1672137285
nanos: 322259051
>
event_type: GENERIC_EVENT
vendor_name: "ACME Labs"
description: "Thi"
>
>
>
>
Thi
.
Hmmm, that’s missing an S?
The fix? You just need add an additional escape character.
“message” => [“^(?P<_firstWord>[^\\s]+)\\s.*$”]
And your Grok will work as expected. The below table includes a summary of what additional escape characters you’ll need to add to your Parser.
|-----------|-----------|---------------------------------------------------------------------------------------------|
| RE2 Regex | CBN Regex | Explanation |
|-----------|-----------|---------------------------------------------------------------------------------------------|
| \s | \\s | Regex shorthand chars require escaping |
| \. | \\. | Regex reserved chars require escaping |
| \\" | \\\" | As above |
| \] | \\] | As above |
| \| | \\| | As above |
| [^\\]+ | [^\\\\]+ | Regex special chars within character class group require escape |
| \\\\ | \\\\ | Special characters outside character class group or shorthand chars do not require escaping |
Troubleshooting
There will be errors, there always are.
Use Statedump, with Labels
The Statedump function is used to print (dump) the state of the parser at that point of execution. You can also add a custom label to each instance to uniquely identify that instance, e.g., :
statedump{label => "pre-json filter"}
Which will output as follows when you run your Parser:
Internal State (label=pre-json filter):
{
"@collectionTimestamp": {
"nanos": 994411420,
"seconds": 1672135087
},
"@createTimestamp": {
"nanos": 994411420,
"seconds": 1672135087
},
"@enableCbnForLoop": true,
"@onErrorCount": 0,
"@output": [],
"@timezone": "",
"foo": "",
"message": "{ \"timestamp\": \"2022-12-26T16:39:57-08:00\", \"event\": \"login\", \"details\": { \"user\": \"thatsiemguy\", \"ip\": [\"1.2.3.4\", \"5.6.7.8\"] } }",
"timestamp": ""
}
You can add multiple Statedump functions in your Parser, which can be useful for troubleshooting complex Parser issues.
💡 Remember! You can’t submit a Parser to production with the Statedump label, it will be rejected. Make sure to remove them entirely before submission, even commented out Statedumps will fail submission.
Event timestamp before minTimestamp
Is the extracted timestamp older than six months?
Chronicle SIEM has a default configuration that prevents ingestion of logs older than six months. If you encounter this during parser development you can i) disable the date plugin during investigation, ii) test on newer logs, or iii) manually change the timestamps to avoid the error.
generic::unknown: invalid event 0: LOG_PARSING_GENERATED_INVALID_EVENT: "generic::invalid_argument: failed to validate event timestamp: event timestamp 1996-12-20 00:39:57 +0000 UTC is before minTimestamp: 1999-12-31 16:00:00 -0800 PST"
Similarly, there an error will be generated for logs with a timestamp in the future, with a reference point of UTC.
generic::unknown: invalid event 0: LOG_PARSING_GENERATED_INVALID_EVENT: "generic::invalid_argument: failed to validate event timestamp: event timestamp 2023-01-20 00:39:57 +0000 UTC is beyond maxTimestampFutureDuration: 168h0m0s"
Variable name collisions
Chronicle SIEM’s Parsers don’t have any reserved field names (afaict); however, you can end up with a conflict of field names depending on your output configuration.
The most common issue is a log message including a field called event
and a Parser configuration using the output field event
as well. This won’t end well!
Rather, rename the original event field to something else, e.g., rename event
to _event
, or if using UDM ensure you use event1
as your output variable.
Use GROK named placeholders and build iteratively
When creating (or troubleshooting) complex Regex, use a generic match all regex named placeholder, and then start adding a small piece of regex one at a time.
Take the following example, it starts off with the first Regex pattern that matches everything in the Grok statement, and then in each successive step add a small change one at a time.
|---------------------------------------------------|-------------------------------------------------------|
| Initial CBN RE2 Regex | Named Catchall Capture Group Output |
|---------------------------------------------------|-------------------------------------------------------|
| "^(?P<_catchall>.*$)" | User \"BOB\" logged on to workstation \"DESKTOP-01\". |
| ^User\s\\\"(?P<_catchall>.*$) | BOB\" logged on to workstation \"DESKTOP-01\". |
| ^User\s\\\"(?P<_user>.*?)\\\"\s(?P<_catchall>.*$) | logged on to workstation \"DESKTOP-01\". |
| Continue on till entire message is matched | |
Troubleshooting Parser Validation
When building a Chronicle Parser you may encounter errors with UDM Validation, e.g., a required field not being set to output a given UDM event type.
One option for learning or troubleshooting I found useful myself, and when training folks, is abstracting the log away from the Parser, and manually setting variable values in your Parser directly.
Note, this isn’t something you would ever do for Production (as you’d be sending the same static values over each time!) but this is useful for learning, testing, or troubleshooting UDM Validation, e.g., you’re trying to output a USER_LOGIN event but keep getting an error about auth not being set. You can manually set the variables for the auth UDM object and output static values to get the Parser working, then come back and examine the log to see what fields would make most sense to use. In some cases, there will not be values you can use, so you will have to set some manual defaults.
# meta:
# author = "thatsiemguy@"
# description = "CBN Template for UDM USER_LOGIN events"
# version = "1.1"
# created = "2020-04-01"
# updated = "2020-07-07"
filter {
mutate {
replace => {
# UDM > Metadata
"metadata_event_timestamp" => ""
"metadata_vendor_name" => "Acme"
"metadata_product_name" => "Acme SSO"
"metadata_product_version" => "1.0"
"metadata_product_event_type" => "login"
"metadata_product_log_id" => "12345678"
"metadata_description" => "A user logged in."
"metadata_event_type" => "USER_LOGIN"
# UDM > Principal
"principal_ip" => "192.0.2.10"
# UDM > Target
"target_application" => "Acme Connect"
"target_user_user_display_name" => "Mary Smith"
"target_user_userid" => "mary@acme.com"
# UDM > Extensions
"auth_type" => "SSO"
"auth_mechanism" => "USERNAME_PASSWORD"
# UDM > Securit Results
"securityResult_action" => "ALLOW"
"security_result.severity" => "LOW"
}
}
# ------------ Input Configuration --------------
#
# Extract fields from message, e.g., json filter, kv, grok
# ------------ Date Extract --------------
#
# If no date {} function is used, defaults to process time
# ------------ Field Assignment --------------
# UDM Metadata
mutate {
replace => {
"event1.idm.read_only_udm.metadata.vendor_name" => "%{metadata_vendor_name}"
"event1.idm.read_only_udm.metadata.product_name" => "%{metadata_product_name}"
"event1.idm.read_only_udm.metadata.product_version" => "%{metadata_product_version}"
"event1.idm.read_only_udm.metadata.product_event_type" => "%{metadata_product_event_type}"
"event1.idm.read_only_udm.metadata.product_log_id" => "%{metadata_product_log_id}"
"event1.idm.read_only_udm.metadata.description" => "%{metadata_description}"
"event1.idm.read_only_udm.metadata.event_type" => "%{metadata_event_type}"
}
}
# UDM Auth
mutate {
replace => {
"event1.idm.read_only_udm.extensions.auth.type" => "%{auth_type}"
}
merge => {
"event1.idm.read_only_udm.extensions.auth.mechanism" => "auth_mechanism"
}
}
# UDM Principal
mutate {
merge => {
"event1.idm.read_only_udm.principal.ip" => "principal_ip"
}
}
# UDM Target
mutate {
replace => {
"event1.idm.read_only_udm.target.user.userid" => "%{target_user_userid}"
"event1.idm.read_only_udm.target.user.user_display_name" => "%{target_user_user_display_name}"
"event1.idm.read_only_udm.target.application" => "%{target_application}"
}
}
# UDM > Security Results
mutate {
merge => {
"security_result.action" => "securityResult_action"
}
}
mutate {
merge => {
"event1.idm.read_only_udm.security_result" => "security_result"
}
}
# ------------ Output Event(s) --------------
statedump{
label => "pre_event"
}
mutate {
merge => {
"@output" => "event1"
}
}
}
#events_for_log_entry: <
# events: <
# timestamp: <
# seconds: 1585875373
# nanos: 487737000
# >
# idm: <
# read_only_udm: <
# metadata: <
# product_log_id: "12345678"
# metadata_event_timestamp: <
# seconds: 1585875373
# nanos: 487737000
# >
# event_type: USER_LOGIN
# vendor_name: "Acme"
# product_name: "Acme SSO"
# product_version: "1.0"
# product_event_type: "login"
# description: "A user logged in."
# >
# principal: <
# ip: "192.0.2.10"
# >
# target: <
# user: <
# userid: "mary@acme.com"
# user_display_name: "Mary Smith"
# >
# application: "Acme Connect"
# >
# extensions: <
# auth: <
# type: SSO
# mechanism: USERNAME_PASSWORD
# >
# >
# >
# >
# >
#>
Summary
Hopefully some of the tips and best practices will help you with your Parser adventures, and used in combination with Chronicle SIEM’s online free training materials, and pretty detailed parsing documentation, you should be able to customize your log data in no time.