Making Petabyte Scale Data Actionable with ADX and AutoML Part 1

Aaron (Ari) Bornstein
Microsoft Azure
Published in
6 min readNov 28, 2018

TLDR; In this series we will discuss how to make petabyte scale data actionable through Ingestion, Processing Visualization and Machine Learning. This walk through will showcase the new Azure Data Explorer Service and it’s unique capabilities.

Post #1: Creating a Azure Data Explorer Cluster Through Code

In the first post in this series we will focus on how to get started with the Azure Data Explorer in python and load batch data for analysis.

A Self Contained Jupyter notebook with all the steps outlined in this post can be found below:

Goals:

What is Azure Data Explorer?

Azure Data Explorer is a fast and highly scalable data exploration service for log and telemetry data. It helps you handle the many data streams emitted by modern software, so you can collect, store, and analyze data. Azure Data Explorer is ideal for analyzing large volumes of diverse data from any data source, such as websites, applications, IoT devices, and more.

Tutorial

A completed End2End Notebook For Getting Started With Azure Data Explorer can be found here.

Prerequisites

  • Azure Account you can get a Free Account here

Step 1: Load Azure Notebook

Step 2: Authenticate with Azure

!az login
!az ad sp create-for-rbac --sdk-auth > mycredentials.json
import os, json
with open('mycredentials.json') as data_file:
azure_session = json.load(data_file)
# delete credentials file
os.remove("mycredentials.json")

For more information on this step check out the following post:

Step 3: Create Azure Data Explorer Cluster

Now that we have authenticated our session with a our Azure Account lets use the Azure Resource Template and Azure Python Deployment SDK to deploy a Azure Data Explorer Cluster

from azure.common.credentials import ServicePrincipalCredentials
from azure.mgmt.resource import ResourceManagementClient
from azure.mgmt.resource.resources.models import DeploymentMode
credentials = ServicePrincipalCredentials(
client_id=azure_session["clientId"],
secret=azure_session["clientSecret"],
tenant=azure_session["tenantId"]
)
client = ResourceManagementClient(credentials, azure_session["subscriptionId"])
# Resource Group ParametersRESOURCE_GROUP_NAME = 'kustodeploymenttest'
AZURE_REGION = 'East US' # Set region here
location = ''.join(AZURE_REGION.split()).lower()
# Set Azure Data Explorer Cluster ParametersCLUSTER_NAME = 'cdacluster'
CLUSTER_NODE_SIZE = 'D13_v2'
CLUSTER_NODE_TEIR = 'Standard'
CLUSTER_NODE_CAPACITY = 2
# Set Azure Data Explorer DB Deployment ParametersDB_NAME = 'cdadb'
DB_SOFT_DELETION_PERIOD = 3650
DB_HOT_CACHE_PERIOD = 31
# Define Azure Data Explorer Deployment TemplateURI = "https://{}.{}.kusto.windows.net:443".format(CLUSTER_NAME, location)
DATA_INGESTION_URI = "https://ingest-{}.{}.kusto.windows.net:443/".format(CLUSTER_NAME, location)
template = {\
"$schema": "https://schema.management.azure.com/schemas/2015-01-01/deploymentTemplate.json#",
"contentVersion": "1.0.0.0",
"resources": [\
{\
"type": "Microsoft.Kusto/Clusters",
"sku": {\
"name": CLUSTER_NODE_SIZE,
"tier": CLUSTER_NODE_TEIR,
"capacity": CLUSTER_NODE_CAPACITY
},
"name": CLUSTER_NAME,
"apiVersion": "2017-09-07-privatepreview",
"location": AZURE_REGION,
"properties": {\
"trustedExternalTenants": [
{
"value": azure_session["tenantId"]
}
],
}
},
{\
"type": "Microsoft.Kusto/Clusters/Databases",
"name": "{}/{}".format(CLUSTER_NAME, DB_NAME),
"apiVersion": "2017-09-07-privatepreview",
"location": AZURE_REGION,
"properties": {
"softDeletePeriodInDays": DB_SOFT_DELETION_PERIOD,
"hotCachePeriodInDays": DB_HOT_CACHE_PERIOD,
},
"dependsOn": [\
"[resourceId('Microsoft.Kusto/Clusters', '{}')]".format(CLUSTER_NAME)
]
}
]
}
deployment_properties = {
'mode': DeploymentMode.incremental,
'template': template,
}
# Create Resource Group and Deploy (Note this could take 10-15 min)
resource_group_params = {'location':location}
client.resource_groups.create_or_update('kustodeploymenttest', resource_group_params)
deployment_async_operation = client.deployments.create_or_update(
RESOURCE_GROUP_NAME,
'azure-sample',
deployment_properties
)
deployment_async_operation.wait()

Step 4: Create Azure Data Explorer Client

After about 5–10 mins we should have a working Azure Data Explorer Cluster now lets create a client so that we can create tables and ingest our data.

from azure.kusto.data.request import KustoClient, KustoConnectionStringBuilder
from azure.kusto.data.exceptions import KustoServiceError
from azure.kusto.data.helpers import dataframe_from_result_table
import pandas as pd
import datetime
# Construct the connection string. In this example we are using key authentication,
# alternate options are AAD Application Certificate, AAD Application Key, and AAD User Password.
# More samples available @ # https://github.com/Azure/azure-kusto-python/blob/master/azure-kusto-data/tests/sample.py
KCSB_INGEST = KustoConnectionStringBuilder.with_aad_application_key_authentication(DATA_INGESTION_URI, azure_session["clientId"], azure_session["clientSecret"], azure_session["tenantId"])KCSB_ENGINE = KustoConnectionStringBuilder.with_aad_application_key_authentication(URI, azure_session["clientId"], azure_session["clientSecret"], azure_session["tenantId"])KUSTO_CLIENT = KustoClient(KCSB_ENGINE)

Step 5: Set Source File Information

This step is based on the getting started tutorial as such we will be using the Storm Events toy data set. In order to ingest we need to configure the following information about our data source file.

DESTINATION_TABLE = "StormEvents"
DESTINATION_TABLE_COLUMN_MAPPING = "StormEvents_CSV_Mapping"
from azure.storage.blob import BlockBlobService
from azure.kusto.ingest import KustoIngestClient, IngestionProperties, FileDescriptor, BlobDescriptor, DataFormat, ReportLevel, ReportMethod
CONTAINER = "samplefiles"
ACCOUNT_NAME = "kustosamplefiles"
SAS_TOKEN = "?st=2018-08-31T22%3A02%3A25Z&se=2020-09-01T22%3A02%3A00Z&sp=r&sv=2018-03-28&sr=b&sig=LQIbomcKI8Ooz425hWtjeq6d61uEaq21UVX7YrM61N4%3D"
FILE_PATH = "StormEvents.csv"
FILE_SIZE = 64158321 # in bytes
BLOB_PATH = "https://" + ACCOUNT_NAME + ".blob.core.windows.net/" + CONTAINER + "/" + FILE_PATH + SAS_TOKEN

Step 6: Create Table in Azure Data Explorer

Now that we have configured our Data Source File for Ingestion we need to create a Azure Data Explorer Table to contain our data.

CREATE_TABLE_COMMAND = ".create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)"RESPONSE = KUSTO_CLIENT.execute_mgmt(DB_NAME, CREATE_TABLE_COMMAND)

Check here for more table commands

Step 7: Create Ingestion Mapping

Now that we have an Azure Data Explorer Table we need to define a mapping schema to map from our Storm Dataset to the Table we created in the last step.

CREATE_MAPPING_COMMAND = """.create table StormEvents ingestion csv mapping 'StormEvents_CSV_Mapping' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EpisodeId","datatype":"int","Ordinal":2},{"Name":"EventId","datatype":"int","Ordinal":3},{"Name":"State","datatype":"string","Ordinal":4},{"Name":"EventType","datatype":"string","Ordinal":5},{"Name":"InjuriesDirect","datatype":"int","Ordinal":6},{"Name":"InjuriesIndirect","datatype":"int","Ordinal":7},{"Name":"DeathsDirect","datatype":"int","Ordinal":8},{"Name":"DeathsIndirect","datatype":"int","Ordinal":9},{"Name":"DamageProperty","datatype":"int","Ordinal":10},{"Name":"DamageCrops","datatype":"int","Ordinal":11},{"Name":"Source","datatype":"string","Ordinal":12},{"Name":"BeginLocation","datatype":"string","Ordinal":13},{"Name":"EndLocation","datatype":"string","Ordinal":14},{"Name":"BeginLat","datatype":"real","Ordinal":16},{"Name":"BeginLon","datatype":"real","Ordinal":17},{"Name":"EndLat","datatype":"real","Ordinal":18},{"Name":"EndLon","datatype":"real","Ordinal":19},{"Name":"EpisodeNarrative","datatype":"string","Ordinal":20},{"Name":"EventNarrative","datatype":"string","Ordinal":21},{"Name":"StormSummary","datatype":"dynamic","Ordinal":22}]'"""RESPONSE = KUSTO_CLIENT.execute_mgmt(DB_NAME, CREATE_MAPPING_COMMAND)

Step 8: Ingest Data

Once we have our mapping in place we can now ingest our data.

INGESTION_CLIENT = KustoIngestClient(KCSB_INGEST)
INGESTION_PROPERTIES = IngestionProperties(database=DB_NAME, table=DESTINATION_TABLE, dataFormat=DataFormat.csv, mappingReference=DESTINATION_TABLE_COLUMN_MAPPING, additionalProperties={'ignoreFirstRecord': 'true'})
BLOB_DESCRIPTOR = BlobDescriptor(BLOB_PATH, FILE_SIZE) # 10 is the raw size of the data in bytes
INGESTION_CLIENT.ingest_from_blob(BLOB_DESCRIPTOR,ingestion_properties=INGESTION_PROPERTIES)

Step 9: Validate That data was Ingested into the Table

Let’s validate that our data was ingested.

QUERY = "StormEvents | count"
RESPONSE = KUSTO_CLIENT.execute_query(DB_NAME, QUERY)
dataframe_from_result_table(RESPONSE.primary_results[0])

Step 10: Make First Queries

Lets create a sample query to analyze the first 1000 events in our dataset.

KUSTO_QUERY  = "StormEvents | sort by StartTime desc | take 1000"RESPONSE = KUSTO_CLIENT.execute(DB_NAME, KUSTO_QUERY)

After you’ve run this command check out the query documentation to try out some more complex queries.

Call to Action

  • Look at the query documentation and try out some queries of you own
  • Check out the Notebook and deploy your own petabyte scale data
  • Keep your eye out on the next post.

Next Post

The data in this post was toy data. In reality most petabyte scale data is not batched but rather streamed. The next post in this series will review how to ingest stream data from github using event hubs in python, visualize data and review more complex queries. The last post in the series will focus on how to operationalize your data with Azure Auto ML

If you have any questions, comments, or topics you would like me to discuss feel free to follow me on Twitter.

About the Author
Aaron (Ari) Bornstein is an avid AI enthusiast with a passion for history, engaging with new technologies and computational medicine. As an Open Source Engineer at Microsoft’s Cloud Developer Advocacy team, he collaborates with Israeli Hi-Tech Community, to solve real world problems with game changing technologies that are then documented, open sourced, and shared with the rest of the world.

--

--

Aaron (Ari) Bornstein
Microsoft Azure

<Microsoft Open Source Engineer> I am an AI enthusiast with a passion for engaging with new technologies, history, and computational medicine.