Pull out ADF pipeline Metadata from Azure Log Analytics to Snowflake Table using Function App

Raghul Ravikumar
BI3 Technologies
Published in
5 min readAug 3, 2022
DATA FLOW

This blog demonstrates how to automate ADF pipeline Metadata move to snowflake table using a function app.

Follow the steps below:

· Azure pipeline data move to Azure log analytics

· Connect to log analytics.

· Extract data from log analytics.

· Connect and insert data into a snowflake.

Step 1: Azure ADF pipeline data move to Azure log analytics

Step 1.1: Create Log Analytic workspace in Azure

Step 1.2: Login Azure → Log Analytic workspace →Create

Using the Monitor service, data from ADF needs to be moved to the Log Analytics workspace.

Then, Go Monitor in Azure to add diagnostic

Step 1.3: Azure →Monitor →Diagnostic setting →Add diagnostic setting

Step 1.4: Choose which data in the diagnostic setting should be transferred to Log Analytics.

The Metadata for Activity Runs, Pipeline Runs, and Tigger Runs will be transferred to log analytics.

Step 1.5: Check the Log Analytics for the transferred Metadata for Activity Runs, Pipeline Runs, and Tigger Runs by executing the query in the worksheet.

Now Activity Runs, Pipeline Runs, and Tigger Runs data are in the Log Analytics workspace then use C# code to pull out data from log analytics and insert it into a snowflake.

Using C# code log analytics data insert into a snowflake table

The Code that has been developed is from visual studio code using C#. which includes an interactive debugger, so the user can step through the source code, inspect variables, view call stacks, and execute commands in the console.

Install the packages with the versions.

· Azure.Monitor.Query(version 1.1.0)

· Microsoft.Azure.OperationalInsights(version 1.1.0)

· Microsoft.Azure.Webjobs.Extenstions.DurableTask(version 2.5.1)

· Microsoft.Exchange.WebServices(version 2.2.0)

· Microsoft.NET.Sdk.Functions (version 4.0.1)

· Microsoft.Rest.ClientRuntime.Azure.Authentication (version 2.4.1)

· Snowflake.Data (version 2.0.3)

Step 2: · Connect to log analytics.

Step 2.1: First, connect Log Analytics credentials with C# using client id, client secret, and Log Analytics Workspace id.

Get Workspace id in Log Analytics

Step 2.2: AzureLog Analytics Workspace →overview →Workspace id

Application registration in Azure Active Directory is required.

Step 2.3: Refer to the link to create App Registration: https://docs.microsoft.com/en-us/azure/digital-twins/how-to-create-app-registration?tabs=portal

After app registration, get the client id and client secret, and then utilize the credentials in the C# code.

Then, in log analytics, add the app register that was created in Azure Active Directory. Give the reader role to app register in log analytics.

Check to see if the app register has been added.

Step 2.4: Then connection needs a domain, authEndpoint, tokenAudience to log in to the Log Analytics

Code:

var workspaceId = "************************************";var clientId = "************************************";var clientSecret = "************************************";var domain = "***************************";var authEndpoint ="***************************";var tokenAudience = "***************************";var adSettings = new ActiveDirectoryServiceSettings{AuthenticationEndpoint = new Uri(authEndpoint),TokenAudience = new Uri(tokenAudience),ValidateAuthority = true};var creds = ApplicationTokenProvider.LoginSilentAsync(domain, clientId, clientSecret, adSettings).GetAwaiter().GetResult();var client = new OperationalInsightsDataClient(creds);client.WorkspaceId = workspaceId;

Step 3: Extract data from log analytics.

The following code is to iterate each query that the used in the log analytics to extract data.

Code:

string[] query = new string[] { "ADFActivityRun | where TimeGenerated > ago(24h)| where Status !in ('InProgress', 'Queued')","ADFPipelineRun | where TimeGenerated  > ago(24h)| where Status !in ('InProgress', 'Queued')","ADFTriggerRun | where TimeGenerated  > ago(24h) | where Status !in ('InProgress', 'Queued')"};foreach (var qry in query){var results = client.Query(qry);dynamic jsonObj = JsonConvert.SerializeObject(results.Results);//ADFActivityRUn insterted in the Snowflake tableif (qry.Contains("ADFActivityRun")){var myDeserializedClass = JsonConvert.DeserializeObject<List<ADFActitivtyRun>>(jsonObj);log.LogInformation("\nADF_ACTIVITY_RUN Table insert scripts are initiated.....");inserADFActitivtyRunData(myDeserializedClass, log);log.LogInformation("ADF_ACTIVITY_RUN Table insert scripts are completed.....");}//ADFPipelineRun insterted in the Snowflake tableelse if (qry.Contains("ADFPipelineRun")){var myDeserializedClass = JsonConvert.DeserializeObject<List<ADFPipelineRun>>(jsonObj);log.LogInformation("\nADF_PIPELINE_RUN Table insert scripts are initiated.....");inserADFPipelineRunData(myDeserializedClass, log);log.LogInformation("ADF_PIPELINE_RUN Table insert scripts are completed.....");}//ADF_Trigger_Run insterted in the Snowflake tableelse{var myDeserializedClass = JsonConvert.DeserializeObject<List<ADFTriggerRun>>(jsonObj);log.LogInformation("\nADF_TRIGGER_RUN Table insert scripts are initiated.....");insertTriggerRunData(myDeserializedClass, log);log.LogInformation("ADF_TRIGGER_RUN Table insert scripts are completed.....");}}log.LogInformation("\nTables are Created sucessfully...");

Step 4: Connect and insert data into the snowflake.

Connect to snowflake using connection string to extract data from log analytics to insert into the snowflake table

Prepare insert query to insert Metadata from log analytics to snowflake

Code:

using (IDbConnection conn = new SnowflakeDbConnection()){try{conn.ConnectionString = "********************************";//if (query.ToUpper().Equals("TRUNCATE"))//{//Console.WriteLine("Table Truncation Initiated...");//string truncQry = "TRUNCATE TABLE //"+Database+".TEMP.Snowflake_CostAndUsage_Report;";//CreateTable(conn, truncQry);//Console.WriteLine("Table Truncation completed...");}else{Console.WriteLine("Insert values...");CreateTable(conn, query);//Console.WriteLine("Insertion completed...");}}catch (Exception err){Console.WriteLine(err.ToString());}finally{conn.Close();Console.WriteLine("Connection closed...");}}}

For Reference: https://github.com/Raghul-1999/Project.git

Conclusion

Finally, to view the extracted pipeline metadata in the snowflake table

About us:

Bi3 has been recognized for being one of the fastest-growing companies in Australia. Our team has delivered substantial and complex projects for some of the largest organizations around the globe and we’re quickly building a brand that is well known for superior delivery.

Website: https://bi3technologies.com/

Follow us on,
LinkedIn : https://www.linkedin.com/company/bi3technologies
Instagram :
https://www.instagram.com/bi3technologies/
Twitter :
https://twitter.com/Bi3Technologies

--

--