Asynchrone Programmierung in Rust: Futures, Executors und Task Scheduling
Daniel Hayes
Full-Stack Engineer · Leapcell

Definition von Future
Das Future
ist der Kern der asynchronen Programmierung in Rust. Hier ist die Definition des Future
Traits:
#[must_use = "futures do nothing unless you `.await` or poll them"] #[lang = "future_trait"] pub trait Future { type Output; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>; } #[must_use = "this `Poll` may be a `Pending` variant, which should be handled"] pub enum Poll<T> { Ready(T), Pending, }
Ein Future
hat einen assoziierten Typ Output
und eine Methode poll()
, die ein Poll<Self::Output>
zurückgibt. Poll
ist ein Enum mit zwei Varianten: Ready
und Pending
. Durch Aufrufen der Methode poll()
kann ein Future
weiter in Richtung Fertigstellung gebracht werden, bis die Aufgabe erledigt und ausgetauscht wird.
Wenn das
Future
in einem aktuellenpoll
-Aufruf abgeschlossen ist, gibt esPoll::Ready(result)
zurück, was bedeutet, dass der Wert desFuture
zurückgegeben wird. Wenn dasFuture
noch nicht abgeschlossen ist, gibt esPoll::Pending()
zurück. An diesem Punkt wird dasFuture
ausgesetzt und wartet darauf, durch ein Ereignis (über eine Wake-Funktion) aufgeweckt zu werden.
Executor (Ausführungs-Scheduler)
Ein Executor ist ein Scheduler für ein Future
. Während das Betriebssystem für die Planung von Threads verantwortlich ist, plant es keine User-Space-Koroutinen (wie Future
s). Daher benötigt jedes Programm, das Koroutinen für die Nebenläufigkeit verwendet, einen Executor zur Handhabung der Planung.
Rusts Future
s sind lazy – sie werden nur ausgeführt, wenn sie gepollt werden. Eine Möglichkeit, sie anzutreiben, besteht darin, eine andere Async-Funktion innerhalb einer Async-Funktion mit .await
aufzurufen, aber das löst das Problem nur innerhalb von Async-Funktionen selbst. Die äußersten Async-Funktionen müssen immer noch von einem Executor angetrieben werden.
Executor-Laufzeit
Obwohl Rust Koroutinen wie Future
s bereitstellt, stellt es keinen Executor auf Sprachebene bereit. Wenn keine Koroutinen verwendet werden, ist es nicht erforderlich, eine Laufzeitumgebung einzubinden. Wenn sie jedoch benötigt werden, bietet das Ökosystem eine Vielzahl von Executoren zur Auswahl.
Hier sind 4 gängige Executoren in Rust:
futures
: Diese Bibliothek enthält einen einfachen integrierten Executor.tokio
: Bietet einen Executor; die Verwendung von#[tokio::main]
schließt implizit den Tokio-Executor ein.async-std
: Bietet einen Executor ähnlich wie Tokio.smol
: Bietetasync-executor
und macht hauptsächlichblock_on
verfügbar.
Wake-Benachrichtigungsmechanismus
Ein Executor verwaltet eine Gruppe von Future
s (in der Regel die äußersten Async-Funktionen) und rückt sie voran, indem er sie kontinuierlich pollt, bis sie abgeschlossen sind. Zunächst pollt der Executor ein Future
einmal. Danach wird es nicht mehr aktiv pollen. Wenn die poll
-Methode Poll::Pending
zurückgibt, wird das Future
ausgesetzt, bis ein Ereignis das Aufwecken über die Funktion wake()
auslöst. Das Future
kann dann den Executor aktiv benachrichtigen und ihn auffordern, das Polling fortzusetzen und die Aufgabe weiter auszuführen. Dieser Wake-Then-Poll-Zyklus wird wiederholt, bis das Future
abgeschlossen ist.
Der Waker
stellt eine wake()
-Methode bereit, die dem Executor mitteilt, dass die zugehörige Aufgabe bereit ist, fortgesetzt zu werden, sodass der Executor das entsprechende Future
erneut pollenn kann.
Context
ist ein Wrapper um Waker
. Sehen wir uns die Context
-Struktur an, die in der poll
-Methode verwendet wird:
pub struct Context<'a> { waker: &'a Waker, _marker: PhantomData<fn(&'a ()) -> &'a ()>, }
Die Definition und Implementierung von Waker
sind recht abstrakt. Intern verwendet es eine virtuelle Funktionstabelle (Vtable), um eine Vielzahl von Waker
-Verhaltensweisen zu ermöglichen:
pub struct RawWakerVTable { clone: unsafe fn(*const ()) -> RawWaker, wake: unsafe fn(*const ()), wake_by_ref: unsafe fn(*const ()), drop: unsafe fn(*const ()), }
Rust selbst bietet keine Async-Laufzeit – es definiert nur grundlegende Schnittstellen in der Standardbibliothek und überlässt das Laufzeitverhalten der Implementierung durch Drittanbieter-Laufzeiten. In der Standardbibliothek sehen Sie also nur die Schnittstellendefinitionen und einige High-Level-Implementierungen. Beispielsweise delegiert die wake()
-Methode für Waker
den Aufruf einfach an die entsprechende Funktion in der Vtable:
impl Waker { /// Wake up the task associated with this `Waker`. #[inline] pub fn wake(self) { // The actual wakeup call is delegated through a virtual function call // to the implementation which is defined by the executor. let wake = self.waker.vtable.wake; let data = self.waker.data; // Don't call `drop` -- the waker will be consumed by `wake`. crate::mem::forget(self); // SAFETY: This is safe because `Waker::from_raw` is the only way // to initialize `wake` and `data` requiring the user to acknowledge // that the contract of `RawWaker` is upheld. unsafe { (wake)(data) }; } ... }
Die tatsächliche Implementierung der Vtable befindet sich nicht in der Standardbibliothek – sie wird von Async-Laufzeiten von Drittanbietern bereitgestellt, z. B. der im futures
-Crate.
Erstellen eines Timers
Verwenden wir ein Timer-Beispiel, um den Planungsmechanismus eines Future
besser zu verstehen. Das Ziel ist: Wenn ein Timer erstellt wird, wird ein neuer Thread erzeugt, der für eine bestimmte Dauer schläft, und sobald das Zeitfenster verstrichen ist, signalisiert er das Timer-Future
.
Hinweis: Dies erfordert den ArcWake
-Trait aus dem futures
-Crate, der eine bequeme Möglichkeit zum Erstellen eines Waker
bietet. Bearbeiten Sie Cargo.toml
und fügen Sie die folgende Abhängigkeit hinzu:
[dependencies] futures = "0.3"
Vollständiger Code für das Timer-Future
:
// future_timer.rs use futures; use std::{ future::Future, pin::Pin, sync::{ Arc, Mutex }, task::{ Context, Poll, Waker }, thread, time::Duration, }; pub struct TimerFuture { shared_state: Arc<Mutex<SharedState>>, } /// Shared state between the Future and the sleeping thread struct SharedState { /// Indicates whether the timer (sleep) has completed completed: bool, /// When sleep ends, the thread can use this `waker` to notify `TimerFuture` to wake up the task waker: Option<Waker>, } impl Future for TimerFuture { type Output = (); fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { // Check shared state to determine if the timer has completed let mut shared_state = self.shared_state.lock().unwrap(); if shared_state.completed { println!("future ready. execute poll to return."); Poll::Ready(()) } else { println!("future not ready, tell the future task how to wakeup to executor"); // Set the `waker` so that the new thread can wake up the task once the sleep completes, // allowing the `Future` to be polled again. // The `clone` here happens on every `poll`, but ideally should only happen once. // The reason we clone each time is because the `TimerFuture` might move between tasks // in the executor; a single `waker` instance might be altered and point to the wrong task, // resulting in the executor running the wrong task. shared_state.waker = Some(cx.waker().clone()); Poll::Pending } } } impl TimerFuture { /// Create a new `TimerFuture` which completes after the specified duration pub fn new(duration: Duration) -> Self { let shared_state = Arc::new(Mutex::new(SharedState { completed: false, waker: None, })); // Spawn a new thread let thread_shared_state = shared_state.clone(); thread::spawn(move || { // Sleep for the specified duration to simulate a timer thread::sleep(duration); let mut shared_state = thread_shared_state.lock().unwrap(); // Notify the executor that the timer is done and the corresponding `Future` can be polled again shared_state.completed = true; if let Some(waker) = shared_state.waker.take() { println!("detect future is ready, wakeup the future task to executor."); waker.wake() } }); TimerFuture { shared_state } } } fn main() { // We haven't implemented our own executor yet, so we use the one from the `futures` crate futures::executor::block_on(TimerFuture::new(Duration::new(10, 0))); }
Ausführungsergebnis:
future not ready, tell the future task how to wakeup to executor detect future is ready, wakeup the future task to executor. future ready. execute poll to return.
Wie oben gezeigt, ist der 10-Sekunden-Timer zunächst noch nicht abgeschlossen und befindet sich im Status Pending
. An diesem Punkt müssen wir der Aufgabe mitteilen, wie sie sich selbst aufwecken soll, wenn sie bereit ist. Nach 10 Sekunden ist der Timer abgeschlossen und verwendet den zuvor festgelegten Waker
, um die Future
-Aufgabe zur Ausführung zu aktivieren.
Erstellen eines Executors
Im vorherigen Code haben wir unseren eigenen Scheduler nicht implementiert, sondern den vom futures
-Crate bereitgestellten verwendet. Erstellen wir nun einen benutzerdefinierten Executor, um zu verstehen, wie er unter der Haube funktioniert. In der realen asynchronen Rust-Programmierung würden Sie jedoch normalerweise die tokio
-Bibliothek verwenden. Hier erstellen wir eine von Grund auf neu, um besser zu verstehen, wie Async funktioniert.
Schlüsselcode:
// future_executor.rs use { futures::{ future::{ BoxFuture, FutureExt }, task::{ waker_ref, ArcWake }, }, std::{ future::Future, sync::mpsc::{ sync_channel, Receiver, SyncSender }, sync::{ Arc, Mutex }, task::Context, time::Duration, }, }; mod future_timer; // Import the previously implemented timer module use future_timer::TimerFuture; /// Task executor: responsible for receiving tasks from a channel and executing them struct Executor { ready_queue: Receiver<Arc<Task>>, } /// `Spawner` is responsible for creating new `Future`s and sending them to the task channel #[derive(Clone)] struct Spawner { task_sender: SyncSender<Arc<Task>>, } /// A `Future` that can schedule itself (by sending itself to the task channel), and wait to be executed struct Task { /// The in-progress `Future` that will complete at some point in the future /// /// Technically, a `Mutex` here is unnecessary because we're executing everything on a single thread. /// But Rust isn't smart enough to know that the `Future` is not being shared across threads, /// so we use `Mutex` to satisfy the compiler's requirements for thread safety. /// /// A production-grade executor wouldn't use a `Mutex` here because it introduces overhead. /// Instead, it would use `UnsafeCell`. future: Mutex<Option<BoxFuture<'static, ()>>>, /// Allows this task to re-submit itself into the task queue, waiting for the executor to `poll` it task_sender: SyncSender<Arc<Task>>, } fn new_executor_and_spawner() -> (Executor, Spawner) { // Maximum number of buffered tasks in the task channel (queue length) // This implementation is simplified; real-world executors handle this differently const MAX_QUEUED_TASKS: usize = 10_000; let (task_sender, ready_queue) = sync_channel(MAX_QUEUED_TASKS); (Executor { ready_queue }, Spawner { task_sender }) } impl Spawner { fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) { let future = future.boxed(); let task = Arc::new(Task { future: Mutex::new(Some(future)), task_sender: self.task_sender.clone(), }); println!("first dispatch the future task to executor."); self.task_sender.send(task).expect("too many tasks queued."); } } /// Implements `ArcWake` to define how to wake a task and schedule it for execution impl ArcWake for Task { fn wake_by_ref(arc_self: &Arc<Self>) { // Wake is implemented by sending the task back into the task channel, // so the executor will `poll` it again. let cloned = arc_self.clone(); arc_self .task_sender .send(cloned) .expect("too many tasks queued"); } } impl Executor { /// Actually run the `Future` tasks by continuously receiving and executing them fn run(&self) { let mut count = 0; while let Ok(task) = self.ready_queue.recv() { count += 1; println!("received task. {}", count); // Retrieve the future; if it hasn't finished (still Some), then `poll` it and try to complete it let mut future_slot = task.future.lock().unwrap(); if let Some(mut future) = future_slot.take() { // Create a `LocalWaker` based on the task itself let waker = waker_ref(&task); let context = &mut Context::from_waker(&*waker); // `BoxFuture<T>` is an alias for `Pin<Box<dyn Future<Output = T> + Send + 'static>>` // `as_mut` converts it to `Pin<&mut dyn Future + Send + 'static>` if future.as_mut().poll(context).is_pending() { println!("executor run the future task, but is not ready, create a future again."); // Future is not yet done, so put it back and wait for the next poll *future_slot = Some(future); } else { println!("executor run the future task, is ready. the future task is done."); } } } } } fn main() { let (executor, spawner) = new_executor_and_spawner(); // Wrap the TimerFuture in a task and dispatch it to the scheduler for execution spawner.spawn(async { println!("TimerFuture await"); // Create a timer Future and wait for it to complete TimerFuture::new(Duration::new(10, 0)).await; println!("TimerFuture Done"); }); // Drop the spawner so the executor knows no more tasks will be submitted drop(spawner); // Run the executor until the task queue is empty // Once the task runs, it will print "howdy!", pause for 2 seconds, then print "done!" executor.run(); }
Ausführungsergebnis:
first dispatch the future task to executor. received task. 1 TimerFuture await future not ready, tell the future task how to wakeup to executor executor run the future task, but is not ready, create a future again. detect future is ready, wakeup the future task to executor. received task. 2 future ready. execute poll to return. TimerFuture Done executor run the future task, is ready. the future task is done.
Beim ersten Planungsversuch ist die Aufgabe noch nicht bereit und gibt Pending
zurück. Die Aufgabe wird dann darüber informiert, wie sie aufgeweckt werden soll, wenn sie bereit ist. Später, wenn das Ereignis bereit ist, wird die Aufgabe wie angewiesen aufgeweckt und zur Ausführung geplant.
Asynchroner Verarbeitungsablauf
Das Reactor-Pattern ist ein klassisches Entwurfsmuster, das zum Erstellen von leistungsstarken, ereignisgesteuerten Systemen verwendet wird. Der Executor und der Reactor sind Komponenten dieses Musters. Das Reactor-Pattern besteht aus drei Hauptteilen:
- Task: Eine auszuführende Arbeitseinheit. Aufgaben können angehalten und die Steuerung an den Executor übergeben werden, um später neu geplant zu werden.
- Executor: Ein Scheduler, der die Aufgaben verwaltet, die zur Ausführung bereit sind (die Ready Queue) und die Aufgaben, die blockiert sind (die Wait Queue).
- Reactor: Verwaltet eine Ereigniswarteschlange. Wenn ein Ereignis eintritt, benachrichtigt es den Executor, eine bestimmte Aufgabe zum Ausführen zu aktivieren.
Der Executor plant Aufgaben zur Ausführung. Wenn eine Aufgabe nicht fortgesetzt werden kann, aber noch nicht abgeschlossen ist, wird sie ausgesetzt und eine geeignete Aufweckbedingung registriert. Wenn der Reactor später ein Ereignis empfängt, das die Aufweckbedingung erfüllt, weckt er die ausgesetzte Aufgabe. Der Executor kann dann das Polling dieser Aufgabe fortsetzen. Dieser Zyklus wird wiederholt, bis die Aufgabe abgeschlossen ist.
Die asynchrone Verarbeitung von Rust über Future
ist eine typische Implementierung des Reactor Patterns.
Am Beispiel von
tokio
:async/await
bietet Unterstützung auf Syntaxebene undFuture
ist die Datenstruktur, die asynchrone Aufgaben darstellt. Wenn.await
aufgerufen wird, plant und führt der Executor die Aufgabe aus.
Der Scheduler von Tokio läuft über mehrere Threads. Jeder Thread führt Aufgaben aus seiner eigenen Ready Queue aus. Wenn die Warteschlange eines Threads leer ist, kann er Aufgaben aus den Warteschlangen anderer Threads stehlen (eine Strategie, die als Work-Stealing bezeichnet wird). Wenn eine Aufgabe nicht mehr vorankommen kann und
Poll::Pending
zurückgibt, setzt der Scheduler sie aus und legt mithilfe einesWaker
eine geeignete Aufweckbedingung fest. Der Reactor verwendet die asynchronen E/A-Mechanismen des Betriebssystems (z. B.epoll
,kqueue
oderIOCP
), um E/A-Ereignisse zu überwachen. Wenn ein relevantes Ereignis ausgelöst wird, ruft der ReactorWaker::wake()
auf, wodurch das angehalteneFuture
reaktiviert wird. DasFuture
wird zurück in die Ready Queue gestellt und wartet auf die Ausführung.
Zusammenfassung
Future
ist die Kernabstraktion im asynchronen Programmiermodell von Rust und stellt Vorgänge dar, die irgendwann in der Zukunft abgeschlossen werden. Rusts Future
s sind lazy – sie benötigen einen Executor, um sie anzutreiben. Diese Ausführung wird durch Polling implementiert:
- Wenn ein
Future
in einem aktuellen Poll-Zyklus abgeschlossen ist, gibt esPoll::Ready(result)
zurück und stellt den endgültigen Wert bereit. - Wenn das
Future
noch nicht abgeschlossen ist, gibt esPoll::Pending()
zurück. An diesem Punkt wird dasFuture
ausgesetzt und wartet auf ein externes Ereignis, um es über einenWaker
aufzuwecken.
Der Waker
stellt eine wake()
-Methode bereit, um den Executor zu benachrichtigen, welche Aufgabe fortgesetzt werden soll. Wenn wake()
aufgerufen wird, weiß der Executor, dass die dem Waker
zugeordnete Aufgabe bereit ist, Fortschritte zu erzielen, und pollt das Future
erneut. Dieser Wake → Poll → Suspend-Zyklus wird fortgesetzt, bis das Future
schließlich abgeschlossen ist.
Jede asynchrone Aufgabe durchläuft im Allgemeinen drei Phasen:
- Polling-Phase: Der Executor initiiert das Polling auf einem
Future
. Wenn es einen Punkt erreicht, an dem es keine weiteren Fortschritte erzielen kann (Poll::Pending
), wird die Aufgabe ausgesetzt und wechselt in die Wartephase. - Wartephase: Die Ereignisquelle (normalerweise als Reactor bezeichnet) registriert einen
Waker
, um auf ein Ereignis zu warten. Wenn das Ereignis eintritt, löst es denWaker
aus, um das zugehörigeFuture
zu reaktivieren und in die Reaktivierungsphase überzugehen. - Reaktivierungsphase: Wenn das Ereignis eintritt, wird das entsprechende
Future
von seinemWaker
reaktiviert. Der Executor plant die Aufgabe erneut zur Abfrage ein. Die Aufgabe schreitet fort, bis sie entweder abgeschlossen ist oder einen weiterenPending
-Punkt erreicht. Dieser Zyklus wird wiederholt, bis die Aufgabe vollständig abgeschlossen ist.
Wir sind Leapcell, Ihre erste Wahl für das Hosten von Rust-Projekten.
Leapcell ist die Serverless-Plattform der nächsten Generation für Webhosting, asynchrone Aufgaben und Redis:
Multi-Sprachen-Unterstützung
- Entwickeln Sie mit Node.js, Python, Go oder Rust.
Stellen Sie unbegrenzt Projekte kostenlos bereit
- Zahlen Sie nur für die Nutzung – keine Anfragen, keine Gebühren.
Unschlagbare Kosteneffizienz
- Pay-as-you-go ohne Leerlaufgebühren.
- Beispiel: 25 $ unterstützen 6,94 Millionen Anfragen bei einer durchschnittlichen Antwortzeit von 60 ms.
Optimierte Entwicklererfahrung
- Intuitive Benutzeroberfläche für mühelose Einrichtung.
- Vollständig automatisierte CI/CD-Pipelines und GitOps-Integration.
- Echtzeitmetriken und -protokollierung für verwertbare Einblicke.
Mühelose Skalierbarkeit und hohe Leistung
- Automatische Skalierung zur mühelosen Bewältigung hoher Parallelität.
- Kein operativer Overhead – konzentrieren Sie sich einfach auf das Bauen.
Erfahren Sie mehr in der Dokumentation!
Folgen Sie uns auf X: @LeapcellHQ