非同期処理の裏側を覗き見:Rustのasync/awaitの場合

Heruoji
Goalist Blog
Published in
23 min readJun 21, 2024

イントロダクション

非同期処理は、タスクの完了を待つことなく多くのタスクをプログラム上で実行することができる強力なツールです。例えば、Webサーバーのリクエスト処理やファイルの読み書きなど、I/Oバウンドタスクの効率的な処理に非常に有用です。

この記事では、Rust公式の「Asynchronous Programming in Rust」の第1章と第2章を参考にし、並行プログラミングモデルや非同期処理の実装例、非同期ランタイムの内部構造までまとめました。

並行プログラミングモデルの比較

並行プログラミングにはさまざまなモデルがあり、それぞれに特有の利点と欠点があります。Asynchronous Programming in Rustの第1章の内容に基づき、OSスレッド、イベント駆動、コルーチン、アクターモデルの各モデルについて例を交えながら説明します。

OSスレッド

OSスレッドは、オペレーティングシステムによって管理されるスレッドを使用する並行プログラミングモデルです。既存の同期コードのプログラミングモデルを大きく変更せず利用できる点で優れており、少数のタスクに対しては簡単に並行処理を表現できます。しかし、スレッド間の同期の難しさやパフォーマンスオーバーヘッドが大きいため、大量のタスクやIOバウンドのワークロードには向いていません。スレッドプールを使用することで一部のコストを軽減できますが、すべての問題を解決するわけではありません。

Javaの例:

public class ThreadExample {
public static void main(String[] args) {
Thread thread = new Thread(() -> {
for (int i = 1; i <= 10; i++) {
System.out.println("Hello from the spawned thread: " + i);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});

thread.start();

for (int i = 1; i <= 5; i++) {
System.out.println("Hello from the main thread: " + i);
try {
Thread.sleep(200);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}

try {
thread.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

イベント駆動プログラミング

イベント駆動プログラミングは、イベントが発生したときにそれに対応する処理を実行するモデルです。コールバックやイベントハンドラーを使用して、非同期イベントに応答します。

JavaScriptの例:

const fs = require('fs');

fs.readFile('example.txt', 'utf8', (err, data) => {
if (err) {
console.error(err);
return;
}
console.log(data);
});

console.log('This will run before the file is read');

高いパフォーマンスを実現できる一方でコードが非線形になりがちであり、データフローやエラーの伝搬が追跡しにくいです。

コルーチン

コルーチンは、一時停止と再開が可能な関数のようなものです。自然な制御フローを保ちながら並行処理を行うことができます。

Pythonの例:

import asyncio

async def async_function():
print('Start')
await asyncio.sleep(2)
print('End')

asyncio.run(async_function())

スレッドと同じくプログラミングモデルを変える必要がなく使いやすいですが、低レベルの詳細は抽象化されています。

アクターモデル

アクターモデルは、すべての並行計算をアクターと呼ばれる独立した単位に分割するモデルです。アクターは、メッセージを通じて通信します。

Akkaを使ったJavaの例:

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;

public class AkkaExample {

// Pingアクター
static class PingActor extends AbstractActor {
private final ActorRef pongActor;

public PingActor(ActorRef pongActor) {
this.pongActor = pongActor;
}

@Override
public Receive createReceive() {
return receiveBuilder()
.matchEquals("ping", msg -> {
System.out.println("Ping received");
pongActor.tell("pong", getSelf());
})
.build();
}
}

// Pongアクター
static class PongActor extends AbstractActor {
@Override
public Receive createReceive() {
return receiveBuilder()
.matchEquals("pong", msg -> {
System.out.println("Pong received");
getSender().tell("ping", getSelf());
})
.build();
}
}

public static void main(String[] args) {
final ActorSystem system = ActorSystem.create("pingpong-system");
final ActorRef pongActor = system.actorOf(Props.create(PongActor.class), "pongActor");
final ActorRef pingActor = system.actorOf(Props.create(PingActor.class, pongActor), "pingActor");

// 初めのメッセージ送信
pingActor.tell("ping", ActorRef.noSender());

// システムを終了させる前に少し待機
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
system.terminate();
}
}
}

アクター間の通信はメッセージを通じて行われるためスレッドセーフな並行処理が実現できますが、フロー制御やリトライロジックなどの実用的な問題を解決する必要があります。

OSスレッドと非同期(Async)の比較

OSスレッドは少数のタスクであれば既存の同期コードをほぼそのまま使える利便性がありますが、CPUとメモリのオーバーヘッドが大きく、大規模なIOバウンドタスクには向きません。

非同期(Async)は軽量で効率的に大量のIOバウンドタスクを処理できますが、非同期用のランタイムがバンドルされるため、バイナリサイズが大きくなる傾向があります。

それぞれ向き不向きがあるため、シナリオに応じて適切に選択することが大切です。

Rustでのasync/.await

非同期処理の例として、Rustの async/.awaitを説明します。以下のポイントに基づいて動作します:

async

  • asyncキーワードは、コードブロックをFutureというトレイトを実装した状態機械に変換します。
  • async fnは非同期関数を定義し、戻り値はFutureです。

await

  • async fn内で、.awaitを使用して他のFutureの完了を待機します。
  • .awaitはスレッドをブロックせずに非同期に待機し、他のタスクが実行されるのを許可します。

Future

Futureは、非同期計算を表現するためのトレイトであり、最終的に値を生成します(値が空の場合もあります)。簡易版のFutureトレイトは以下のようになります。

trait SimpleFuture {
type Output;
fn poll(&mut self, wake: fn()) -> Poll<Self::Output>;
}

enum Poll<T> {
Ready(T),
Pending,
}

Futurepoll関数を呼び出すことで進行します。pollが呼ばれると、Futureは完了に向けて可能な限り進行し、完了するとPoll::Ready(result)を返します。まだ完了していない場合はPoll::Pendingを返し、Futureが進行可能になったときにwake()関数が呼ばれるように設定します。wake()が呼ばれると、Futureを駆動するExecutorが再度pollを呼び出し、Futureの処理が再度進行します。

Executor

Executorは非同期タスクをスケジュールし、管理します。FutureExecutorにより実行されます。

ブロッキング関数の場合、呼び出すとスレッド全体がブロックされます。一方で、 asyncにより生成されたFutureはブロックされるとスレッドの制御を他のFutureに譲渡します。これにより、スレッドが効率的に利用され、複数のタスクを同時に実行することが可能になります。

以下がコード例です。

use futures::executor::block_on;
use futures::join;
use std::time::Duration;
use futures_timer::Delay;

async fn learn_song() -> String {
println!("Learning the song...");
Delay::new(Duration::from_secs(2)).await;
println!("Learned the song!");
"La La La".to_string()
}

async fn sing_song(song: String) {
println!("Singing the song: {}", song);
Delay::new(Duration::from_secs(2)).await;
println!("Finished singing the song!");
}

async fn learn_and_sing() {
let song = learn_song().await;
sing_song(song).await;
}

async fn dance() {
for _ in 0..3 {
println!("Dancing...");
Delay::new(Duration::from_secs(1)).await;
}
println!("Finished dancing!");
}

async fn async_main() {
let f1 = learn_and_sing();
let f2 = dance();

join!(f1, f2);
}

fn main() {
block_on(async_main());
}

async fn learn_and_sing()は、歌を学んでから歌う非同期関数です。learn_song().awaitで歌を学び、その後sing_song(song).awaitで歌を歌います。

async fn async_main()は、learn_and_singdanceを並行して実行する非同期関数です。futures::join!を使用して、両方のタスクが完了するまで待ちます。

fn main()は、block_on(async_main())を使用して非同期関数を実行します。block_onは、非同期タスクが完了するまで現在のスレッドをブロックします。

このコードを実行すると、以下のような出力が得られます。

Learning the song...
Dancing...
Dancing...
Learned the song!
Singing the song: La La La
Dancing...
Finished dancing!
Finished singing the song!

この結果から、learn_and_singdanceが並行して実行され、異なるタイミングで完了することが確認できます。learn_and_sing 内では、learn_song().awaitにより learn_songが完了するまでsing_songは待機しますが、その間danceは並行して実行されます。これにより、スレッドが効率的に利用され、パフォーマンスが向上します。

非同期処理の裏側

Rust言語において、 asyncawait の裏側でどのような処理が動いているのか確認するために、カスタム非同期ランタイムを実装してみます。Asynchronous Programming in Rustの第2章のカスタムタイマーの例を見ていきます。この例では、ExecutorSpawnerという2つの主要なコンポーネントを持つ非同期タスク実行環境を構築し、TimerFutureを使用してタイマーの非同期タスクを実行します。 lib.rsmain.rs の二つのファイルに実装します。

lib.rs ファイルでは、TimerFutureというカスタム非同期タイマーを実装しています。このタイマーは、指定された期間が経過するまで非同期に待機します。

use std::{
future::Future,
pin::Pin,
sync::{Arc, Mutex},
task::{Context, Poll, Waker},
thread,
time::Duration,
};

pub struct TimerFuture {
shared_state: Arc<Mutex<SharedState>>,
}

struct SharedState {
completed: bool,
waker: Option<Waker>,
}

impl Future for TimerFuture {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut shared_state = self.shared_state.lock().unwrap();
if shared_state.completed {
Poll::Ready(())
} else {
shared_state.waker = Some(cx.waker().clone());
Poll::Pending
}
}
}

impl TimerFuture {
pub fn new(duration: Duration) -> Self {
let shared_state = Arc::new(Mutex::new(SharedState {
completed: false,
waker: None,
}));

let thread_shared_state = shared_state.clone();
thread::spawn(move || {
thread::sleep(duration);
let mut shared_state = thread_shared_state.lock().unwrap();
shared_state.completed = true;
if let Some(waker) = shared_state.waker.take() {
waker.wake()
}
});

TimerFuture { shared_state }
}
}

TimerFutureは、 Futureトレイトの実装クラスです。Rustの asyncキーワードは、コードブロックを Futureトレイトを実装した状態機械に変換します。

TimerFuture構造体

TimerFutureは、 Futureトレイトの実装クラスです。SharedStateの参照を持ちます。

SharedState構造体

SharedState構造体は、タイマーの時間が経過したかどうかの状態と、タスクウェイクアップ用のWakerを保持します。

  • completed: タイマーが完了したかどうかを示すブール値。
  • waker: タスクをウェイクアップするためのWaker

Waker

Waker はタスクの再スケジュールを行うために使用されます。TimerFutureでは、指定された期間が経過した後にタスクを再開するためにWakerを使用します。

Futureトレイトの実装

Futureトレイトを実装することで、TimerFutureは非同期タスクとして扱えるようになります。

  • pollメソッドはエグゼキューターによって実行されるメソッドで、タイマーが完了しているかをチェックします。完了していればPoll::Readyを返し、未完了であればPoll::Pendingを返します。
  • 未完了の場合、後ほどタイマーが完了したときにタスクを再スケジュールするためのWakershared_state.wakerに保存します。

TimerFuture::newメソッド

newメソッドは、新しいTimerFutureを作成します。

  • shared_stateを初期化し、タイマーの完了状態をfalseに設定します。
  • 新しいスレッドを生成し、指定された期間が経過した後にshared_state.completedtrueに設定し、保存されたwakerがあればウェイクアップすることでタスクを再スケジュールします。

main.rs ファイルでは、非同期タスクを管理・実行するカスタム非同期ランタイムを構築します。これには、タスクの生成と実行を管理するエグゼキュータとスポーナーが含まれています。

use futures::{
future::{BoxFuture, FutureExt},
task::{waker_ref, ArcWake},
};
use std::{
future::Future,
sync::mpsc::{sync_channel, Receiver, SyncSender},
sync::{Arc, Mutex},
task::Context,
time::Duration,
};

use timer::TimerFuture;

struct Executor {
ready_queue: Receiver<Arc<Task>>,
}

impl Executor {
fn run(&self) {
while let Ok(task) = self.ready_queue.recv() {
let mut future_slot = task.future.lock().unwrap();
if let Some(mut future) = future_slot.take() {
let waker = waker_ref(&task);
let context = &mut Context::from_waker(&waker);
if future.as_mut().poll(context).is_pending() {
*future_slot = Some(future);
}
}
};
}
}

#[derive(Clone)]
struct Spawner {
task_sender: SyncSender<Arc<Task>>,
}

impl Spawner {
fn spawn(&self, future: impl Future<Output=()> + 'static + Send) {
let future = future.boxed();
let task = Arc::new(Task {
future: Mutex::new(Some(future)),
task_sender: self.task_sender.clone(),
});
self.task_sender.send(task).expect("too many tasks queued");
}
}

struct Task {
future: Mutex<Option<BoxFuture<'static, ()>>>,
task_sender: SyncSender<Arc<Task>>,
}

impl ArcWake for Task {
fn wake_by_ref(arc_self: &Arc<Self>) {
let cloned = arc_self.clone();
arc_self
.task_sender
.send(cloned)
.expect("too many tasks queued");
}
}

fn new_executor_and_spawner() -> (Executor, Spawner) {
const MAX_QUEUED_TASKS: usize = 10_000;
let (task_sender, ready_queue) = sync_channel(MAX_QUEUED_TASKS);
(Executor { ready_queue }, Spawner { task_sender })
}

fn main() {
let (executor, spawner) = new_executor_and_spawner();

spawner.spawn(async {
println!("howdy!");
TimerFuture::new(Duration::new(2, 0)).await;
println!("done!")
});

drop(spawner);

executor.run();
}

Executor

Executorは、タスクを実行する責任を持つ構造体です。タスクはReceiver<Arc<Task>>というキューに保存され、順次実行されます。

runメソッドは、キューからタスクを取り出し、タスクのFutureのpollメソッドを実行します。タスクが完了していない場合(Poll::Pendingが返ってきた場合)、タスクを再度キューに戻します。

Spawner

Spawnerは、新しい非同期タスクを生成してエグゼキュータに送信します。

Task

Taskは、個々の非同期タスクを表す構造体です。ArcWakeトレイトを実装しており、タスクがウェイクアップされる際に、wake_by_refによってエグゼキュータにタスクを再度送信します。

main メソッドを実行すると、 howdy! が表示され、2秒後に done! が表示されます。

全体処理の流れ

全体の処理の流れをまとめると、以下のようになります。

  1. ExecutorSpawnerの作成
    new_executor_and_spawner関数で、非同期タスクを管理・実行するためのエグゼキュータとタスクを生成するスポーナーを作成します。
  2. 非同期タスクの生成と送信
    Spawnerが非同期タスクを生成し、エグゼキュータに送信します。このタスクは、TimerFutureを使用して2秒間待機する非同期タスクです。
  3. エグゼキュータの実行
    エグゼキュータはキューからタスクを取り出し、そのタスクのFutureをポーリングします。
  4. TimerFutureのポーリング
    TimerFuture::pollメソッドでタスクが完了していない場合(shared_state.completed = false)、Poll::Pendingを返します。このとき、現在のタスクのWakershared_stateにセットします。
  5. カウント用スレッドの実行
    TimerFuture::newのタイミングで別スレッドでカウントが開始され、指定された時間が経過すると、shared_state.completedtrueに設定し、保存されたWakerを呼び出してタスクをウェイクアップします。
  6. Waker の呼び出し
    Wakerwakeメソッドにより、Task構造体のwake_by_refメソッドが呼び出され、タスクが再度エグゼキュータに送信されます。
  7. エグゼキュータの再ポーリング
    エグゼキュータがキューからタスクを再度取り出し、TimerFuture::pollを実行します。今度はshared_state.completed = trueとなっているため、Poll::Readyを返します。
  8. タスクの完了
    エグゼキュータはタスクが完了したと判定し、続きの処理を実行します。

このように、async/.awaitのコルーチン的な書き方の背後では、Futureの状態機械とExecutorのカスタムランタイムによって、イベント駆動的に非同期処理が動いていることが分かります。

まとめ

非同期プログラミングは、現代のソフトウェア開発において不可欠な技術です。非同期処理を利用することで、効率的なリソース管理と高いパフォーマンスを実現できます。

非同期処理の基礎

非同期処理は、タスクの完了を待たずに他のタスクを実行できるため、I/Oバウンドな作業や大規模な並行処理において特に有用です。

並行プログラミングモデルの比較

  • OSスレッド: シンプルで使いやすいが、リソースのオーバーヘッドが大きく、スケーラビリティに課題がある。
  • イベント駆動: 高性能だが、コードが非線形になりがち。
  • コルーチン: 自然な制御フローを保ちながら並行処理を実現可能。
  • アクターモデル: スレッドセーフな並行処理が可能だが、制御フローが複雑。

Rustでのasync/.await

Rustのasync/.awaitは、非同期プログラミングをシンプルにし、高性能な実装を可能にします。FutureとExecutorの概念を理解することで、Rustの非同期処理の強力さを活用できます。

非同期処理の裏側

自作の非同期ランタイムを実装することで、非同期処理のメカニズムの理解が深まりました。Rustのasync/.awaitにおいては、Future、Waker、Executorの役割を具体的に理解することが重要です。Futureとカスタムランタイムが協力して非同期処理を実現しています。また、async/.awaitのようなコルーチンの裏側では、実際にはイベント駆動の仕組みが使われています。これは、イベントループが非同期タスクを管理し、タスクが完了したときに再開するためのメカニズムを提供します。

非同期プログラミングは一見難解に思えるかもしれませんが(実際に難解だぜ)、基本的な概念とモデルを理解することで、効果的に活用できるようになると思います。今後も、非同期プログラミングの技術を深め、より効率的なソフトウェア開発を目指していきます。

--

--