Async Programming in Rust: Stream Trait und sein Design
Ethan Miller
Product Engineer · Leapcell

Der Stream-Trait ist ähnlich dem Future-Trait. Während Future die Zustandsänderung eines einzelnen Elements darstellt, kann Stream, ähnlich dem Iterator-Trait in der Standardbibliothek, mehrere Werte liefern, bevor er abgeschlossen ist. Oder einfach ausgedrückt: Ein Stream besteht aus einer Reihe von Futures, aus denen wir das Ergebnis jedes Future lesen können, bis der Stream abgeschlossen ist.
Definition von Stream
Der Future ist das grundlegendste Konzept in der asynchronen Programmierung. Wenn ein Future einen einmaligen asynchronen Wert darstellt, dann stellt ein Stream eine Reihe von asynchronen Werten dar. Future ist 1, während Stream 0, 1 oder N ist. Die Signatur von Stream lautet wie folgt:
pub trait Stream { type Item; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>; }
Das Konzept von Stream entspricht dem Iterator in synchronen Primitiven. Erinnern Sie sich, wie ähnlich sogar ihre Signaturen sind!
pub trait Iterator { type Item; fn next(&mut self) -> Option<Self::Item>; }
Stream wird verwendet, um kontinuierliche Datenquellen zu abstrahieren, obwohl er auch enden kann (wenn poll None zurückgibt)
Ein gängiges Beispiel für einen Stream ist der Consumer Receiver im Message-Channel des futures-Crates. Jedes Mal, wenn eine Nachricht von der Send-Seite gesendet wird, erhält der Empfänger einen Some(val)-Wert. Sobald die Send-Seite geschlossen (gedroppt) wird und keine Nachrichten mehr im Kanal vorhanden sind, empfängt er ein None.
use futures::channel::mpsc; use futures::{executor::block_on, SinkExt, StreamExt}; async fn send_recv() { const BUFFER_SIZE: usize = 10; let (mut tx, mut rx) = mpsc::channel::<i32>(BUFFER_SIZE); println!("tx: Send 1, 2"); tx.send(1).await.unwrap(); tx.send(2).await.unwrap(); drop(tx); // `StreamExt::next` ist ähnlich wie `Iterator::next`, aber anstatt einen Wert zurückzugeben, // gibt es ein `Future<Output = Option<T>>` zurück, daher müssen Sie `.await` verwenden, um den tatsächlichen Wert zu erhalten assert_eq!(Some(1), rx.next().await); assert_eq!(Some(2), rx.next().await); assert_eq!(None, rx.next().await); } fn main() { block_on(send_recv()); }
Unterschiede zwischen Iterator und Stream
Iteratorerlaubt wiederholtes Aufrufen dernext()-Methode, um neue Werte zu erhalten, bis esNonezurückgibt.Iteratorist blockierend: Jeder Aufruf vonnext()belegt die CPU, bis ein Ergebnis erzielt wird. Im Gegensatz dazu ist der asynchroneStreamnicht-blockierend und gibt die CPU frei, während er wartet.- Die
poll_next()-Methode vonStreamist derpoll()-Methode vonFuturerecht ähnlich, und ihre Funktion ähnelt dernext()-Methode vonIterator. Das direkte Aufrufen vonpoll_next()ist jedoch unpraktisch, da Sie denPoll-Zustand manuell verwalten müssen, was nicht sehr ergonomisch ist. Aus diesem Grund bietet RustStreamExt, ein Extension-Trait fürStream, das einenext()-Methode bietet, die ein von derNext-Struktur implementiertesFuturezurückgibt. Auf diese Weise können Sie direkt mitstream.next().awaitüber einen Wert iterieren.
Hinweis:
StreamExtsteht für Stream Extension. In Rust ist es üblich, die minimale Trait-Definition (wieStream) in einer Datei zu belassen und zusätzliche APIs (wieStreamExt) in einer separaten, verwandten Datei abzulegen.
Hinweis: Im Gegensatz zu
Futurebefindet sich derStream-Trait noch nicht in Rusts Core-Bibliothek (std::core). Er befindet sich imfutures-util-Crate, undStreamExtensionsist ebenfalls nicht Teil der Standardbibliothek. Dies bedeutet, dass verschiedene Bibliotheken möglicherweise widersprüchliche Importe bereitstellen. Beispielsweise bietet Tokio ein eigenesStreamExtan, das vonfutures-utilgetrennt ist. Verwenden Sie nach Möglichkeitfutures-util, da dies das am häufigsten verwendete Crate für Async/Await ist.
Implementierung der next()-Methode von StreamExt und der Next-Struktur:
pub trait StreamExt: Stream { fn next(&mut self) -> Next<'_, Self> where Self: Unpin { assert_future::<Option<Self::Item>, _>(Next::new(self)) } } // `next` gibt die `Next`-Struktur zurück pub struct Next<'a, St: ?Sized> { stream: &'a mut St, } // Wenn Stream Unpin ist, dann ist Next auch Unpin impl<St: ?Sized + Unpin> Unpin for Next<'_, St> {} impl<'a, St: ?Sized + Stream + Unpin> Next<'a, St> { pub(super) fn new(stream: &'a mut St) -> Self { Self { stream } } } // Next implementiert Future, jeder poll() pollt im Wesentlichen vom Stream über poll_next() impl<St: ?Sized + Stream + Unpin> Future for Next<'_, St> { type Output = Option<St::Item>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { self.stream.poll_next_unpin(cx) } }
Streams erstellen
Die futures-Bibliothek bietet mehrere komfortable Methoden zum Erstellen einfacher Streams, wie z. B.:
empty(): erstellt einen leerenStreamonce(): erstellt einenStream, der einen einzelnen Wert enthältpending(): erstellt einenStream, der niemals einen Wert liefert und immerPoll::Pendingzurückgibtrepeat(): erstellt einenStream, der wiederholt denselben Wert liefertrepeat_with(): erstellt einenStream, der Werte lazy über eine Closure liefertpoll_fn(): erstellt einenStreamaus einer Closure, diePollzurückgibtunfold(): erstellt einenStreamaus einem Anfangszustand und einer Closure, die einenFuturezurückgibt
use futures::prelude::*; #[tokio::main] async fn main() { let mut st = stream::iter(1..10) .filter(|x| future::ready(x % 2 == 0)) .map(|x| x * x); // Iteriere über den Stream while let Some(x) = st.next().await { println!("Got item: {}", x); } }
Im obigen Code generiert stream::iter einen Stream, der dann durch filter- und map-Operationen geleitet wird. Schließlich wird der Stream iteriert und die resultierenden Daten werden ausgegeben.
Wenn Sie sich nicht um Async/Await kümmern und Ihnen nur das Stream-Verhalten wichtig ist, ist Stream::iter zum Testen sehr praktisch. Eine weitere interessante Methode ist repeat_with, mit der Sie eine Closure übergeben können, um Werte bei Bedarf lazy zu generieren, zum Beispiel:
use futures::stream::{self, StreamExt}; // Von der nullten bis zur dritten Potenz von zwei: async fn stream_repeat_with() { let mut curr = 1; let mut pow2 = futures::stream::repeat_with(|| { let tmp = curr; curr *= 2; tmp }); assert_eq!(Some(1), pow2.next().await); assert_eq!(Some(2), pow2.next().await); assert_eq!(Some(4), pow2.next().await); assert_eq!(Some(8), pow2.next().await); }
Einen Stream implementieren
Das Erstellen Ihres eigenen Stream umfasst zwei Schritte:
- Zuerst definieren Sie eine
struct, um den Zustand des Streams zu speichern - Dann implementieren Sie den
Stream-Trait für diesestruct
Erstellen wir einen Stream namens Counter, der von 1 bis 5 zählt:
#![feature(async_stream)] // Zuerst die Struktur: /// Ein Stream, der von eins bis fünf zählt struct Counter { count: usize, } // Wir möchten, dass der Zähler bei eins beginnt, also fügen wir als Helfer eine `new()`-Methode hinzu. // Dies ist nicht unbedingt erforderlich, aber es ist bequem. // Beachten Sie, dass wir `count` bei Null beginnen - der Grund wird in der Implementierung von `poll_next()` deutlich. impl Counter { fn new() -> Counter { Counter { count: 0 } } } // Dann implementieren wir `Stream` für `Counter`: impl Stream for Counter { // Wir verwenden `usize` zum Zählen type Item = usize; // `poll_next()` ist die einzige erforderliche Methode fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { // Erhöhe den Zähler. Deshalb haben wir bei Null angefangen. self.count += 1; // Überprüfe, ob wir mit dem Zählen fertig sind. if self.count < 6 { Poll::Ready(Some(self.count)) } else { Poll::Ready(None) } } }
Stream-Traits
Es gibt verschiedene Traits im Zusammenhang mit Streams in Rust, wie Stream, TryStream und FusedStream.
-
StreamistIteratorsehr ähnlich. Wenn es jedochNonezurückgibt, bedeutet dies, dass der Stream erschöpft ist und nicht mehr abgefragt werden sollte. Das fortgesetzte Abfragen eines Streams, nachdem dieserNonezurückgegeben hat, führt zu undefiniertem Verhalten und kann zu unvorhersehbaren Ergebnissen führen. -
TryStreamist ein spezialisierter Trait für Streams, dieResult<Wert, Fehler>-Elemente liefern.TryStreambietet Funktionen, die das Anpassen und Transformieren der innerenResults erleichtern. Sie können es sich als eine API vorstellen, das für Streams entwickelt wurde dieResult-Elemente erzeugen, was die Arbeit mit Fehlerbehandlungsfällen komfortabler macht. -
FusedStreamähnelt einem regulären Stream, bietet jedoch zusätzlich die Möglichkeit für Benutzer zu erkennen, ob der Stream nach der Rückgabe vonNonewirklich erschöpft ist oder ob er sicher erneut abgefragt werden kann. Wenn Sie beispielsweise einen Stream erstellen, der von einem Ringspeicher unterstützt wird, gibt der Stream möglicherweise bei der ersten IterationNonezurück. MitFusedStreamwäre es jedoch sicher, später erneut abzufragen, um eine neue Runde der Iteration über den Puffer fortzusetzen.
Iteration und Concurrency
Genau wie der Iterator-Trait unterstützt Stream auch die Iteration. Sie können beispielsweise Methoden wie map, filter, fold, for_each, skip sowie ihre fehlerbewussten Pendants verwenden: try_map, try_filter, try_fold, try_for_each usw.
Anders als Iterator können for-Schleifen jedoch nicht direkt zum Iterieren über einen Stream verwendet werden. Stattdessen können imperative Schleifen wie while let oder loop verwendet werden, wobei next oder try_next explizit wiederholt aufgerufen werden. Sie können beispielsweise auf eine der folgenden Arten aus einem Stream lesen:
// Iterationsmuster 1 while let Some(value) = s.next().await {} // Iterationsmuster 2 loop { match s.next().await { Some(value) => {} None => break; } }
Ein Beispiel für das Berechnen der Summe der Werte in einem Stream:
use futures_util::{pin_mut, Stream, stream, StreamExt}; async fn sum(stream: impl Stream<Item=usize>) -> usize { // Vergessen Sie nicht, den Stream vor der Iteration zu pinnen pin_mut!(stream); let mut sum: usize = 0; // Iteriere über den Stream while let Some(item) = stream.next().await { sum = sum + item; } sum }
Wenn Sie jeweils einen Wert verarbeiten, entgehen Ihnen möglicherweise die Vorteile der Concurrency, was den Zweck der asynchronen Programmierung zunichte macht. Um mehrere Werte gleichzeitig aus einem Stream zu verarbeiten, können Sie for_each_concurrent und try_for_each_concurrent verwenden:
use std::{pin::Pin, io}; use futures_util::{Stream, TryStreamExt}; async fn jump_around(stream: Pin<&mut dyn Stream<Item = Result<i32, io::Error>>>) -> Result<(), io::Error> { // Verwende `try_for_each_concurrent` stream.try_for_each_concurrent(100, |num| async move { jump_n_times(num).await?; report_n_jumps(num).await?; Ok(()) }).await?; Ok(()) } async fn jump_n_times(num: i32) -> Result<(), io::Error> { println!("jump_n_times :{}", num + 1); Ok(()) } async fn report_n_jumps(num: i32) -> Result<(), io::Error> { println!("report_n_jumps : {}", num); Ok(()) }
Zusammenfassung
Stream ist ähnlich wie Future, aber während Future die Zustandsänderung eines einzelnen Elements darstellt, verhält sich Stream eher wie ein Iterator, der mehrere Werte vor dem Abschluss liefern kann. Oder einfach ausgedrückt: Ein Stream besteht aus einer Reihe von Futures, und wir können das Ergebnis jedes Future aus dem Stream abrufen, bis er abgeschlossen ist – was ihn zu einem asynchronen Iterator macht.
Die Funktion poll_next eines Stream kann einen von drei möglichen Werten zurückgeben:
Poll::Pending: zeigt an, dass der nächste Wert noch nicht bereit ist und wir noch warten müssen.Poll::Ready(Some(val)): zeigt an, dass ein Wert bereit ist und erfolgreich zurückgegeben wurde; Sie könnenpoll_nexterneut aufrufen, um den nächsten Wert abzurufen.Poll::Ready(None): zeigt an, dass der Stream beendet wurde undpoll_nextnicht mehr aufgerufen werden sollte.
Wir sind Leapcell, Ihre erste Wahl für das Hosten von Rust-Projekten.
Leapcell ist die Serverless-Plattform der nächsten Generation für Webhosting, asynchrone Aufgaben und Redis:
Multi-Language Support
- Entwickeln Sie mit Node.js, Python, Go oder Rust.
Stellen Sie unbegrenzt Projekte kostenlos bereit
- zahlen Sie nur für die Nutzung – keine Anfragen, keine Gebühren.
Unschlagbare Kosteneffizienz
- Pay-as-you-go ohne Leerlaufgebühren.
- Beispiel: 25 US-Dollar unterstützen 6,94 Millionen Anfragen bei einer durchschnittlichen Antwortzeit von 60 ms.
Optimierte Developer Experience
- Intuitive Benutzeroberfläche für mühelose Einrichtung.
- Vollständig automatisierte CI/CD-Pipelines und GitOps-Integration.
- Echtzeitmetriken und -protokollierung für umsetzbare Einblicke.
Mühelose Skalierbarkeit und hohe Leistung
- Automatische Skalierung zur einfachen Bewältigung hoher Concurrency.
- Kein operativer Overhead – konzentrieren Sie sich einfach auf den Aufbau.
Erfahren Sie mehr in der Dokumentation!
Folgen Sie uns auf X: @LeapcellHQ



