Real-time Data with async-graphql Subscriptions
Emily Parker
Product Engineer · Leapcell

Introduction: Bridging Static Data and Dynamic User Experiences
In today's fast-paced digital world, users expect instantaneous updates and reactive interfaces. Traditional request-response paradigms, while effective for fetching static data, often fall short when it comes to delivering real-time information such as live sports scores, chat messages, or stock price changes. Continuously polling the server can be inefficient and resource-intensive, leading to wasted bandwidth and increased server load. This is where real-time technologies shine, enabling servers to proactively push data to connected clients as events occur. GraphQL, with its powerful query language, offers an elegant solution for data fetching, but its true real-time potential is unlocked through Subscriptions. This article delves into how we can leverage async-graphql
in Rust to implement GraphQL Subscriptions, transforming our applications from static data providers to dynamic, real-time powerhouses.
Understanding Real-time with GraphQL Subscriptions
Before we dive into the code, let's clarify some fundamental concepts that underpin real-time data updates with GraphQL and async-graphql
.
Core Concepts
- GraphQL: An open-source data query and manipulation language for APIs, and a runtime for fulfilling queries with existing data. It allows clients to request exactly what they need, no more, no less.
- Subscriptions: A type of GraphQL operation that enables a client to receive real-time updates from a server when a specific event occurs. Unlike queries (fetch once) and mutations (modify and fetch once), subscriptions maintain an open connection (typically WebSocket-based) and push data to the client as new events are published on the server.
- WebSockets: A communication protocol that provides full-duplex communication channels over a single TCP connection. This persistent connection is crucial for subscriptions, allowing the server to push updates without needing the client to repeatedly request data.
async-graphql
: A powerful and ergonomic GraphQL server library for Rust. It supports all GraphQL operation types, including subscriptions, and integrates seamlessly with various async runtimes and web frameworks.- Publish-Subscribe (Pub/Sub) Pattern: A messaging pattern where senders (publishers) do not send messages directly to specific receivers (subscribers) but instead categorize published messages into classes. Subscribers express interest in one or more classes and only receive messages that are of interest. This pattern is fundamental to how global events trigger subscription updates.
The Mechanism of GraphQL Subscriptions
At its core, a GraphQL Subscription works as follows:
- Client Initiates Subscription: A client sends a GraphQL subscription query to the server, typically over a WebSocket connection.
- Server Listens for Events: The server, upon receiving the subscription, registers the client's interest in a particular event stream or topic. It then sets up a mechanism to listen for these events.
- Event Occurs and Publishes: When an event occurs on the server-side (e.g., a new chat message is sent, a database record is updated), the server publishes this event to a Pub/Sub system (e.g., an in-memory channel, Redis Pub/Sub, Kafka).
- Server Notifies Subscriber: The Pub/Sub system, or a component listening to it, picks up the published event. The GraphQL server then transforms this event data according to the client's original subscription query and pushes the resulting payload back to the client over the established WebSocket connection.
- Client Receives Update: The client receives the real-time update and can react accordingly, updating its UI or state.
Implementing Subscriptions with async-graphql
Let's walk through an example of implementing a simple chat application where new messages are pushed to subscribed clients in real-time.
First, we need to set up our dependencies in Cargo.toml
:
[dependencies] async-graphql = { version = "7.0", features = ["apollo_tracing", "tracing"] } async-graphql-warp = "7.0" # Or async-graphql-actix-web, async-graphql-poem, etc. tokio = { version = "1.0", features = ["full"] } tokio-stream = "0.1" futures = "0.3" warp = "0.3" # Or actix-web, poem, etc.
Next, let's define our GraphQL schema with a subscription type. We'll use an in-memory channel to simulate a Pub/Sub system for simplicity. For production, consider using Redis or similar.
use async_graphql::{ http::{GraphiQLSource, WebSocket}, Subscription, Schema, Object, futures_util::stream::{SplitSink, SplitStream}, futures_util::{Stream, SinkExt, StreamExt}, Context, EmptyMutation, }; use tokio::sync::broadcast; use tokio_stream::wrappers::BroadcastStream; use futures::channel::mpsc; use std::{pin::Pin, collections::HashMap, sync::Arc, sync::Mutex}; use warp::{Filter, Rejection, Reply}; // A simple Message struct #[derive(Clone, Debug)] struct Message { id: u32, content: String, author: String, } // Our Pub/Sub system: a broadcast channel // Storing the sender in an Arc<Mutex<_>> to allow sharing across threads and mutate type MessageSender = Arc<broadcast::Sender<Message>>; // Our GraphQL Context to hold the sender struct AppContext { message_sender: MessageSender, // For simplicity, we'll store messages in memory messages: Arc<Mutex<Vec<Message>>>, next_id: Arc<Mutex<u32>>, } // Implement methods to add messages impl AppContext { fn add_message(&self, content: String, author: String) -> Message { let mut messages = self.messages.lock().unwrap(); let mut next_id = self.next_id.lock().unwrap(); let id = *next_id; *next_id += 1; let new_message = Message { id, content, author }; messages.push(new_message.clone()); // Publish the new message to the broadcast channel if let Err(e) = self.message_sender.send(new_message.clone()) { eprintln!("Failed to send message: {:?}", e); } new_message } } // The Query type: for fetching existing messages or other data as usual struct Query; #[Object] impl Query { async fn hello(&self) -> String { "world".to_string() } async fn messages(&self, ctx: &Context<'_>) -> Vec<Message> { let app_ctx = ctx.data::<AppContext>().unwrap(); app_ctx.messages.lock().unwrap().clone() } } // The Subscription type: defines what clients can subscribe to struct Subscription; #[Subscription] impl Subscription { // This subscription streams new messages async fn new_messages<'ctx>(&self, ctx: &'ctx Context<'_>) -> impl Stream<Item = Message> + 'ctx { let app_ctx = ctx.data::<AppContext>().unwrap(); let receiver = app_ctx.message_sender.subscribe(); BroadcastStream::new(receiver) .map(|result| result.unwrap_or_else(|e| { eprintln!("Error receiving message from broadcast channel: {:?}", e); // In a real app, you might want to handle this error more gracefully, // perhaps by skipping the item or sending a custom error message. Message { id: 0, content: "Error".to_string(), author: "System".to_string() } })) } } // The Mutation type: for publishing new messages struct Mutation; #[Object] impl Mutation { async fn send_message(&self, ctx: &Context<'_>, content: String, author: String) -> Message { let app_ctx = ctx.data::<AppContext>().unwrap(); app_ctx.add_message(content, author) } } // Function to build the GraphQL schema fn build_schema() -> Schema<Query, Mutation, Subscription> { let (tx, _rx) = broadcast::channel(1024); // Buffer for 1024 messages let message_sender = Arc::new(tx); let app_ctx = AppContext { message_sender: message_sender.clone(), messages: Arc::new(Mutex::new(Vec::new())), next_id: Arc::new(Mutex::new(1)), }; Schema::build(Query, Mutation, Subscription) .data(app_ctx) .finish() } #[tokio::main] async fn main() { let schema = build_schema(); // Define the GraphQL endpoint let graphql_post = async_graphql_warp::graphql(schema.clone()) .and_then(|(schema, request)| async move { Ok::<_, Rejection>(async_graphql_warp::Response::from(schema.execute(request).await)) }); // Define the GraphQL subscription endpoint // This uses `async_graphql_warp::graphql_subscription` for Warp let graphql_ws = async_graphql_warp::graphql_subscription(schema); // Define the GraphiQL IDE endpoint let graphiql = warp::path!("graphiql") .and(warp::get()) .map(|| { warp::http::Response::builder() .header("content-type", "text/html") .body(GraphiQLSource::build().endpoint("/").subscription_endpoint("/").finish()) .unwrap() }); let routes = graphql_post .or(graphql_ws) .or(graphiql); println!("GraphiQL IDE: http://localhost:8000/graphiql"); warp::serve(routes).run(([127, 0, 0, 1], 8000)).await; }
Explanation of the Code Example
Message
Struct: A simpleMessage
struct to represent our chat messages. It needs to beClone
becausetokio::sync::broadcast
requires messages to be cloneable.MessageSender
(Pub/Sub): We usetokio::sync::broadcast::channel
as our in-memory Pub/Sub mechanism. When a message is sent viasender.send(message)
, all active subscribers receive a clone of the message.AppContext
: This struct acts as our GraphQL context, holding shared state like theMessageSender
and a vector to store all messages (for the query). We wrapmessages
andnext_id
inArc<Mutex<T>>
for safe shared mutable access across threads.Query
Type: Defines amessages
field to fetch all historical messages and a simplehello
field.Mutation
Type: Thesend_message
mutation creates a newMessage
, adds it to our in-memory store, and crucially, callsself.message_sender.send(new_message)
to publish it.Subscription
Type:- The
#[Subscription]
attribute marks this as a GraphQL subscription type. - The
new_messages
field is an asynchronous function that returnsimpl Stream<Item = Message>
. This is the core of the subscription. - Inside
new_messages
, we obtain a receiver from ourmessage_sender
viaapp_ctx.message_sender.subscribe()
. BroadcastStream::new(receiver)
converts thetokio::sync::broadcast::Receiver
into afutures::Stream
, whichasync-graphql
understands.- We add a
.map
to handle potential errors from the broadcast channel, unwrapping the result.
- The
main
Function Setup:- We build the
Schema
and provide ourAppContext
to it using.data(app_ctx)
. This makesAppContext
accessible in all resolvers viactx.data::<AppContext>()
. - We set up three Warp routes:
/
: Handles standard GraphQL queries and mutations usingasync_graphql_warp::graphql()
./
: Handles GraphQL over WebSockets for subscriptions usingasync_graphql_warp::graphql_subscription()
. It's common to use the same endpoint path for both HTTP and WebSocket since they communicate over different protocols./graphiql
: Provides a GraphiQL IDE endpoint, configured to interact with our main GraphQL endpoint and subscription endpoint.
- We build the
Testing the Subscription
- Run the application:
cargo run
- Open your browser to
http://localhost:8000/graphiql
. - In the GraphiQL IDE, open two tabs or windows.
Tab 1 (Subscription):
subscription NewMessages { newMessages { id content author } }
Execute this subscription. It will establish a WebSocket connection and start listening.
Tab 2 (Mutation):
mutation SendMessage { sendMessage(content: "Hello from GraphQL!", author: "Alice") { id content author } }
Execute this mutation several times, changing the content and author. You will immediately see the new messages appearing in Tab 1's subscription output.
You can also try querying historical messages in a third tab:
query GetMessages { messages { id content author } }
Application Scenarios
GraphQL Subscriptions are ideal for a wide range of real-time applications:
- Chat Applications: Instant messaging, group chats.
- Live Dashboards: Real-time metrics, system monitoring.
- Collaborative Editing: Google Docs-like applications where changes are reflected instantly.
- Gaming: Updating game states, player positions.
- Financial Applications: Live stock tickers, cryptocurrency prices.
- Notifications: Pushing user notifications immediately.
Conclusion: Empowering Dynamic User Experiences
Implementing GraphQL Subscriptions with async-graphql
in Rust provides a robust and efficient way to build real-time data features into your applications. By leveraging the WebSocket protocol and the publish-subscribe pattern, async-graphql
enables your server to proactively push data to clients as events unfold, dramatically enhancing user experience by transforming static data representations into dynamic, living interfaces. This powerful combination of Rust's performance and async-graphql
's ergonomics allows developers to craft highly responsive and interactive applications with relative ease.