Implementierung von Long Polling mit Streams in Rust-Webdiensten
James Reed
Infrastructure Engineer · Leapcell

Einleitung
In der Welt moderner Webanwendungen ist die Bereitstellung von Echtzeit-Updates an Benutzer von größter Bedeutung. Ob eine Chat-Anwendung, ein Live-Dashboard oder ein Benachrichtigungssystem, Benutzer erwarten aktuelle Daten ohne ständiges Neuladen der Seite. Während WebSockets oft die bevorzugte Lösung für echte bidirektionale Kommunikation sind, können sie für einfachere Szenarien, in denen Clients hauptsächlich Updates empfangen oder wenn mit älterer Infrastruktur gearbeitet wird, manchmal übertrieben sein. Hier glänzt Long Polling. Long Polling bietet einen Mittelweg und bietet ein pseudo-Echtzeit-Erlebnis über HTTP-Anfragen, wodurch die Integration einfacher und in bestimmten Anwendungsfällen oft robuster gegen Netzwerkunterbrechungen ist als kontinuierlich geöffnete WebSocket-Verbindungen. Dieser Artikel befasst sich damit, wie Rusts leistungsstarkes asynchrones Ökosystem und Stream-basierte APIs effektiv genutzt werden können, um robuste und skalierbare Long-Polling-Mechanismen in Webdiensten zu implementieren.
Kernkonzepte und Implementierung
Bevor wir uns mit der Rust-Implementierung befassen, lassen Sie uns einige wichtige Begriffe klären, die für Long Polling und Rusts asynchrone Fähigkeiten relevant sind.
Long Polling: Eine Technik, bei der ein Client eine HTTP-Anfrage an einen Server sendet und der Server diese Anfrage absichtlich offen hält, bis neue Daten verfügbar sind oder ein Timeout auftritt. Sobald Daten verfügbar sind, antwortet der Server, und der Client stellt sofort eine weitere Anfrage. Dies simuliert einen Push-ähnlichen Mechanismus über Standard-HTTP.
Asynchrone Programmierung (in Rust): Rusts async
/await
-Syntax ermöglicht das Schreiben von nebenläufigem Code, der den Ausführungsthread nicht blockiert, während er auf den Abschluss von E/A-Operationen (wie Netzwerkanfragen oder Datenbankabfragen) wartet. Dies ist entscheidend für Hochleistungs-Webdienste, die viele gleichzeitige Verbindungen effizient handhaben müssen.
Streams (in Rust): Im Kontext von futures
und asynchronem Rust ist ein Stream
eine asynchrone Sequenz von Werten, ähnlich einem Iterator
, aber für async
-Kontexte. Er ermöglicht die Verarbeitung von Daten, sobald sie im Laufe der Zeit verfügbar werden, anstatt zu verlangen, dass alle Daten sofort vorhanden sind. Dies ist besonders nützlich für Long Polling, bei dem wir möglicherweise mehrere Datenblöcke senden oder die Verbindung offen halten möchten, während neue Ereignisse eintreffen.
Das Long Polling Prinzip
Die serverseitige Implementierung von Long Polling umfasst typischerweise diese Schritte:
- Client-Anfrage: Ein Client sendet eine HTTP-GET-Anfrage an einen bestimmten Endpunkt (z. B.
/events
). - Server hält Anfrage: Wenn keine neuen Daten sofort verfügbar sind, antwortet der Server nicht. Stattdessen legt er die Client-Anfrage (oder eine Darstellung davon) in eine Warteschlange oder abonniert sie an eine Ereignisquelle.
- Ereignisbenachrichtigung: Wenn neue Daten oder ein Ereignis auftreten, ruft der Server die wartenden Client-Anfragen ab, die an diesem Ereignis interessiert sind.
- Server antwortet: Der Server erstellt eine HTTP-Antwort mit den neuen Daten und sendet sie an den Client.
- Client versucht erneut: Nach Erhalt einer Antwort initiiert der Client sofort eine weitere Long Polling-Anfrage, um weiterhin auf zukünftige Ereignisse zu warten.
Implementierung von Long Polling mit Streams in Rust
Rusts tokio
-Runtime und das axum
-Web-Framework, kombiniert mit tokio::sync::broadcast
-Kanälen und futures::StreamExt
, bieten eine ausgezeichnete Grundlage für den Aufbau eines robusten Long Polling-Dienstes mit Streams.
Betrachten wir ein einfaches Ereignissystem, bei dem sich Benutzer für generische "Ereignisse" registrieren und diese empfangen können.
use axum::* use futures::Stream; use serde::Deserialize; use std::* use tokio::* use tokio_stream::wrappers::BroadcastStream; /// Stellt ein Ereignis dar, das an Clients gesendet werden kann. #[derive(Debug, Clone, serde::Serialize)] struct MyEvent { id: u64, message: String, } /// Unser Anwendungsstatus, der den Broadcast-Sender enthält. #[derive(Clone)] struct AppState { event_sender: broadcast::Sender<MyEvent>, event_counter: Arc<Mutex<u64>>, // Um eindeutige Ereignis-IDs zu generieren } #[tokio::main] async fn main() { let (event_sender, _receiver) = broadcast::channel(16); // Kanal für Ereignisse let app_state = AppState { event_sender: event_sender.clone(), event_counter: Arc::new(Mutex::new(0)), }; // Simuliert Ereignisgenerierung (z. B. aus einem anderen Dienst oder interner Logik) tokio::spawn(generate_dummy_events(event_sender.clone(), app_state.event_counter.clone())); let app = Router::new() .route("/events/long-poll", get(long_poll_handler)) .route("/events/trigger", get(trigger_event)) .with_state(app_state); let listener = tokio::net::TcpListener::bind("127.0.0.1:3000") .await .unwrap(); println!("Listening on {}", listener.local_addr().unwrap()); axum::serve(listener, app).await.unwrap(); } // Handler für den Long-Polling-Endpunkt async fn long_poll_handler( State(app_state): State<AppState>, ) -> Sse<Pin<Box<dyn Stream<Item = Result<Event, Infallible>> + Send>>> { // Holt einen Empfänger für neue Ereignisse let mut rx = app_state.event_sender.subscribe(); // Erstellt einen Stream, der Ereignisse aus dem Broadcast-Kanal emittiert. // Wir transformieren unser MyEvent in ein axum::sse::Event. let event_stream = BroadcastStream::new(rx) .map(|event_result| match event_result { Ok(event) => { let json_data = serde_json::to_string(&event).unwrap_or_default(); Ok(Event::default().event("message").data(json_data)) } // Behandelt `RecvError::Lagged`, wenn der Client zu langsam war. // Für Long Polling würden wir normalerweise einfach schließen und den Client neu verbinden lassen. Err(e) => { eprintln!("Broadcast-Empfangsfehler: {:?}", e); // In einer realen Anwendung könnten Sie hier ein Fehlerereignis senden // oder einfach streamen, was den Client zum Neuverbinden zwingt. Err(Infallible) // Infallible für Result<Event, Infallible> bedeutet, dass kein Fehler vom Stream selbst ausgeht. } }) .boxed(); // Die Sse-Antwort behandelt implizit das Offenhalten der Verbindung // und das Senden von Ereignissen, sobald sie im Stream ankommen. // Wir fügen eine kurze Dauer hinzu, um anzuzeigen, dass der Server Daten senden erwartet. // Die Kernlogik für das Timeout von Long Polling liegt jedoch auf Client-Seite. Sse::new(event_stream) } // Ein Endpunkt zum manuellen Auslösen von Ereignissen (zu Testzwecken) #[derive(Deserialize)] struct TriggerParams { message: String, } async fn trigger_event( State(app_state): State<AppState>, Query(params): Query<TriggerParams>, ) -> String { let mut counter = app_state.event_counter.lock().unwrap(); *counter += 1; let new_event = MyEvent { id: *counter, message: params.message.clone(), }; app_state.event_sender.send(new_event.clone()).unwrap(); format!("Ereignis ausgelöst: {:?}", new_event) } // Simuliert ein externes System, das Ereignisse generiert async fn generate_dummy_events( sender: broadcast::Sender<MyEvent>, counter: Arc<Mutex<u64>>, ) { let mut interval = interval(Duration::from_millis(2000)); // Alle 2 Sekunden loop { interval.tick().await; let mut count = counter.lock().unwrap(); *count += 1; let event = MyEvent { id: *count, message: format!("Automatisches Ereignis {}", *count), }; println!("Sende Dummy-Ereignis: {:?}", event); if let Err(e) = sender.send(event) { eprintln!("Fehler beim Senden des Dummy-Ereignisses: {}", e); } } }
Erklärung des Codes:
AppState
: Enthält einentokio::sync::broadcast::Sender
, einen Kanal für mehrere Produzenten und mehrere Konsumenten. Wenn ein Ereignis über diesen Sender gesendet wird, erhalten alle aktiven Empfänger eine Kopie. Dies ist ideal für die Verteilung von Ereignissen an mehrere verbundene Clients.main
-Funktion: Richtet denAppState
ein, startet eine Hintergrundaufgabe (generate_dummy_events
), um die Ereignisgenerierung zu simulieren, und initialisiert denaxum
HTTP-Server.generate_dummy_events
: Eine einfacheasync
-Funktion, die periodischMyEvent
-Instanzen über den Broadcast-Kanal sendet. Dies simuliert das Vorhandensein neuer Daten.long_poll_handler
: Dies ist der Kernendpunkt von Long Polling.- Er abonniert den
event_sender
, um einenbroadcast::Receiver
zu erhalten. tokio_stream::wrappers::BroadcastStream::new(rx)
konvertiert denbroadcast::Receiver
in einenfutures::Stream
. Dies ermöglicht uns, eingehende Ereignisse als asynchrone Sequenz zu behandeln.- Wir
map
dann jedes empfangeneMyEvent
in einaxum::response::sse::Event
. Während dieses Beispiel Server-Sent Events (SSE) für seine Stream-ähnliche Natur und Browserkompatibilität verwendet, gilt das grundlegende Stream-Konzept auch dann, wenn Sie nur eine einzelne JSON-Antwort pro Long-Poll-Anfrage senden würden. SSE eignet sich hervorragend, da es die Aufrechterhaltung der Verbindung und das Pushen mehrerer Ereignisse über eine einzige HTTP-Verbindung auf natürliche Weise handhabt, was gut zu hochfrequentem Long Polling passt. Sse::new(event_stream)
:axum
bietet einenSse
-Antworttyp, der automatisch die Umwandlung einesStream
vonResult<Event, Infallible>
in eine Server-Sent Events-Antwort handhabt. DerInfallible
-Fehlertyp bedeutet, dass wir davon ausgehen, dass die Stream-Verarbeitung selbst keinen Fehler erzeugt, denaxum
speziell für SSE behandeln muss. Wenn ein Ereignis nicht verarbeitet werden kann, würden wir es normalerweise überspringen oder einen Fehler protokollieren.
- Er abonniert den
trigger_event
: Ein einfacher Endpunkt zum manuellen Auslösen eines Ereignisses über eine HTTP-Anfrage, nützlich zum Testen.
Client-seitige Überlegungen
Für den Client kann eine Standard-fetch
- oder XMLHttpRequest
-Anfrage verwendet werden. Da wir vom Server SSE verwenden, ist die EventSource
-API auf Client-Seite die natürlichste Wahl:
const eventSource = new EventSource('http://127.0.0.1:3000/events/long-poll'); eventSource.onmessage = function(event) { console.log("Allgemeine Nachricht empfangen:", event.data); const data = JSON.parse(event.data); console.log("Geparstes Ereignis:", data); }; eventSource.addEventListener('message', function(event) { console.log(""Nachricht"-Ereignis empfangen:", event.data); }); eventSource.onerror = function(err) { console.error("EventSource fehlgeschlagen:", err); // Sie würden hier normalerweise die Wiederverbindung behandeln. // EventSource hat einige automatische Wiederverbindungslogik, kann aber angepasst werden. };
Dieser Client behält die Verbindung offen und empfängt Ereignisse, wenn sie vom Rust-Server gesendet werden. Wenn die Verbindung unterbrochen wird, versucht EventSource
, die Verbindung wiederherzustellen. Für traditionelles Long Polling (ohne SSE) müsste der Client jedes Mal, wenn er eine Antwort erhält, eine neue fetch
-Anfrage stellen.
Anwendungsfälle
- Benachrichtigungssysteme: Push-Benachrichtigungen an Benutzer ohne den Overhead von WebSockets für jeden Client.
- Live-Dashboards/Feeds: Anzeige von Echtzeit-Datenaktualisierungen (z. B. Aktienkurse, Sensormesswerte), bei denen der Client hauptsächlich Informationen konsumiert.
- Chat-Anwendungen (vereinfacht): Für grundlegende Chats, bei denen Nachrichten an einen Server gesendet und dann an die Teilnehmer verteilt werden, kann Long Polling eine einfachere Alternative zu WebSockets sein, insbesondere wenn der Server nur Updates pushen muss.
- Spiel-Lounges: Informieren Sie Spieler über neue Spielstarts oder Spielerankünfte.
Fazit
Die Implementierung von Long Polling in Rust-Webdiensten mit asynchronen Streams bietet eine robuste und effiziente Möglichkeit, Pseudo-Echtzeit-Updates bereitzustellen. Durch die Nutzung von Tokio's Broadcast-Kanälen und den Stream-kompatiblen Sse
-Antworten von axum
können Entwickler skalierbare Systeme aufbauen, die zahlreiche gleichzeitige Verbindungen verarbeiten, ohne Threads zu blockieren. Dieser Ansatz bietet eine leistungsstarke Alternative zu WebSockets für Szenarien, in denen keine bidirektionale Kommunikation erforderlich ist, und trägt zu einer reaktionsschnelleren und interaktiveren Benutzererfahrung bei. Rusts Leistung und Nebenläufigkeitsfunktionen machen es außergewöhnlich gut geeignet für die Architektur solcher Echtzeit-Kommunikationsmuster.