RustのカスタムFutureによるポーリングの理解
Takashi Yamamoto
Infrastructure Engineer · Leapcell

はじめに
非同期プログラミングは、特にネットワーキング、I/Oバウンドタスク、高同時実行システムといった分野で、高性能で応答性の高いアプリケーションを構築するための不可欠なパラダイムとなっています。Rustは、その強力な型システムと所有権モデルにより、Futureトレイトに基づいた、強力で安全な非同期プログラミングのアプローチを提供します。async/await構文を通じてFutureを操作することが多いですが、これらの抽象化が内部でどのように機能するかを真に理解することは、デバッグ、最適化、さらにはカスタム非同期コンポーネントの設計においても重要です。カスタムFutureを作成するこの詳細な解説は、ポーリングメカニズムを解明し、非同期タスクとエグゼキュータ間の基本的なやり取りを明らかにし、最終的にはRustの非同期機能をより自信を持って精密に使いこなせるようにします。
非同期実行の心臓部:ポーリング
カスタムFutureを構築する前に、関連するコアコンセプトを明確に理解しましょう。
- Futureトレイト: Rustでは、
Futureは、完了している場合もしていない場合もある非同期計算を表すトレイトです。これには単一のメソッドpollがあり、エグゼキュータはFutureの進捗状況を確認するためにこれを繰り返し呼び出します。 - エグゼキュータ: エグゼキュータは、
Futureを取得し、pollメソッドを繰り返し呼び出すことで完了まで駆動する責任を負います。Futureのライフサイクルを管理し、タスクをスケジュールし、タスクが進捗の準備ができたときにそれをウェイクアップさせることを処理します。一般的なエグゼキュータにはtokioやasync-stdがあります。 - ポーリング: これは、エグゼキュータが未完了の
Futureに対してpollメソッドを呼び出す行為です。pollが呼び出されると、Futureは進捗の試みを行います。 Pollenum:pollメソッドはPollenumを返します。これには2つのバリアントがあります。Poll::Ready(T): Futureが正常に完了したことを示し、Tはその計算結果です。Poll::Pending: Futureがまだ完了していないことを示します。Pendingが返された場合、Futureは(Wakerを介して)進捗の準備ができたときにウェイクアップされるように手配しなければなりません。
Waker:Wakerは、エグゼキュータによって提供される低レベルのメカニズムであり、Futureが再度ポーリングされる準備ができたことをシグナルすることができます。FutureがPoll::Pendingを返すと、ContextからWakerを取得してクローンします。後で、ソケットへのデータ到着、タイマーの期限切れなど、Futureのブロックを解除する可能性のあるイベントが発生すると、Futureはwaker.wake_by_ref()を呼び出して、リポーリングするようにエグゼキュータに通知します。Context:pollメソッドに渡されるContextには、Wakerと、Futureがエグゼキュータと対話するのに役立つその他の情報が含まれています。
カスタムFutureの構築:シンプルな遅延
非ブロッキング遅延を導入するカスタムFutureを作成してみましょう。これにより、ポーリングメカニズムを直接観察できます。
deadline(完了すべき時間)とオプションのWaker(タスクをウェイクアップするため)を保持するDelay構造体を定義します。
use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll, Waker}; use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; use std::thread; // 遅延Futureの状態を表す struct Delay { deadline: Instant, // デッドラインが経過したときにFutureをウェイクアップするためにWakerを格納する必要があります。 // Arc<Mutex<Option<Waker>>>は、スレッド間でWakerを共有し安全に編集することを可能にします。 waker_storage: Arc<Mutex<Option<Waker>>>, // タイマーThreadが一度だけスポーンされることを保証するフラグ。 timer_thread_spawned: bool, } impl Delay { fn new(duration: Duration) -> Self { Delay { deadline: Instant::now() + duration, waker_storage: Arc::new(Mutex::new(None)), timer_thread_spawned: false, } } } // Delay構造体に対するFutureトレイトを実装 impl Future for Delay { // Futureの出力型はユニットです。遅延を表すだけなので。 type Output = (); // Futureの核心:pollメソッド fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { // デッドラインが既に経過している場合、Futureは準備完了です。 if Instant::now() >= self.deadline { println!("Delay future: Deadline reached. Returning Ready."); return Poll::Ready(()); } // --- Wakerの格納とタイマーの設定(一度だけ) --- // タイマーThreadがまだスポーンされていない場合、それを設定します。 if !self.timer_thread_spawned { println!("Delay future: First poll. Storing waker and spawning timer thread."); // コンテキストから現在のWakerを格納します。 // このWakerは、タイマースレッドがこのタスクをウェイクアップするために使用します。 let mut waker_guard = self.waker_storage.lock().unwrap(); *waker_guard = Some(cx.waker().clone()); drop(waker_guard); // ロックを早期に解放 // 新しいThreadに渡すためにArcをクローンします。 let waker_storage_clone = self.waker_storage.clone(); let duration_until_deadline = self.deadline.duration_since(Instant::now()); // デッドラインまでスリープし、元のタスクをウェイクアップする新しいThreadをスポーンします。 thread::spawn(move || { thread::sleep(duration_until_deadline); println!("Delay timer thread: Deadline passed. Waking up the task."); // Wakerを取得してタスクをウェイクアップします if let Some(waker) = waker_storage_clone.lock().unwrap().take() { waker.wake(); } }); // 複数回タイマーThreadをスポーンしないように、タイマーThreadがスポーンされたことをマークします。 self.timer_thread_spawned = true; } else { // この部分は、タイマースレッドが既に実行されている場合の後続のポーリングを処理します。 // タスクが移動または再スケジュールされるとエグゼキュータが判断した場合、Wakerを更新することが重要です。 // Wakerが更新されないと、以前のWakerが無効になり、タスクがウェイクアップされなくなる可能性があります。 let mut waker_guard = self.waker_storage.lock().unwrap(); // waker_guard.as_ref().map_or(true, |w| !w.will_wake(cx.waker())) は、Wakerが存在しないか、または新しいWakerが古いWakerとは異なる(will_wakeがfalseを返す)場合にtrueになります。 // これは、タスクが移動されたり、エグゼキュータが異なるWakeerを提供したりした場合にWakerを更新するために必要です。 if waker_guard.as_ref().map_or(true, |w| !w.will_wake(cx.waker())) { println!("Delay future: Waker changed. Updating."); *waker_guard = Some(cx.waker().clone()); } } // デッドラインがまだ経過していない場合、Futureはペンディング状態です。 // タイマーThreadによって`waker.wake()`が呼び出されると、リポーリングされます。 println!("Delay future: Deadline not yet reached. Returning Pending."); Poll::Pending } } #[tokio::main] async fn main() { println!("Main: Starting program."); let delay_future = Delay::new(Duration::from_secs(2)); // 2秒の遅延を作成 println!("Main: Awaiting delay future..."); delay_future.await; // カスタムFutureを待機 println!("Main: Delay completed. Program finished."); }
Delay Futureの説明:
-
struct Delay:deadline: 遅延が終了すべき時点を表すInstant。waker_storage:Arc<Mutex<Option<Waker>>>は不可欠です。WakerはFuture(self.waker_storageを所有している)と、wakeを呼び出す別個のthread::spawnの間で共有される必要があります。Arcは共有所有権を可能にし、MutexはWakerを格納および取得するための安全な内部可変性を提供します。Optionは、Wakerが格納される前の最初のpollで利用可能でない場合があるため使用されます。timer_thread_spawned: タイマー(thread::spawn部分)のみを一度設定することを保証する単純なブールフラグ。
-
impl Future for Delay:type Output = ();: 私たちの遅延Futureは単に完了し、意味のある値は生成しません。poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>: これが核心です。if Instant::now() >= self.deadline: 各ポーリングで、デッドラインが経過したかどうかを確認します。経過していれば、Readyであり、Poll::Ready(())を返します。if !self.timer_thread_spawned: この条件ブロックは、実際のタイマー(thread::spawn部分)を一度だけ設定することを保証します。let mut waker_guard = self.waker_storage.lock().unwrap(); *waker_guard = Some(cx.waker().clone());:waker_storageのロックを取得し、現在のContextからWakerをcloneして格納します。このWakerは、現在ポーリングされているこの特定のタスクを指します。thread::spawn(...): 標準のRustスレッドを起動します。このスレッドは、残りの期間sleep()します。これはヘルパーThreadの観点からはブロッキングsleepですが、別のOS Thread内にあるためエグゼキュータThreadをブロックしません。- スポーンされたThread内で、スリープを完了した後、格納された
Wakerを取得しwaker.wake()を呼び出します。このwake()呼び出しは、非同期ランタイム(mainのTokio)に、このWakerに関連付けられたタスクが再度ポーリングの準備ができたことを通知します。 self.timer_thread_spawned = true;: 複数回タイマーThreadをスポーンしないように、フラグをtrueに設定します。
else { ... }: タイマースレッドが既にスポーンされている場合(つまり、既にペンディング状態のFutureを再度ポーリングする場合)、Context内のWakerが変更されたかどうか(!w.will_wake(cx.waker()))を確認する必要があります。変更された場合は、格納されたWakerを更新します。これは、エグゼキュータがタスクを移動または再スケジュールすることがあり、タスクを正しく通知するために新しいWakerが必要になる場合があるため重要です。Poll::Pending: デッドラインが経過しておらず、タイマーが設定されている場合、Futureはまだ待機中です。Poll::Pendingを返します。エグゼキュータは、waker.wake()が呼び出されるまで、このFutureのポーリングを停止します。
tokio::mainおよびawaitでの動作:
Delay::new(Duration::from_secs(2)):Delayインスタンスが作成されます。delay_future.await: ここが魔法が起こる場所です。- Tokioのエグゼキュータが
delay_futureを受け取ります。 - 最初のポーリング: エグゼキュータは
delay_future.poll(cx)を呼び出します。- デッドラインは満たされていません。
timer_thread_spawnedはfalseです。cxからのWakerがクローンされ、delay_future.waker_storageに格納されます。- 新しい
thread::spawnが作成されます。このスレッドは2秒間スリープを開始します。 timer_thread_spawnedがtrueに設定されます。pollはPoll::Pendingを返します。
Poll::Pending後のエグゼキュータのアクション: エグゼキュータはdelay_futureが準備できていないことを認識します。このタスクを脇に置き、他の準備完了タスク(存在する場合)をポーリングするか、waker.wake()の呼び出しを待ちます。重要なのは、Tokioランタイムスレッドは、私たちのthread::spawnのthread::sleepによってブロックされないことです。- 2秒後:
thread::spawnのthread::sleepが完了します。- 格納された
Wakerを取得し、waker.wake()を呼び出します。
- 格納された
waker.wake()後のエグゼキュータのアクション: エグゼキュータは、delay_futureに関連付けられたタスクのウェイクアップシグナルを受信します。エグゼキュータは、delay_futureを再度ポーリングするようにスケジュールします。- 2番目(またはそれ以降)のポーリング: エグゼキュータは
delay_future.poll(cx)を再度呼び出します。- ここで、
Instant::now() >= self.deadlineがtrueになります。 pollはPoll::Ready(())を返します。
- ここで、
- 完了:
delay_future.await式は最終的に完了し、main関数が続行されます。
- Tokioのエグゼキュータが
結論
カスタムDelay Futureを実装することにより、Rustの非同期ポーリングメカニズムについて実践的な理解を得ました。エグゼキュータによってFuture::pollが繰り返し呼び出されること、Poll::Pendingが未完了の状態を示すこと、そして重要なこととして、Wakerが進捗が可能になったときにエグゼキュータにポーリングを再開するように通知することを、進捗の橋渡しとして機能することを確認しました。Wakerを介したFutureとExecutor間のこの明確なやり取りは、Rustの効率的で非ブロッキングな非同期プログラミングの基盤であり、ブロッキングスレッドのオーバーヘッドなしで高性能でスケーラブルなアプリケーションを可能にします。カスタムFuture実装を習得することは、Rustの強力な非同期エコシステムへのより深い洞察を解き放つ高度なスキルです。