Data Analystに3PAS RAW Dataを焚べる

Data Analystに「3PASのRAW DataをTableauでいろいろ可視化したい」との要望をもらい、作業したときのメモ。

Requirement

「3PASのRAW Dataをユーザ、セッション、コンバージョン毎に分解し、貢献度を計算してTableauで可視化したい。集計は週次でやるけど、気になるときは毎日見たい。」との要望。

3PASのRAW Dataは、日別のCSVで提供されるので、月に一度の作業ならダウンロードしてPythonで処理するのもありだけど、週次、日次とかだとさすがに無理。なので、RAW Dataを全部Bigqueryにロードして処理することにします。

もちろん、Treasure Dataを使う手もあるんだけど、Treasure Dataは、いろいろUDFが用意されていて、なんでも簡単にできちゃうから汎用的なメモにならため(…)、Bigquery+標準SQL(#standardSQL)で話を進めます。

3PAS RAW Data

RAW Dataは 日別のcsv/zip,gz形式でftpやGCSに保存されます。Impression, Click, Conversionのデータはロードの失敗時のリカバリがやりやすいよう、日別のテーブルに格納します。また、Match File(マスタ)は、最新のものがあればいいのでReplaceでテーブルに格納します。
以下は、Impression/ClickのデータがStandardYYYYMMDDテーブル、ConversionのデータがConversionYYYYMMDDテーブルに格納される想定です。

Bulk Load

データのロードはembulk、データの待受やロード後のレポートを含めたコントロールはdigdagでやります。embulkのyamlを書く際には、カラム名に空白がある場合はembulk-filter-columnでrenameしたり、timestapmのparseエラーでデータがskipされたりしないようembulk-filter-timestamp_formatを使ったりするのがポイントです。

Union

StandardとConversionを一緒に処理したいので、unionします。EventDateとConversionDateのようにTable毎に違う名前が付いていることがあるのでaliasで項目名を合わせせておきます。また、Impression/Click/Conversionの順番でレコードが綺麗に並ぶよう(Impression/ClickとConversionが同じ時間になることがある)、ConversionのEventTypeIDの桁をずらす加工をしておきます。

忘れては行けないのがTimezone。データはUTCなので、前日のテーブルにJSTの早朝のデータが入っています。前日のテーブルも対象とするよう_TABLE_SUFFIXの範囲に注意しつつ、日付をJSTで比較します。

#standard SQL
WITH
StandardView AS (
SELECT
*
FROM (
SELECT
UserID,
EventDate,
EventID,
CAST(EventTypeID AS INT64) EventTypeID
From
`project.dataset.Standard*`
WHERE
_TABLE_SUFFIX BETWEEN '20170130' AND '20170201'
AND EventDate >= TIMESTAMP('2017-02-01 00:00:00 Asia/Tokyo')
AND EventDate < TIMESTAMP('2017-02-02 00:00:00 Asia/Tokyo') )
WHERE
UserID IN (
SELECT
UserID
FROM
`project.dataset.Conversion*`
WHERE
_TABLE_SUFFIX BETWEEN '20170130' AND '20170201'
AND ConversionDate >= TIMESTAMP('2017-02-01 00:00:00 Asia/Tokyo')
AND ConversionDate <= TIMESTAMP('2017-02-02 00:00:00 Asia/Tokyo') )
),
ConversionView AS (
SELECT
UserID,
ConversionDate EventDate,
ConversionID EventID,
CAST(EventTypeID AS INT64) + 10 EventTypeID
FROM
`project.dataset.Conversion*`
WHERE
_TABLE_SUFFIX BETWEEN '20170130' AND '20170201'
AND ConversionDate >= TIMESTAMP('2017-02-01 00:00:00 Asia/Tokyo')
AND ConversionDate <= TIMESTAMP('2017-02-02 00:00:00 Asia/Tokyo')
)
SELECT
*
FROM
(SELECT * FROM StandardView) UNION ALL
(SELECT * FROM ConversionView)

Serialize

UserID毎に時間で並べて行番号を付けておきます。Unionの時に加工しておいたEventTypeIDをここで使います。

SELECT
*,
ROW_NUMBER() OVER (PARTITION BY UserID ORDER By EventDate, EventID) SerialNo
FROM (
-- 以下略 --
)

Boundary

セッション(30分以上アクセスが途絶えたら別セッション)、イベント(Impression,ClickからConversionまで)を区別できるよう、境界のレコードでフラグを立てます。
LAG()で前の行のEventDateとEventTypeIDを取っておいて、その値を使って境界のレコードにフラグを立てます。

SELECT
*,
CASE
WHEN UNIX_SECONDS(EventDate) - UNIX_SECONDS(PrevEventDate) >= 60*30 THEN 1
WHEN PrevEventDate IS NULL THEN 1
ELSE 0
END DateBoundary,
CASE
WHEN PrevEventTypeID IS NULL THEN 1
WHEN PrevEventTypeID = 11 THEN 1
ELSE 0
END EventBoudary
FROM (
SELECT
*,
LAG(EventDate,1) OVER (PARTITION BY UserID ORDER BY SerialNo) PrevEventDate,
LAG(EventTypeID,1) OVER (PARTITION BY UserID ORDER BY SerialNo) PrevEventTypeID
FROM (
-- 以下略 --
)
)

Sessionize

セッションの切れ目のフラグ(DateBoundary)を累積するとSession NOが抽出でき、UserID + SessionNoをSessionIDとすれば、以降、セッションごとの集計ができます。

SELECT
*,
CONCAT(UserID,'-',CAST(SessionNo AS STRING)) SessionID
FROM (
SELECT
*,
SUM(DateBoundary) OVER (PARTITION BY UserID ORDER BY SerialNo) SessionNo
FROM (
-- 以下略 --
)
)

EventNo

SessionIDごとにイベントの切れ目のフラグ(EventBoudary)を累積していくとConversionを区切りとしたEventNoが抽出できます。

SELECT
*,
SUM(EventBoudary) OVER (PARTITION BY SessionID ORDER BY SerialNo) EventNo
FROM (
-- 以下略 --
)

Contribution

Conversionに対するImpression/Clickの貢献度は、EventTypeID別にフラグを付けて累積し、逆数をとってスコアとします。累積をUserID, SessionID, EventNoのどのレベルで行うかは、用途にあわせて調整します。

SELECT
*,
CASE WHEN ImpressionCount > 0 THEN 1/ImpressionCount ELSE 0 END ImpressionScore,
CASE WHEN ClickCount > 0 THEN 1/ClickCount ELSE 0 END ClickScore,
CASE WHEN (ImpressionCount+ClickCount) > 0 THEN 1/(ImpressionCount+ClickCount) ELSE 0 END TotalScore
FROM (
SELECT
*,
SUM(ImpressionContribution) OVER (PARTITION BY SessionID ORDER BY SerialNo) ImpressionCount,
SUM(ClickContribution) OVER (PARTITION BY SessionID ORDER BY SerialNo) ClickCount
FROM (
SELECT
*,
CASE WHEN EventTypeID = 1 THEN 1 ELSE 0 END ImpressionContribution,
CASE WHEN EventTypeID = 2 THEN 1 ELSE 0 END ClickContribution FROM (
-- 以下略 --
)
)
)

TIMESTAMP format

TIMESTAMP型のデータは、ExcelやTableauで処理するときに一手間必要となるので文字列にフォーマットしておきます。

SELECT
FORMAT_TIMESTAMP('%Y-%m-%d %H:%M:%S', EventDate, 'Asia/Tokyo') EventDate,
-- 以下略 --

Finally

ここまでやってViewにしておくか、日次、週次で集計テーブルに書き出すようdigdagで設定しておけば、あとはTableauでよろしく..とはいかず、「実はこの場合は..、これはこう置き換えて..」など、様々な追加要望が来るわけですが、その時は、あちこち修正をかけていく事になります。

Like what you read? Give Yoshihiko Miyaichi a round of applause.

From a quick cheer to a standing ovation, clap to show how much you enjoyed this story.