Rust Webサービスにおけるストリームを使用したロングポーリングの実装
James Reed
Infrastructure Engineer · Leapcell

はじめに
現代のWebアプリケーションの世界では、ユーザーにリアルタイムの更新を配信することが最も重要です。チャットアプリケーション、ライブダッシュボード、通知システムなど、ユーザーはページを常にリフレッシュすることなく最新のデータを期待しています。双方向通信の常套手段としてWebSocketがよく利用されますが、クライアントが主に更新を受信するだけのシンプルなシナリオや、レガシーインフラストラクチャを扱う場合には、過剰になることがあります。ここでロングポーリングが役立ちます。ロングポーリングは、HTTPリクエストを使用して疑似リアルタイム体験を提供し、特定のユースケースでは、 WebSocket接続を継続的に開いておくよりも、統合がシンプルで、ネットワークの中断に対する堅牢性が高いことがよくあります。この記事では、Rustの強力な非同期エコシステムとストリームベースAPIを効果的に活用して、Webサービスで堅牢かつスケーラブルなロングポーリングメカニズムを実装する方法について詳しく説明します。
コアコンセプトと実装
Rustでの実装に入る前に、ロングポーリングとRustの非同期機能に関連するいくつかの重要な用語を明確にしましょう。
ロングポーリング (Long Polling): クライアントがサーバーにHTTPリクエストを送信し、サーバーが新しいデータが利用可能になるかタイムアウトが発生するまで、そのリクエストを意図的に開いたままにします。データが利用可能になると、サーバーは応答し、クライアントはすぐに別のリクエストを送信します。これは、標準HTTP over プッシュのようなメカニズムをシミュレートします。
非同期プログラミング (Rustにおける): Rustのasync
/await
構文は、I/O操作(ネットワークリクエストやデータベースクエリなど)が完了するのを待っている間、実行スレッドをブロックしない並行コードを記述できます。これは、多数の同時接続を効率的に処理する必要がある高性能Webサービスにとって不可欠です。
ストリーム (Rustにおける): futures
と非同期Rustのコンテキストでは、Stream
は値の非同期シーケンスであり、Iterator
に似ていますが、async
コンテキスト用です。すべてのデータが一度に利用可能である必要はなく、データが利用可能になるにつれて処理できます。これは、複数のデータチャンクを送信したり、新しいイベントが到着するまで接続を維持したいロングポーリングに特に役立ちます。
ロングポーリングの原則
ロングポーリングのサーバーサイド実装には、通常、次の手順が含まれます。
- クライアントリクエスト: クライアントは、特定のエンドポイント(例:
/events
)にHTTP GETリクエストを送信します。 - サーバーリクエスト保留: 新しいデータがすぐに利用できない場合、サーバーは応答しません。代わりに、クライアントのリクエスト(またはその表現)をキューに入れ、イベントソースにサブスクライブします。
- イベント通知: 新しいデータまたはイベントが発生すると、サーバーは待機中のクライアントリクエストで、このイベントに関心のあるものを取得します。
- サーバー応答: サーバーは、新しいデータを含むHTTPレスポンスを構築し、クライアントに送信します。
- クライアント再試行: レスポンスを受信すると、クライアントはすぐに別のロングポーリングリクエストを開始し、将来のイベントを継続してリッスンします。
Rustにおけるストリームを使用したロングポーリングの実装
Rustのtokio
ランタイムとaxum
Webフレームワークは、tokio::sync::broadcast
チャネルとfutures::StreamExt
と組み合わせて、ストリームを使用した堅牢なロングポーリングサービスを構築するための優れた基盤を提供します。
ユーザーが一般的な「イベント」を購読して受信できるシンプルなイベントシステムを考えてみましょう。
use axum::{ extract::{Query, State}, response::sse::{Event, Sse}, routing::get, Router, }; use futures::Stream; use serde::Deserialize; use std::{ convert::Infallible, pin::Pin, sync::{Arc, Mutex}, time::Duration, }; use tokio:: sync::broadcast, time::interval, use tokio_stream::wrappers::BroadcastStream; /// クライアントに送信できるイベントを表します。 #[derive(Debug, Clone, serde::Serialize)] struct MyEvent { id: u64, message: String, } /// アプリケーションの状態。ブロードキャスト送信者を保持します。 #[derive(Clone)] struct AppState { event_sender: broadcast::Sender<MyEvent>, event_counter: Arc<Mutex<u64>>, // 一意のイベントIDを生成するため } #[tokio::main] async fn main() { let (event_sender, _receiver) = broadcast::channel(16); // イベント用チャネル let app_state = AppState { event_sender: event_sender.clone(), event_counter: Arc::new(Mutex::new(0)), }; // ダミーイベントの生成をシミュレート(例:他のサービスや内部ロジックから) tokio::spawn(generate_dummy_events(event_sender.clone(), app_state.event_counter.clone())); let app = Router::new() .route("/events/long-poll", get(long_poll_handler)) .route("/events/trigger", get(trigger_event)) .with_state(app_state); let listener = tokio::net::TcpListener::bind("127.0.0.1:3000") .await .unwrap(); println!("Listening on 127.0.0.1:3000 {}", listener.local_addr().unwrap()); axum::serve(listener, app).await.unwrap(); } // ロングポーリングエンドポイントのハンドラ async fn long_poll_handler( State(app_state): State<AppState>, ) -> Sse<Pin<Box<dyn Stream<Item = Result<Event, Infallible>> + Send>>> { // 新しいイベントのレシーバーを取得します。 let mut rx = app_state.event_sender.subscribe(); // ブロードキャストチャネルからのイベントを発行するストリームを作成します。 // MyEventをaxum::sse::Eventに変換します。 let event_stream = BroadcastStream::new(rx) .map(|event_result| match event_result { Ok(event) => { let json_data = serde_json::to_string(&event).unwrap_or_default(); Ok(Event::default().event("message").data(json_data)) } // クライアントが遅すぎた場合の`RecvError::Lagged`を処理します。 // ロングポーリングでは、通常、単に閉じてクライアントを再接続させます。 Err(e) => { eprintln!("Broadcast receive error: {:?}", e); // 実際のアプリケーションでは、ここでエラーイベントを送信するか、 // ストリームを終了させてクライアントに再接続を強制することができます。 Err(Infallible) // Result<Event, Infallible>におけるInfallibleは、ストリーム自体からのエラーがないことを意味します。 } }) .boxed(); // Sseレスポンスは、接続を開いたままにし、ストリームで到着したイベントを送信することを // 暗黙的に処理します。 // サーバーがデータ送信を期待していることを示すために短い期間を追加します。 // ただし、コアなロングポーリングタイムアウトロジックはクライアントサイドのものです。 Sse::new(event_stream) } // 手動でイベントをトリガーするエンドポイント(テスト目的) #[derive(Deserialize)] struct TriggerParams { message: String, } async fn trigger_event( State(app_state): State<AppState>, Query(params): Query<TriggerParams>, ) -> String { let mut counter = app_state.event_counter.lock().unwrap(); *counter += 1; let new_event = MyEvent { id: *counter, message: params.message.clone(), }; app_state.event_sender.send(new_event.clone()).unwrap(); format!("Event triggered: {:?}", new_event) } // イベントを生成する外部システムをシミュレートします async fn generate_dummy_events( sender: broadcast::Sender<MyEvent>, counter: Arc<Mutex<u64>>, ) { let mut interval = interval(Duration::from_millis(2000)); // 2秒ごと loop { interval.tick().await; let mut count = counter.lock().unwrap(); *count += 1; let event = MyEvent { id: *count, message: format!("Automatic event {}", *count), }; println!("Sending dummy event: {:?}", event); if let Err(e) = sender.send(event) { eprintln!("Failed to send dummy event: {}", e); } } }
コードの説明:
AppState
:tokio::sync::broadcast::Sender
を保持します。これはマルチプロデューサー、マルチコンシューマーチャネルです。イベントがこの送信者を通じて送信されると、アクティブなすべてのレシーバーがコピーを受け取ります。これは、複数の接続されたクライアントにイベントをブロードキャストするのに理想的です。main
関数:AppState
を設定し、イベント生成をシミュレートするバックグラウンドタスク(generate_dummy_events
)を開始し、axum
HTTPサーバーを初期化します。generate_dummy_events
:MyEvent
インスタンスを定期的にブロードキャストチャネルに送信するシンプルなasync
関数です。これは新しいデータが利用可能になることをシミュレートします。long_poll_handler
: これはロングポーリングエンドポイントの中核です。event_sender
にサブスクライブしてbroadcast::Receiver
を取得します。tokio_stream::wrappers::BroadcastStream::new(rx)
はbroadcast::Receiver
をfutures::Stream
に変換します。これにより、受信したイベントを非同期シーケンスとして扱えます。- 次に、受信した各
MyEvent
をaxum::response::sse::Event
にmap
します。この例では、ストリームの性質とブラウザ互換性のためにServer-Sent Events(SSE)を使用していますが、基本的なストリームの概念は、単一のJSONレスポンスをロングポーリングリクエストごとに送信する場合でも適用されます。SSEは、接続を開いたままにし、単一のHTTP接続 over 複数のイベントをプッシュすることを本質的に処理するため、高頻度のロングポーリングとよく一致します。 Sse::new(event_stream)
:axum
はStream
ofResult<Event, Infallible>
をServer-Sent Eventsレスポンスに自動的に変換するSse
レスポンスタイプを提供します。Infallible
エラータイプは、ストリーム処理自体がaxum
がSSE用に特別に処理する必要のあるエラーを生成しないことを主張することを意味します。イベントの処理に失敗した場合、通常はスキップするか、エラーをログに記録します。
trigger_event
: HTTPリクエストを介してイベントを手動でトリガーするシンプルなエンドポイントで、テストに便利です。
クライアントサイドの考慮事項
クライアント側では、標準のfetch
またはXMLHttpRequest
を使用できます。サーバーからSSEを使用しているため、クライアントサイドのEventSource
APIが最も自然な選択です。
const eventSource = new EventSource('http://127.0.0.1:3000/events/long-poll'); eventSource.onmessage = function(event) { console.log("Received general message:", event.data); const data = JSON.parse(event.data); console.log("Parsed event:", data); }; eventSource.addEventListener('message', function(event) { console.log("Received 'message' event:", event.data); }); eventSource.onerror = function(err) { console.error("EventSource failed:", err); // 通常、ここでは再接続を処理します。 // EventSourceには自動再接続ロジックがありますが、カスタマイズ可能です。 };
このクライアントは接続を開いたままにし、Rustサーバーから送信されたイベントを受信します。接続が切断された場合、EventSource
は再接続を試みます。従来のロングポーリング(SSEを使用しない)の場合、クライアントはレスポンスを受け取るたびに新しいfetch
リクエストを作成する必要があります。
アプリケーションシナリオ
- 通知システム: すべてのクライアントにWebSocketのオーバーヘッドなしでユーザー通知をプッシュします。
- ライブダッシュボード/フィード: クライアントが主に情報を消費する場所で、リアルタイムのデータ更新(例: 株価、センサー値)を表示します。
- チャットアプリケーション(簡易版): メッセージがサーバーに送信され、参加者にブロードキャストされるチャットの場合、ロングポーリングは、サーバーが更新をプッシュする必要しかない場合、WebSocketのシンプルな代替手段となり得ます。
- ゲームロビー: 新しいゲームの開始やプレイヤーの到着をプレイヤーに通知します。
結論
Rust Webサービスで非同期ストリームを使用したロングポーリングを実装することは、疑似リアルタイムの更新を配信するための堅牢で効率的な方法を提供します。tokio
のブロードキャストチャネルとaxum
のストリーム互換Sse
レスポンスを活用することで、開発者はスレッドをブロックすることなく多数の同時接続を処理するスケーラブルなシステムを構築できます。このアプローチは、双方向通信が厳密に必要とされないシナリオ for WebSocket の強力な代替手段を提供し、より応答性のあるインタラクティブなユーザーエクスペリエンスに貢献します。Rustのパフォーマンスと並行処理機能は、このようなリアルタイム通信パターンをアーキテクチャ化するのに特に適しています。