Chronicle Ingestion Stats & Metrics

Chris Martin (@thatsiemguy)
17 min readNov 13, 2022

--

In this post we’re going to cover how you can utilize the managed instance of Google Cloud BigQuery (there will be SQL), aka the Chronicle Data Lake, available with Chronicle SIEM to get insights into Ingestion, Parsing, and Chronicle’s Forwarder itself . Specifically, focusing on the SQL tables available in the Data Lake, what they audit, and for which use cases you’re best using either table:

  1. Ingestion Stats
  2. Ingestion Metrics

🆕 Dec 23 update, the Ingestion Metrics table has superseded the Ingestion Stats table as the recommended source of truth for anything ingestion related in Chronicle SIEM.

TL;DR:

|-----------------------|-----------------|-------------------|
| Item | Ingestion Stats | Ingestion Metrics |
|-----------------------|-----------------|-------------------|
| Latency? | Hours | Minutes |
| Partitioned? | Yes | No |
| Namespaces? | No | Yes |
| Log & Event Counts? | Yes | Yes |
| Log & Event Ratios? | Yes | No |
| Parsing Errors? | Yes | No |
| Parsing Drop Function?| No | Yes |
| Forwarder Statistics? | No | Yes |
| Telemetry Stages | Single | Multiple |
|-----------------------|-----------------|-------------------|

In this post we’ll explore the Ingestion Stats and Metrics table via SQL and BigQuery, but you don’t have to use SQL. You can use Chronicle’s embedded Enterprise Looker dashboards, or Enterprise Looker, for many of the use cases discussed in this post; however, for the purposes of really understanding the metrics tables, and the use cases available, SQL is a great learning aide.

A custom Chronicle SIEM dashboard using embedded Looker to show Ingestion Stats and Metrics.

Ingestion Stats

The original (older) of the two telemetry related tables in the Chronicle Data Lake, used for reporting purposes log and event volume. For non real-time requirements this is the table to use but, to understand better why, let’s first look at the Ingestion Stats table and key things you should know:

  • provides counts and ratios (calculated percentages) for:
    - normalization
    - parsing
    - validation
    - enrichment
    - total
  • uses batch inserts, with a minimum latency of two hours
    - you can test the latency of data written with a SQL like below:
SELECT
time_bucket,
timestamp_sec,
TIMESTAMP_SECONDS(timestamp_sec) AS latest_timestamp,
TIMESTAMP_DIFF(TIMESTAMP_SECONDS(timestamp_sec),CURRENT_TIMESTAMP(),HOUR) AS latency
FROM
`datalake.ingestion_stats`
WHERE
DATE(_PARTITIONTIME) = CURRENT_DATE()
GROUP BY
1,
2
ORDER BY
timestamp_sec DESC
LIMIT
1

Which returns results as follows:

|-------------|---------------|-------------------------|---------|
| time_bucket | timestamp_sec | latest_timestamp | latency |
|-------------|---------------|-------------------------|---------|
| 2022111214 | 1668261600 | 2022-11-12 14:00:00 UTC | -2 |
|-------------|---------------|-------------------------|---------|
  • telemetry is logged per time_bucket field
    - e.g. 2022111114 is 2022 Nov 11th 14:00 Hours UTC
  • is a partitioned table
    - by _PARTITIONTIME
  • writes records for 0 entries observed for a log source
    - useful to know as the Ingestion Metrics does not write 0 based entries, i.e., if a log stops logging you have to infer by a lack of an entry, but more on that later.

Validation Log Ingestion, Parsing, and Error Counts

As a schema on write platform having the capability to inspect normalization, and potential parsing issues, is a key capability to ensure reliable detection and response.

The below SQL uses the Ingestion Stats table’s count fields to provide an aggregate summary of all ingested events with normalization and error counts:

SELECT
FORMAT("%'d",SUM(entry_number)) AS entry_number,
FORMAT("%'d",SUM(normalized_event_count)) AS normalized_event_count,
FORMAT("%'d",SUM(parsing_error_count)) AS parsing_error_count,
FORMAT("%'d",SUM(validation_error_count)) AS validation_error_count,
FORMAT("%'d",SUM(enrichment_error_count)) AS enrichment_error_count,
FORMAT("%'d",SUM(total_error_count)) AS total_error_count
FROM
`datalake.ingestion_stats`
WHERE
DATE(_PARTITIONTIME) = CURRENT_DATE()

And the results, where:

  • entry_number
    - is the number of raw logs
  • normalized_event_count
    - the number of successfully parsed raw logs
  • parsing_error_count
    - is where a CBN had an unexpected (unhandled) error
  • validation_error_count
    - is where the outputted UDM event from a CBN failed validation, e.g., a PROCESS_LAUNCH without a Process
| entry_number | normalized_event_count | parsing_error_count | validation_error_count | enrichment_error_count | total_error_count |
|--------------|------------------------|---------------------|------------------------|------------------------|-------------------|
| 2,061,748 | 2,056,349 | 9 | 0 | 0 | 9 |

You may have noticed there are a few errors observed.

To expand upon this SQL and find those log sources in question with some form of error, add i) the log_type, and ii) an additional WHERE statement clause to find log sources with a non-zero total_error_count (which includes parsing, validation, and enrichment errors), as follows:

SELECT
log_type,
FORMAT("%'d",SUM(normalized_event_count)) AS normalized_event_count,
FORMAT("%'d",SUM(parsing_error_count)) AS parsing_error_count,
FORMAT("%'d",SUM(validation_error_count)) AS validation_error_count,
FORMAT("%'d",SUM(enrichment_error_count)) AS enrichment_error_count,
FORMAT("%'d",SUM(total_error_count)) AS total_error_count
FROM
`datalake.ingestion_stats`
WHERE
DATE(_PARTITIONTIME) = CURRENT_DATE()
AND total_error_count > 0
GROUP BY 1

And the results as follows. Note, the FORMAT function is used for display purposes to add a decimal comma separator (or customize as needed, or leave out altogether)

|------------|--------------|------------------------|---------------------|------------------------|------------------------|-------------------|
| log_type | entry_number | normalized_event_count | parsing_error_count | validation_error_count | enrichment_error_count | total_error_count |
|------------|--------------|------------------------|---------------------|------------------------|------------------------|-------------------|
| ACME | 9 | 0 | 9 | 0 | 0 | 9 |
|------------|--------------|------------------------|---------------------|------------------------|------------------------|-------------------|

Important to note, while you can run and get these insights via SQL, Chronicle SIEM does include pre-built Dashboards with the embedded Looker offering that provides all the insights above, as well as Dashboards in the Enterprise Looker Chronicle Marketplace offering. A post for another day!

Validation Log Ingestion, Parsing, and Error Counts as Ratios

The Ingestion Stats table also includes pre-calculated ratios, or percentages. Building upon the prior SQL we can use the ratio columns to get telemetry results in a slightly easier to consume format:

WITH
ratios AS (
SELECT
log_type,
DATE_TRUNC(DATE(TIMESTAMP_SECONDS(timestamp_sec)), DAY) AS day,
ROUND(AVG(normalization_ratio),2) AS normalization_ratio,
ROUND(AVG(parsing_error_ratio),2) AS parsing_error_ratio,
ROUND(AVG(validation_error_ratio),2) AS validation_error_ratio,
ROUND(AVG(enrichment_error_ratio),2) AS enrichment_error_ratio,
ROUND(AVG(total_error_ratio),2) AS total_error_ratio
FROM
`datalake.ingestion_stats`
WHERE
DATE(_PARTITIONTIME) > DATE_SUB(CURRENT_DATE, INTERVAL 7 DAY)
GROUP BY
1,
2)
SELECT
log_type,
day,
FORMAT("%d%%",CAST(SUM(100 * normalization_ratio) AS INT64)) AS normalization_ratio,
FORMAT("%d%%",CAST(SUM(100 * parsing_error_ratio) AS INT64)) AS parsing_error_ratio,
FORMAT("%d%%",CAST(SUM(100 * validation_error_ratio) AS INT64)) AS validation_error_ratio,
FORMAT("%d%%",CAST(SUM(100 * enrichment_error_ratio) AS INT64)) AS enrichment_error_ratio,
FORMAT("%d%%",CAST(SUM(100 * total_error_ratio) AS INT64)) AS total_error_ratio
FROM
ratios
GROUP BY
1,
2
ORDER BY
1,
2 ASC

And example results. A quick review shows the AUDITD log source has a normalization ratio ranging from 54% through to 99% over the last several days, but no error percentage. From this we can infer the CBN is dropping logs (intentionally); however, this is an area where you would need to use the Ingestion Metrics table to get the specific drop values and parser outputs.

|-----|----------|------------|---------------------|---------------------|------------------------|------------------------|-------------------|
| Row | log_type | day | normalization_ratio | parsing_error_ratio | validation_error_ratio | enrichment_error_ratio | total_error_ratio |
|-----|----------|------------|---------------------|---------------------|------------------------|------------------------|-------------------|
| 1 | AUDITD | 2022-11-07 | 81% | 0% | 0% | 0% | 0% |
| 2 | AUDITD | 2022-11-08 | 54% | 0% | 0% | 0% | 0% |
| 3 | AUDITD | 2022-11-09 | 99% | 0% | 0% | 0% | 0% |
| 4 | AUDITD | 2022-11-10 | 99% | 0% | 0% | 0% | 0% |
| 5 | AUDITD | 2022-11-11 | 99% | 0% | 0% | 0% | 0% |
| 6 | AUDITD | 2022-11-12 | 99% | 0% | 0% | 0% | 0% |
| 7 | AUDITD | 2022-11-13 | 99% | 0% | 0% | 0% | 0% |

Note, there probably is a better way to write the above query (but you get the idea, hopefully) — it generates the AVERAGE ratio across a day (as a SUM operation would give percentages in the thousands), and then uses a DATE_TRUNC function to effectively group logs into the same day (by default in UTC).

A final observation upon the ratio values in Ingestion Stats, you may see instances that in the hundreds to thousands 🤔 How does a log source normalize over 100%? Some CBNs output multiple events from a single log, e.g., WORKSPACE_ACTIVITY for Google Drive logs can include dozens of events in a single log. It’s not common, but worth being aware of.

Average Log Size, by Log Source

We can use the Ingestion Stats table to calculate the SUM of logs ingested, the SUM of all bytes ingested, and calculate the AVERAGE log size per log source, over the last 7 days:

WITH
base_stats AS (
SELECT
log_type,
SUM(entry_number) AS sum_entries,
SUM(size_bytes) AS sum_bytes
FROM
`datalake.ingestion_stats`
WHERE
DATE(_PARTITIONTIME) > DATE_SUB(CURRENT_DATE, INTERVAL 7 DAY)
GROUP BY
1 )
SELECT
log_type,
sum_entries,
sum_bytes,
IFNULL(TRUNC(CAST(SAFE_DIVIDE(sum_bytes, sum_entries) AS NUMERIC),0),0) AS average_byte_size
FROM
base_stats
GROUP BY
1,
2,
3
ORDER BY 1 ASC

And example results:

|-----|---------------------|-------------|------------|-------------------|
| Row | log_type | sum_entries | sum_bytes | average_byte_size |
|-----|---------------------|-------------|------------|-------------------|
| 1 | AUDITD | 2509967 | 653130788 | 260 |
| 2 | BRO_JSON | 7779569 | 7955965760 | 1022 |
| 3 | CATCH_ALL | 8 | 1713 | 214 |
| 4 | FORWARDER_HEARTBEAT | 18841 | 24489675 | 1299 |
|-----|---------------------|-------------|------------|-------------------|

A neat feature of Chronicle SIEM is not having to expend significant resources on sizing and capacity planning to the extent you commonly would with an on-prem, or cloud based SAS SIEM solutions; however, capacity insight and planning are still an important consideration, and if your are on a storage based model with Chronicle, the insights from Ingestion Stats can help you optimize your license utilization.

Silent Log Sources

Just as understanding what log sources are being ingested and the data volumes they’re producing, having an understanding of if those same sources are not working, or have stopped logging altogether, is an important requirement.

The below SQL could be used to identify log sources that have not reported in the last 24 hours:

DECLARE __REALTIME_LOG_SOURCES__ ARRAY <STRING>;
SET __REALTIME_LOG_SOURCES__ = ['AUDITD',
'GCP_CLOUDAUDIT',
'GCP_CLOUD_NAT',
'GCP_IDS',
'WINDOWS_SYSMON',
'WINEVTLOG',
'BRO_JSON',
'GCP_DNS',
'GCP_FIREWALL',
'SQUID_WEBPROXY'];

SELECT
log_type,
DATE(TIMESTAMP_SECONDS(timestamp_sec)) AS date,
MAX(TIMESTAMP_SECONDS(timestamp_sec)) AS last_seen,
TIMESTAMP_DIFF(MAX(TIMESTAMP_SECONDS(timestamp_sec)),CURRENT_TIMESTAMP,hour) AS hours_silent
FROM
`datalake.ingestion_stats`
WHERE
DATE(_PARTITIONTIME) > DATE_SUB(CURRENT_DATE, INTERVAL 1 DAY)
AND log_type IN UNNEST(__REALTIME_LOG_SOURCES__)
GROUP BY
date,
log_type
HAVING
SUM(entry_number) = 0
ORDER BY
1,
2 ASC

The SQL use Google Standard SQL procedural language to declare a variable (an ARRAY) of log sources that are expected to have continuous ingestion, as not all log sources are continuous, and many are interval based, e.g., your daily Contextual CMDB or IOC feed should not be alerted upon if it’s not ingested in the last X hours if it only brings in data daily.

Tip, You can use the Feed Management API documentation to lookup the interval for API or Object Storage based log sources (search for “Ingest schedule”).

The SQL then looks for any log type with a count of 0 raw_logs in the last 24 hours period, calculates the last seen, and hours silent values (with the caveat there will be at least 2 hours latency on these stats).

And example results if there’s a log source not reported in would be as follows:

|----------|------------|--------------------------------|--------------|
| log type | date | last seen | hours silent |
|----------|------------|--------------------------------|--------------|
| AUDITD | 2022-11-11 | 2022-11-11 15:00:00.000000 UTC | -2 |

For non-critical log sources, you could use such an approach, but given the latency of the batch exports if you require notification under two hours this is not sufficient, but that’s where Ingestion Metrics come in.

Ingestion Metrics

Ingestion Metrics is the newer of the telemetry tables in Chronicle Data Lake, and addresses the issue of the higher latency batch export of Ingestion Stats, as well as providing Chronicle Forwarder level telemetry.

Let’s run a version of the SQL we ran earlier to validate the lower latency of the table:

SELECT
end_time,
TIMESTAMP_DIFF(MAX(end_time),CURRENT_TIMESTAMP(),MINUTE) AS latency
FROM
`datalake.ingestion_metrics`
GROUP BY
1
ORDER BY
end_time DESC
LIMIT
1

And we can see its in the area of minutes versus hours for Ingestion Stats (the previous SQL for Ingestion Stats used a TIMESTAMP_DIFF in HOURS)

|-----|--------------------------------|---------|
| Row | end_time | latency |
|-----|--------------------------------|---------|
| 1 | 2022-11-12 16:42:07.919602 UTC | -7 |

Ingestion Metrics includes a subset of the log volume counts that Ingest Stats provides, specifically log and event counts, but does not include parsing metrics or ratios. Ingestion Metrics also includes:

  • chronicle forwarder telemetry
  • parser error details
  • the associated (tagged) Namespace

Let’s look into those.

Querying Chronicle Forwarder Telemetry

The Chronicle Forwarder is the Chronicle SIEM’s collector (or agent), a container image you can run on Docker (officially supported), GCP GKE (works, but not officially supported), or RHEL Podman (also works, but not officially supported).

Note, the support on various container platforms is at this the of writing, that could and may change.

Each Chronicle Forwarder has a unique ID which helps to tell them apart. You can inspect your Chronicle Forwarders with a SQL as follows, also including the Namespace, and an array of associated log sources being collected:

SELECT
namespace,
collector_id,
ARRAY_AGG(DISTINCT log_type ORDER BY log_type ASC) as log_type,
FROM
`datalake.ingestion_metrics`
WHERE
NOT REGEXP_CONTAINS(collector_id, r'^[a-e]{8}')
AND log_type IS NOT NULL
GROUP BY 1,2

What is the exception filters in the WHERE clause? 🧐 These are internal component IDs used in Chronicle SIEM which, for the purpose of analyzing anything related to a Chronicle Forwarder, we can safely filter out (they are GUIDs which are the same letter, or number).

| --------- | ------------------------------------ | ------------------- |
| namespace | collector id | log_type |
| --------- | ------------------------------------ | ------------------- |
| null | 5fdc1432-9920-4a07-92b1-b2bbbcf4865e | AUDITD |
| --------- | ------------------------------------ | AZURE_AD_CONTEXT |
| POWERSHELL |
| SURICATA_EVE |
| SURICATA_IDS |
| WINDOWS_AD |
| WINDOWS_DEFENDER_AV |
| WINDOWS_SYSMON |
| WINEVTLOG |
| ------------------- |

For example, running the below SQL, now including the last heartbeat time, shows interesting results:

SELECT 
namespace,
collector_id,
MAX(last_heartbeat_time) AS last_heartbeat_time
FROM `datalake.ingestion_metrics`
WHERE
NOT REGEXP_CONTAINS(collector_id, r'^[a-e]{8}')
GROUP BY 1,2

And the results. Note, there’s two collector_id values with the same GUID. Someone’s re-used a single Chronicle Forwarder ID configuration (credentials) for two different collectors!. While this is valid, and log ingestion will work, it does prevent a potential challenge for observability with telemetry as you’re in effect having two disparate collectors hidden behind one ID. Don’t re-use collector IDs.

|-----|------------|--------------------------------------|-------------------------|
| Row | namespace | collector_id | last_heartbeat_time |
|-----|------------|--------------------------------------|-------------------------|
| 1 | null | e069cbec-2ad1-49a3-b85f-b885bdef9310 | 2022-11-13 15:00:10 UTC |
| 2 | production | e069cbec-2ad1-49a3-b85f-b885bdef9310 | 2022-11-13 15:00:05 UTC |
| 3 | null | d20c6ec6-e915-4dac-8a92-a126c935f085 | 2022-11-13 14:45:52 UTC |
| 4 | null | 1154f4f2-699f-4b75-bd77-22b27c4d8a80 | 2022-11-13 14:07:10 UTC |

The other item of note, see the Namespace value. This is included in Ingestion Metrics (but not within Ingestion Stats), and is useful for ensuring consistency of Namespace application (if used) which is a dependency for accurate context enrichment (a post for another time).

Validation Log Ingestion, Parsing, and Error Counts

When starting to write a SQL, equivalent to the prior version of the Ingestion, Parsing and Error counts, you’ll encounter a couple of changes required:

  • there is no _PARTITIONDATE
    - Ingestion Metrics does not include a _PARTITION, so for restricting a SQL to specific time ranges we need use the timestamp fields.
  • multiple stages of the ingestion pipeline are logged
    - which needs be accounted for, or else you’ll end up double counting logging volumes
SELECT 
DISTINCT component
FROM
`datalake.ingestion_metrics`

And the results:

| Row | component             |
|-----|-----------------------|
| 1 | Out-of-Band Processor |
| 2 | Normalizer |
| 3 | Forwarder |
| 4 | Ingestion API |

At this point we can cross reference the Ingestion Metrics table to see which fields are used by which components (this really is easier to read on the official docs, go click the link and come back):

| *Fields*            | *Type*    | *Description*                                                                                                                                                                                                                                                                          |
|---------------------|-----------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| component | STRING | The service or pipeline to which the metric applies, which includes the Forwarder, Ingestion API, Out-of-Band Processor (Chronicle API feed), and Normalizer. |
| collector_id | STRING | The unique identifier of the collection mechanism. For push sources, the forwarder ID or generated ID is used. For Chronicle API or Chronicle API feed, the ID has the following format: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx. |
| feed_id | STRING | The ID for the specific feed which the log belongs to. |
| log_type | STRING | The source of the log which identifies the log entries in the batch. For example, WINDOWS_DNS. |
| start_time | TIMESTAMP | The start time associated with the metric in microseconds. |
| end_time | TIMESTAMP | The end time associated with the metric in microseconds. |
| input_type | STRING | This field is populated if the ingestion source is the Chronicle forwarder. Based on the data that the forwarder sends, this field contains pcap, syslog, or splunk. |
| namespace | STRING | Namespace that the log belongs to. |
| event_type | STRING | The event type determines which fields are included with the event. The event type includes values such as PROCESS_OPEN, FILE_CREATION, USER_CREATION, and NETWORK_DNS. |
| drop_reason_code | STRING | This field is populated if the ingestion source is the Chronicle forwarder. and indicates the reason why a log was dropped during normalization. |
| last_heartbeat_time | TIMESTAMP | This field is populated if the ingestion source is the Chronicle forwarder or Chronicle API feed. The last timestamp when the forwarder or API feed was active in microseconds. |
| log_volume | FLOAT64 | The volume of logs during the interval in bytes. |
| drop_count | FLOAT64 | This field is populated if the ingestion source is the Chronicle forwarder. and indicates the number of logs dropped during the interval. |
| log_count | FLOAT64 | The number of logs ingested during the interval. |
| event_count | FLOAT64 | The number of events generated during the interval. |
| state | STRING | The final status of the event or log. The status is one of the following: - parsed. The log is successfully parsed. - validated. The log is successfully validated. - failed_parsing. The log has parsing errors. - failed_validation. The log has validation errors. |

And breaking that down by component:

| Metrics                 | component             |
|-------------------------|-----------------------|
| Heartbeat | Forwarder |
| Log Bytes Count | Forwarder |
| Log Record Count | Forwarder |
| Drop Count | Forwarder |
| Log Record Count | Ingestion API |
| Log Bytes Count | Ingestion API |
| Log Record Count | Out-of-Band Processor |
| Log Bytes Count | Out-of-Band Processor |
| Last Ingested Timestamp | Out-of-Band Processor |
| Log Count | Normalizer |
| Log Size | Normalizer |
| Event Count | Normalizer |

As the focus of the original SQL was on Normalization, let’s adapt the SQL to use the Normalizer component fields:

SELECT
DATE_TRUNC(DATE(end_time), DAY) AS day,
log_type,
CAST(SUM(log_volume) AS INT64) AS log_volume,
CAST(SUM(log_count) AS INT64) AS log_count,
CAST(SUM(event_count) AS INT64) AS event_count
FROM
`datalake.ingestion_metrics`
WHERE
component = "Normalizer"
AND log_type IS NOT NULL
AND DATE(end_time) > DATE_SUB(CURRENT_DATE, INTERVAL 1 DAY)
GROUP BY
1,
2
ORDER BY
1,
2 DESC

And example results:

| Row | day        | log_type           | log_volume | log_count | event_count |
|-----|------------|--------------------|------------|-----------|-------------|
| 1 | 2022-11-13 | WORKSPACE_ACTIVITY | 19156367 | 22007 | 22027 |
| 2 | 2022-11-13 | WINEVTLOG | 269111885 | 143958 | 137659 |
| 3 | 2022-11-13 | WINDOWS_SYSMON | 54456472 | 29107 | 29075 |

Where log_volume is the size of the raw logs, log is the count of raw logs, and event count the number of raw logs successfully normalized into UDM.

Silent Log Sources

As briefly mentioned earlier, Ingestion Metrics does not write a 0 based entry is no log is observed, so detect a silent log source requires a slightly different approach, but with the added bonus that as Ingestion Metrics is near real-time, this provides a powerful mechanism for detecting silent or failing log sources quickly.

As before, the SQL will include Procedural language to declare variables for tuning the query, including an array of real-time log sources, the duration for a log source to be silent before returning a match, but with a variation from Ingestion Stats that a baseline is built over the last X days to compared against:

DECLARE __SILENT_LOG_SOURCE_INTERVAL__ INT64;
DECLARE __BASELINE_LOOKBACK__ INT64;
DECLARE __REALTIME_LOG_SOURCES__ ARRAY <STRING>;

SET __SILENT_LOG_SOURCE_INTERVAL__ = 1; --HOURS
SET __BASELINE_LOOKBACK__ = 7; --DAYS
SET __REALTIME_LOG_SOURCES__ = ['AUDITD',
'GCP_CLOUDAUDIT',
'GCP_CLOUD_NAT',
'GCP_IDS',
'WINDOWS_SYSMON',
'WINEVTLOG',
'BRO_JSON',
'GCP_DNS',
'GCP_FIREWALL',
'SQUID_WEBPROXY'];


---
WITH current_state AS (
SELECT
DISTINCT(log_type) AS log_type
FROM
`datalake.ingestion_metrics`
WHERE
DATETIME(start_time) BETWEEN DATETIME_SUB(CURRENT_DATETIME, INTERVAL __SILENT_LOG_SOURCE_INTERVAL__ HOUR) AND CURRENT_DATETIME
AND component IN ("Normalizer")
),
baseline AS (
SELECT
DISTINCT(log_type) AS log_type,
MAX(start_time) AS last_seen
FROM
`datalake.ingestion_metrics`
WHERE
DATETIME(start_time) BETWEEN DATETIME_SUB(CURRENT_DATETIME, INTERVAL __BASELINE_LOOKBACK__ DAY) AND CURRENT_DATETIME
AND log_volume IS NOT NULL
AND component IN ("Normalizer")
GROUP BY 1
)
SELECT
DISTINCT b.log_type,
b.last_seen,
TIMESTAMP_DIFF(b.last_seen,CURRENT_TIMESTAMP,hour) AS hours_silent
FROM baseline b
LEFT OUTER JOIN current_state c
ON b.log_type = c.log_type
WHERE c.log_type IS NULL
AND b.log_type IN UNNEST(__REALTIME_LOG_SOURCES__)

And example results:

"There is no data to display."

In this case, there are no silent log sources (go go great platform engineering team); however, if you want to test this then you can invert the WHERE cause logic to be:

NOT IN UNNEST(__REALTIME_LOG_SOURCES__)

Which shows the format of the SQL results:

|-----|------------------|--------------------------------|--------------|
| Row | log_type | last_seen | hours_silent |
|-----|------------------|--------------------------------|--------------|
| 1 | UDM | 2022-11-11 07:17:07.919602 UTC | -55 |
| 2 | WORKSPACE_USERS | 2022-11-12 16:37:07.919602 UTC | -22 |
| 4 | GCP_DLP_CONTEXT | 2022-11-13 10:12:07.919602 UTC | -4 |
| 5 | CATCH_ALL | 2022-11-11 08:12:07.919602 UTC | -54 |
| 6 | GCP_IAM_ANALYSIS | 2022-11-13 00:07:07.919602 UTC | -14 |
| 7 | OSINT_IOC | 2022-11-11 01:12:07.919602 UTC | -61 |

These are not real-time log sources and will run on an interval to bring in context, so all working as expected. You could however update and run a variation of the above SQL to detect daily log sources and alert if they don’t arrive as expected simple by changing the ‘__SILENT_LOG_SOURCE_INTERVAL__’ parameter.

Parser Drop Function Logging

Ingestion Metrics include Parser (CBN) level logging, specifically when the Drop function is used. The Drop function usage is not necessarily a bad thing, e.g., it’s used to filter out incorrect or bad data, but it can be useful to keep an eye to handle unexpected situations that may impact normalization. A quick SQL as below can show Drop function usage per CBN:

SELECT
log_type,
ARRAY_AGG(DISTINCT drop_reason_code) AS drop_reason_code
FROM
`datalake.ingestion_metrics`
WHERE
drop_reason_code IS NOT NULL
AND component = "Normalizer"
AND DATE(end_time) > DATE_SUB(CURRENT_DATE, INTERVAL 7 DAY)
GROUP BY
1

And example results below, which show that there could be malformed data being sent in from the Windows environment.

|-----|--------------------|-------------------------------------------------------|
| Row | log_type | drop_reason_code |
|-----|--------------------|-------------------------------------------------------|
| 1 | WINEVTLOG | LOG_PARSING_DROPPED_BY_FILTER: TAG_UNSUPPORTED |
| | | LOG_PARSING_DROPPED_BY_FILTER: TAG_MALFORMED_ENCODING |
|-----|--------------------|-------------------------------------------------------|
| 2 | AUDITD | LOG_PARSING_DROPPED_BY_FILTER |

Further SQL can identify the collector_id (Forwarder) this is occurring on, and then perform a Raw Log Search to identify further what the root cause could be.

Tip, apply the collector_id to as a metadata label to your Chronicle Forwarder and you can then perform UDM searches based specifically on an individual forwarder (not of help here, as the log will have been Dropped, i.e., its a raw log only).

Namespaces

Briefly mentioned earlier on, but Namespace tagging occurs on Ingestion Metrics, which is a useful way to validate that log source ingestion is using the correct Namespace tagging, be that via Feed Management, or a Chronicle Forwarder.

SELECT
namespace,
ARRAY_AGG(DISTINCT log_type) AS log_type
FROM
`datalake.ingestion_metrics`
WHERE
DATE(end_time) > DATE_SUB(CURRENT_DATE, INTERVAL 7 DAY)
AND log_type IS NOT NULL
GROUP BY 1

And example results:

|-----|------------|---------------------|
| Row | namespace | log_type |
|-----|------------|---------------------|
| 1 | dev | GCP_IAM_ANALYSIS |
| | | WORKSPACE_USERS |
| | | OSINT_IOC |
| | | GCP_IAM_CONTEXT |
| | | NIX_SYSTEM |
| | | POWERSHELL |
|-----|------------|---------------------|
| 2 | production | NIX_SYSTEM |
| | | WINEVTLOG |
| | | WINDOWS_DEFENDER_AV |
| | | WINDOWS_SYSMON |
|-----|------------|---------------------|

Finally, we’ve covered a lot of detail on Ingestion Metrics, but you do not need to rely on SQL for this level of Analysis. Chronicle’s embedded Looker dashboards, or Enterprise Looker via the Chronicle Marketplace can make use of Ingestion Metrics, as documented here.

Summary

Hopefully this post has helped to show the options available for ingestion telemetry metrics within Chronicle SIEM, and how you can use them as explained with SQL examples, and build upon, e.g., real world example include using similar approaches to the above via Enterprise Looker (the SQL Runner capability), using your SOARs scheduler to run queries and generate Cases, or utilizing Google Cloud services to scheduled BigQuery SQL on demand, or Cloud Scheduler with Cloud Functions.

🐉While all efforts possible to write accurate SQL have been made, these SQL are not intended as production SQL, could likely be improved, and rather are meant as informative and educational so as to understand the underlying Chronicle Datalake tables.

--

--