Rustの非同期処理をasync/awaitとTokioで解き明かす
Min-jun Kim
Dev Intern · Leapcell

並行Rustプログラミング入門
高性能で応答性の高いアプリケーションが求められる現代において、並行処理は単なる贅沢ではなく、必要不可欠なものです。数千もの同時接続を処理するWebサーバーから複雑なデータ処理パイプラインまで、複数のタスクを同時に実行できる能力は、ユーザーエクスペリエンスとリソース利用率に直接影響します。タスクが完了するまでプログラム全体をブロックする従来の同期プログラミングは、このようなシナリオではすぐにボトルネックとなります。ここで、非同期プログラミングが登場し、ネットワークリクエストやファイルI/Oのような長時間実行される操作が完了するのを待っている間に、プログラムが有用な作業を実行できるようにするパラダイムシフトを提供します。
Rustは、パフォーマンス、安全性、並行処理に重点を置いているため、非同期プログラミングをファーストクラスの機能として採用しています。初期の非同期Rustは複雑な手動のFutureコンビネータが特徴でしたが、async/await
構文の導入により、非同期コードがより同期的で直感的に感じられるようになり、状況は一変しました。しかし、async/await
だけでは並行処理は魔法のように実現しません。これらのノンブロッキング操作をスケジュールして実行するための非同期ランタイムが必要です。利用可能なさまざまなランタイムの中で、TokioはRustエコシステムにおける事実上の標準として登場し、堅牢でスケーラブルな非同期アプリケーションを構築するための包括的なツールキットを提供しています。この記事は、Rustにおける非同期プログラミングを分かりやすく説明し、async/await
のコアコンセプトを探求し、効率的で並列なRustプログラムを構築するためにTokioランタイムを活用する方法を実践的に実証することを目的としています。
非同期Rustの解説
Rustにおける非同期プログラミングは、その核心においてFuturesの概念に基づいています。Future
は、将来のある時点で利用可能になる可能性のある値を表すトレイトです。それは基本的にステートマシンであり、ポーリングされると、値が準備できたことを示すか、まだ準備ができていないため後で再度ポーリングする必要があることを示します。このノンブロッキングの性質が、単一のスレッドが多くの同時操作を管理することを可能にします。
主要用語の解説
例に入る前に、いくつかの重要な用語を明確にしましょう。
Future
: 前述の通り、これは完了時に値を生成する非同期計算を表すトレイトです。そのコアメソッドはpoll
であり、エグゼキュータは計算を前進させるために繰り返し呼び出します。async fn
: Rustにおけるこの特別な構文は、非同期関数を宣言します。async fn
を呼び出すと、内部のコードがすぐに実行されるわけではありません。代わりにFuture
を返します。実際の実行は、このFuture
がエグゼキュータによってポーリングされたときにのみ開始されます。await
: このキーワードはasync fn
またはasync
ブロック内でのみ使用できます。Future
をawait
すると、現在のasync
関数の実行は、待機中のFuture
が完了するまで一時停止します。この一時停止中に、エグゼキュータは他のFuture
を実行するために切り替えることができ、スレッドがブロックされるのを防ぎます。- エグゼキュータ/ランタイム: これは、
async fn
から返されたFuture
を取得し、それらをポーリングし、実行のためにスケジュールするエンジンです。ポーリングループの管理、依存関係が準備できたとき(例:ネットワークソケットにデータが到着したとき)にFuture
をウェイクアップさせること、そして効率的なリソース利用を保証することに責任を負います。Tokioはこのようなエグゼキュータ/ランタイムの著名な例です。 Pin
:Pin
はより高度な概念ですが、Future
が開始された後もメモリ内で移動する必要がないようにasync/await
がどのように機能するかを理解するためには不可欠です。Pin
は、値が現在のメモリ位置から移動されないことを保証します。これは、Future
内でしばしば見られる自己参照構造にとって重要です。
async/await
メカニズム
async/await
構文シュガーは、Future
の操作を大幅に簡素化します。ファイルから読み取る同期関数を考えてみましょう。
// 同期ファイル読み取り fn read_file_sync(path: &str) -> std::io::Result<String> { std::fs::read_to_string(path) }
この関数は、ファイル全体が読み取られるまで現在のスレッドをブロックします。次に、async/await
を使用した非同期の同等機能を見てみましょう。
// async_stdまたはtokio::fsを使用した非同期ファイル読み取り async fn read_file_async(path: &str) -> std::io::Result<String> { tokio::fs::read_to_string(path).await // read_to_stringによって返されたFutureをawait }
read_file_async
が呼び出されると、すぐにFuture
を返します。tokio::fs::read_to_string(path)
の呼び出しもFuture
を返します。この内部Future
をawait
すると、read_file_async
Future
はエグゼキュータに制御を戻します。エグゼキュータは、他のFuture
を実行するために切り替えることができます。tokio::fs::read_to_string
が完了したとき(例:ファイルが読み取られたとき)、エグゼキュータはread_file_async
Future
をウェイクアップし、await
ポイントの直後から実行を再開します。この協調的なマルチタスクが、協調的な非同期プログラミングの本質です。
Tokioランタイムの紹介
Tokioは単なるエグゼキュータではありません。
- スケジューラ:
Future
を管理および実行します。これにより、Future
の実行を実際に並列化するために複数のスレッド(ワーカー スレッド)を利用できますが、各Future
自体は単一のスレッドで実行されます。 - 非同期I/O: 標準ライブラリI/O操作(例:
TcpStream
、UdpSocket
、File
)のノンブロッキングバージョン。これらは、高性能ネットワークサービスを構築するために不可欠です。 - タイマー: 特定の時刻または遅延(例:
tokio::time::sleep
)でタスクをスケジュールするため。 - 同期プリミティブ: 標準ライブラリのミューテックス、セマフォ、チャネルなどの非同期バージョン(例:
tokio::sync::Mutex
、tokio::sync::mpsc
)。 - ユーティリティ: 一般的な非同期パターン(例:タスクの結合(
tokio::join!
)、複数のFuture間の選択(tokio::select!
)、バックグラウンドタスクのスポーン(tokio::spawn
))のための豊富なヘルパーセット。
実践例:Tokioを使ったシンプルなエコーサーバー
Tokioとasync/await
がどのように連携するかを示すために、基本的なTCPエコーサーバーを構築しましょう。
// Cargo.toml // [dependencies] // tokio = { version = "1", features = ["full"] } use tokio::{ io::{AsyncReadExt, AsyncWriteExt}, // 非同期読み取り/書き込み用 net::{TcpListener, TcpStream}, // TCPネットワーク用 }; async fn handle_connection(mut stream: TcpStream) -> Result<(), Box<dyn std::error::Error>> { println!("Handling connection from {:?}", stream.peer_addr()?); let mut buf = vec![0; 1024]; // エコー用の小さなバッファ loop { // クライアントから非同期にデータを読み込む let n = stream.read(&mut buf).await?; if n == 0 { // クライアントが接続を閉じた println!("Client disconnected from {:?}", stream.peer_addr()?); return Ok(()) } // 受信したデータをクライアントに非同期にエコーバックする stream.write_all(&buf[0..n]).await?; } } #[tokio::main] // Tokioランタイムのエントリポイント async fn main() -> Result<(), Box<dyn std::error::Error>> { let listener = TcpListener::bind("127.0.0.1:8080").await?; println!("Echo server listening on 127.0.0.1:8080"); loop { // 新しいクライアント接続を非同期に受け入れる let (stream, _addr) = listener.accept().await?; // この接続を処理するための新しい非同期タスクをスポーンする。 // `tokio::spawn`は、handle_connectionによって返されたFutureが // Tokioランタイムによって並列に実行されることを保証します。 tokio::spawn(async move { if let Err(e) = handle_connection(stream).await { eprintln!("Error handling connection: {}", e); } }); } }
解説:
#[tokio::main]
: このマクロは、Tokioランタイムのセットアップと実行を容易にします。async fn main
を受け取り、Tokioランタイムインスタンス内で自動的に実行します。これがないと、手動でTokioランタイムを作成し、それにブロックする必要があるでしょう。TcpListener::bind("127.0.0.1:8080").await?
: これはノンブロッキングTCPリスナーを作成します。await
は、バインドに時間がかかる場合(bind
自体ではまれですが、例示的です)、それが完了するまでmain
関数が制御を譲ることを意味します。listener.accept().await?
: これはノンブロッキングサーバーロジックのコアです。accept()
は、新しいクライアント接続が確立されたときに完了するFuture
を返します。接続を待っている間、Tokioは他のFuture
(既に接続されているクライアントからのものなど)を実行できます。tokio::spawn(async move { ... })
: これが複数のFuture
を並列に実行する方法です。tokio::spawn
はFuture
(この場合はasync move
ブロック)を受け取り、それをTokioランタイムで実行するようにスケジュールします。各スポーンされたタスクは独立して実行されます。もしspawn
しなければ、accept
はhandle_connection
が完了するまでブロックし、サーバーは同期的に動作し、複数のクライアントを並列に処理できなくなります。stream.read(&mut buf).await?
およびstream.write_all(&buf[0..n]).await?
:handle_connection
内では、これらはTokioの非同期I/Oメソッドです。これらはスレッドをブロックすることなくTCPストリームから読み書きします。読み取るデータがない場合、read
は制御を譲ります。書き込みバッファがいっぱいの場合、write_all
は制御を譲ります。
この例は、async/await
が、Tokioランタイムとペアリングされたときに、どのようにして並列的な動作を提供する、シーケンシャルに見えるコードを書くことを可能にするかを明確に示しています。各handle_connection
タスクは、Tokioのスケジューラによって並列に管理される個別のFuture
であり、サーバーが多数のクライアントを同時に単一の(または少数)スレッドで処理できるようになります。
高度なTokio機能:SelectとJoin
Tokioは、Futureを結合および管理するための強力なマクロを提供します。
-
tokio::join!
: 複数のFuture
が同時に完了するのを待ち、その結果を収集します。すべてのFuture
は並列にポーリングされます。async fn fetch_data_from_api_a() -> String { /* ... */ "Data A".to_string() } async fn fetch_data_from_api_b() -> String { /* ... */ "Data B".to_string() } async fn get_all_data() { let (data_a, data_b) = tokio::join!( fetch_data_from_api_a(), fetch_data_from_api_b() ); println!("Received: {} and {}", data_a, data_b); }
-
tokio::select!
: 複数のFuture
を互いに競わせ、最初に完了したFuture
に対応するブランチを実行します。use tokio::time::{sleep, Duration}; async fn timeout_op() { // 長い処理をシミュレート sleep(Duration::from_secs(5)).await; println!("Long operation finished!"); } async fn early_exit() { sleep(Duration::from_secs(2)).await; println!("Early exit condition met!"); } async fn race_example() { tokio::select! { _ = timeout_op() => { println!("Timeout operation won the race!"); }, _ = early_exit() => { println!("Early exit won the race!"); }, // 最初に準備ができたブランチがない場合でもデフォルトの動作を追加できます。 } }
これらのマクロは、複雑な非同期ワークフローをオーケストレーションするのに非常に役立ち、開発者は高度な並行処理パターンを簡潔に表現できます。
結論
async/await
およびTokioランタイムを使用した非同期プログラミングは、Rustにおける並列アプリケーション開発に革命をもたらしました。Future
トレイトとそのノンブロッキング哲学を採用し、Tokioの堅牢なエグゼキュータ、非同期I/O、および豊富なユーティリティセットを活用することで、開発者はメモリ安全性とパフォーマンスの高い言語で、非常に効率的でスケーラブル、かつ応答性の高いアプリケーションを構築できます。async/await
構文は、このような並列コードの記述を容易にし、RustプログラムがI/Oバウンドなシナリオで真に輝くことを可能にし、現代のネットワークサービス、データパイプライン、および高性能コンピューティングに優れた選択肢となります。Rustの非同期エコシステムは、開発者が自信を持って速度と安全性の両方を達成しながら、素晴らしいシステムを構築することを可能にします。