データの操作

Okazawa Ryusuke
22 min readOct 21, 2017

--

データの操作は科学でもあり、職人芸でもあります。ここでは科学よりテクニックに着目します。

データの調査

解決すべき問題が特定され、そのためのデータが得られたなら、早速モデルを構築し、回を得るための作業に取り掛かりたいという誘惑にかられることでしょうが、それよりもまず最初にやるべきことはデータを調べることです。

1次元データの調査

1次元のデータセットをもっているとして、数値の集合である場合が最も単純です。底データは各ユーザがサイト上に止まっている時間で合ったり、データサイエンスの書籍のページ数であったりします。

最初に行うべきはその統計量を計算することです。データ数、最小値、最大値、変チンチ、標準偏差などを算出します。

それらの値がデータに対する良い理解に寄与しないこともあります。そんな場合に取るべき次の手は、ヒストグラムの作成です。つまり値をいくつかの範囲(バケツ)に分割し、それぞれのバケツにいくつのデータが入るかを教えます。

def bucketize(point, bucket_size):
return bucket_size * math.floor(point / bucket_size)
def make_histogram(points, bucket_size):
return Counter(bucketize(point, bucket_size) for point in points)
def plot_histgram(points, bucket_size, title=""):
histgram = make_histgram(points, bucket_size)
plt.bar(histgram.keys(), histgram.values(), width=bucket_size)
plt.title(title)
plt.show()

例えば、次の2つのデータセットについて考えます。

random.seed(0)uniform = [200 * random.random() - 100 for _ in range(10000)]normal = [57 * inverse_normal_cdf(random.random())
for _ in range(10000)]

どちらも平均は0,標準偏差はおおよそ58ですが、分布は全く異なっています

2つの分布データは最大値と最小値が異なっています。しかし、それだけでは両者どのように異なっているかを示すには不十分です。

2次元データ

2次元のデータ・セットを持っているとしましょう。1日あたりサイト上で費やす時間とデータサイエンティストとしての勤務年数の組み合わせなどが考えられます。それぞれの次元について理解を深めるのは当然として、データの散らばりにも興味があるはずです。例えば、次の適当に作ったデータで考えてみましょう。

def random_normal():
return inverse_normal_cdf(random.random())
xs = [x + random_normal() / 2 for x in xs]
ys1 = [ x+random_normal() / 2 for x in xs]
ys2 = [ -x + random_normal()/ 2 for x in xs]

多次元データ

多次元のデータに対しては、それぞれの次元が他の次元とどのように関連しているかを知ることが重要です。簡単な手法として、相関行列を調べる方法があります。これは次元とj次元の相関を行列のi行j列の値としたものです。

def correlation_matrix(data):
_, num_colums = shape(data)
def matrix_entry(i, j):
return correlation(get_column(data, i), get_column(data, j))
return make_matrix(num_columns, num_columns, matrix_entry)

(次元数がそんなにおおくないのであれば)より可視的な手法として、各次元データの組み合わせで作る散布図行れるを使う方法があります。この分圧されたクラフはplt.subplots()を使って書きます。作成するグラフの行数と列数を与えてfigureオブジェクトと軸オブジェクトの配列を取得します。

import matplotlib.pyplot as plt_, num_columns = shape(data)
fig, ax = plt.subplots(num_columns, num_columns)
for i in range(num_columns):
for j in range(num_columns):
if i != j: ax[i][j].scatter(get_column(data, j), get_column(data, i))
else: ax[i][j].annotate("series" + str(i) , (0.5, 0.5), xycoords='axes fraction', ha="center", va="center")if i < num_columns - 1:ax[i][j].xaxis.set_visible(False)
if j > 0: ax[i][j].yaxis.set_visible(False)
ax[-1][-1].set_xlim(ax[0][-1].get_xlim())
ax[0][0].set_ylim(ax[0][1].get_ylim())

plt.show()

データの整理と変換

実世界のデータは整然と整っているわけではありません。大抵の場合、使う前には何らかの処理を施す必要が生じます。前章では文字列を浮動小数点または整数に変換しなければなりませんでした。そこでデータを使う直前に変換を行ってました。

closing_price = float(row[2])

csv.readerをラップする関数を通して、データの入力中に解釈する方法を使えば、よりエラーの発生を低減できるはずです。この手法では列ごとに解釈を行うパーサをリストして与えます。Noneは「列に対して何も行わない」ものとします

def parse_row(input_row, parsers):
return [parser(value) if parser is not None else value
for value, parser in zip(input_row, parsers)]
def parser_row_with(reader, parsers):
for row in reader:
yield parse_row(row, parsers)

誤ったデータがあるばあいにはどうするのでしょうか。「浮動小数点」のあるべき場所に実際には数値意外の値があったとします。大抵はプログラムがエラーを吐くよりは、Noneが返る方が望ましい動作です。これはヘルパー関数を使えば可能となります。

def try_or_none(f):
def f_or_none(x):
try: return f(x)
except: return None
return f_or_none

このヘルパー関数を使うようにparse_rowを変更します。

def parse_row(input_row, parsers):
return [try_or_none(parser)(value) if parser is not None else value
for value, parser in zip(input_row, parsers)]

例として、誤りを含むカンマ区切りの株価データを使います。このデータを解釈しながら一度に読み込みます。

import dateutil.parser
data = []
with open("comma_delimited_stock_prices.csv", "rb") as f:
reader = csv.reader(f)
for line in parse_rows_with(reader, [dateutil.parser.parse, None, float]}:
data.append(line)

読み込み後に、Noneを含む行がないかを確認します。

for row in data:
if any(x is None for x in row):
print(row)

Noneが合った場合にはどう扱うかも決めておきましょう。(一般的には、3つの選択肢があります。Noneを取り除く、元のデータに戻りかけているデータの補足や誤ったデータの修正を試みる、何もしないかです)csv.DictReader向けのヘルパー関数を作ってもいいです。その場合はフィールド名に対してパーサを指定します。例として、

def try_parse_field(field_name, value, parser_dict):
parser = parser_dict.get(field_name)
if parser is not None:
return try_or_none(parser)(value)
else:
return value
def parse_dict(input_dict, parser_dict):
return {field_name : try_parse_field(field_name, value, parser_dict)
for field_name, value in input_dict.iteritems()}

次に行うべきは、簡易な手段を使うか、または先程やったデータの調査で使ったような手法用いて外れ値がないかを調べることです。実際のデータでは小数点がかけていたり、0が余計だったり、様々な誤りがあります。そして、そしてそれを正しく扱うのはデータを分析する側つまり自分の責任で行わければなりません。

データの操作

データサイエンティストにとって最も重要なスキルの1つがデータの操作です。それは特定のテクニックというよりは、一般的な手順と言えます。そこで、少し株価を例にして、その感覚を伝えようと思います。

data = [
{'closing_price':102.06,
'data':datetime.datetime(2014, 8, 29, 0,0),
'symbol': 'AAPL'}, #...]

概念的に言うと、このデータは行の集まり(例えば表計算)としてみることが出来ます。

このデータに対する問題をいくつかといてみましょう。その過程で手順とパターン化し、抽象化したツールを作成して操作を容易にします。
例えば、AAPLの終値はいくらが最高であったかを調べます。まずは具体的な手順に分解します。

  1. AAPLの行だけに着目する
  2. 各業からclosinf_price(終値)の値を取り出す
  3. それらの値にmaxを適用する
  4. リスト内包を使って3つの手順を一度に実行します。
max_aapl_price = max(row["closing_price"])
for row in data
if row["symbol"] == "AAPL")

データ中の各銘柄ごとに最高の終値を見つける事ができれば、より一般的な手段として使えます。1つの方法として次の手順を考えましょう。

  1. 同じ銘柄(symbol)ごとにグループ化する
  2. 各グループに対して上の手順を行う
#銘柄ごとにグループ化
by_symbol = defaultdict(list)
for row in data:
by_symbol[row["symbol"]]append(row)
#各銘柄ごとに内包表記で最大値を求める
max_price_by_symbol = {symbol : max(row["closing_price"]
for row in grouped_rows)
for symbol, grouped_rows in by_symbol.iteritems()}

幾つかの処理パターンがでてきました。どちらの例でも辞書からclosing_priceの値を取り出しています。そこで関数を新たに2つ追加します。1つは辞書から列の値を取り出す関数を返すもの、もう一つは辞書のリストから同じレルの値を選び出すものです。

#辞書から列を取り出す関数を返す
def picker(field_name):
return lambda row: row[field_name]
#辞書のリストから、指定した列のリストに変換する
def plunck(field_name, row):
return map(picker(field_name), rows)

グループ化関数の返した行をグループ化すると共に、各グループのに似の変換を適用する関数を作ります。

de group_by(grouper, rows, value_transform=None):
#キーはグループ化関数の出力、値は行のリスト
grouped = defaultdict(list)
for row in rows:
grouped[grouper(row)].append(row)
if value_transform is None:
return grouped
else:
return {key :value_transform(rows)
for key, rows in grouped.iteritems()}

これを使えば以前のコードをさらに簡単にかけます。例えば、

max_price_by_symbol = group_by(picker("symbol"),
data,
lambda rows:max(pluck("closing_price", rows)))

続いて、もっと複雑な問題に取り組みましょう。例えば、1日で最も大きく値が動いたのはどれくらいであったか。また、小さい変化はどうであったかについて考えます。前日比は今日の終値/昨日の終値 -1で計算できます。これはつまり、今日の値と機能の値を関係づける方法が必要になることを意味しています。1つの方法として、銘柄ごとに値をグループ化して次の手順を適用します

  1. 値を日付順に並べる
  2. zipをつかって(昨日の終値、今日の終値)の組を作る
  3. その組みを使って新しい列「change(前日比)」を追加する

各グループごとに適用できる形の関数を作ります

def precent_price_change(yesterday, today):
return today["closing_price"] / yesterday["closing_price"] - 1
def day_over_day_changes(ground_rows):
ordered = sorted(ground_rows, key=picker("date"))
return [{"symbol":today["symbol"],
"date":today["date"]
"change":percent_price_change(yesterday, today)}
for yesterday, today in zip(ordered, ordered[1:])]

この関数をgroup_byのvalue_trainsform引数に指定します。

#キーが銘柄、値は前日比の辞書
change_by _symbol = group_by(pycker("symbol"), data, day_over_day_changes)
#すべての前日比の辞書を大きなリストに格納する
all_changes = [change
for changes in changes_by_symbol.values()
for change in changes]

ここまでできれば、最大値と最小値を求めるのは簡単です。

max(all_changes, key=picker("change"))
min(all_changes, key=picker("change"))

all_changeを使えば投資も有利に進めれます。まず月ごとにグループ化し、各グループの総合的な変化を計算します。

繰り返しになりますが、適切なvalue_transform関数を作りgroup_byに適用します

#前日比を比較するにはそれぞれ1をくわえたものを乗じてから1を減ずる
def combine_pct_changes(pct_change1, pct_change2):
return (1 + pct_change1) * (1 + pct_change2) - 1
def overall_change(changes):
return reduce(combine_pct_chages, pluck("change", changes))
overall_change_by_month = group_by(lambda row: row['date'].month,
all_changes,
overall_change)

このような操作をよくデータサイエンスでは行います。

スケールの変更

多くも手法がデータのスケールに影響を受け舞うs。例えば、さいーたサイエンティスト百人分の身長と体重のデータから、体格をクラスタ分類することにしましょう。直感的には知覚の値を持つ者同士を同じクラスタに分類することになります。つまりデータの近さを定義する必要があります。すでにユークリッド距離を計算する関数を作っているので、(身長体重)2つの値を2次元空間上で扱うのが自然です。しかし実際は身長をインチで表したり、cmで表したりします。

このように単位の変更で結果が代わってしまうのは明らかに紛らわしい問題です。このため、各次元のあたいが直接比較できない場合には、それぞれの次元の値が平均0かつ標準偏差1となるようにデータのスケールを変更します。各次元を「平均値からの標準偏差の割合」で表すことに変更して単位を効果的に取り除きます。このために、まず各列の平均と標準偏差を計算し、

def scale(data_matrix):
num_rows, num_cols = shape(data_matrix)
means = [mean(get_column(data_matrix, j))
for j in range(num_cols)]
stdevs = [standard_deviation(get_column(data_matrix, j))
for j in range(num_cols)]
return means, stdevs

この値を使って新しいデータを作ります

means, stdevs = scale(data_matrix)def rescale(i, j):
if stdevs[j] > 0:
return (data_matrix[i][j]-means[j]) / stdevs[j]
else:
return data_matrix[i][j]
num_row, num_cols = shape(data_matrix)
return make_matrix(num_rows, num_cols, rescaled)

いつものように、データをどのように扱うかは自分で判断しなければなりません。膨大な量の身長と体重のデータを持っていたとして、その中で身長が69~70インチの間のデータのみを中執して使うとしましょう。値の違いは誤差であり、1つの列の標準偏差を他の標準偏差と同等に扱いたくなという状況は少なくありません。

次元削減

多次元データの中ですべての次元が実行的な次元とは限りません。
そんなときは、主成分分析と呼ばれる手法によりデータの変化を表すために必要最小限の次元を抽出します。

各次元の平均が0となるように変化をします

def de_mean_matrix(A):#Aのすべての値と各列の平均との差を返す 結果の行列は各列の平均が0となる
nr, nc = shape(A)
column_means, _ scale = scale(A)
return make_matrix(nr, nc, lambda i, j :A[i][j] - column_means[j])

これを行わないとデータの変化ではなく平均自体を特定することになってしまします。変換後の行列Xがあれば、データの分散が最も大きくなる方法を求められます。具体的には(大きさ1のベクトルとして)方向dが与えられたとき、行列の各列xはdの方向にdot(x, y)の大きさを持ちます。そして、あらゆるゼロではないベクトルwは大きさを1にすることで方向として扱えます。

def direction(w):
mag = magnitude(w)
return [w_i / mag for w_i in w]

そのため、ゼロでないベクトルwが与えられたときwで示された方法に対するデータセットの分散が計算できます。

def directional_variance(x_i, w):
return dot(x_i, direction(w)) ** 2

wで示される方法に対するx_iの分散を求める

def directional_varience(X, w):
return sum(directional_variance_i(x_i, w)
for x_i in X)

そして、勾配関数がわかれば、勾配降下法を容易手分散を最大化する方向が第一成分になります。

def directional_gradient_i(s_i, w):
#x_i列の値がw方向に持つ分散に対する勾配
projection_length = dot(x_i, direction(w))
return [2 * projection_length * x_ij for x_ij in x x_i]
def directional_variance_gradient(X, w):
return vector_sum(dictional_varience_gradient_i(x_i, w)
for x_i in X)

directional_variance関数を最大化する方向が第一主成分となります

def first_principle_component(X):
guess = [1for _ in X[0]]
unscaled_maximizer = maxmize_batch(
partial(directional_variance, X),
partial(directional_gradient, X),
guess)
return direction(unscaled_maximizer)

確率的勾配降下法をつかうなら次のようになります。

def first_princepal_componet_stg(X):
guess = [1 for _ in X[0]]
unscaled_maximizer_stochastic(
lambda x, _, w:directional_variance_i(x, w),
lambda x, _, w:directional_gradient_i(x, w),
W,
[None for _ in X],
guess)
return direction(unscaled_maximizer)

平均が0となるよう変換を行ったデータセットを使うと、データが最も大きく変化する方向である第1主成分が得られます。

第1主成分が見つかれば、それぞれのデータをその宝庫に射影した値を求めることができます。

def project(v, w):
projection_length = dot(v, w)
return scalar_multiply(projection_length, w)

その他の主成分を求める場合、まず最初にデータから第1主成分への射影を取り除きます。

def remove_projection_from_vector(v, w):
return vector_subtract(v, project(v, w))
def remove_projection(X, w):
return [remove_projection_from_vector(x_i,w) for x_i in X]

ここで使用したのは2次元データなので、第1主成分を取り除くと結果的に残るのは1次元のデータとなります。

ここまでできれば、主成分の射影を取り除く処理を繰り返すことでそのあtの主成分が見つけられます。より高次元のデータセットに対しても、必要な主成分を繰り返し見つけることが出来ます。

def principal_componet_analysis(X, num_conpornents):
compornets = []
for _ in range(num_components):
component = first_principal_component(X)
components.append(component)
X = remove_projection(X, component)
return components

その後、各成分で構成される低次元数の空間にデータを変換できます。

def transform_vector(v, components):
return [dot(v, w) for w in components]
def transform(X, conponents):
return [transform_vector(x_i, components) for x_i in X]

行くるかの理由により、この手法は有用です。まず、データから不要な次元をとり、高い相関を持つ次元に集約して、データの整理ができます。

つぎに、高次元のデータにはうまく適用できな様々な手法がありますが、低次元表現に変換すれば適用が可能です。

一方、より良いモデルが作れるかもしれませんが、解釈が難しくなるのも事実です。「第3主成分が』0.1増加すると年収が10,000ドル高くなる」という結論よりも、「勤続年数1年にあたり、年収が10,000ドル高くなる」と言えたほうが理解は簡単です。

--

--