非同期Rust Webサービスにおける同期ブロッキングの処理
Emily Parker
Product Engineer · Leapcell

はじめに
modernenweb開発の世界では、速度と応答性が最優先されます。ユーザーは、アプリケーションが機敏で、顕著な遅延なしに多数のリクエストを並行して処理することを期待しています。Rustは、その強力な非同期機能により、高性能Webサービス構築の有力な候補として浮上しています。Actix-webやTokioなどのフレームワークにより、開発者はシステムリソースを効率的に利用する、非常に並行性の高いコードを書くことができます。
しかし、すべての操作が非同期であるべき、あるいはそうあるべきものとは限りません。暗号学的ハッシュ(例:Argon2やBcryptによるパスワードハッシュ)、複雑なデータ処理、またはレガシー同期ライブラリとの対話など、一部のタスクは本質的にブロッキングです。これらのブロッキング操作が非同期コンテキストで直接実行されると、スレッド全体をブロックし、他のすべての並行タスクの進行を停止させ、サービスのパフォーマンスを著しく低下させます。この記事では、応答性と効率性を維持するために、これらの同期ブロッキング操作を非同期Rust Webサービスに正しく統合するという重要な課題を掘り下げていきます。
コアコンセプトの理解
ソリューションを詳しく掘り下げる前に、関連する主要な概念を明確に理解しましょう。
- 非同期プログラミング: Rust(および他の多くの言語)では、非同期プログラミングは、プログラムが新しいタスクを開始するのを待つことなく、多数のタスクを開始できるようにします。非同期タスクがI/O操作(ネットワークリクエストやディスク読み取りなど)に遭遇すると、スレッドをブロックする代わりに、ランタイムに制御を「譲り」、他のタスクの実行を許可します。I/O操作が完了すると、タスクは再開できます。これは、
async/await構文とタスクスケジューリングを管理するexecutor(Tokioなど)によって実現されます。 - ブロッキング操作: ブロッキング操作とは、実行されたときに、完了するまで呼び出し元に制御を返さない操作です。この間、操作を実行しているスレッドは「ブロック」され、他の作業を実行できません。例としては、CPUバウンドな計算(パスワードハッシュなど)、同期ファイルI/O、またはブロッキングデータベース呼び出しが挙げられます。
- Tokioランタイム: TokioはRustで最も人気のある非同期ランタイムです。イベントループ、タスクスケジューラ、協調的マルチタスクのためのツールなど、非同期アプリケーションを構築するために必要なすべてのコンポーネントを提供します。通常、
asyncタスクを実行するために、固定数のワーカー・スレッド(多くの場合、CPUコアごとに1つ)を使用します。 - スレッドプール: スレッドプールは、タスクの実行に使用できる事前起動されたスレッドのコレクションです。すべてのタスクに新しいスレッドを起動する代わりに、タスクはプールに送信され、利用可能なスレッドがそれらを取得します。これにより、スレッドの作成と破棄のオーバーヘッドが削減されます。
Tokioのワーカー・スレッドのいずれかでブロッキング操作が直接実行されると、問題が発生します。ワーカー・スレッドはブロックされているため、他のasyncタスクを実行できず、サービスの並行処理の一部が効果的に停止します。
ブロッキングコードの処理戦略
根本的な解決策は、ブロッキング操作をメインの非同期ランタイムのワーカー・スレッドから移動させることです。これにより、プライマリ・イベントループは、非ブロッキング・タスクのスケジューリングと実行を自由に行えるようになります。
1. tokio::task::spawn_blockingの使用
Tokioベースのアプリケーションでブロッキング操作を処理する最も簡単で推奨される方法は、tokio::task::spawn_blockingを使用することです。この関数は、提供されたブロッキング・フューチャーまたはクロージャを、ブロッキング・タスク専用にTokioによって管理される、動的にサイズ変更可能なスレッドプールにオフロードします。
以下に、パスワードハッシュの例を使用して、実際的な方法を示します。
use actix_web::{web, App, HttpServer, HttpResponse, Responder}; use tokio::time::{sleep, Duration}; use argon2::{password_hash::SaltString, Argon2, PasswordHasher}; use rand_core::OsRng; // 暗号学的な乱数生成器 async fn hash_password_handler(password: web::Path<String>) -> impl Responder { let password_str = password.into_inner(); // これがArgon2パスワードハッシュのようなCPU集約型操作だと想像してください。 // 直接実行すると、Actix-webワーカー・スレッドをブロックします。 let hashed_password = tokio::task::spawn_blocking(move || { let salt = SaltString::generate(&mut OsRng); // Argon2は時間がかかります、特に強力なパラメータを使用する場合 let argon2 = Argon2::default(); argon2.hash_password(password_str.as_bytes(), &salt) .map(|hash| hash.to_string()) .expect("Failed to hash password") }) .await; match hashed_password { Ok(hash) => HttpResponse::Ok().body(format!("Hashed password: {}", hash)), Err(e) => { eprintln!("Failed to hash password in blocking thread: {:?}", e); HttpResponse::InternalServerError().body("Failed to process password") } } } async fn hello() -> impl Responder { // これは非ブロッキング操作であり、並行して実行できます sleep(Duration::from_millis(100)).await; HttpResponse::Ok().body("Hello world!") } #[actix_web::main] async fn main() -> std::io::Result<()> { HttpServer::new(|| { App::new() .route("/", web::get().to(hello)) .route("/hash/{password}", web::get().to(hash_password_handler)) }) .bind(("127.0.0.1", 8080))? .run() .await }
この例では:
hash_password_handlerはasync関数ですが、実際のパスワードハッシュロジックはtokio::task::spawn_blockingに渡されたクロージャ内に配置されています。spawn_blockingはJoinHandleを返します。これをawaitします。このawaitポイントが重要です:hash_password_handler自体は、他のスレッドでのハッシュ完了を待っている間、非ブロッキングです。- ハッシュ・クロージャは、Tokioのブロッキング・スレッドプールからの専用スレッドで実行されます。これは、非同期ランタイムのコア・スレッドとは別のプールです。
helloエンドポイントは、完全に非同期であるため、複数のパスワードハッシュ要求が進行中でも、引き続き迅速に応答できます。
spawn_blockingを使用する時期:
- CPUバウンドな計算:パスワードハッシュ、画像処理、重いデータ変換。
- 同期I/O:レガシーライブラリまたは非同期APIを提供しないファイルとの対話。
- 明示的に制御を譲らない、かなりの時間がかかる任意のコード。
2. 専用スレッドプール(例:rayon)
より複雑な、または非常に一般的なCPUバウンド・ワークロードの場合、rayonのような専用スレッドプールライブラリの使用を検討できます。Rayonは、CPUバウンド・タスクの並列処理に優れたデータ・パラレリズム・フレームワークを提供し、カスタム・スレッド管理よりも優れていることがよくあります。
rayon自体はtokio::task::spawn_blockingほど直接的にasync/awaitと統合されていませんが、それでも両者を連携させることができます。
use actix_web::{web, App, HttpServer, HttpResponse, Responder}; use tokio::time::{sleep, Duration}; use argon2::{password_hash::SaltString, Argon2, PasswordHasher}; use rand_core::OsRng; use once_cell::sync::Lazy; // スレッドプールの遅延静的初期化用 use rayon::ThreadPoolBuilder; // 集中的なCPUタスク専用のグローバルRayonスレッドプールを作成します。 // スレッド数は、アプリケーションのニーズとサーバーのCPUコア数に基づいて調整してください。 static CPU_POOL: Lazy<rayon::ThreadPool> = Lazy::new(|| { ThreadPoolBuilder::new() .num_threads(num_cpus::get()) // 通常、すべてのCPUコアを使用します .build() .expect("Failed to build Rayon thread pool") }); async fn hash_password_rayon_handler(password: web::Path<String>) -> impl Responder { let password_str = password.into_inner(); let hashed_password = tokio::task::spawn_blocking(move || { // ここで、ブロッキングTokioスレッド内でRayonのプールに送信できます CPU_POOL.install(move || { let salt = SaltString::generate(&mut OsRng); let argon2 = Argon2::default(); argon2.hash_password(password_str.as_bytes(), &salt) .map(|hash| hash.to_string()) .expect("Failed to hash password") }) }) .await; match hashed_password { Ok(hash) => HttpResponse::Ok().body(format!("Hashed password (Rayon): {}", hash)), Err(e) => { eprintln!("Failed to hash password with Rayon: {:?}", e); HttpResponse::InternalServerError().body("Failed to process password") } } } // ... async fn hello() と main関数は以前と同様に、新しいルートを追加します ... #[actix_web::main] async fn main() -> std::io::Result<()> { HttpServer::new(|| { App::new() .route("/", web::get().to(hello)) .route("/hash/{password}", web::get().to(hash_password_handler)) // spawn_blockingを使用 .route("/hash_rayon/{password}", web::get().to(hash_password_rayon_handler)) // spawn_blockingを介してrayonを使用 }) .bind(("127.0.0.1", 8080))? // バインド .run() .await }
この強化された例では:
rayon::ThreadPoolは遅延静的グローバルとして作成され、一度だけ初期化されることを保証します。hash_password_rayon_handlerは依然としてtokio::task::spawn_blockingを使用します。これは重要です:rayon::ThreadPoolは独自の構成済みスレッドで実行されます。async関数内で直接CPU_POOL.installを呼び出した場合、それでもTokioの非同期ワーカー・スレッドをブロックします。CPU_POOL.installはクロージャを受け取り、それがRayonのスレッドのいずれかで実行されることを保証します。ここで実際のCPUバウンド・ワークが発生します。
Rayonのような専用スレッドプールを使用する時期:
- 小さな独立した単位に分割できる、高度にCPUバウンドでデータ集約型のタスクを並列化する場合。
- Tokioのブロッキングプールとは異なる、特定のCPU負荷の高いワークロード専用のスレッド数をより細かく制御したい場合。
- 非同期ランタイムから離れた場所でRayonの並列計算を安全に実行するために、
spawn_blockingと組み合わせて使用されることがよくあります。
3. 外部ライブラリを非同期化する
場合によっては、ブロッキング操作が外部ライブラリ(同期APIのみを提供するデータベースドライバなど)から来るものです。
-
ラッパーライブラリ:
asyncラッパーやライブラリのフォークを探してください。たとえば、sqlxはRust用の非同期ORMで、非ブロッキングであるように特別に設計されています。同期diesel接続からsqlxに移行すると、データベース操作が真に非同期になります。 -
手動オフロード: 非同期の代替が存在しない場合は、
tokio::task::spawn_blockingにフォールバックして、ブロッキング呼び出しをラップする必要があります。// 例:ブロッキングデータベース呼び出し(仮想) async fn fetch_user_blocking(user_id: u32) -> Result<String, String> { let user_data = tokio::task::spawn_blocking(move || { // ブロッキングデータベース呼び出しをシミュレート std::thread::sleep(Duration::from_secs(1)); if user_id == 1 { Ok(format!("User data for ID {}", user_id)) } else { Err("User not found".to_string()) } }).await; user_data.expect("Blocking task failed").map_err(|e| e.to_string()) }
結論
非同期Rust Webサービスに同期ブロッキングコードを統合するには、応答性とパフォーマンスを維持するために慎重な検討が必要です。鉄則は、非同期ランタイムのコア・ワーカー・スレッドでブロッキング操作を実行させないことです。tokio::task::spawn_blockingを活用することで、これらのCPUバウンドまたはブロッキングI/O操作をTokioが管理する別のスレッドプールに効果的にオフロードできます。高度に並列化可能なCPUバウンド・タスクの場合は、spawn_blockingとrayonのような専用ライブラリを組み合わせることで、さらにきめ細かな制御が可能になります。これらのプラクティスに従うことで、ユーザーエクスペリエンスを損なうことなく、あらゆる種類のワークロードを適切に処理できる、堅牢でパフォーマンスの高い非同期Rustアプリケーションを構築できます。サービスの機敏性を維持するために、常にブロッキング作業をオフロードしてください。