以零售為例,動手實作以資料驅動的商業決策系統

將零售資料轉化為以資料驅動的商業決策系統參考架構

這個系統是以零售 (retail) 場景為範例設計,瞭解如何動手實作以資料驅動的商業決策(比方說根據銷售資料簡單計算出商品間的關聯性,發掘出客戶的喜好),這篇文章將會介紹架構設計概念,更重要的是,會以 Azure IoT/Data Services 示範如何實作這樣的系統。

Lambda Architecture

這個系統主要分為三塊:資料提取 (data ingestion)資料轉置 (data transformation)、以及資料分析 (data analytics),在這個案例中我們將使用 Lambda Architecture 的概念來做設計。

在說明為什麼要使用 Lambda Architecture 之前,先讓我們思考一般在設計一個交易型應用程式的資料流程可能會像這樣:

一個資料 ETL (Extract-Transform-Load) 的流程

這樣的架構已經經過無數個專案驗證、沒有什麼大問題,它尤其適合像是 ERP、LoB (Line-of-Business) 應用系統 -- 系統需求、應用程式功能與範圍大致不太會變動,所以可以在設計初期就把資料結構完美地規化好 schema 也做正規化,當資料產生後,也可以在預期下做好轉置 (transform) 以及預載 (load) 放在如資料倉儲 (data warehouse) 中來提供報表或分析使用。

但每個程式開發人員一定都面臨過一個討厭的場景 — 「老闆/客戶又改規格了」,這種問題有時候是開發前沒有想清楚規格及需求造成,但也有可能是因為期待從大量能蒐集的資料中淘金,想說先把資料存下來再看看以後可以怎樣運用 (需求尚不明確);或是蒐集的資料來自四面八方、格式都不一樣、甚至資料量非常龐大 (從感測器蒐集、從社交網路爬取等等),預先處理可能不切實際等等可能。

所以 Lambda Architecture 的概念就被提出來了,它的一個簡單的想法是有沒有可能我先把資料用簡單的方式儲存好,真的需要它而且也知道要做什麼用途時才進行 ETL 的操作,對應上圖的流程可能就像是這樣:

資料先存下來,有明確需求時再進行轉置或載入

所以 Lambda Architecture 完整的細節就可能是這樣:

Lambda Architecture 的概念

因為我們要適用的產業是零售業,不管是線上的電子商務網站,還是實體通路的零售店面,都可以蒐集到很多客戶的行為資料(如:瀏覽商品、結帳細節等等),然後再依據不同的商業需求,隨時都能輕易再將這些原始資料進行轉置與分析,而不必擔心當需求變動時得大幅度改變系統設計。

接下來我們會介紹實作的概念與細節,所有的程式碼都可以在這個 GitHub repo 上找到。

資料提取 (Data Ingestion)

首先要思考的,若我們就是零售業的經營者,當然希望業績是不斷成長,線上商店有更多的客戶、更多的商品,實體通路就是能展更多的店面,不論是哪一種面向,伴隨而來的就是會產生愈來愈大量的資料,所以勢必要解決兩件事:

  • 能不能用最經濟的方式來儲存這些資料?
  • 如何有一個能應付高頻 (high bandwidth) 大量的資料蒐集接口?

經濟的資料儲存

我遇過很多開發人員,在設計系統時一想到要 (永久地) 儲存資料就會想存到資料庫中,而且還是像 MS SQL 或 MySQL 等這類關聯式資料庫。但別忘了這類型的資料庫設計來做交易 (transaction) 的,在我們要設計的系統中,這些蒐集來的客戶行為資料比較像是 events 或 logs,是儲存後就 (正常狀況下) 不會去修改它的,放在處理 OLTP 的資料庫中只能說是「有錢就是任性」,因為你得為了這些不常被 “transaction” 的資料煩惱關聯式資料庫的延展問題,不是花人力就是花金錢解決。

所以這類型的資料一般都會建議使用檔案或是 blob 儲存體的形式來儲存,以撰文時的牌價來比較,儲存 1TB 的資料在 Azure SQL 資料庫(假設單顆 DB 搭配 S3 方案,還不看資料存取效能需求)每個月約新台幣 4,654 元 (參考來源),而放在 Azure Blob 儲存體中,每個月僅需新台幣 1,000 元左右 (看選擇的備援等級而定,如果移至冷區或封存會更便宜,參考來源),當資料量更大後差距更大。

放在 Azure Blob 儲存體的資料,未來要透過 Hadoop 或 Spark 等工具來處理也是完全沒問題的。

高頻的資料接口

面對大量、快速而且持續成長的資料,你應該不會想說還寫個 Web API 來接收資料,一方面延展 (當資料來源或資料量增長時) 是個問題之外,還有身份驗證、多種通訊協定的支援等等的工作要做。

在這樣的情形之下,選擇 Azure IoT Hub 做為提取資料的入口是個好主意,它本身就為了高頻資料的佇列設計,而且支援多種通訊協定 (HTTP, MQTT, AMQP 等),我們在實作時會再介紹它的特色。

即時處理以及資料封存

解決了接資料與儲存資料的問題後,系統前半段的架構幾乎就要完成了,Azure IoT Hub 將資料接起來後佇列在那裡,還需要一個動作把這些資料拿出來存在 Blob 儲存體中(佇列內的資料可以保存一段時間,但不是永久),這部份我們可以很簡單地用 Azure Stream Analytics (ASA) 這個服務來做,因為它可以即時監控 Azure IoT Hub (當然 ASA 的資料來源有很多種),一旦有資料進入 Azure IoT Hub 就立刻進行處理,我們就先把資料存進 Blob 儲存體中。

這樣一來服務架構就會像是這樣:

資料來源到儲存的流向

接下來讓我們看一下各自的實作細節。

IoT Hub 的設計

IoT Hub 在這裡的角色就相當於一個高頻資料輸入的佇列 (queue),所以我們的資料來源 (可能是 web 的 payment gateway,也可能是實體通路的 POS 機) 就可以直接往 IoT Hub 送,不過資料要送到 IoT Hub 之前必須要到 IoT Hub 上註冊取得一組專屬金鑰,這樣才能通過驗證開始送資料,所以 event source 一定要保存好這組金鑰才能正確將資料送出。

你可以參考這份程式碼來參考如何將資料送到 IoT Hub 中。

這個 「event source 模擬器」是以 Node.js 實作的,為了可攜性故將金鑰存在 Azure Cosmos DB (Document DB) 中。

Stream Analytics 的設計

在現有的需求架構下,Stream Analytics 的部份相對簡單一點,只要設定好 Input 有一個來自 IoT Hub 的資料流 (data stream),然後 Output 設定 Blob 儲存體,並且用日期格式來區分資料路徑,細節可以參考這份程式碼的設定範例

不過要注意的是,送到 IoT Hub 中的資料為了精簡一點,所以銷售的商品資訊我只傳送 productId 資料,而銷售商品的型錄另外存成一個 catalog.json 檔案,並且把它先放在 Blob 儲存體中做為參考資料,這樣後續的資料操作就能用 productId 的資料再關聯起來。

Stream Analytics 處理資料的程式是以一個類似 SQL 查詢的語法來操作,它像是這樣:

SELECT eventDate, userId, productId, quantity, price
INTO Archive
FROM POSInput TIMESTAMP BY eventDate

其中 POSTInput 就是設定了 IoT Hub 的 alias,而 Archive 就是 Blob 儲存體的 alias。

當資料開始往 IoT Hub 傳送時,別忘了要啟動 Stream Analytics 它才會開始待命做事,這也代表你可以控制 Stream Analytics 的工作時間,若資料量更大的時候,也可以再延展 Stream Analytics 的處理單位,透過這些操作來掌握費用細節。

如果一切順利,你很快就能在 Blob 儲存體中找到像是這樣的 JSON 格式檔案:

經由 Azure Stream Analytics 處理後儲存到 Blob 裡的檔案內容

如果想看即時資訊

到這裡我們已完成了整個系統在資料提取這端的基礎架構,資料也順利儲存下來了,雖然之後再用其它工具來處理這些資料也很方便,但若是在資料產生的過程中就想看到即時資訊要怎麼做呢?比如說老闆想看一下商品銷售的即時資訊,這個時候我們可以善用 Stream Analytics 能即時將資料傳送到 Power BI 的特色,將它們結合起來。

上面我們提到原始資料必須搭配 catalog.json 檔案的內容關聯起來才能看到完整的資料,所以我們只要把它放在 blob 儲存體內,並且在 Stream Analytics 中設定參考資料 Input (Stream Analytics 可以設多個 Inputs 及多個 Outputs),然後再加入一個 Power BI 的資料集做為 Output,就可以將整個 Stream Analytics 的查詢語法修改成這樣:

WITH AllEvents AS (
SELECT productId, Count(productId) as [totalSold]
FROM POSInput
TIMESTAMP BY eventDate
GROUP BY productId, TumblingWindow(minute, 5)
)
SELECT a.productId, b.name, a.totalSold, b.category
INTO SellingBI
FROM AllEvents a
JOIN ProductCatalog as b on a.productId = b.productId
SELECT eventDate, userId, productId, quantity, price
INTO Archive
FROM POSInput TIMESTAMP BY eventDate

這裡我們所定義的「即時」是設定 5 分鐘送一次資料到 Power BI 的資料集,所以設了一個名為 AllEvents 的 window,然後把 POSInputProductCatalog 的資料 JOIN 起來送到 SellingBI 這個 Power BI 資料集的 alias 中。

順利執行的效果就會像是這樣,Power BI 上呈現的資訊每 5 分鐘就會自動更新資料,同時更新圖形。

送到 Azure IoT Hub 的資料,透過 Azure Stream Analytics 即時傳送到 Power BI 中呈現

資料轉置 (Data Transformation)

當系統上線後,blob 裡開始陸陸續續會存放這些行為資料,但如果要把這些非結構化的資料轉置成有意義的表格式資料 (適用於報表或 BI 工具),由於資料量 (理論上) 已經很龐大了,這時候就可以藉助像 Hadoop 或 Spark 這類專門處理大量資料的工具來幫忙了。

使用 Hive 來處理資料

這裡使用 Hive 來做為實作案例(當然你也可以使用自己熟悉的工具如 SparkQL),而資料就直接從 Blob 儲存體中讀出來處理。在 Azure 上用 HDInsight 的 Hadoop cluster 最大的好處就是需要使用時才建立,沒用時刪掉,省下很多安裝以及維護的成本。

建好 Azure HDInsight 的 Hadoop (Linux-based) cluster 後,它預設會裝好 Ambari web 管理介面,我們可以從這裡來下 Hive query。

Ambari web 管理介面,可以透過它來操作 Hadoop cluter

在這裡輸入 Hive query 後就可以直接按 Execute 按鈕把工作送到 cluster 中去做運算。首先我們建立一個 Hive table 把放在 Blob 儲存體下的資料讀進來,後續的運算就可以直接從這個 table 來拿資料,不必再去 Blob 儲存體內撈。

CREATE EXTERNAL TABLE IF NOT EXISTS LogsRaw (jsonentry string) 
PARTITIONED BY (year INT, month INT, day INT)
STORED AS TEXTFILE LOCATION "wasbs://logs@example.blob.core.windows.net/";
ALTER TABLE LogsRaw ADD IF NOT EXISTS PARTITION (year=2017, month=10, day=31) LOCATION "wasbs://logs@example.blob.core.windows.net/2017/10/31";
ALTER TABLE LogsRaw ADD IF NOT EXISTS PARTITION (year=2017, month=11, day=01) LOCATION "wasbs://logs@example.blob.core.windows.net/2017/11/01";
SELECT * from LogsRaw limit 1;

這段 Hive query 會建立一個 LogsRaw 的 Hive table,也指定把這個 table 儲存在 blob 儲存體中。這個 table 裡面只有一個字串資料欄位 jsonentry 用來存原始資料,然後會根據日期來做 partition。

Blob 的資料是假設存在 https://example.blob.core.windows.net/logs/2017/10/31/xxxx.json 中,只是 Hive query 支援 Azure blob 的格式 (wasbs://……)必須把容器放在最前面。

如果執行完沒有錯誤,最後一行的 SELECT 應該就可以撈出一筆資料來確認是否成功。

運算完畢後應該可以順利撈出一筆資料確認 Hive table 的建置與資料讀取無誤

接著先在 Blob 儲存體中建立一個 processeddata 的容器,用來存放處理過的資料,然後再執行這個 Hive query 把原始資料 (raw data) 轉成表格式的資料結構,這裡我們也是以建立一個 Hive table 來完成:

CREATE TABLE IF NOT EXISTS storeActivity (
eventdate timestamp,
userid string,
productid string,
quantity int,
price int
) PARTITIONED BY (year int, month int, day int)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n'
STORED AS TEXTFILE LOCATION 'wasbs://processeddata@example.blob.core.windows.net/structuredlogs';
ALTER TABLE storeActivity ADD IF NOT EXISTS PARTITION (year=2017, month=10, day=31) LOCATION 'wasbs://processeddata@example.blob.core.windows.net/structuredlogs/2017/10/31';
ALTER TABLE storeActivity ADD IF NOT EXISTS PARTITION (year=2017, month=11, day=01) LOCATION 'wasbs://processeddata@example.blob.core.windows.net/structuredlogs/2017/11/01';

完成後就會建立一個 storeActivity 的 Hive table,然後資料會儲存在 https://example.blob.core.windows.net/processeddata/structuredlogs 中,跟 LogsRaw 一樣以日期來做 partition。

再來就是把 LogsRaw 裡的資料拆解 (Extract) 後寫進 storeActivity 表格中:

INSERT OVERWRITE TABLE storeActivity Partition (year, month, day)
SELECT CAST(CONCAT(split(get_json_object(jsonentry, "$.eventdate"),'T')[0], ' ', SUBSTRING(split(get_json_object(jsonentry, "$.eventdate"),'T')[1],0,LENGTH(split(get_json_object(jsonentry, "$.eventdate"),'T')[1])-1)) as TIMESTAMP) as eventdate,
get_json_object(jsonentry, "$.userid") as userid,
get_json_object(jsonentry, "$.productid") as productid,
CAST(get_json_object(jsonentry, "$.quantity") as int) as quantity,
CAST(get_json_object(jsonentry, "$.price") as int) as price,
year, month, day
FROM LogsRaw;

完成之後你可以試著執行像是 SELECT * from storeActivity LIMIT 20; 這樣的查詢語句看看資料是否正確地寫入 storeActivity 的表格中。

正確寫入 storeActivity 後查詢應該就有資料了

透過這些步驟,相信你已經可以感覺得到如何操作儲存在 blob 儲存體內的資料了,當有必要時可以再使用像是 Hive 這類工具把它轉成表格式的資料。例如我想產生一個表格是產品的銷售狀況,我可以再把 catalog.json 的內容載入做關聯。

首先,建一個 Hive table 把 catalog.json 的資料載入,最後用 ProductCatalog 這個 table 來存參考的型錄資料:

DROP TABLE IF EXISTS RawProductCatalog;
CREATE EXTERNAL TABLE RawProductCatalog (
jsonentry string
) STORED AS TEXTFILE LOCATION "wasbs://ref@example.blob.core.windows.net/catalog/";
DROP TABLE IF EXISTS ProductCatalog;
CREATE TABLE ProductCatalog ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n'
LOCATION 'wasbs://processeddata@example.blob.core.windows.net/product_catalog/'
AS SELECT get_json_object(jsonentry, "$.productId") as productId,
get_json_object(jsonentry, "$.name") as name,
get_json_object(jsonentry, "$.category") as category,
get_json_object(jsonentry, "$.price") as price
FROM RawProductCatalog;

若要觀看 2017/10/31 的銷售狀況,也只要下這樣的查詢就可以了:

SELECT a.productid, b.name, b.category, SUM(a.quantity) as totalSold
FROM storeActivity a LEFT OUTER JOIN ProductCatalog b
ON a.productid = b.productid
WHERE year=2017 and month=10 and day=31
GROUP BY a.productid, b.name, b.category
ORDER BY totalSold DESC;

結果就像是這樣:

從 storeActivity 與 ProductCatalog 關聯後的資料

有了像 Hive 這樣的工具,操作非結構化的資料就像用 SQL 查詢資料庫一樣方便。

舉例來說,下面這段查詢就可以根據銷售資料簡單計算出商品間的關聯性,發掘出客戶的喜好:

DROP VIEW IF EXISTS unique_purchases;
CREATE VIEW unique_purchases AS
SELECT distinct userid, productid
FROM storeActivity where eventdate > date_sub(from_unixtime(unix_timestamp()),30);
DROP VIEW IF EXISTS all_purchased_products;
CREATE VIEW all_purchased_products AS
SELECT a.userid, COLLECT_LIST(CONCAT(a.productid,',',a.qty)) as product_list from (
SELECT userid, productid, sum(quantity) as qty FROM websiteActivity
WHERE eventdate > date_sub(from_unixtime(unix_timestamp()),30)
GROUP BY userid, productid
ORDER BY userid ASC, qty DESC) a
GROUP BY a.userid;
DROP VIEW IF EXISTS related_purchase_list;
CREATE VIEW related_purchase_list AS
SELECT a.userid, a.productid, b.product_list
FROM unique_purchases a LEFT OUTER JOIN all_purchased_products b ON (a.userid = b.userid);
DROP TABLE IF EXISTS related_products;
CREATE TABLE related_products ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' LINES TERMINATED BY '\n' LOCATION 'wasbs://processeddata@example.blob.core.windows.net/related_products/' AS
SELECT c.productid, c.related_product, c.qty, rank() OVER (PARTITION BY c.productid ORDER BY c.qty DESC) as rank FROM
(SELECT a.productid, a.related_product, SUM(a.quantity) as qty FROM
(SELECT b.productid, SPLIT(prod_list, ',')[0] as related_product, CAST(SPLIT(prod_list, ',')[1] as INT) as quantity
FROM related_purchase_list b LATERAL VIEW EXPLODE(b.product_list) prodList as prod_list) a
WHERE a.productid <> a.related_product
GROUP BY a.productid, a.related_product
ORDER BY a.productid ASC, qty DESC) c;
SELECT * from related_products;

最後會建立一個 related_products 的 Hive table 然後儲存在Blob 儲存體 https://example.blob.core.windows.net/processeddata/related_products/ 下。

從銷售行為中計算出客戶喜好商品間的關聯性
需要的資料都計算完畢時,因為都已經儲存在 Blob 儲存體中了,就可以把 HDInsight 的 cluster 刪掉省錢了。

資料分析 (Data Analytics)

雖然我們用 Hive 可以建立許多 Hive table 把非結構化的資料轉換成表格式的資料,然後依然使用 Hive query 來查詢需要的資料,但很多商業軟體或是既有的資料分析工具不一定直接支援 Hive,這個時候我們可以運用 Azure SQL Data Warehouse 來兼顧 SQL Server 相容性以及資料倉儲的功能。

我們可以用 Azure SQL Data Warehouse 把 Hive table 轉換成 SQL Server 相容的 table 資料,這樣不只可以使用 SQL Server Management Studio 這類原本為 SQL Server 設計的管理或分析工具來繼續延用。

在 Visual Studio 中開啟 SQL Data Warehouse 的操作

由於我們已經將 Hive table 都儲存在 Blob 儲存體中,所以只要讓 SQL Data Warehouse 設定好 Blob 的帳號與金鑰後就能存取資料了。首先在 SQL Data Warehouse 中設定金鑰資訊:

IF NOT EXISTS (SELECT * FROM sys.symmetric_keys where name = '##MS_DatabaseMasterKey##')
CREATE MASTER KEY;
CREATE DATABASE SCOPED CREDENTIAL AzureStorageCredential
WITH
IDENTITY = '<Azure storage account name>',
SECRET = '<Azure storage key>';

再來設定 Hive table 的存放位置:

CREATE EXTERNAL DATA SOURCE AzureStorage
WITH (
TYPE = HADOOP,
LOCATION = 'wasbs://processeddata@example.blob.core.windows.net',
CREDENTIAL = AzureStorageCredential
);

接著設定資料格式:

CREATE EXTERNAL FILE FORMAT TextFile
WITH (
FORMAT_TYPE = DelimitedText,
FORMAT_OPTIONS (FIELD_TERMINATOR = ',')
);

現在可以開始建立 SQL Data Warehouse 的 external table 關聯到 Hive table 了,比方說關聯之前建立的 storeActivity table,先建立 schema asb 代表 Azure Storage Blob:

CREATE SCHEMA [asb]
GO

接著,從 blob 中讀出 Hive table,然後建立一個 external table 名為 asb.StoreActivityExternal

CREATE EXTERNAL TABLE asb.StoreActivityExternal
(
EventDate datetime2,
UserId nvarchar(20),
ProductId nvarchar(20),
Quantity int,
Price int
)
WITH (
LOCATION='/structuredlogs/',
DATA_SOURCE=AzureStorage,
FILE_FORMAT=TextFile
);

這樣如法砲製其它的 table 也是一樣的作法。

不過上述的作法是建立 External Table,資料實際上還是儲存在 Azure Blob 儲存體中,SQL Data Warehouse 只是轉換查詢的指令到 Blob 儲存體裡撈資料,如果要把資料讀取進來增加後續的查詢效能(以及使用 SQL Data Warehouse 中的索引功能),就應該像是這樣:

CREATE SCHEMA [adw]
GO

在 SQL Data Warehouse 中建立 table 、從 external table 讀入資料並且將 ProductId 加入索引:

CREATE TABLE adw.FactStoreActivity
WITH (
CLUSTERED COLUMNSTORE INDEX,
DISTRIBUTION = HASH(ProductId)
)
AS
SELECT
EventDate,
UserId,
ProductId,
Quantity,
Price
FROM asb.StoreActivityExternal
GO

這樣讀取 FactStoreActivity 這個 table 時就不會再去查詢 blob,除了速度更快之外,也可以利用到 ProductId 的索引來加速查詢。

流程自動化 (Automation)

到此為止我們已經完成了整個系統的架構:從資料提取、將資料儲存至 Blob 儲存體保存、再由 Hadoop (Hive) 來將資料轉換成表格化 (Hive table) 的資料,不過中間還有許多步驟 (例如 Hive query) 還是手動進行,但我們可以再做一些設定讓整個流程自動化,這裡就要加入 Azure Data Factory 來把這套流程建立成資料管線 (data pipeline) 並且定時讓它自動處理。

資料管線的基礎設定

在 Data Factory 的編寫頁面中,設定帳號服務的關聯,首先新增一個 Azure 儲存體的資料存放區 (data store),填入適當的帳號名稱及金鑰後,設定完按下部署

{
"name": "AzureStorageLinkedService",
"properties": {
"description": "",
"hubName": "skretaildf_hub",
"type": "AzureStorage",
"typeProperties": {
"connectionString": "DefaultEndpointsProtocol=https;AccountName=example;AccountKey=**********"
}
}
}

再設定 SQL Data Warehouse 的帳號關聯,新增一個 Azure SQL Data Warehouse 的資料存放區 (data store),填入適當的連線字串資訊後按下部署

{
"name": "AzureSqlDWLinkedService",
"properties": {
"description": "",
"hubName": "skretaildf_hub",
"type": "AzureSqlDW",
"typeProperties": {
"connectionString": "Data Source=tcp:skretail.database.windows.net,1433;Initial Catalog=skretaildw;Integrated Security=False;User ID=skretail;Password=**********;Connect Timeout=30;Encrypt=True"
}
}
}

然後再新增一個依需求的 HDInsight cluter 的計算資源,這樣我們不必事先把 HDInsight cluster 建好,只要資料管線在運作時才建立,對於成本掌控更佳。

{
"name": "HDInsightOnDemandLinkedService",
"properties": {
"type": "HDInsightOnDemand",
"description": "",
"typeProperties": {
"clusterSize": 4,
"timeToLive": "00:05:00",
"osType": "Linux",
"version": "3.6",
"linkedServiceName": "AzureStorageLinkedService"
}
}
}

接著就是設定資料管線的輸入 (Input),我們新增一個 Azure Blob 儲存體的資料集 (dataset),然後指定我們從 IoT Hub -> Stream Analytics 儲存到 Blob 的原始檔案的路徑:

{
"name": "RawJsonData",
"properties": {
"type": "AzureBlob",
"linkedServiceName": "AzureStorageLinkedService",
"typeProperties": {
"folderPath": "logs/{Year}/{Month}/{Day}",
"partitionedBy":
[
{ "name": "Year", "value": { "type": "DateTime", "date": "SliceStart", "format": "yyyy" } },
{ "name": "Month", "value": { "type": "DateTime", "date": "SliceStart", "format": "MM" } },
{ "name": "Day", "value": { "type": "DateTime", "date": "SliceStart", "format": "dd" } }
],
"format": {
"type": "JsonFormat"
}
},
"availability": {
"frequency": "Day",
"interval": 1
},
"external": true
}
}

接下來,我們以產生 storeActivity 資料為預期的輸出 (Output),而且從上述的流程說明,最後會把這個 Hive table 儲存在 Blob 儲存體中,所以再新增一筆 Azure Blob 儲存體的資料集作為輸出結果:

{
"name": "StoreActivityBlob",
"properties": {
"type": "AzureBlob",
"linkedServiceName": "AzureStorageLinkedService",
"structure": [
{
"name": "eventdate",
"type": "Datetime"
},
{
"name": "userid",
"type": "String"
},
{
"name": "productid",
"type": "String"
},
{
"name": "quantity",
"type": "Int32"
},
{
"name": "price",
"type": "Int32"
}
],
"typeProperties": {
"folderPath": "processeddata/structuredlogs/{Year}/{Month}/{Day}",
"partitionedBy":
[
{ "name": "Year", "value": { "type": "DateTime", "date": "SliceStart", "format": "yyyy" } },
{ "name": "Month", "value": { "type": "DateTime", "date": "SliceStart", "format": "MM" } },
{ "name": "Day", "value": { "type": "DateTime", "date": "SliceStart", "format": "dd" } }
],
"format": {
"type": "TextFormat",
"columnDelimiter": ","
}
},
"availability": {
"frequency": "Day",
"interval": 1
}
}
}

最後,因為我們希望資料管線最後的結果是 Azure SQL Data Warehouse 的 table,所以還要再新增一筆 Azure SQL Data Warehouse 的資料集 (dataset)

{
"name": "StoreActivitySQL",
"properties": {
"type": "AzureSqlDWTable",
"linkedServiceName": "AzureSqlDWLinkedService",
"structure": [
{
"name": "EventDate",
"type": "Datetime"
},
{
"name": "UserId",
"type": "String"
},
{
"name": "ProductId",
"type": "String"
},
{
"name": "Quantity",
"type": "Int32"
},
{
"name": "Price",
"type": "Int32"
}
],
"typeProperties": {
"tableName": "adw.FactStoreActivity"
},
"availability": {
"frequency": "Day",
"interval": 1
}
}
}

如果都順利部署,現在完成的設定應該會像是這樣:

在 Data Factory 上設定好資料存放區以及資料集,以便後續運作

協調資料管線的運作流程

設定好資料的來源、目的以及格式之後,接著就是要設定資料管線的運作了,因為我們是用 Hive query 來進行資料操作,所以要給它 Hive script 的設定,我將 Hive script 事先儲存在 blob 儲存體下,然後再設定資料管線運作時的參考。

存放好 Hive query 的檔案 (*.hql) 之後,新增一個管線 (pipeline),然後填入對應的資料:(這裡我新增了另外一個資料集 DummyDataset 作為中繼資料)

{
"name": "HadoopPipeline",
"properties": {
"description": "資料轉換的管線",
"activities": [
{
"type": "HDInsightHive",
"typeProperties": {
"scriptPath": "scripts\\addpartitions.hql",
"scriptLinkedService": "AzureStorageLinkedService",
"defines": {
"StorageAccountName": "skretailstore",
"Year": "$$Text.Format('{0:yyyy}',SliceStart)",
"Month": "$$Text.Format('{0:MM}',SliceStart)",
"Day": "$$Text.Format('{0:dd}',SliceStart)"
}
},
"inputs": [
{
"name": "RawJsonData"
}
],
"outputs": [
{
"name": "DummyDataset"
}
],
"policy": {
"timeout": "01:00:00",
"concurrency": 1,
"retry": 3
},
"scheduler": {
"frequency": "Day",
"interval": 1
},
"name": "CreatePartitionHiveActivity",
"linkedServiceName": "HDInsightOnDemandLinkedService"
},
{
"type": "HDInsightHive",
"typeProperties": {
"scriptPath": "scripts\\structuredlogs.hql",
"scriptLinkedService": "AzureStorageLinkedService",
"defines": {
"Year": "$$Text.Format('{0:yyyy}',SliceStart)",
"Month": "$$Text.Format('{0:MM}',SliceStart)",
"Day": "$$Text.Format('{0:dd}',SliceStart)"
}
},
"inputs": [
{
"name": "DummyDataset"
}
],
"outputs": [
{
"name": "StoreActivityBlob"
}
],
"policy": {
"timeout": "01:00:00",
"concurrency": 1,
"retry": 3
},
"scheduler": {
"frequency": "Day",
"interval": 1
},
"name": "ProcessDataHiveActivity",
"linkedServiceName": "HDInsightOnDemandLinkedService"
}
],
"start": "2017-11-01T00:00:00Z",
"end": "2017-11-03T00:00:00Z"
}
}

這條資料管線就會在指定的時間運作,然後把資料從 RawJsonData (從 IoT Hub -> Stream Analytics 來的原始資料) 經過 HDInsight cluster 的轉換最後存到 StoreActivityBlob 中。

我們再新增一條管線 (pipeline) 把資料從 Blob (Hive table) 轉送到 SQL Data Warehouse 中:

{
"name": "BlobDWPipeline",
"properties": {
"description": "將資料從 Blob 轉送到 SQL Data Warehouse 的管線",
"activities": [
{
"type": "Copy",
"name": "HiveToDWActivity",
"typeProperties": {
"source": {
"type": "BlobSource"
},
"sink": {
"type": "SqlDWSink",
"sqlWriterCleanupScript": "$$Text.Format('DELETE FROM adw.FactWebsiteActivity WHERE EventDate >= \\'{0:yyyy-MM-dd HH:mm}\\' AND EventDate < \\'{1:yyyy-MM-dd HH:mm}\\'', WindowStart, WindowEnd)",
"writeBatchSize": 0,
"writeBatchTimeout": "00:00:00"
}
},
"inputs": [
{
"name": "StoreActivityBlob"
}
],
"outputs": [
{
"name": "StoreActivitySQL"
}
],
"policy": {
"timeout": "1.00:00:00",
"concurrency": 1,
"retry": 3
},
"scheduler": {
"frequency": "Day",
"interval": 1
}
}
],
"start": "2017-11-01T00:00:00Z",
"end": "2017-11-03T00:00:00Z"
}
}

設定完成後,應該會像是這樣:

加入資料管線

我們也可以在 Data Factory 的圖表 (diagram) 資訊中看到視覺化的資料管線:

視覺化設定好的資料管線

監視及管理的畫面也可以監看資料管線的運作狀態:

監控管理畫面

這樣我們就設定好了自動的資料運作。

結論

從這個例子裡我們看到了從資料蒐集到資料根據需求進行轉換的管線,然後再從資料中找出有價值的資訊,全部的實作程式碼都已放在 GitHub repo 上,歡迎大家在上面交流或是提出 feedbacks。

參考資料