Async Programmierung in Rust: Futures mit join!, try_join! und select! zusammensetzen
Olivia Novak
Dev Intern · Leapcell

Wenn nur ein Future
ausgeführt wird, können Sie .await
direkt innerhalb einer Async-Funktion async fn
oder eines Async-Codeblocks async {}
verwenden. Wenn jedoch mehrere Futures
gleichzeitig ausgeführt werden müssen, blockiert die direkte Verwendung von .await
gleichzeitige Aufgaben, bis ein bestimmtes Future
abgeschlossen ist – wodurch sie effektiv seriell ausgeführt werden. Das futures
Crate bietet viele nützliche Werkzeuge für die gleichzeitige Ausführung von Futures
, wie z. B. die Makros join!
und select!
.
Hinweis: Das Modul
futures::future
bietet eine Reihe von Funktionen zum Arbeiten mitFutures
(viel umfassender als die Makros). Siehe:
Das join!
-Makro
Das join!
-Makro ermöglicht das Warten auf den Abschluss mehrerer verschiedener Futures
gleichzeitig und kann diese gleichzeitig ausführen.
Betrachten wir zunächst zwei falsche Beispiele mit .await
:
struct Book; struct Music; async fn enjoy_book() -> Book { /* ... */ Book } async fn enjoy_music() -> Music { /* ... */ Music } // Falsche Version 1: Führt Aufgaben sequenziell innerhalb der Async-Funktion anstelle von gleichzeitig aus async fn enjoy1_book_and_music() -> (Book, Music) { // Führt tatsächlich sequenziell innerhalb der Async-Funktion aus let book = enjoy_book().await; // await löst blockierende Ausführung aus let music = enjoy_music().await; // await löst blockierende Ausführung aus (book, music) } // Falsche Version 2: Auch sequenzielle Ausführung innerhalb der Async-Funktion anstelle von gleichzeitig async fn enjoy2_book_and_music() -> (Book, Music) { // Führt tatsächlich sequenziell innerhalb der Async-Funktion aus let book_future = enjoy_book(); // Async-Funktionen sind lazy und werden nicht sofort ausgeführt let music_future = enjoy_music(); // Async-Funktionen sind lazy und werden nicht sofort ausgeführt (book_future.await, music_future.await) }
Die beiden obigen Beispiele scheinen asynchron ausgeführt zu werden, aber tatsächlich müssen Sie zuerst das Buch fertig lesen, bevor Sie die Musik hören können. Das heißt, die Aufgaben innerhalb der Async-Funktion werden sequenziell (nacheinander) und nicht gleichzeitig ausgeführt.
Dies liegt daran, dass in Rust Futures
lazy sind – sie beginnen erst mit der Ausführung, wenn .await
aufgerufen wird. Und da die beiden await
-Aufrufe in der Code-Reihenfolge erfolgen, werden sie sequenziell ausgeführt.
Um zwei Futures
korrekt gleichzeitig auszuführen, versuchen wir es mit dem futures::join!
-Makro:
use futures::join; // Die Verwendung von `join!` gibt ein Tupel zurück, das die von jedem Future ausgegebenen Werte enthält, sobald es abgeschlossen ist. async fn enjoy_book_and_music() -> (Book, Music) { let book_fut = enjoy_book(); let music_fut = enjoy_music(); // Das join!-Makro muss warten, bis alle verwalteten Futures abgeschlossen sind, bevor es selbst abgeschlossen ist join!(book_fut, music_fut) } fn main() { futures::executor::block_on(enjoy_book_and_music()); }
Wenn Sie mehrere Async-Aufgaben in einem Array gleichzeitig ausführen möchten, können Sie die Methode futures::future::join_all
verwenden.
Das try_join!
-Makro
Da join!
warten muss, bis alle von ihm verwalteten Futures
abgeschlossen sind, können Sie try_join!
verwenden, wenn Sie die Ausführung aller Futures
sofort stoppen möchten, wenn einer von ihnen fehlschlägt – besonders nützlich, wenn die Futures
Result
zurückgeben.
Hinweis: Alle an try_join!
übergebenen Futures
müssen denselben Fehlertyp aufweisen. Wenn sich die Fehlertypen unterscheiden, können Sie die Methoden map_err
und err_into
aus dem Modul futures::future::TryFutureExt
verwenden, um die Fehler zu konvertieren:
use futures::{ future::TryFutureExt, try_join, }; struct Book; struct Music; async fn get_book() -> Result<Book, ()> { /* ... */ Ok(Book) } async fn get_music() -> Result<Music, String> { /* ... */ Ok(Music) } /** * Alle an try_join! übergebenen Futures müssen denselben Fehlertyp aufweisen. * Wenn sich die Fehlertypen unterscheiden, sollten Sie map_err oder err_into verwenden * aus dem Modul futures::future::TryFutureExt, um sie zu konvertieren. */ async fn get_book_and_music() -> Result<(Book, Music), String> { let book_fut = get_book().map_err(|()| "Unable to get book".to_string()); let music_fut = get_music(); // Wenn ein Future fehlschlägt, stoppt try_join! sofort die gesamte Ausführung try_join!(book_fut, music_fut) } async fn get_into_book_and_music() -> (Book, Music) { get_book_and_music().await.unwrap() } fn main() { futures::executor::block_on(get_into_book_and_music()); }
Das select!
-Makro
Mit dem join!
-Makro können Sie Ergebnisse erst verarbeiten, nachdem alle Futures
abgeschlossen sind. Im Gegensatz dazu wartet das select!
-Makro auf mehrere Futures
, und sobald einer von ihnen abgeschlossen ist, kann er sofort behandelt werden:
use futures::{ future::FutureExt, // for `.fuse()` pin_mut, select, }; async fn task_one() { /* ... */ } async fn task_two() { /* ... */ } /** * Race-Modus: führt t1 und t2 gleichzeitig aus. * Wer zuerst fertig ist, beendet die Funktion und auf die andere Aufgabe wird nicht gewartet. */ async fn race_tasks() { // .fuse() ermöglicht es dem Future, das FusedFuture-Trait zu implementieren let t1 = task_one().fuse(); let t2 = task_two().fuse(); // Das pin_mut-Makro verleiht den Futures das Unpin-Trait pin_mut!(t1, t2); // Verwenden Sie select!, um auf mehrere Futures zu warten und das zu verarbeiten, das zuerst abgeschlossen wird select! { () = t1 => println!("Aufgabe 1 zuerst abgeschlossen"), () = t2 => println!("Aufgabe 2 zuerst abgeschlossen"), } }
Der obige Code führt t1
und t2
gleichzeitig aus. Wer zuerst fertig ist, löst seine entsprechende println!
-Ausgabe aus. Die Funktion wird dann beendet, ohne auf den Abschluss der anderen Aufgabe zu warten.
Hinweis: Anforderungen für select!
– FusedFuture + Unpin
Die Verwendung von select!
erfordert, dass die Futures
sowohl FusedFuture
als auch Unpin
implementieren, was über die Methode .fuse()
und das Makro pin_mut!
erreicht wird.
- Die Methode
.fuse()
ermöglicht es einemFuture
, dasFusedFuture
-Trait zu implementieren. - Das Makro
pin_mut!
ermöglicht es demFuture
, dasUnpin
-Trait zu implementieren.
Hinweis:
select!
erfordert zwei Trait-Grenzen:FusedStream + Unpin
:
- Unpin: Da
select
nicht das Eigentum desFuture
verbraucht, greift es über eine mutable Referenz darauf zu. Dadurch kann dasFuture
wiederverwendet werden, wenn es nach Abschluss vonselect
noch nicht abgeschlossen ist.- FusedFuture: Sobald ein
Future
abgeschlossen ist, sollteselect
es nicht mehr abfragen. „Fuse“ bedeutet Kurzschließen – dasFuture
gibt sofortPoll::Pending
zurück, wenn es nach dem Beenden erneut abgefragt wird.
Nur durch die Implementierung von FusedFuture
kann select!
korrekt innerhalb einer Schleife funktionieren. Ohne dies könnte ein abgeschlossenes Future
dennoch kontinuierlich von select
abgefragt werden.
Für Stream
wird ein etwas anderes Trait namens FusedStream
verwendet. Durch den Aufruf von .fuse()
(oder die manuelle Implementierung) wird ein Stream
zu einem FusedStream
, sodass Sie .next()
oder .try_next()
darauf aufrufen und ein Future
erhalten können, das FusedFuture
implementiert.
use futures::{ stream::{Stream, StreamExt, FusedStream}, select, }; async fn add_two_streams() -> u8 { // mut s1: impl Stream<Item = u8> + FusedStream + Unpin, // mut s2: impl Stream<Item = u8> + FusedStream + Unpin, // Die Methode `.fuse()` ermöglicht es Stream, das FusedStream-Trait zu implementieren let s1 = futures::stream::once(async { 10 }).fuse(); let s2 = futures::stream::once(async { 20 }).fuse(); // Das pin_mut-Makro ermöglicht es Stream, das Unpin-Trait zu implementieren pin_mut!(s1, s2); let mut total = 0; loop { let item = select! { x = s1.next() => x, x = s2.next() => x, complete => break, default => panic!(), // Dieser Zweig wird niemals ausgeführt, da Futures zuerst priorisiert werden, dann complete }; if let Some(next_num) = item { total += next_num; } } println!("add_two_streams, total = {total}"); total } fn main() { executor::block_on(add_two_streams()); }
Hinweis: Das
select!
-Makro unterstützt auch die Zweigedefault
undcomplete
:
- complete branch: Wird nur ausgeführt, wenn alle
Futures
undStreams
abgeschlossen sind. Es wird oft mit einerloop
verwendet, um sicherzustellen, dass alle Aufgaben abgeschlossen sind.- default branch: Wenn sich keine der
Futures
oderStreams
in einemReady
-Zustand befindet, wird dieser Zweig sofort ausgeführt.
Empfohlene Dienstprogramme für die Verwendung mit select!
Bei der Verwendung des select!
-Makros sind zwei besonders nützliche Funktionen/Typen:
Fuse::terminated()
-Funktion: Wird verwendet, um ein leeresFuture
(implementiert bereitsFusedFuture
) in einerselect
-Schleife zu erstellen und später nach Bedarf zu füllen.FuturesUnordered
-Typ: Ermöglicht einemFuture
mehrere Kopien, die alle gleichzeitig ausgeführt werden können.
use futures::{ future::{Fuse, FusedFuture, FutureExt}, stream::{FusedStream, FuturesUnordered, Stream, StreamExt}, pin_mut, select, }; async fn future_in_select() { // Erstellen Sie ein leeres Future, das bereits FusedFuture implementiert let fut = Fuse::terminated(); // Erstellen Sie einen FuturesUnordered-Container, der mehrere gleichzeitige Futures enthalten kann let mut async_tasks: FuturesUnordered<Pin<Box<dyn Future<Output = i32>>>> = FuturesUnordered::new(); async_tasks.push(Box::pin(async { 1 })); pin_mut!(fut); let mut total = 0; loop { select! { // select_next_some: verarbeitet nur die Some(_)-Werte aus dem Stream und ignoriert None num = async_tasks.select_next_some() => { println!("first num is {num} and total is {total}"); total += num; println!("total is {total}"); if total >= 10 { break; } // Überprüfen Sie, ob fut beendet wurde if fut.is_terminated() { // Füllen Sie bei Bedarf ein neues Future aus fut.set(async { 1 }.fuse()); } }, num = fut => { println!("second num is {num} and total is {total}"); total += num; println!("now total is {total}"); async_tasks.push(Box::pin(async { 1 })); }, complete => break, default => panic!(), }; } println!("total finally is {total}"); } fn main() { executor::block_on(future_in_select()); }
Zusammenfassung
Das futures
Crate bietet viele praktische Werkzeuge für die gleichzeitige Ausführung von Futures
, darunter:
join!
-Makro: Führt mehrere verschiedeneFutures
gleichzeitig aus und wartet, bis alle abgeschlossen sind, bevor es beendet wird. Dies kann als ein Muss-alles-abschließen-Concurrency-Modell verstanden werden.try_join!
-Makro: Führt mehrere verschiedeneFutures
gleichzeitig aus, aber wenn eines von ihnen einen Fehler zurückgibt, stoppt es sofort die Ausführung allerFutures
. Dies ist nützlich, wennFutures
Result
zurückgeben und ein vorzeitiger Ausstieg erforderlich ist – ein Fail-Fast-Concurrency-Modell.select!
-Makro: Führt mehrere verschiedeneFutures
gleichzeitig aus und sobald eines von ihnen abgeschlossen ist, kann es sofort verarbeitet werden. Dies kann als Race-Concurrency-Modell betrachtet werden.- Anforderungen für die Verwendung von
select!
:FusedFuture
+Unpin
, die über die Methode.fuse()
und das Makropin_mut!
implementiert werden können.
Wir sind Leapcell, Ihre erste Wahl für das Hosting von Rust-Projekten.
Leapcell ist die Next-Gen Serverless Platform für Web Hosting, Async-Aufgaben und Redis:
Multi-Language Support
- Entwickeln Sie mit Node.js, Python, Go oder Rust.
Stellen Sie unbegrenzt Projekte kostenlos bereit
- Zahlen Sie nur für die Nutzung – keine Anfragen, keine Gebühren.
Unschlagbare Kosteneffizienz
- Pay-as-you-go ohne Leerlaufgebühren.
- Beispiel: 25 $ unterstützen 6,94 Mio. Anfragen bei einer durchschnittlichen Antwortzeit von 60 ms.
Optimierte Entwicklererfahrung
- Intuitive Benutzeroberfläche für mühelose Einrichtung.
- Vollständig automatisierte CI/CD-Pipelines und GitOps-Integration.
- Echtzeitmetriken und -protokollierung für umsetzbare Erkenntnisse.
Mühelose Skalierbarkeit und hohe Leistung
- Automatische Skalierung zur einfachen Bewältigung hoher Parallelität.
- Kein Betriebsaufwand – konzentrieren Sie sich einfach auf das Bauen.
Erfahren Sie mehr in der Dokumentation!
Folgen Sie uns auf X: @LeapcellHQ