Echtzeitdaten mit async-graphql-Abonnements
Emily Parker
Product Engineer · Leapcell

Einleitung: Statische Daten und dynamische Benutzererfahrungen verbinden
In der heutigen schnelllebigen digitalen Welt erwarten Benutzer sofortige Aktualisierungen und reaktionsfähige Schnittstellen. Traditionelle Anfrage-Antwort-Paradigmen, die zwar für das Abrufen statischer Daten wirksam sind, reichen oft nicht aus, wenn es darum geht, Echtzeitinformationen wie Live-Sporteregebnisse, Chat-Nachrichten oder Aktienkursänderungen bereitzustellen. Ständiges Abfragen des Servers kann ineffizient und ressourcenintensiv sein, was zu verschwendeter Bandbreite und erhöhter Serverlast führt. Hier glänzen Echtzeit-Technologien, die es Servern ermöglichen, Daten proaktiv an verbundene Clients zu pushen, sobald Ereignisse eintreten. GraphQL bietet mit seiner leistungsstarken Abfragesprache eine elegante Lösung für den Datenabruf, aber sein wahres Echtzeit-Potenzial wird durch Abonnements (Subscriptions) erschlossen. Dieser Artikel befasst sich damit, wie wir async-graphql
in Rust nutzen können, um GraphQL-Abonnements zu implementieren, und unsere Anwendungen von statischen Datenanbietern zu dynamischen Echtzeit-Kraftpaketen umwandeln.
Echtzeit mit GraphQL-Abonnements verstehen
Bevor wir uns mit dem Code befassen, klären wir einige grundlegende Konzepte, die Echtzeit-Datenaktualisierungen mit GraphQL und async-graphql
untermauern.
Kernkonzepte
- GraphQL: Eine Open-Source-Sprache zur Abfrage und Manipulation von Daten für APIs und eine Laufzeitumgebung zur Erfüllung von Abfragen mit vorhandenen Daten. Sie ermöglicht es Clients, genau das anzufordern, was sie benötigen, nicht mehr und nicht weniger.
- Abonnements (Subscriptions): Eine Art von GraphQL-Operation, die es einem Client ermöglicht, Echtzeit-Updates von einem Server zu erhalten, wenn ein bestimmtes Ereignis eintritt. Im Gegensatz zu Abfragen (einmal abrufen) und Mutationen (ändern und einmal abrufen) beibehalten Abonnements eine offene Verbindung (typischerweise WebSocket-basiert) und pushen Daten an den Client, sobald neue Ereignisse auf dem Server veröffentlicht werden.
- WebSockets: Ein Kommunikationsprotokoll, das bidirektionale Kommunikationskanäle über eine einzige TCP-Verbindung bereitstellt. Diese persistente Verbindung ist für Abonnements von entscheidender Bedeutung und ermöglicht es dem Server, Updates zu pushen, ohne dass der Client wiederholt Daten anfordern muss.
async-graphql
: Eine leistungsstarke und ergonomische GraphQL-Serverbibliothek für Rust. Sie unterstützt alle GraphQL-Operationstypen, einschließlich Abonnements, und lässt sich nahtlos in verschiedene asynchrone Laufzeiten und Web-Frameworks integrieren.- Publish-Subscribe (Pub/Sub)-Muster: Ein Nachrichtenmuster, bei dem Absender (Publisher) Nachrichten nicht direkt an bestimmte Empfänger (Subscriber) senden, sondern veröffentlichte Nachrichten in Klassen kategorisieren. Subscriber drücken Interesse an einer oder mehreren Klassen aus und erhalten nur Nachrichten, die von Interesse sind. Dieses Muster ist grundlegend dafür, wie globale Ereignisse Abonnement-Updates auslösen.
Der Mechanismus von GraphQL-Abonnements
Im Kern funktioniert ein GraphQL-Abonnement wie folgt:
- Client initiiert Abonnement: Ein Client sendet eine GraphQL-Abonnementabfrage an den Server, typischerweise über eine WebSocket-Verbindung.
- Server lauscht auf Ereignisse: Der Server registriert nach Erhalt des Abonnements das Interesse des Clients an einem bestimmten Ereignisstrom oder Thema. Anschließend richtet er einen Mechanismus zum Lauschen auf diese Ereignisse ein.
- Ereignis tritt ein und wird veröffentlicht: Wenn auf der Serverseite ein Ereignis eintritt (z. B. eine neue Chat-Nachricht gesendet wird, ein Datenbankeintrag aktualisiert wird), veröffentlicht der Server dieses Ereignis in einem Pub/Sub-System (z. B. einem In-Memory-Kanal, Redis Pub/Sub, Kafka).
- Server benachrichtigt Abonnenten: Das Pub/Sub-System oder eine daran lauschende Komponente holt das veröffentlichte Ereignis ab. Der GraphQL-Server wandelt dann diese Ereignisdaten entsprechend der ursprünglichen Abonnementabfrage des Clients um und pusht die resultierende Nutzlast über die bestehende WebSocket-Verbindung zurück an den Client.
- Client erhält Update: Der Client empfängt das Echtzeit-Update und kann entsprechend reagieren, indem er seine UI oder seinen Zustand aktualisiert.
Implementierung von Abonnements mit async-graphql
Lassen Sie uns ein Beispiel für die Implementierung einer einfachen Chat-Anwendung durchgehen, bei der neue Nachrichten in Echtzeit an abonnierte Clients weitergeleitet werden.
Zuerst müssen wir unsere Abhängigkeiten in Cargo.toml
einrichten:
[dependencies] async-graphql = { version = "7.0", features = ["apollo_tracing", "tracing"] } async-graphql-warp = "7.0" # Oder async-graphql-actix-web, async-graphql-poem, etc. tokio = { version = "1.0", features = ["full"] } tokio-stream = "0.1" futures = "0.3" warp = "0.3" # Oder actix-web, poem, etc.
Als Nächstes definieren wir unser GraphQL-Schema mit einem Abonnementtyp. Wir verwenden einen In-Memory-Kanal, um ein Pub/Sub-System zu simulieren. Für die Produktion sollten Sie Redis oder ähnliches in Betracht ziehen.
use async_graphql::{ http::{GraphiQLSource, WebSocket}, Subscription, Schema, Object, futures_util::stream::{SplitSink, SplitStream}, futures_util::{Stream, SinkExt, StreamExt}, Context, EmptyMutation, }; tokio::sync::broadcast; tokio_stream::wrappers::BroadcastStream; futures::channel::mpsc; use std::{pin::Pin, collections::HashMap, sync::Arc, sync::Mutex}; use warp::{Filter, Rejection, Reply}; // Eine einfache Message-Struktur #[derive(Clone, Debug)] struct Message { id: u32, content: String, author: String, } // Unser Pub/Sub-System: ein Broadcast-Kanal // Speichern des Senders in einem Arc<Mutex<_>>, um die gemeinsame Nutzung über Threads hinweg und Mutationen zu ermöglichen type MessageSender = Arc<broadcast::Sender<Message>>; // Unser GraphQL-Kontext zum Halten des Senders struct AppContext { message_sender: MessageSender, // Der Einfachheit halber speichern wir Nachrichten im Speicher messages: Arc<Mutex<Vec<Message>>>, next_id: Arc<Mutex<u32>>, } // Implementieren von Methoden zum Hinzufügen von Nachrichten impl AppContext { fn add_message(&self, content: String, author: String) -> Message { let mut messages = self.messages.lock().unwrap(); let mut next_id = self.next_id.lock().unwrap(); let id = *next_id; *next_id += 1; let new_message = Message { id, content, author }; messages.push(new_message.clone()); // Veröffentlichen der neuen Nachricht an den Broadcast-Kanal if let Err(e) = self.message_sender.send(new_message.clone()) { eprintln!("Nachricht konnte nicht gesendet werden: {:?}", e); } new_message } } // Die Query-Typ: zum Abrufen bestehender Nachrichten oder anderer Daten wie gewohnt struct Query; #[Object] impl Query { async fn hello(&self) -> String { "world".to_string() } async fn messages(&self, ctx: &Context<'_>) -> Vec<Message> { let app_ctx = ctx.data::<AppContext>().unwrap(); app_ctx.messages.lock().unwrap().clone() } } // Der Subscription-Typ: definiert, was Clients abonnieren können struct Subscription; #[Subscription] impl Subscription { // Dieses Abonnement streamt neue Nachrichten async fn new_messages<'ctx>(&self, ctx: &'ctx Context<'_>) -> impl Stream<Item = Message> + 'ctx { let app_ctx = ctx.data::<AppContext>().unwrap(); let receiver = app_ctx.message_sender.subscribe(); BroadcastStream::new(receiver) .map(|result| result.unwrap_or_else(|e| { eprintln!("Fehler beim Empfang der Nachricht vom Broadcast-Kanal: {:?}", e); // In einer realen App möchten Sie diesen Fehler möglicherweise gnädiger behandeln, // indem Sie das Element überspringen oder eine benutzerdefinierte Fehlermeldung senden. Message { id: 0, content: "Error".to_string(), author: "System".to_string() } })) } } // Der Mutation-Typ: zum Veröffentlichen neuer Nachrichten struct Mutation; #[Object] impl Mutation { async fn send_message(&self, ctx: &Context<'_>, content: String, author: String) -> Message { let app_ctx = ctx.data::<AppContext>().unwrap(); app_ctx.add_message(content, author) } } // Funktion zum Erstellen des GraphQL-Schemas fn build_schema() -> Schema<Query, Mutation, Subscription> { let (tx, _rx) = broadcast::channel(1024); // Puffer für 1024 Nachrichten let message_sender = Arc::new(tx); let app_ctx = AppContext { message_sender: message_sender.clone(), messages: Arc::new(Mutex::new(Vec::new())), next_id: Arc::new(Mutex::new(1)), }; Schema::build(Query, Mutation, Subscription) .data(app_ctx) .finish() } #[tokio::main] async fn main() { let schema = build_schema(); // Definieren des GraphQL-Endpunkts let graphql_post = async_graphql_warp::graphql(schema.clone()) .and_then(|(schema, request)| async move { Ok::<_, Rejection>(async_graphql_warp::Response::from(schema.execute(request).await)) }); // Definieren des GraphQL-Abonnement-Endpunkts // Dies verwendet `async_graphql_warp::graphql_subscription` für Warp let graphql_ws = async_graphql_warp::graphql_subscription(schema); // Definieren des GraphiQL IDE-Endpunkts let graphiql = warp::path!("graphiql") .and(warp::get()) .map(|| { warp::http::Response::builder() .header("content-type", "text/html") .body(GraphiQLSource::build().endpoint("/").subscription_endpoint("/").finish()) .unwrap() }); let routes = graphql_post .or(graphql_ws) .or(graphiql); println!("GraphiQL IDE: http://localhost:8000/graphiql"); warp::serve(routes).run(([127, 0, 0, 1], 8000)).await; }
Erklärung des Codebeispiels
Message
-Struktur: Eine einfacheMessage
-Struktur zur Darstellung unserer Chat-Nachrichten. Sie mussClone
sein, datokio::sync::broadcast
erforderliche Nachrichten klonbar macht.MessageSender
(Pub/Sub): Wir verwendentokio::sync::broadcast::channel
als unser In-Memory-Pub/Sub-Mechanismus. Wenn eine Nachricht übersender.send(message)
gesendet wird, erhalten alle aktiven Abonnenten eine Kopie der Nachricht.AppContext
: Diese Struktur fungiert als unser GraphQL-Kontext und speichert gemeinsam genutzten Zustand wie denMessageSender
und einen Vektor zum Speichern aller Nachrichten (für die Abfrage). Wir wickelnmessages
undnext_id
inArc<Mutex<T>>
für einen sicheren, gemeinsam genutzten, beschreibbaren Zugriff über Threads hinweg ein.Query
-Typ: Definiert einmessages
-Feld zum Abrufen aller historischen Nachrichten und ein einfacheshello
-Feld.Mutation
-Typ: Diesend_message
-Mutation erstellt eine neueMessage
, fügt sie unserem In-Memory-Speicher hinzu und ruft entscheidendself.message_sender.send(new_message)
auf, um sie zu veröffentlichen.Subscription
-Typ:- Das Attribut
#[Subscription]
markiert dies als GraphQL-Abonnementtyp. - Das Feld
new_messages
ist eine asynchrone Funktion, dieimpl Stream<Item = Message>
zurückgibt. Dies ist der Kern des Abonnements. - Innerhalb von
new_messages
erhalten wir einen Empfänger aus unseremmessage_sender
überapp_ctx.message_sender.subscribe()
. BroadcastStream::new(receiver)
konvertiert dentokio::sync::broadcast::Receiver
in einenfutures::Stream
, denasync-graphql
versteht.- Wir fügen eine
.map
-Operation hinzu, um potenzielle Fehler vom Broadcast-Kanal zu behandeln und das Ergebnis zu entpacken.
- Das Attribut
main
-Funktions-Setup:- Wir erstellen das
Schema
und stellen ihm unserenAppContext
über.data(app_ctx)
zur Verfügung. Dies machtAppContext
in allen Resolvern überctx.data::<AppContext>()
zugänglich. - Wir richten drei Warp-Routen ein:
/
: Verarbeitet Standard-GraphQL-Abfragen und -Mutationen mithilfe vonasync_graphql_warp::graphql()
./
: Verarbeitet GraphQL nur über WebSockets für Abonnements mithilfe vonasync_graphql_warp::graphql_subscription()
. Es ist üblich, denselben Endpunktpfad sowohl für HTTP als auch für WebSocket zu verwenden, da sie über unterschiedliche Protokolle kommunizieren./graphiql
: Bietet einen GraphiQL-IDE-Endpunkt, der für die Interaktion mit unserem Haupt-GraphQL-Endpunkt und dem Abonnement-Endpunkt konfiguriert ist.
- Wir erstellen das
Testen des Abonnements
- Führen Sie die Anwendung aus:
cargo run
- Öffnen Sie Ihren Browser unter
http://localhost:8000/graphiql
. - Öffnen Sie in der GraphiQL-IDE zwei Tabs oder Fenster.
Tab 1 (Abonnement):
subscription NewMessages { newMessages { id content author } }
Führen Sie dieses Abonnement aus. Es wird eine WebSocket-Verbindung herstellen und mit dem Lauschen beginnen.
Tab 2 (Mutation):
mutation SendMessage { sendMessage(content: "Hello from GraphQL!", author: "Alice") { id content author } }
Führen Sie diese Mutation mehrmals aus und ändern Sie Inhalt und Autor. Sie werden sofort die neuen Nachrichten in der Abonnementausgabe von Tab 1 sehen.
Sie können auch den Abruf historischer Nachrichten in einem dritten Tab versuchen:
query GetMessages { messages { id content author } }
Anwendungsfälle
GraphQL-Abonnements eignen sich ideal für eine Vielzahl von Echtzeit-Anwendungen:
- Chat-Anwendungen: Sofortnachrichten, Gruppenchats.
- Live-Dashboards: Echtzeit-Metriken, Systemüberwachung.
- Kollaborative Bearbeitung: Google Docs-ähnliche Anwendungen, bei denen Änderungen sofort widergespiegelt werden.
- Spiele: Aktualisieren von Spielzuständen, Spielerpositionen.
- Finanzanwendungen: Live-Aktienkurse, Kryptowährungspreise.
- Benachrichtigungen: Sofortiges Pushen von Benutzerbenachrichtigungen.
Fazit: Dynamische Benutzererfahrungen ermöglichen
Die Implementierung von GraphQL-Abonnements mit async-graphql
in Rust bietet eine robuste und effiziente Möglichkeit, Echtzeit-Datenfunktionen in Ihre Anwendungen zu integrieren. Durch die Nutzung des WebSocket-Protokolls und des Publish-Subscribe-Musters ermöglicht async-graphql
Ihrem Server, proaktiv Daten an Clients zu pushen, sobald Ereignisse eintreten. Dies verbessert die Benutzererfahrung dramatisch, indem statische Datendarstellungen in dynamische, lebendige Schnittstellen umgewandelt werden. Diese leistungsstarke Kombination aus Rusts Leistung und der Ergonomie von async-graphql
ermöglicht es Entwicklern, äußerst reaktionsschnelle und interaktive Anwendungen mit relativer Leichtigkeit zu erstellen.