データの取り込みから配信まで

Ryan Lingo
99P Labs: 日本語バージョン
7 min readNov 2, 2023

作者:Ben Davis

過去数回のブログ投稿 (1, 2) で、多量のクエリ結果を素早く届けるにはどうしたらよいかを調べる為、パフォーマンスを測定しその実現可能性を評価したさまざまなデータパイプラインコンポーネントについて話しました。ここでは、それらをまとめて、いくつかの重要なコンポーネントとテクノロジーについて話したいと思います。

データ取り込み (Data Ingestion)

問題の前半は、生のソース データを取り込み、それをクエリ可能な構造化された何らかの形式に変換することです。そのプロセスは幾つかのステージで構成され、それぞれの段階に複数のworkers が入ります。このステージ間でデータを調整および移動させる際に、Apache Kafkaを使います。

データ取り込み(Data Ingestion)ボックスは、生の入力ソース データを行テーブルの構造に変換するために必要なプロセスを表し、ETL ボックスは、ストレージにレファレンスや派生データセットを変換して書き込むプロセスを表します。 すべてのプロセスは、Kafka トピックを発行およびサブスクライブできる Kafka クライアントです。 何故ここでカフカを使用するのでしょうか? これまでの経験で、複雑さが最大の問題の 1 つであることが分かりました。 データ取り込みプロセスを一度作り上げるのは簡単ですが、それを後で変更するのは難しくなります。 ここでKafka を使用することで、問題を疎結合のシンプルな作業単位に崩し、その後workersを追加するだけでスケールアップすることが簡単にできるようになります。 もう 1 つの重要な点は、ワークフローにある全てのparticipantsが従うスキーマです。 これには、Protobufs を使用し、Kafka 内すべてのペイロードは、シリアライズされた Protobuf メッセージとなります。 最終的には、データ イベントに応答する取り込みプロセスが成果物です。 予定されているわけではなく、新しいデータが作られ、数分または数秒で構造化されたレコードがクエリに利用可能となります。

データ配信

ここから、 どのようにクエリを実行して結果を生み出すかと言うチャレンジの後半に入ります。 上記の取り込みプロセスにより、約 4,000 万レコードを基にサンプル イベント シリーズ データセットを一つ作成しました。 ファイルは Hudi で管理され、日ごとに分けられ、Hive カタログに同期されます。 当初はクエリ エンジンとして Presto を使用していましたが、ストレージをまとめる必要があったため、最終的に Trino に切り替えました。 主なユースケースは、ある一定期間のイベントでできた大きなデータフレームを作成する Python クライアントです。 実行する為、とりあえずAPI と SDK を作りました。

API

API は従来の JSON/REST サービスと gRPC ストリーミング サービスの2 種類のサービスを提供します。 REST サービスは、ベアラー トークンでアクセスを制御するアプリケーションゲートウェイに隠れています。 ゲートウェイは現在 gRPC をサポートしていないため、トークンを検証する為のストリーミングインターセプターを搭載して同じ機能を果たしています。 クライアント側では、Interceptor がベアラー トークンをリクエスト ヘッダーに追加する役割を果たします。 API の観点では、トークンを基にリクエストの受理·拒否をトークンを基にだけです。 が、クライアントが認証し、適切にフォーマットされたリクエストを作成し、レスポンスからデータフレームを生成することは、重大なタスクとなります。 ここで SDK が登場します。

SDK

SDK は単一のパッケージであり、ID ブローカーおよび REST と gRPC 両方のエンドポイントと通信するように事前構成されています。 ここで、REST と gRPC のどちらのサービスを使用すればよいかという疑問が出てきます。 バックエンドでは、API がリクエスト (REST または gRPC) を受け取ると、クエリを Trino に送信します。 REST クライアントの場合、API サーバーは行を シングルJSON 応答に蓄積してから、クライアントに送り返します。 gRPC クライアントの場合、Protobuf でエンコードされた行が Trino から到着すると一まとめになりストリームされます。 ペースは、クライアントによって設定されます。 ここでは1000 レコードから 5M レコードまでの一連のテストを実行しました。

REST グラフは、緑色のクエリが比較的早く終了していることを示しています。 青い線は、クライアントの待機時間のほとんどが、API サーバーからクライアントへのデータ転送に費やされていることを示しています。 これを gRPC と比較してください。

gRPC グラフは、クライアントとクエリがほぼ同時に終了していることを示しています。 API はクライアントと Trino の間の導管として機能するため、クライアントがサポートできる速さで行が要求されます。

どちらが速いでしょうか? 少ないレコード数で REST と gRPC を比較すると、両者はほぼ同じで、約 500K レコードで分岐し始めます。 この相違が発生するポイントは、クライアントのコンピューティングとネットワーク帯域幅の容量で変わるので、クライアントごとに異なります。

結論

Kafka が提供する機能は、いくつかのハイレベルの目標に対応しています。

受信データは、収集後できるだけ早く配信したいので、 スケジュールに左右されるのではなく、イベントに反応するプロセスを設計する必要があります。 すべてのユースケースで必要になるわけではありませんが、フレームワークは必要に応じてその機能をサポートできます。

プラットフォームが変化に対応できることが望ましいのですが、実際はすべてが変わってしまいます。 フレームワークは、システムの残りの部分を壊すことなくコンポーネントを追加、変更、または交換できるように対応できる必要があります。

スケーリングもできる必要がありますが、テクノロジーだけでは解決できません。 より高速な CPU とより多くのメモリに依存するのをやめ、ワークロードを小さな独立した作業単位に分解し、より多くの同時実行によるスケーリングをする必要があります。

レコード数が数百万に達すると、最終的にユーザーのデータ待機時間の限界に達します。 gRPC はすでに 6 年前に作られまだ広く採用されているようには見えませんが、膨大な量のデータ処理の為に別のアプローチを取らざるを得なくなりました。 ストリーミングは待ち時間を短縮するだけでなく、レコードが到着したら一まとめで処理でき、 データセット全体のダウンロードが完了するまで待つ必要はありません。 シリアル化された Protobuf メッセージは人間が判読できませんが、非常にコンパクトです。 また、型指定されているため、異なる言語で記述されたプロセス間でデータを渡すときに発生する可能性のある曖昧さと型変換エラーを回避できます。つまり、 Kafka、Protobufs、および gRPC はパイプラインの基盤として適しているのです。

LinkedInMedium にご参加ください。 research@99plabs.com でご連絡いただく事も可能です。 ご不明な点や99P Labs について、お気軽にお問い合わせください。

--

--

Ryan Lingo
99P Labs: 日本語バージョン

🚀Dev Advocate @99P Labs | Unraveling future mobility & data science | Insights on #AI #LLMs #DataScience #FutureMobility 🤖💻🚗📊🌟