SDG AutoVault in Snowflake: Some Best practices for logging your DataObservability
When we talk about logging, we talk about to track what is happening in our systems in order to give insights to the different stakeholders to take decisions. Until now, most of people that usually use OLTP relational databases, they are also use to log that data about processes (or near real metrics) without to take into consideration that some of these actions need to be fast, very fast, or other ones have a high latency, but the computational resources are very high.
Using Snowflake, we are going to try to give a good sense how to implement efficiently the logging information. Trying to des-mitify the old bad practices, into new ones using the correct features that Snowflake provides. These practices will not allow to become successful in our path to get a reliable and good performance observability model in our data platform.
In that scenario in our SDG AutoVault (check on the video and transcript to english), we will want to improve some of the characteristics related about logging with a massive multi-parallel process. This framework allows us to build DataVault Data Models by Metadata Driven Engines in an easy manner from businness to business. This tool will be covered deeply in other article, but in this one, we will only view the improvement of the DataObservability features thanks to Event Tables and Dynamic Tables.
Here we are a simplified overview of the SDG AutoVault Framework.
Our Data Observability Roadmap emprovement
We will form two scenarios, initial one will be our current situation, we are making intensive logging into one only table. The other one will apply some of the best practices to the previous one, using some of the new capabilities for that case. Given the different frameworks we will see here, in the future articles we will dive deeply in the different parts, mentioning some of the best practices to take into consideration on each layer. We will focus only on the Example of Sales logging processes (DataObservability Metadata Model).
Current situation
We are going to execute our processes to load the SALES data (but can exist other ones as FINANCE, ie).
This is the DAG autogenerated by our SDG AutoVault that exists inside our Snowflake for Sales DataWorkload:
As we can see, we have more than 20 tasks in parallel (30 in some steps). This DAG will cause some transactions collisions in our table to try logging in most of cases at same time. And if you take into consideration, that it could exist other similar DAGs running all together, the chance for collision will be greater if each one coincide in the same period.
On each DataWorkload exists a lot of processess with different tasks, that each Task executes a procedure. This procedure executes some SQL statetments inside and for each action, will log every action in a Transactions table.
This is an example, what is the structure of the TASK, and each call of the DS_RUN_LOG procedure:
This is an example of the procedure, mentioning only the data will be logged:
CREATE OR REPLACE PROCEDURE DO_META_RDV.DS_RUN_LOG (P_RUN VARCHAR
, P_DES_LOG VARCHAR
, P_STATUS VARCHAR
, P_THRESHOLD VARCHAR
, P_ENVIRONMENT VARCHAR
, P_ACTION VARCHAR
, P_DATASET VARCHAR
, P_STEP VARCHAR
, P_DATASET_FIELD VARCHAR) RETURNS FLOAT AS ()
This will log for each step, an action on the same dataset loaded. This is simple call that the procedure will use to do logging:
CALL DO_META_RDV.DS_RUN_LOG('TEST_RUN_123456','MESSAGE TEST INFO333','START','INFO','ENV_PRO','ACTION1','DATASET1','1','DATASET_FIELD_1');
This CALL execution, will do the below INSERT into DO_META_RDV.LNK_DATASET_RUN_LOG table (simplified example):
INSERT INTO DO_META_RDV.LNK_DATASET_RUN_LOG
SELECT MD5('TEST_RUN_123456'||'START'||'INFO'||'ENV_PRO'||'ACTION1'||'DATASET1'||'1'||'DATASET_FIELD_1') LINK_ID_DS_RUN_LOG,
md5('TEST_RUN_123456') HUB_ID_RUN, md5('DATASET1') HUB_ID_DATASET, md5('ACTION1') HUB_ID_ACTION, md5('ENV_PRO') HUB_ID_ENVIRONMENT,
md5('START') HUB_ID_STATUS, md5('INFO') HUB_ID_THRESHOLD, md5('-9') HUB_ID_DATASET_FIELD,
'-9' SQ_STEP, 'TEST_RUN_123456' COD_RUN, 'DATASET1' COD_DATASET,'-9' COD_DATASET_FIELD, 'ACTION1' COD_ACTION,'ENV_PRO' COD_ENVIRONMENT, 'START' COD_STATUS, 'INFO' COD_THRESHOLD,
CURRENT_TIMESTAMP() DT_LOAD,'MESSAGE TEST INFO' DESC_LOG,'EDW' RECORD_SOURCE ;
In the DataVault approach, we are generating the different HUB_ID_xx likes MD5(COD_xx) for our Business Key, and the different codes (BK).
Remember that we will need to track in near realtime our processes in order to get insights in our dashboards and monitoring tools Snowsight / Streamlit through our DataObservability Metadata Model. The requeriments of our Enterprise Stakeholders really need to track what is happening in order to take the correct actions, to be sure to give the data refreshed on time to the management enterprise.
Let’s start to analyze the current problem!
We assume that we have deployed our dataplatform. So once we have done some executions (DAG Batches), we will see through our observability DML_AFFECTED_TABLES view that there are too much INSERTs doing the logging in our table.
Please refer to this other mine article, in order to get the details how is working this DML_AFFECTED_TABLES view. This view will show all the tables affected by DML transactions, showing the number of rows affected. This feature is not natively provided by Snowflake.
Our view is showing that has been done many logs. Now we will check that there are some blocking errors trying to log in our table, filtering by the type of locked table raised error, this can be done by checking on ACCOUNT_USAGE.QUERY_HISTORY view:
We can also see it, on the Snowsight Query history. So we will have to filter by Status = ‘Failed’, and the below specifications in the Filter options:
It once the previous filters have been applied query history panel, we will see the below:
In any case, we are capturing all that locks on our DataObservability Model, when we can see the different datasets / actions affected:
These errors are showed like below:
Statement ‘XXXXX–YYY–EEE-YUU–90123456’ has locked table ‘LNK_DATASET_RUN_LOG’ in transaction 1707317974634000000 and this lock has not yet been released.
Your statement ‘XXXXX–YYY–EEE-YUU–12345678’ was aborted because the number of waiters for this lock exceeds the 20 statements limit.
These ones are captured due to in the same process (procedure used to log information, we are handling errors as well), so it when tries to log and detects is blocked, fails and try to log the error raised. In that case, it will log the error of statement has failed. But it can happen that trying to log that error, this new log is blocked again!!
So imagine that on the same moment, in the data process flow that has many steps (logs), there is a moment we don’t know how the process should act to, due to it can fails in any step, and it can be unmanagable or getting confused. In that case, we will have to check what is the level of impact (we don’t have to allow these type of situations!)
Finally, we view what is the activity on the warehouse used in our Sales Data processes:
Please pay attention on blocked timing resources (blue ones), and the peaks of activity.
What are the consequences that these situations happen¿? We will appoint the misbehaviour on the below points:
- We will have to implement some mechanisms in our enginering processes to retry the insert (in case that fails). It will require more complexity, and more compute resources needed, reducing the time to finish.
- The data process can be affected if they can finish through all their steps. Although you implement to prior retry mechanism, if there are a lot of load in your system, it can fail anyway at the end.
- Management of our log table in case there are some incomplete logs (in case that the maximum number of retries have been exceed).
- Limitations to our data platform: Not able to provide an interface of near realt time capabilities.
- Extra overloading of our data object resources. The deadlocks over some object will affect directly blocks in our data processes.
New design — Go to the Future!!
In this scenrio, we will add some changes to our platform. Take into consideration that this type of design is focused on the use for tables that will store METADATA information. In that case, we will use the below objects having in mind some considerations:
- Snowflake have these tables with a full capabilities like clusterization, time travel, fail-safe,…
- We will use only for the cases we want to normalize and phisicallize information in batch mode. Imagine you want to extract some information from other systems, your third party etl tools, and load into your Snowflake Centralized Storage Metadata.
- That type of use is given when you extract from RAW and you want to store the information in historified way.
- In our case, this table will be a Non-Historized Link, in a Raw Data Vault (RDV) Metadata.
- Snowflake introduces that new type of table, that is really designed to logging information in a semistructure way. You can only configure one for your account.
- With this new feature will allow us to insert more than 20 information transactions rows at same time with no chance to timeouts!
- The event table is self-managed by Snowflake, you can only have one by ACCOUNT.
- You insert your customized metadata log in key-pair values, JSON format.
- You can only track events in FUNCTIONs and PROCEDUREs.
- These ones are also designed to store directly information from your real time devices.
- In our case, this table will be a RAW Table located in a Persisting Staging Area (PSA) Metadata.
- Snowflake introduces that new type of table, that is designed to ease us with dataworkloads for your business.
- That tables are refreshed by Snowflake using similar capabilities than tasks, but you don’t need to use streams in order to provide an incremental loads, providing near real time capabilities as you need.
- On one hand, I have considered that it will be a DT, due to it will be used very frequently (intensive) way with a high volume of data, so this approach it will be the best. On the other hand, if your case you don’t need intensive consume this data, or there are no so much data on this, you can try to convert it to a view, and scaling up your warehouse in order to get a good comfortability. In any case, you will have to make some tests, and make a plan, if this feasible for you.
- In our case, this table will be a Business Data Vault (BDV) Metadata Link component, that will will store all together information from the different mentioned DV Metadata Layers. This approach has been analyzed in order to implement a near real time capabilities, in order to improve the performance and centralize the different Observability information.
Summarizing on objective to use these three tables:
- Batch Event table, we will load all the logs that will export from the third party tools outside Snowflake.
- Event Table Transaction, we will make an intensive load of all our logging information, so it will be refreshed in short period of time in minutes in order to retrieve this information.
- Dynamic table will keep all together the information the low latency information and high latency information in a only one table.
It is important to have all this information phisicallyzed in order to provide a better performance when we want to monitoring all our information, as we mentioned previously.
LET’S GOING TO MAKE SOME CHANGES!!
Setup of Event Transactions table — EDW.DO_META_PSA.LOG_EVENTS:
-- We will store it in our PSA metadata schema
CREATE EVENT TABLE EDW.DO_META_PSA.LOG_EVENTS;
--Assign the LOG table to our Account (we can only have one by account)
ALTER ACCOUNT SET EVENT_TABLE = EDW.DO_META_PSA.LOG_EVENTS;
--We will have to assign a LOG_LEVEL (by default is OFF)
--It will be for our account one of the most restrictive
alter ACCOUNT set LOG_LEVEL = ERROR;
--We will have to assign the level of TRACE
--we will assign on EVENT (only want that trace the EVENT we log specifically)
alter ACCOUNT set trace_level = ON_EVENT;
Setup of Event Procedure LOG — new DO_META_RDV.DS_RUN_LOG:
An example how to log on the same way, in our EVENT table in PSA:
CREATE OR REPLACE PROCEDURE DO_META_RDV.DS_RUN_LOG (P_RUN VARCHAR
, P_DES_LOG VARCHAR
, P_STATUS VARCHAR
, P_THRESHOLD VARCHAR
, P_ENVIRONMENT VARCHAR
, P_ACTION VARCHAR
, P_DATASET VARCHAR
, P_STEP VARCHAR
, P_DATASET_FIELD VARCHAR) RETURNS FLOAT LANGUAGE JAVASCRIPT EXECUTE AS OWNER AS '
try {
var cod_run_var=P_RUN;
var cod_status_var=P_STATUS;
var cod_action_var=P_ACTION;
var des_log_var=P_DES_LOG;
var cod_threshold_var=P_THRESHOLD;
var cod_environment_var=P_ENVIRONMENT;
var cod_dataset_var=P_DATASET;
var sq_step_var=P_STEP;
var cod_dataset_field_var=P_DATASET_FIELD;
snowflake.addEvent("FW_DV_LOG", {"COD_RUN": cod_run_var,"COD_ACTION": cod_action_var,"COD_STATUS": cod_status_var,"COD_THRESHOLD": cod_threshold_var,"COD_ENVIRONMENT": cod_environment_var,"DES_LOG": des_log_var,"COD_DATASET": cod_dataset_var,"SQ_STEP": sq_step_var,"COD_DATASET_FIELD": cod_dataset_field_var});
return 1
} catch (err) {
var cod_status_error=''SYSTEM ERROR'';
var desc_error_message= err.message;
snowflake.addEvent("FW_DV_LOG", {"COD_RUN": cod_run_var,"COD_ACTION": cod_action_var,"COD_STATUS": cod_status_error,"COD_THRESHOLD": cod_threshold_var,"COD_ENVIRONMENT": cod_environment_var,"DES_LOG": des_log_var,"COD_DATASET": cod_dataset_var,"SQ_STEP": sq_step_var,"COD_DATASET_FIELD": desc_error_message});
return 0
}
';
We will use the addEvent function in order to store all our information.
- First information: -> “FW_DV_LOG” : It will be our RECORD identifier. This information will be used to identify our Observability information later, from the different type of information you can store in addition to this.
- Second information: -> {JSON key-par value info}: It will be used to store all the detail information we want to store.
This information, it will be logged on our Event Table, and finally we will be able to query it like this:
SELECT *
FROM EDW.DO_META_PSA.LOG_EVENTS
WHERE RECORD_TYPE='SPAN_EVENT'
AND RECORD:name ='FW_DV_LOG';
RECORD_TYPE field is defined by our customized EVENT logged information in our Stored procedures. For more information about it, please check documentation here.
So, in order to query in a structured way, this will be our query:
SELECT
COALESCE(RECORD_ATTRIBUTES:COD_RUN,'-9') COD_RUN,
COALESCE(RECORD_ATTRIBUTES:COD_ENVIRONMENT,'-9') COD_ENVIRONMENT,
COALESCE(RECORD_ATTRIBUTES:COD_ACTION,'-9') COD_ACTION,
COALESCE(RECORD_ATTRIBUTES:COD_DATASET,'-9') COD_DATASET,
COALESCE(RECORD_ATTRIBUTES:COD_DATASET_FIELD,'-9') COD_DATASET_FIELD,
COALESCE(RECORD_ATTRIBUTES:COD_STATUS,'-9') COD_STATUS,
COALESCE(RECORD_ATTRIBUTES:COD_THRESHOLD,'-9') COD_THRESHOLD,
COALESCE(RECORD_ATTRIBUTES:SQ_STEP::INT,-9) SQ_STEP,
COALESCE(RECORD_ATTRIBUTES:DES_LOG,'-9') DES_PROCESS_LOAD,
CONVERT_TIMEZONE('UTC','Europe/Madrid',TIMESTAMP) DT_LOAD,
COALESCE(RESOURCE_ATTRIBUTES:"snow.database.name",'-9') RECORD_SOURCE,
MD5(COD_RUN) HUB_ID_RUN,
MD5(COD_ACTION) HUB_ID_ACTION,
MD5(COD_DATASET) HUB_ID_DATASET,
MD5(COD_DATASET_FIELD) HUB_ID_DATASET_FIELD,
MD5(COD_STATUS) HUB_ID_STATUS,
MD5(COD_THRESHOLD) HUB_ID_THRESHOLD,
MD5(COD_ENVIRONMENT) HUB_ID_ENVIRONMENT,
MD5(HUB_ID_RUN||'-'||HUB_ID_STATUS||'-'||HUB_ID_THRESHOLD||'-'
||HUB_ID_ENVIRONMENT||'-'||HUB_ID_ACTION||'-'
||HUB_ID_DATASET||'-'||SQ_STEP::VARCHAR||'-'
||HUB_ID_DATASET_FIELD) LINK_ID_DS_RUN_LOG
FROM DO_META_PSA.LOG_EVENTS
WHERE RECORD_TYPE='SPAN_EVENT'
AND RECORD:name ='FW_DV_LOG';
Setup of Final logging Dynamic Transactions table — DO_META_RDV.LNK_DATASET_RUN_LOG_DIN:
We will have to UNION ALL (both) tables:
CREATE OR REPLACE DYNAMIC TABLE DO_META_BDV.LNK_DATASET_RUN_LOG_DIN
TARGET_LAG = '1 minute'
WAREHOUSE = EDW_WH
AS
SELECT
LINK_ID_DS_RUN_LOG, HUB_ID_RUN, HUB_ID_DATASET, HUB_ID_DATASET_FIELD,
HUB_ID_ACTION, HUB_ID_ENVIRONMENT, HUB_ID_STATUS, HUB_ID_THRESHOLD,
SQ_STEP, COD_RUN, COD_DATASET,COD_DATASET_FIELD, COD_ACTION,
COD_ENVIRONMENT, COD_STATUS, COD_THRESHOLD, DT_LOAD, DES_PROCESS_LOAD,
RECORD_SOURCE
FROM (
SELECT
COALESCE(RECORD_ATTRIBUTES:COD_RUN,'-9') COD_RUN,
COALESCE(RECORD_ATTRIBUTES:COD_ENVIRONMENT,'-9') COD_ENVIRONMENT,
COALESCE(RECORD_ATTRIBUTES:COD_ACTION,'-9') COD_ACTION,
COALESCE(RECORD_ATTRIBUTES:COD_DATASET,'-9') COD_DATASET,
COALESCE(RECORD_ATTRIBUTES:COD_DATASET_FIELD,'-9') COD_DATASET_FIELD,
COALESCE(RECORD_ATTRIBUTES:COD_STATUS,'-9') COD_STATUS,
COALESCE(RECORD_ATTRIBUTES:COD_THRESHOLD,'-9') COD_THRESHOLD,
COALESCE(RECORD_ATTRIBUTES:SQ_STEP::INT,-9) SQ_STEP,
COALESCE(RECORD_ATTRIBUTES:DES_LOG,'-9') DES_PROCESS_LOAD,
CONVERT_TIMEZONE('UTC','Europe/Madrid',TIMESTAMP) DT_LOAD,
COALESCE(RESOURCE_ATTRIBUTES:"snow.database.name",'-9') RECORD_SOURCE,
MD5(COD_RUN) HUB_ID_RUN,
MD5(COD_ACTION) HUB_ID_ACTION,
MD5(COD_DATASET) HUB_ID_DATASET,
MD5(COD_DATASET_FIELD) HUB_ID_DATASET_FIELD,
MD5(COD_STATUS) HUB_ID_STATUS,
MD5(COD_THRESHOLD) HUB_ID_THRESHOLD,
MD5(COD_ENVIRONMENT) HUB_ID_ENVIRONMENT,
MD5(HUB_ID_RUN||'-'||HUB_ID_STATUS||'-'||HUB_ID_THRESHOLD||'-'||HUB_ID_ENVIRONMENT||'-'||HUB_ID_ACTION||'-'||HUB_ID_DATASET||'-'||SQ_STEP::VARCHAR||'-'||HUB_ID_DATASET_FIELD) LINK_ID_DS_RUN_LOG
FROM DO_META_PSA.LOG_EVENTS
WHERE RECORD_TYPE='SPAN_EVENT'
AND RECORD:name ='FW_DV_LOG'
)
UNION ALL
SELECT
LG.LINK_ID_DS_RUN_LOG, LG.HUB_ID_RUN, LG.HUB_ID_DATASET,
LG.HUB_ID_DATASET_FIELD,
LG.HUB_ID_ACTION, LG.HUB_ID_ENVIRONMENT, LG.HUB_ID_STATUS,
LG.HUB_ID_THRESHOLD,
LG.SQ_STEP, LG.COD_RUN, LG.COD_DATASET,COD_DATASET_FIELD,
LG.COD_ACTION, LG.COD_ENVIRONMENT,
LG.COD_STATUS, LG.COD_THRESHOLD, LG.DT_LOAD,LG.DES_PROCESS_LOAD,
LG.RECORD_SOURCE
FROM DO_META_RDV.LNK_DATASET_RUN_LOG LG;
- We have considered for our use case, a TARGET_LAG of 1 minute, we need a near-real time Observability feature. You have have to considerate that this time is not the schedule you want, it would be the desired lag time you want between refresh your data. For a more specific schedule, you should check other way to refresh your data, like TASK+STREAM+STATEMENT.
- We configure another WH, than existing one for our Data Sales Workload process DAG. For the specified LAG time, we know that it will always be up.
- We check, that our Dynamic tables has been succesfully generated in a INCREMENTAL MODE (not FULL one, that it will require more compute resources with a good efficiency). You can do it by SQL command, or by Snowsight.
SHOW DYNAMIC TABLES IN SCHEMA DO_META_BDV;
- You can also see the latency of the Current lag and the Target Lag. Current Lag it will be that is 20 seconds behing the desired 1 minut Target lag desired.
- You will have to apply the SELECT grant to the different ROLEs you want to use this Dyn Table.
LET’S GOING TO REVIEW THE RESULTS OF OUR CHANGES
Once we have done, we redeploy and run again our SALES DAG. Our objective will be to check the below:
- Reliability Logging processes: There will not be any errors, so it will not be retries, no delays, …
- Performance load: We will have take into consideration that log table is designed for logging rather normal tables. This affect on the response time in our logging processes.
No errors!!
We see that in our Observability view for our table there are no errors. We don’t log any transaction on that table:
Checking on the log will be the same (is the source FACT METRICS table):
We will check that there are no errors in any queries in our Sales Data Workload:
SELECT MAX(START_TIME) LAST_QUERY_TIME, TO_DATE(START_TIME) START_TIME,
coalesce(SUM(CASE WHEN ERROR_MESSAGE LIKE '%has locked table%' THEN 1 ELSE 0 END),0) nbr_errors,
SUM(1) nbr_queries,current_timestamp() DT_CURRENT
FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY
GROUP BY ALL
ORDER BY 1;
Checking the refresh of our LNK_DATASET_RUN_LOG_DIN Dynamic table
We can check an overview of our LNK_DATASET_RUN_LOG_DIN (Transaction table), on the same structure than the previous one, but it is containing all the information.
So we can see that is updated frequently, such we have specified to our setup:
The important metrics to check here will be the below ones:
- STATUS: This will communicate that the data has been successfully refreshed.
- REFRESH DURATION: This will tell us what time the dynamic table need to refresh. If this is more than 1 minute, our lag will be aggregrated on the time. In that case, we will need optimize our query or scale up our warehouse size.
- ROWS CHANGED: It mentions how many rows are changed or deleted. If you observe that there is no rows changed along the time, or frequently for your target lag specified, you should check to re-adapt your target lag time to the periods you observe significant changes.
Checking: What is the WareHouse Activity on our Sales Data processes:
Now we can see the below:
- There are no peaks of 70 queries running aggregated, the level of the charge is more balanced!
- There are no blocked queries running!! That is very good :)
Conclusions
With this new approach, we will achieve to improve greatly the different features that are needed for our Data Observability capabilities.
- Not extra overhead our data load processes, not mixing logging with data load resources.
- Reliability in the process logging providing all the detail in a efficient way. Not need to implement retries systems, so it will not raise errors for logging!
- Provisioning the near real time refresh updating information, with no restrictions on parallelism.
So you have seen that the changes we have made barely have had an impact to our current system. We have implemente these features in order to provide with high benefits our Observability Snowflake data platform.
In our side, we provide with this and other Snowflake features our SDG AutoVault.
If you want to know more in detail about more aspects checked here, you can follow me on medium || Linked-in here.
I hope this can help you, Now it’s your turn, You can do the same, surfing into Snowflake World and Enjoy!