如何使用 Grafana Loki 分析 SQL Server Audit 日誌

Ivan Cheng
23 min readFeb 16, 2023

--

上一篇文章已經教大家如何針對特定事件分析 Windows 登入日誌,該技巧幾乎可以運用在任何類型的日誌。今天就是要來教大家高階一點的應用,如何針對 SQL Server Audit 日誌進行分析與儀表板的製作。

首先我們要使用 SQL Server Audit 功能,沒用過的朋友請參考此篇文章。

將使用資料庫或者資料表執行 CREATE、ALTER 或 DROP 作為演示範例

建立稽核

從物件總管中點選執行個體 > 安全性 > 稽核,新增稽核。

這個稽核是用於定義蒐集到的稽核紀錄如何儲存,選擇 Application Log。

剛被建立的稽核會處於停用狀態,必須要啟用紀錄才能夠被保存。

新增伺服器稽核規格

從物件總管中點選執行個體 > 安全性 > 伺服器稽核規格,新增伺服器稽核規格。

輸入伺服器稽核規格的名稱並選擇上面所建立的稽核

我們想要稽核資料庫或者資料表執行 CREATE、ALTER 或 DROP 陳述式時,選擇以下稽核動作群組名稱。

  • DATABASE_OBJECT_CHANGE_GROUP
  • SCHEMA_OBJECT_CHANGE_GROUP

關於稽核動作群組名稱的詳細描述,請參考此篇文章。

完成伺服器稽核規格設定後,一樣需要啟用才會開始進行稽核。
需要注意的是伺服器稽核規格與稽核都必須要同時啟動,才能夠正確紀錄。

檢視稽核紀錄

確認建立資料庫與資料表的動作是否會被稽核記錄

利用下列的 T-SQL 指令碼建立資料庫

USE [master]
GO

CREATE DATABASE [Database_1]
CONTAINMENT = NONE
ON PRIMARY
( NAME = N'Database_1', FILENAME = N'C:\Program Files\Microsoft SQL Server\MSSQL13.MSSQLSERVER\MSSQL\DATA\Database_1.mdf' , SIZE = 8192KB , MAXSIZE = UNLIMITED, FILEGROWTH = 65536KB )
LOG ON
( NAME = N'Database_1_log', FILENAME = N'C:\Program Files\Microsoft SQL Server\MSSQL13.MSSQLSERVER\MSSQL\DATA\Database_1_log.ldf' , SIZE = 8192KB , MAXSIZE = 2048GB , FILEGROWTH = 65536KB )
GO

利用下列的 T-SQL 指令碼建立資料表

USE [Database_1]
GO

CREATE TABLE [dbo].[Table_1](
[Column_1] [nchar](10) NULL
) ON [PRIMARY]
GO

確認建立資料庫與資料表的動作都被稽核記錄下來了

資料庫的稽核紀錄
資料表的稽核紀錄

利用下列的 T-SQL 指令碼新增一些測試資料,等一下建立儀表板會使用到。

USE [Database_1]
GO

CREATE TABLE [dbo].[Table_2](
[Column_1] [nchar](10) NULL
) ON [PRIMARY]
GO

CREATE VIEW [dbo].[View_1]
AS
SELECT *
FROM [dbo].[Table_1]
GO

CREATE VIEW [dbo].[View_2]
AS
SELECT *
FROM [dbo].[Table_2]
GO

ALTER VIEW [dbo].[View_2]
AS
SELECT *
FROM [dbo].[Table_1]
UNION ALL
SELECT *
FROM [dbo].[Table_2]
GO

DROP VIEW [dbo].[View_2]
GO

Promtail

使用到的 promtail-local-config.yaml 範例如下,主要是將 Application 日誌推送到 Loki 並將 source、event_id 與 leveltext 欄位貼上標籤。

server:
http_listen_port: 9080
grpc_listen_port: 0

positions:
filename: "./positions.yaml"
clients:
- url: http://your_loki_ip:3100/loki/api/v1/push

scrape_configs:
- job_name: windows
windows_events:
eventlog_name: "Application"
use_incoming_timestamp: true
xpath_query: '*'
bookmark_path: "./bookmark-application.xml"
exclude_event_data: true
exclude_user_data: true
labels:
logsource: windows-eventlog
pipeline_stages:
- json:
expressions:
source: source
eventID: event_id
level: levelText
- labels:
source:
eventID:
level:

一定有朋友好奇,那我能不能把稽核紀錄如何儲存到 File。

再透過 static_configs 讀取 AuditLog 底下的日誌呢?

server:
http_listen_port: 9080
grpc_listen_port: 0

positions:
filename: "./positions.yaml"
clients:
- url: http://your_loki_ip:3100/loki/api/v1/push

scrape_configs:
- job_name: mssql
static_configs:
- targets:
- localhost
labels:
host: your_hostname
job: auditlog
__path__: D:\AuditLog\*.sqlaudit

答案是不行的,該日誌檔已經被 SQL Server 加密過了。

建立一個 SQL Server Audit 事件日誌儀表板,點選 Setting 中的 Variables。

我們先建立兩個變數 computer 與 search 之後會過濾會使用到。

變數 computer

General

  • Name 輸入 computer 當作變數名稱使用
  • Label 輸入電腦做為顯示名稱

Query options

  • Data Source 選擇 Loki
  • Query Type 選擇 Label vaules,Label 選擇 computer

Regex

  • 就依自己的命名規則,看如何篩選出 SQL Server 囉

Selection options

  • 勾選 Multi-value
  • 勾選 Include All option

若有配置正確,Preview of vaules 會出現您想要的變數數值,按下 Apply。

變數 search

回到 Setting 中的 Variables,這次我們建立一個 Text box 類型的變數。

General

  • Name 輸入 search 當作變數名稱使用
  • Label 輸入搜尋做為顯示名稱

事件紀錄 Panel

新增一個 Panel,選擇 Table 可視化。資料來源選擇 Loki,順便填入 Title 為事件紀錄。

Label browser 輸入 LogQL 語法如下

  • 日誌流選擇器帶入變數 computer 過濾日誌源
  • 日誌管道帶入變數 search 過濾日誌內容
  • 透過 pattern 將解析到的欄位存成標籤
  • 透過 label_format 將 action_id 與 class_type 轉成名稱
  • 過濾 action_id 為 CREATE、ALTER、DROP

可以用 (?i) 做為正規表達式的前缀,切換為不區分大小寫。

{computer=~"$computer", source="MSSQLSERVER", eventID="33205"}
|~ "(?i)$search"
| pattern `<_>action_id:<action_id>\n<_>`
| label_format action_id=`{{.action_id | trim | replace "CR" "CREATE" | replace "AL" "ALTER" | replace "DR" "DROP"}}`
| action_id =~ "(CREATE|ALTER|DROP)"
| pattern `<_>class_type:<class_type>\n<_>`
| pattern `<_>database_name:<database_name>\n<_>`
| pattern `<_>object_name:<object_name>\n<_>`
| pattern `<_>schema_name:<schema_name>\n<_>`
| pattern `<_>server_instance_name:<server_instance_name>\n<_>`
| pattern `<_>server_principal_name:<server_principal_name>\n<_>`
| pattern `<_>object_name:<object_name>\n<_>`
| pattern `<_>statement:<statement>\nadditional_information<_>`
| pattern `<_>succeeded:<succeeded>\n<_>`
| label_format class_type=`{{.class_type | trim | replace "DB" "DATABASE" | replace "U" "TABLE" | replace "V" "VIEW" | replace "P" "STORED PROCEDURE"}}`
| label_format statement=`{{.statement | replace "\\r\\n" " " | replace "\\r" " " | replace "\\n" " " | replace "\u005c\u005c" "\u005c"}}`

可以透過下列的 T-SQL 指令來查找 action_id 與 class_type 對應的名稱

select * from sys.dm_audit_actions
select * from sys.dm_audit_class_type_map

Table 可視化允許 Pagination 與 Column filter

先點選 labels 欄位看一下,確認想要分析的欄位都有正確解析到。

編輯 Panel 切到 Transform,加入 Extract fields,Source 選擇 labels。

如此一來,就可以把解析到的標籤們轉換成多個欄位進行使用。

但是這樣欄位太多了,可以透過 Transform 加入 Organize fields 隱藏或排序欄位,顯示有意義的欄位即可。

畫面就乾淨許多了,也可以針對欄位進行過濾。

事件紀錄 Panel 基本上就搞定了

時間軸 Panel

新增一個 Panel,選擇 Time Series 可視化。資料來源選擇 Loki,順便填入 Title 為時間軸。

  • Graph Style 選擇 Bar。
  • Query Legend 填入 {{action_id}}

Label browser 輸入 LogQL 語法如下,日誌流選擇器帶入我們設定的變數。可根據下拉選項的過濾,依照 action_id 的數量進行統計。

sum by (action_id) (
count_over_time({computer=~"$computer", source="MSSQLSERVER", eventID="33205"}
|~ "(?i)$search"
| pattern `<_>action_id:<action_id>\n<_>`
| label_format action_id=`{{.action_id | trim | replace "CR" "CREATE" | replace "AL" "ALTER" | replace "DR" "DROP"}}`
| action_id =~ "(CREATE|ALTER|DROP)"
[$__interval])
)

動作統計 Panel

新增一個 Panel,選擇 Pie Chart 可視化。資料來源選擇 Loki,順便填入 Title 為動作統計。

  • Query Legend 填入 {{action_id}}
  • Value Options Calculation 選擇 Total
  • Pie Chart Type 選擇 Donut
  • Pie Chart Labels 選擇 Percent
  • Legend Placement 選擇 Right

Label browser 輸入 LogQL 語法如下,日誌流選擇器帶入我們設定的變數。可根據下拉選項的過濾,依照 action_id 的數量進行統計。

sum by (action_id) (
count_over_time({computer=~"$computer", source="MSSQLSERVER", eventID="33205"}
|~ "(?i)$search"
| pattern `<_>action_id:<action_id>\n<_>`
| label_format action_id=`{{.action_id | trim | replace "CR" "CREATE" | replace "AL" "ALTER" | replace "DR" "DROP"}}`
| action_id =~ "(CREATE|ALTER|DROP)"
[$__interval])
)

類別類型統計 Panel

新增一個 Panel,選擇 Pie Chart 可視化。資料來源選擇 Loki,順便填入 Title 為類別類型統計。

  • Query Legend 填入 {{class_type}}
  • 其他選項同上

Label browser 輸入 LogQL 語法如下,日誌流選擇器帶入我們設定的變數。可根據下拉選項的過濾,依照 class_type 的數量進行統計。

sum by (class_type) (
count_over_time({computer=~"$computer", source="MSSQLSERVER", eventID="33205"}
|~ "(?i)$search"
| pattern `<_>action_id:<action_id>\n<_>`
| pattern `<_>class_type:<class_type>\n<_>`
| label_format action_id=`{{.action_id | trim | replace "CR" "CREATE" | replace "AL" "ALTER" | replace "DR" "DROP"}}`
| label_format class_type=`{{ .class_type | trim | replace "DB" "DATABASE" | replace "U" "TABLE" | replace "V" "VIEW" | replace "P" "STORED PROCEDURE"}}`
| action_id =~ "(CREATE|ALTER|DROP)"
[$__interval])
)

資料庫名稱統計 Panel

新增一個 Panel,選擇 Pie Chart 可視化。資料來源選擇 Loki,順便填入 Title 為資料庫名稱統計。

  • Query Legend 填入 {{database_name}}
  • 其他選項同上

Label browser 輸入 LogQL 語法如下,日誌流選擇器帶入我們設定的變數。可根據下拉選項的過濾,依照 database_name 的數量進行統計。

sum by (database_name) (
count_over_time({computer=~"$computer", source="MSSQLSERVER", eventID="33205"}
|~ "(?i)$search"
| pattern `<_>action_id:<action_id>\n<_>`
| pattern `<_>database_name:<database_name>\n<_>`
| label_format action_id=`{{.action_id | trim | replace "CR" "CREATE" | replace "AL" "ALTER" | replace "DR" "DROP"}}`
| action_id =~ "(CREATE|ALTER|DROP)"
[$__interval])
)

物件名稱統計 Panel

新增一個 Panel,選擇 Pie Chart 可視化。資料來源選擇 Loki,順便填入 Title 為物件名稱統計。

  • Query Legend 填入 {{object_name}}
  • 其他選項同上

Label browser 輸入 LogQL 語法如下,日誌流選擇器帶入我們設定的變數。可根據下拉選項的過濾,依照 object_name 的數量進行統計。

sum by (object_name) (
count_over_time({computer=~"$computer", source="MSSQLSERVER", eventID="33205"}
|~ "(?i)$search"
| pattern `<_>action_id:<action_id>\n<_>`
| pattern `<_>object_name:<object_name>\n<_>`
| label_format action_id=`{{.action_id | trim | replace "CR" "CREATE" | replace "AL" "ALTER" | replace "DR" "DROP"}}`
| action_id =~ "(CREATE|ALTER|DROP)"
[$__interval])
)

最後完成的儀表板如下,我們也可以使用關鍵字來進行過濾。

若您是想要分析資料操作語言(DML),例如 SELECT、INSERT、UPDAT 與 DELETE,只要仿照上面的步驟執行應該不難實作出來的。

例如我們想要稽核資料表執行 SELECT、INSERT、UPDATE 或 DELETE 陳述式時,選擇以下稽核動作群組名稱。

  • SCHEMA_OBJECT_ACCESS_GROUP

提供的參考的 LogQL 語法如下

{computer=~"$computer", source="MSSQLSERVER", eventID="33205"}
|~ "(?i)$search"
| pattern `<_>action_id:<action_id>\n<_>`
| label_format action_id=`{{.action_id | trim | replace "SL" "SELECT" | replace "IN" "INSERT" | replace "UP" "UPDATE" | replace "DL" "DELETE"}}`
| action_id =~ "(SELECT|INSERT|UPDATE|DELETE)"
| pattern `<_>class_type:<class_type>\n<_>`
| pattern `<_>database_name:<database_name>\n<_>`
| pattern `<_>object_name:<object_name>\n<_>`
| pattern `<_>schema_name:<schema_name>\n<_>`
| pattern `<_>server_instance_name:<server_instance_name>\n<_>`
| pattern `<_>server_principal_name:<server_principal_name>\n<_>`
| pattern `<_>object_name:<object_name>\n<_>`
| pattern `<_>statement:<statement>\nadditional_information<_>`
| pattern `<_>succeeded:<succeeded>\n<_>`
| label_format class_type=`{{.class_type | trim | replace "DB" "DATABASE" | replace "U" "TABLE" | replace "V" "VIEW" | replace "P" "STORED PROCEDURE"}}`
| label_format statement=`{{.statement | replace "\\r\\n" " " | replace "\\r" " " | replace "\\n" " "}}`

成功登入與失敗的稽核紀錄當然也沒有問題

例如我們想要稽核已成功登入 SQL Server 或者失敗的主體,選擇以下稽核動作群組名稱。

  • SUCCESSFUL_LOGIN_GROUP
  • FAILED_LOGIN_GROUP

成功登入的 LogQL 語法如下

{computer=~"$computer", source="MSSQLSERVER", eventID="33205"}
|~ "(?i)$search"
| pattern `<_>action_id:<action_id>\n<_>`
| label_format action_id=`{{.action_id | trim | replace "LGIS" "LOGIN SUCCEEDED"}}`
| action_id ="LOGIN SUCCEEDED"
| pattern `<_>class_type:<class_type>\n<_>`
| label_format class_type=`{{.class_type | trim | replace "LX" "LOGIN"}}`
| pattern `<_>server_instance_name:<server_instance_name>\n<_>`
| pattern `<_>server_principal_name:<server_principal_name>\n<_>`
| pattern `<_>network protocol:<network_protocol>\r\n<_>`
| label_format network_protocol=`{{.network_protocol | trim}}`
| label_format statement =""
| pattern `<_>\u003caddress\u003e<ip_address>\u003c/address\u003e<_>`
| server_principal_name !=""
| network_protocol =~"(TCP/IP|LPC)"
| ip_address !="local machine"

登入失敗的 LogQL 語法如下

{computer=~"$computer", source="MSSQLSERVER", eventID="33205"}
|~ "(?i)$search"
| pattern `<_>action_id:<action_id>\n<_>`
| label_format action_id=`{{.action_id | trim | replace "LGIF" "LOGIN FAILED"}}`
| action_id ="LOGIN FAILED"
| pattern `<_>class_type:<class_type>\n<_>`
| label_format class_type =`{{.class_type | trim | replace "LX" "LOGIN"}}`
| pattern `<_>\nserver_principal_name:<server_principal_name>\n<_>`
| pattern `<_>statement:<statement>\n<_>`
| pattern `<_>\u003caddress\u003e<ip_address>\u003c/address\u003e<_>`

再透過 Transform 加入 Merge,將這兩段的結果整併在同個 Panel 即可。

今天的分享就到這邊,希望有幫助到大家。

參考文件

--

--

Ivan Cheng

動若不止,止水皆化波濤;靜而不擾,波濤悉為止水。水相如此,心境亦然。