Using Snowflake on Azure for Querying Azure Event Hubs Capture Avro Files

Arsen Vladimirskiy
Dec 27, 2018 · 4 min read

Video Walkthrough

Table of Contents

Create Storage Account For Files and Queue

# Create resource group
az group create -n avehc1 -l eastus2
# Create storage account for upload files and for queue
az storage account create -g avehc1 -n avmyfiles1 --sku Standard_LRS -l eastus2 --kind StorageV2
# Create container
az storage container create -n myfiles --account-name avmyfiles1

# Create queue for events
az storage queue create -n queue1 --account-name avmyfiles1

Create Azure Event Hub

# Create event hub namespace
az eventhubs namespace create -g avehc1 -n avehc1ns -l eastus2 --sku Standard
# Create event hub
az eventhubs eventhub create -g avehc1 -n avehc1 --namespace-name avehc1ns
# Create storage account and container for event hub capture files
az storage account create -g avehc1 -n avmycapture1 --sku Standard_LRS -l eastus2 --kind StorageV2 az storage container create -n mycapture --account-name avmycapture1

Create Event Grid Subscription

az eventgrid event-subscription create --resource-id /subscriptions/SUBSCRIPTION_ID/resourceGroups/avehc1/providers/Microsoft.Storage/storageAccounts/avmyfiles1 --name blob2queue --endpoint-type storagequeue --endpoint /subscriptions/SUSBSCRIPTION_ID/resourceGroups/avehc1/providers/Microsoft.Storage/storageAccounts/avmyfiles1/queueservices/default/queues/queue1

Upload Batch of Files to Generate Events

az storage blob upload-batch --account-name avmyfiles1 --destination myfiles --source /mnt/c/Python36 --pattern "*.*"

Snowflake Queries

use database TEST_DB;create or replace file format av_avro_format
type = 'AVRO'
compression = 'NONE';

show file formats;
-- Create Snowflake stage pointing to the container with the captured Avro files
create or replace stage aveventgrid_capture
url='azure://avmycapture1.blob.core.windows.net/mycapture'
credentials=(azure_sas_token='?st=xxxxxxxxxxxxxxxxxxxxxxx') file_format = av_avro_format;
-- List all Avro files
list @aveventgrid_capture;
-- Count records in all Avro files
select count(*) from @aveventgrid_capture;
-- Look at raw data in one Avro file
select * from @aveventgrid_capture/avehc1ns/avehc1/0/2018/12/27/01/38/26.avro;
-- Decode the body
select HEX_DECODE_STRING($1:Body) from @aveventgrid_capture/avehc1ns/avehc1/0/2018/12/27/01/38/26.avro;
-- Parse other fields of the Avro file
select HEX_DECODE_STRING($1:Body), TO_TIMESTAMP(REPLACE($1:EnqueuedTimeUtc,'""',''),'MM/DD/YYYY HH:MI:SS AM'), TO_NUMBER($1:Offset), $1:Properties, TO_NUMBER($1:SequenceNumber), $1:SystemProperties from @aveventgrid_capture/avehc1ns/avehc1/0/2018/12/27/01/38/26.avro;
-- Create table to store parsed Avro capture files
create or replace table aveventgrid_capture
( jsontext variant,
eh_enqueued_time_utc timestamp_ntz,
eh_offset int,
eh_properties variant,
eh_sequence_number int,
eh_system_properties variant
);
-- Review the table which is initially empty
select * from aveventgrid_capture;
-- Load data from Avro files into the created Snowflake table
copy into aveventgrid_capture (jsontext, eh_enqueued_time_utc, eh_offset, eh_properties, eh_sequence_number, eh_system_properties)
from ( select HEX_DECODE_STRING($1:Body), TO_TIMESTAMP(REPLACE($1:EnqueuedTimeUtc,'""',''),'MM/DD/YYYY HH:MI:SS AM'), TO_NUMBER($1:Offset), $1:Properties, TO_NUMBER($1:SequenceNumber), $1:SystemProperties from @aveventgrid_capture
);
-- Review how lateral flatten works to break up a JSON array into individual records
select value from aveventgrid_capture, lateral flatten ( input => jsontext );
-- Query event grid blob storage events by parsing JSON using Snowflake’s built-in functions
select
value:eventType::string as eventType,
value:eventTime::timestamp as eventTime,
value:subject::string as subject,
value:id::string as id
from aveventgrid_capture, lateral flatten ( input => jsontext )
where not value:eventType::string is null;
-- Look for specific event id
select
value:eventType::string as eventType,
value:eventTime::timestamp as eventTime,
value:subject::string as subject,
value:id::string as id
from aveventgrid_capture, lateral flatten ( input => jsontext )
where value:id::string = '50eac4d4-e01e-00b5-5584-9da1d9063d9d';
-- Group events by minute and event type
select
date_trunc('MINUTE',value:eventTime::timestamp) as eventTimeWindow,
value:eventType::string as eventType,
count(*) as eventCount
from aveventgrid_capture, lateral flatten ( input => jsontext )
where not value:eventType::string is null
group by eventTimeWindow, eventType
order by eventTimeWindow desc;

Arsen Vladimirskiy

Written by

Azure Cloud Architect & Software Engineer at Microsoft, Commercial Software Engineering (CSE) Team

Welcome to a place where words matter. On Medium, smart voices and original ideas take center stage - with no ads in sight. Watch
Follow all the topics you care about, and we’ll deliver the best stories for you to your homepage and inbox. Explore
Get unlimited access to the best stories on Medium — and support writers while you’re at it. Just $5/month. Upgrade