Elasticsearch on Dockerで仮想通貨の情報を取得する

Yuya Sugano
29 min readNov 8, 2019

仮想通貨の非裁量取引を自動化し、機械学習を用いてモデル学習することを行ってきましたが、 高頻度取引(High Frequency Trading)においてはこれまで使用してきた分足ではなく秒足を使用することが多いようです(いまさらですね)。通常の仮想通貨取引所のAPIでは分足までしかサポートしていないため、ここではdocker-composeでElasticsearchのノードを構築し、btc/jpyの秒足データを保管することを目標とします。

close/volume/timestamp in a second

高頻度取引(High Frequency Trading)として実績のあるロジックが先日公開され、 bitbank.cc APIを利用したpythonのコードへ書き換える予定でしたが、秒足から計算していると思われる指標が出てきたため、コード移植ができませんでした。例えば秒足から計算していると思われるself.executions.get1SecSma()self.executions.get1SecSmaDisparitySmaOfPeriod()というメソッドが登場します。[1]

bitbank APIもですが、通常の仮想通貨取引所のAPIでは分足までの提供で、秒足は基本的にAPIから取得できません(の認識です)。秒足をベースとしたテクニカル指標を計算するには秒足の情報を保存し、取得できるようにしておく必要があります。そこでティッカー情報を毎秒APIでコールしてOHLCV情報をElasticsearchへ保存することを考えました。Elasticsearchクラスタのノードは1つで分散構成は取らないこととします。またKibanaを導入しデータを簡単に可視化できるようにします。

このようなことを行いたいです。

  • ElasticsearchとKibanaをDocker環境で構築する
  • データはコンテナ側へボリュームマウントする
  • pythonクライアントを使用して秒足の情報を保存する

Elasticsearchとは

Elasticsearchについて、は今更ですが個人的な理解のために記載します。ElasticsearchはJavaの検索ライブラリであるLuceneをベースとした、索引型の全文検索エンジンでREST APIをサポートする検索サーバです。トランザクションをサポートするデータベースとは異なり、SQLでなく転置インデックス、スコアによる全文検索をサポートします。特に検索処理のユースケースに応じたクエリ性能と大規模用途向けのスケーラビリティを相対的に重視しているようです。機能面の特徴としては以下の4点が中心となります。

  • 分散配置による高速化高可用性の実現
  • シンプルな REST API によるアクセス
  • JSONフォーマットに対応した柔軟性の高いドキュメント指向
  • ログ収集可視化などの多彩な関連ソフトウェアとの連携

全文検索エンジンとして文書のナレッジベースとしてだけでなく、横断的なデバイスのログ、アプリケーション分析、IoTのセンサ情報の保管と分析など様々な用途に利用されるようになっています。現時点での最新バージョンは 7.4.2 のようですので本稿ではこのバージョンを採用します。Elasticsearchは様々なプラットフォームで動作します。クラウドやオンプレなどインストールできる環境も多様です。いずれの方法も導入は非常に簡単ですが、Elastic CloudやAWS Elasticsearch serviceはコストが安くないため、ここではdocker-composeを使用して、Elasticsearch 7.4.2Kibana 7.4.2をローカルで立ち上げます。

  1. オンプレ(バイナリインストール、Dockerなどの仮想化)
  2. クラウド(Elastic Cloud、AWS Elasticsearch Serviceなど)

Elastic CloudではAWSやAzureなどクラウド環境へ数分でクラスタをデプロイできます。[2]

Elastic Cloud on AWS and Elastic Stack

上記のクラスタおよびElastic Stackサービス使用時におけるElastic Cloudの予測コストです。Elastic Cloudの無料トライアルは14日間まで課金なしで使用できます。通常であれば月 $150 程度のようですね。

Elastic Cloud Billing information

Elastic Stackとして様々な関連ソフトウェアが提供されていますが、ここでは可視化ツールであるKibanaを一緒に導入することにしました。[3]

Elastic Stack (Kibana, Logstash, Beats, X-Pack)

ElasticsearchとKibanaをDockerで起動する

ElasticsearchとKibanaのDockerコンテナはそれぞれ以下を使用します。Elasticsearchのインストールガイドです。[4]

Elasticsearch 7.4.2
$ docker pull docker.elastic.co/elasticsearch/elasticsearch:7.4.2

KibanaのDockerでのセットアップガイドです。[5]

Kibana 7.4.2
$ docker pull docker.elastic.co/kibana/kibana:7.4.2

docker-composeで動かす上で環境まわりの設定が分かりにくかったのでまとめました。作業途中で色々と怒られています。作成したdocker-compose.ymlは少し下にgistで貼ってあります。Configuration Fileのパーミッションについて言及していますが、Configuration Fileは今回作成したdocker-compose.ymlではコンテナ側へマウントしていません。

  • Configuration Fileのパーミッション

Elasticsearchの設定ファイルをホスト側からマウントできますが、Elasticsearchのコンテナ内では elasticsearch ユーザからファイルを読み取る必要があります。コンテナ内のマウント先は /usr/share/elasticsearch/config/elasticsearch.yml です。デフォルトではuid:gid 1000:1000 なのでバインドするフォルダにgid 10000 のグループアクセス権を付与することが公式サイトではお勧めされています。

By default, Elasticsearch runs inside the container as user elasticsearch using uid:gid 1000:1000.

Grant Group access to gid 1000 or 0
  • nofileおよびnoprocに対するulimitsの設定

docker runを使用する場合は以下のオプションを渡せます。

--ulimit nofile=65535:65535

docker-compose.ymlへ記載する場合はこうなります。

ulimits:
nofile:
soft: 65536
hard: 65536
  • スワッピングの無効化

パフォーマンスへの影響およびノードの安定性のためにスワッピングを無効化します。docker runの場合は以下です。

-e "bootstrap.memory_lock=true" --ulimit memlock=-1:-1

docker-compose.ymlへ記載する場合はこちらです。

environment:
- bootstrap.memory_lock=true
ulimits:
memlock:
soft: -1
hard: -1
  • ヒープサイズの設定

メモリのアロケーションでしょうか。16GBを指定する場合はdocker runに以下の環境変数を渡します。

-e ES_JAVA_OPTS="-Xms16g -Xmx16g"

今回はdocker-compose.ymlに256mを指定しています。

environment:
- "ES_JAVA_OPTS=-Xms256m -Xmx256m"
  • vm.max_map_count サイズの設定

Dockerを動かすホスト側での設定です。デフォルトが65530となっているので sysctl コマンドで65530に設定しておきます。設定していない状態では、 vm.max_map_count の値が低すぎるというエラーが出力されていました。

$ docker logs elasticsearch
[1]: max virtual memory areas vm.max_map_count [65530] is too low, increase to at least [262144]
sysctl -w vm.max_map_count=262144

docker-compose.ymlを以下のように作成しました。 discovery.type=single-node で単体のノードを起動します。公式サイトのdocker-compose.ymlには複数ノードでクラスタを立ち上げる例が記載されています。今回のdocker-compose.ymlのリポジトリはこちらです。

https://github.com/yuyasugano/elasticsearch-docker

docker-compose.yml

docker-compose builddocker-compose up -d を行ってElasticsearch 7.4.2Kibana 7.4.2が立ち上がることを確認しました。 docker ps -a で各コンテナが稼働していることを確認できます。

$ docker-compose build
Building elasticsearch
Step 1/3 : FROM docker.elastic.co/elasticsearch/elasticsearch:7.4.2
7.4.2: Pulling from elasticsearch/elasticsearch
Status: Downloaded newer image for docker.elastic.co/elasticsearch/elasticsearch:7.4.2
---> b1179d41a7b4
Step 2/2 : RUN elasticsearch-plugin install analysis-kuromoji
---> Running in 69a5e3d500c7
-> Downloading analysis-kuromoji from elastic
[=================================================] 100%??
-> Installed analysis-kuromoji
Successfully built 77adce2cf740
Successfully tagged elasticsearch-crypto_elasticsearch:latest
Building kibana
Step 1/1 : FROM kibana:7.4.2
7.4.2: Pulling from library/kibana
Status: Downloaded newer image for kibana:7.4.2
---> 230d3ded1abc
Successfully built 230d3ded1abc
Successfully tagged elasticsearch-crypto_kibana:latest
$ docker-compose up -d
$ docker ps -a

クラスタやノード(ここではシングル)のステータスが正常であることを確認してください。Kibanaはhttp://localhost:5601のようにURIに5601のポートでアクセスするとUIから確認できます。Kibanaの立ち上がりには少し時間がかかりました(マシンやインスタンスの性能にもよると思います)。

$ curl -XGET "localhost:9200/_cat/health?v&pretty"
cluster status node.total node.data shards active_shards
docker-cluster green 1 1 0 100.0%
$ curl -XGET "localhost:9200/_cat/nodes?v&pretty"
ip heap.percent ram.percent cpu load_1m load_5m load_15m
172.20.0.3 47 92 2 0.06 0.14 0.38

公式サイトに沿ってインデックスの作成やドキュメントの投入ができることを確認しました。また accounts.jsonサンプルをバッチ投入してみました。[6]

customerインデックスの作成、とインデックスへのドキュメントの投入の例です。ステータスはレプリカシャード作成に失敗しているため yellow となっていました。

$ curl -XPUT "localhost:9200/customer?pretty&pretty"$ curl -XPUT "localhost:9200/customer/_doc/1?pretty&pretty" -H 'Content-Type: application/json' -d'
> {
> "name": "John Doe"
> }
> '
# indexのステータス確認 index status check
$ curl -XGET "localhost:9200/_cat/indices?v"
# indexの設定確認 index configuration check
$ curl -XGET "localhost:9200/customer?pretty&pretty"

バッチ投入は --data-binaryオプションに jsonファイルを指定するだけで非常に簡単です。@を付けてファイル名を指定します。 accounts.json の中には1000件の銀行口座情報が入っており、インデックスのステータスを確認すると docs.count が1000になっていることが確認できます。

$ curl -XPUT "localhost:9200/bank?pretty&pretty"
$ curl -H "Content-Type: application/json" -XPOST "localhost:9200/bank/_bulk?pretty&refresh" --data-binary "@accounts.json"
$ curl -XGET "localhost:9200/_cat/indices?v"

また基本的なElasticsearch Queryの書き方は以下を参考にしました。 mustshouldrelevance scoreの算出があることに対し、 filtermust_not は純粋にフィルタリングのような動作をします。[7]

以降はREST APIではなく elasticsearch-py やKibanaのDev Toolsでインデックスやドキュメントを操作していきます。

Elasticsearch pythonクライアントの導入

Elasticsearch 7.4.2Kibana 7.4.2の環境をdocker-composeで立ち上げることを行って、Elasticsearchの基本的な動作を確認できました。本稿は仮想通貨取引所のAPIから秒足のデータを取得してElasticsearchへドキュメントとして格納することが目的でした。これまでbitbank.cc APIではpythonクライアントを使用してきたので、Elasticsearchも elasticsearch-py というpythonクライアントを導入してみることにします。pipenv環境を構築しているので、 pipenv install でインストールします。

$ mkdir elasticsearch-py
$ pipenv --python 3.7.3
$ pipenv install --dev requests numpy pandas
$ pipenv install --dev elasticsearch

基本的なクライアントの使い方はgithubに掲載されています。Example useを参考にインデックスを作成するコードを書いてみます。[8]

- index.py#!/usr/bin/python
from datetime import datetime
from elasticsearch import Elasticsearch
# by default we connect to localhost:9200
es = Elasticsearch()
# create an index in elasticsearch, ignore status code 400 (index already exists)
es.indices.create(index='btcjpy', ignore=400)

上記のコードを実行します。 btcjpy というインデックスを作成できました。実際に秒足のデータを格納するコードを書く前にElasticsearchのマッピングを決めておきます。スキーマオンリードで簡単に始められるElasticsearchですが、 index を使用しない要素や日本語での全文検索が必要なケースなど、マッピングを手動設定した方がよいケースがあるとのことです。[9]

Elasticsearch 7.0.0以降はタイプレスマッピングです。可能であればOHLCVデータを保管したかったのですが、ティッカー情報からすぐに取り出せるものは last = close の終値と voltimestamp です。とりあえずこの3つのフィールドを対象としてマッピングを定義します。幸い今回移植しようとしているコードでは指標としてSMA(単純移動平均値)しか出てこないため秒足の終値さえ分かれば算出はできそうです。pipenvプロジェクト内でbitbank.cc APIのライブラリをインストールします。

$ pipenv install --dev git+https://github.com/bitbankinc/python-bitbankcc.git#egg=python_bitbankcc

ティッカー情報を確認してみました。続けてKibanaのDev Toolsよりマッピング情報を設定します。 put btcjpy/_mapping で設定できます。

>> import json
>> import python_bitbankcc
>> pub = python_bitbankcc.public()
>> print(json.dumps(pub.get_ticker('btc_jpy'), indent=4, separators=(',', ': ')))
{
"sell": "1015922",
"buy": "1015921",
"high": "1028000",
"low": "1009480",
"last": "1015921",
"vol": "541.9970",
"timestamp": 1573080789838
}
Mapping Configuration in Dev Tools

btcjpy インデックスの確認をします。正常にマッピングが作成されていることが確認できました。 timestamp には date 型でUNIX時間である epoch_millis のフォーマットを指定しています。

$ curl -XGET "localhost:9200/btcjpy?pretty&pretty"
Mappding Configuration in btcjpy

Elasticsearch pythonクライアントのテストと仮想通貨のデータを格納するためのElasticsearchのインデックス・マッピングの準備が完了しました。

仮想通貨の秒足データを保存する

前述のとおり秒足のOHLCVデータでなく、終値、ボリュームとepoch時間をdocker-composeで立ち上げたElasticsearchの基盤へ保存していく戦略を取ります。bitbank.cc APIで毎秒のティッカー情報を取得し、Elasticsearch pythonクライアントでドキュメントを挿入するpythonスクリプトを記述します。以下のElasticsearch pythonクライアントの公式ドキュメントを参考にしています。[10]

まずは試しに id=1 として1件だけ、秒足データを保存してみます。APIで取得したデータをJSONの形にして es.index で保存するだけです。データの取得は es.get で行えます、 es = Elasticsearch() としてElasticsearchのインスタンスを作成しています。

- document.py#!/usr/bin/python
import json
import python_bitbankcc
from datetime import datetime
from elasticsearch import Elasticsearch
# by default we connect to localhost:9200
es = Elasticsearch()
pub = python_bitbankcc.public()
json = pub.get_ticker('btc_jpy')
doc = {
'close': int(json['last']),
'volume': float(json['vol']),
'timestamp': int(json['timestamp'])
}
res = es.index(index="btcjpy", id=1, body=doc)
print(res['result'])
>> createdres = es.get(index="btcjpy", id=1)
print(res['_source'])
>> {'close': 1015000, 'volume': 554.6966, 'timestamp': 1573085069502}

Dev Toolsからの投入したドキュメントの確認方法です。 GET /btc/jpy/_doc/1id=1 のドキュメントを参照できます。

Second Close data in Elasticsearch

秒足のclose/volume/timestampデータをElasticsearchへ保管したのですが、毎秒データを保存し続けるとストレージも枯渇してくる上に、今必要なSMAの計算では直近のデータだけあれば良いので、ドキュメントを上書いて60個の秒足データだけを btcjpyインデックスへ更新し続けることとしました。つまり1分の秒足データでデータポイントは60個保存されます。60秒経過後の次の秒足データは再度 id=1 として上書きされます。検索する場合には timestampを降順で取得することで直近数秒の秒足データが分かるようになります。

Elasticsearch bitbank.cc API

上記のコードを走らせると、以下のように毎秒bitbank.cc APIから取得したティッカー情報のJSONが表示され、Elasticsearchの btcjpy インデックスへ保管されていきます。

--------------------------------------------------------------------
Elasticsearch collected a second data: created
--------------------------------------------------------------------
{
"close": 1007154,
"volume": 627.2253,
"timestamp": 1573111651233
}
--------------------------------------------------------------------
Elasticsearch collected a second data: created
--------------------------------------------------------------------
{
"close": 1006701,
"volume": 627.6453,
"timestamp": 1573111652808
}
--------------------------------------------------------------------
Elasticsearch collected a second data: created
--------------------------------------------------------------------
{
"close": 1006701,
"volume": 627.6453,
"timestamp": 1573111655369
}

インデックスへ保存されるドキュメント数は以下の docs.count の通り60件で、60件を超えたところからは既存のドキュメントへ上書きされていっています。

$ curl -XGET "localhost:9200/_cat/indices?v"
health status index pri rep docs.count docs.deleted
yellow open btcjpy 1 1 60 0
close/volume/timestamp in a second

秒足において短期・中期・長期のSMA(単純移動平均値)に対するデータポイント数を考える必要があるか分からないのですが、例えば短期移動平均を求める場合に仮に n=5 として直近5秒のデータポイントを取得するには、REST APIで以下のクエリーを投げます。

$ curl -X GET "localhost:9200/btcjpy/_search?pretty" -H 'Content-Type: application/json' -d'
{
"query": { "match_all": {} },
"_source": ["close", "volume"],
"from": 0,
"size": 5,
"sort": { "timestamp": { "order": "desc" } }
}
'

この5秒分のデータポイントから終値の平均を計算することで n=5 の単純移動平均値が算出できます。以下のコードをテストしました。 size=5 でデータポイント数を指定しています。

Elasticsearch bitbank.cc API, calculate SMA

実行すると以下のように timestamp の降順で指定したデータポイント数のデータが表示され、そのデータポイントから計算したSMA(単純移動平均値)が表示されます。 elasticsearch-py を試したコードは以下リポジトリへアップロードしてあります。OHLCVデータではないですが、秒足の仮想通貨のデータを使用して仮想通貨の自動取引のコードを書く準備が整いました。以上です。

====================================================================
Total 61 found in 1141ms
--------------------------------------------------------------------
/btcjpy/_doc/40 close: 991648, volume: 670.2622
/btcjpy/_doc/39 close: 991648, volume: 670.2622
/btcjpy/_doc/38 close: 991227, volume: 670.1522
/btcjpy/_doc/37 close: 991227, volume: 670.1868
/btcjpy/_doc/36 close: 991227, volume: 669.7432
====================================================================
5 seconds Simple Moving Avarage: 991395.40

https://github.com/yuyasugano/elasticsearch-py

--

--

Yuya Sugano

Cloud Architect and Blockchain Enthusiast, techflare.blog, Vinyl DJ, Backpacker. ブロックチェーン・クラウド(AWS/Azure)関連の記事をパブリッシュ。バックパッカーとしてユーラシア大陸を陸路横断するなど旅が趣味。