Async Programming in Rust: Stream Trait und sein Design
Ethan Miller
Product Engineer · Leapcell

Der Stream
-Trait ist ähnlich dem Future
-Trait. Während Future
die Zustandsänderung eines einzelnen Elements darstellt, kann Stream
, ähnlich dem Iterator
-Trait in der Standardbibliothek, mehrere Werte liefern, bevor er abgeschlossen ist. Oder einfach ausgedrückt: Ein Stream
besteht aus einer Reihe von Futures
, aus denen wir das Ergebnis jedes Future
lesen können, bis der Stream
abgeschlossen ist.
Definition von Stream
Der Future
ist das grundlegendste Konzept in der asynchronen Programmierung. Wenn ein Future
einen einmaligen asynchronen Wert darstellt, dann stellt ein Stream
eine Reihe von asynchronen Werten dar. Future
ist 1, während Stream
0, 1 oder N ist. Die Signatur von Stream
lautet wie folgt:
pub trait Stream { type Item; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>; }
Das Konzept von Stream
entspricht dem Iterator
in synchronen Primitiven. Erinnern Sie sich, wie ähnlich sogar ihre Signaturen sind!
pub trait Iterator { type Item; fn next(&mut self) -> Option<Self::Item>; }
Stream wird verwendet, um kontinuierliche Datenquellen zu abstrahieren, obwohl er auch enden kann (wenn poll
None
zurückgibt)
Ein gängiges Beispiel für einen Stream
ist der Consumer Receiver
im Message-Channel des futures
-Crates. Jedes Mal, wenn eine Nachricht von der Send
-Seite gesendet wird, erhält der Empfänger einen Some(val)
-Wert. Sobald die Send
-Seite geschlossen (gedroppt) wird und keine Nachrichten mehr im Kanal vorhanden sind, empfängt er ein None
.
use futures::channel::mpsc; use futures::{executor::block_on, SinkExt, StreamExt}; async fn send_recv() { const BUFFER_SIZE: usize = 10; let (mut tx, mut rx) = mpsc::channel::<i32>(BUFFER_SIZE); println!("tx: Send 1, 2"); tx.send(1).await.unwrap(); tx.send(2).await.unwrap(); drop(tx); // `StreamExt::next` ist ähnlich wie `Iterator::next`, aber anstatt einen Wert zurückzugeben, // gibt es ein `Future<Output = Option<T>>` zurück, daher müssen Sie `.await` verwenden, um den tatsächlichen Wert zu erhalten assert_eq!(Some(1), rx.next().await); assert_eq!(Some(2), rx.next().await); assert_eq!(None, rx.next().await); } fn main() { block_on(send_recv()); }
Unterschiede zwischen Iterator und Stream
Iterator
erlaubt wiederholtes Aufrufen dernext()
-Methode, um neue Werte zu erhalten, bis esNone
zurückgibt.Iterator
ist blockierend: Jeder Aufruf vonnext()
belegt die CPU, bis ein Ergebnis erzielt wird. Im Gegensatz dazu ist der asynchroneStream
nicht-blockierend und gibt die CPU frei, während er wartet.- Die
poll_next()
-Methode vonStream
ist derpoll()
-Methode vonFuture
recht ähnlich, und ihre Funktion ähnelt dernext()
-Methode vonIterator
. Das direkte Aufrufen vonpoll_next()
ist jedoch unpraktisch, da Sie denPoll
-Zustand manuell verwalten müssen, was nicht sehr ergonomisch ist. Aus diesem Grund bietet RustStreamExt
, ein Extension-Trait fürStream
, das einenext()
-Methode bietet, die ein von derNext
-Struktur implementiertesFuture
zurückgibt. Auf diese Weise können Sie direkt mitstream.next().await
über einen Wert iterieren.
Hinweis:
StreamExt
steht für Stream Extension. In Rust ist es üblich, die minimale Trait-Definition (wieStream
) in einer Datei zu belassen und zusätzliche APIs (wieStreamExt
) in einer separaten, verwandten Datei abzulegen.
Hinweis: Im Gegensatz zu
Future
befindet sich derStream
-Trait noch nicht in Rusts Core-Bibliothek (std::core
). Er befindet sich imfutures-util
-Crate, undStreamExtensions
ist ebenfalls nicht Teil der Standardbibliothek. Dies bedeutet, dass verschiedene Bibliotheken möglicherweise widersprüchliche Importe bereitstellen. Beispielsweise bietet Tokio ein eigenesStreamExt
an, das vonfutures-util
getrennt ist. Verwenden Sie nach Möglichkeitfutures-util
, da dies das am häufigsten verwendete Crate für Async/Await ist.
Implementierung der next()
-Methode von StreamExt
und der Next
-Struktur:
pub trait StreamExt: Stream { fn next(&mut self) -> Next<'_, Self> where Self: Unpin { assert_future::<Option<Self::Item>, _>(Next::new(self)) } } // `next` gibt die `Next`-Struktur zurück pub struct Next<'a, St: ?Sized> { stream: &'a mut St, } // Wenn Stream Unpin ist, dann ist Next auch Unpin impl<St: ?Sized + Unpin> Unpin for Next<'_, St> {} impl<'a, St: ?Sized + Stream + Unpin> Next<'a, St> { pub(super) fn new(stream: &'a mut St) -> Self { Self { stream } } } // Next implementiert Future, jeder poll() pollt im Wesentlichen vom Stream über poll_next() impl<St: ?Sized + Stream + Unpin> Future for Next<'_, St> { type Output = Option<St::Item>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { self.stream.poll_next_unpin(cx) } }
Streams erstellen
Die futures
-Bibliothek bietet mehrere komfortable Methoden zum Erstellen einfacher Stream
s, wie z. B.:
empty()
: erstellt einen leerenStream
once()
: erstellt einenStream
, der einen einzelnen Wert enthältpending()
: erstellt einenStream
, der niemals einen Wert liefert und immerPoll::Pending
zurückgibtrepeat()
: erstellt einenStream
, der wiederholt denselben Wert liefertrepeat_with()
: erstellt einenStream
, der Werte lazy über eine Closure liefertpoll_fn()
: erstellt einenStream
aus einer Closure, diePoll
zurückgibtunfold()
: erstellt einenStream
aus einem Anfangszustand und einer Closure, die einenFuture
zurückgibt
use futures::prelude::*; #[tokio::main] async fn main() { let mut st = stream::iter(1..10) .filter(|x| future::ready(x % 2 == 0)) .map(|x| x * x); // Iteriere über den Stream while let Some(x) = st.next().await { println!("Got item: {}", x); } }
Im obigen Code generiert stream::iter
einen Stream
, der dann durch filter
- und map
-Operationen geleitet wird. Schließlich wird der Stream iteriert und die resultierenden Daten werden ausgegeben.
Wenn Sie sich nicht um Async/Await kümmern und Ihnen nur das Stream-Verhalten wichtig ist, ist Stream::iter
zum Testen sehr praktisch. Eine weitere interessante Methode ist repeat_with
, mit der Sie eine Closure übergeben können, um Werte bei Bedarf lazy zu generieren, zum Beispiel:
use futures::stream::{self, StreamExt}; // Von der nullten bis zur dritten Potenz von zwei: async fn stream_repeat_with() { let mut curr = 1; let mut pow2 = futures::stream::repeat_with(|| { let tmp = curr; curr *= 2; tmp }); assert_eq!(Some(1), pow2.next().await); assert_eq!(Some(2), pow2.next().await); assert_eq!(Some(4), pow2.next().await); assert_eq!(Some(8), pow2.next().await); }
Einen Stream implementieren
Das Erstellen Ihres eigenen Stream
umfasst zwei Schritte:
- Zuerst definieren Sie eine
struct
, um den Zustand des Streams zu speichern - Dann implementieren Sie den
Stream
-Trait für diesestruct
Erstellen wir einen Stream namens Counter
, der von 1 bis 5 zählt:
#![feature(async_stream)] // Zuerst die Struktur: /// Ein Stream, der von eins bis fünf zählt struct Counter { count: usize, } // Wir möchten, dass der Zähler bei eins beginnt, also fügen wir als Helfer eine `new()`-Methode hinzu. // Dies ist nicht unbedingt erforderlich, aber es ist bequem. // Beachten Sie, dass wir `count` bei Null beginnen - der Grund wird in der Implementierung von `poll_next()` deutlich. impl Counter { fn new() -> Counter { Counter { count: 0 } } } // Dann implementieren wir `Stream` für `Counter`: impl Stream for Counter { // Wir verwenden `usize` zum Zählen type Item = usize; // `poll_next()` ist die einzige erforderliche Methode fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { // Erhöhe den Zähler. Deshalb haben wir bei Null angefangen. self.count += 1; // Überprüfe, ob wir mit dem Zählen fertig sind. if self.count < 6 { Poll::Ready(Some(self.count)) } else { Poll::Ready(None) } } }
Stream-Traits
Es gibt verschiedene Traits im Zusammenhang mit Streams in Rust, wie Stream
, TryStream
und FusedStream
.
-
Stream
istIterator
sehr ähnlich. Wenn es jedochNone
zurückgibt, bedeutet dies, dass der Stream erschöpft ist und nicht mehr abgefragt werden sollte. Das fortgesetzte Abfragen eines Streams, nachdem dieserNone
zurückgegeben hat, führt zu undefiniertem Verhalten und kann zu unvorhersehbaren Ergebnissen führen. -
TryStream
ist ein spezialisierter Trait für Streams, dieResult<Wert, Fehler>
-Elemente liefern.TryStream
bietet Funktionen, die das Anpassen und Transformieren der innerenResult
s erleichtern. Sie können es sich als eine API vorstellen, das für Streams entwickelt wurde dieResult
-Elemente erzeugen, was die Arbeit mit Fehlerbehandlungsfällen komfortabler macht. -
FusedStream
ähnelt einem regulären Stream, bietet jedoch zusätzlich die Möglichkeit für Benutzer zu erkennen, ob der Stream nach der Rückgabe vonNone
wirklich erschöpft ist oder ob er sicher erneut abgefragt werden kann. Wenn Sie beispielsweise einen Stream erstellen, der von einem Ringspeicher unterstützt wird, gibt der Stream möglicherweise bei der ersten IterationNone
zurück. MitFusedStream
wäre es jedoch sicher, später erneut abzufragen, um eine neue Runde der Iteration über den Puffer fortzusetzen.
Iteration und Concurrency
Genau wie der Iterator
-Trait unterstützt Stream
auch die Iteration. Sie können beispielsweise Methoden wie map
, filter
, fold
, for_each
, skip
sowie ihre fehlerbewussten Pendants verwenden: try_map
, try_filter
, try_fold
, try_for_each
usw.
Anders als Iterator
können for
-Schleifen jedoch nicht direkt zum Iterieren über einen Stream
verwendet werden. Stattdessen können imperative Schleifen wie while let
oder loop
verwendet werden, wobei next
oder try_next
explizit wiederholt aufgerufen werden. Sie können beispielsweise auf eine der folgenden Arten aus einem Stream lesen:
// Iterationsmuster 1 while let Some(value) = s.next().await {} // Iterationsmuster 2 loop { match s.next().await { Some(value) => {} None => break; } }
Ein Beispiel für das Berechnen der Summe der Werte in einem Stream:
use futures_util::{pin_mut, Stream, stream, StreamExt}; async fn sum(stream: impl Stream<Item=usize>) -> usize { // Vergessen Sie nicht, den Stream vor der Iteration zu pinnen pin_mut!(stream); let mut sum: usize = 0; // Iteriere über den Stream while let Some(item) = stream.next().await { sum = sum + item; } sum }
Wenn Sie jeweils einen Wert verarbeiten, entgehen Ihnen möglicherweise die Vorteile der Concurrency, was den Zweck der asynchronen Programmierung zunichte macht. Um mehrere Werte gleichzeitig aus einem Stream
zu verarbeiten, können Sie for_each_concurrent
und try_for_each_concurrent
verwenden:
use std::{pin::Pin, io}; use futures_util::{Stream, TryStreamExt}; async fn jump_around(stream: Pin<&mut dyn Stream<Item = Result<i32, io::Error>>>) -> Result<(), io::Error> { // Verwende `try_for_each_concurrent` stream.try_for_each_concurrent(100, |num| async move { jump_n_times(num).await?; report_n_jumps(num).await?; Ok(()) }).await?; Ok(()) } async fn jump_n_times(num: i32) -> Result<(), io::Error> { println!("jump_n_times :{}", num + 1); Ok(()) } async fn report_n_jumps(num: i32) -> Result<(), io::Error> { println!("report_n_jumps : {}", num); Ok(()) }
Zusammenfassung
Stream
ist ähnlich wie Future
, aber während Future
die Zustandsänderung eines einzelnen Elements darstellt, verhält sich Stream
eher wie ein Iterator
, der mehrere Werte vor dem Abschluss liefern kann. Oder einfach ausgedrückt: Ein Stream
besteht aus einer Reihe von Futures
, und wir können das Ergebnis jedes Future
aus dem Stream
abrufen, bis er abgeschlossen ist – was ihn zu einem asynchronen Iterator macht.
Die Funktion poll_next
eines Stream
kann einen von drei möglichen Werten zurückgeben:
Poll::Pending
: zeigt an, dass der nächste Wert noch nicht bereit ist und wir noch warten müssen.Poll::Ready(Some(val))
: zeigt an, dass ein Wert bereit ist und erfolgreich zurückgegeben wurde; Sie könnenpoll_next
erneut aufrufen, um den nächsten Wert abzurufen.Poll::Ready(None)
: zeigt an, dass der Stream beendet wurde undpoll_next
nicht mehr aufgerufen werden sollte.
Wir sind Leapcell, Ihre erste Wahl für das Hosten von Rust-Projekten.
Leapcell ist die Serverless-Plattform der nächsten Generation für Webhosting, asynchrone Aufgaben und Redis:
Multi-Language Support
- Entwickeln Sie mit Node.js, Python, Go oder Rust.
Stellen Sie unbegrenzt Projekte kostenlos bereit
- zahlen Sie nur für die Nutzung – keine Anfragen, keine Gebühren.
Unschlagbare Kosteneffizienz
- Pay-as-you-go ohne Leerlaufgebühren.
- Beispiel: 25 US-Dollar unterstützen 6,94 Millionen Anfragen bei einer durchschnittlichen Antwortzeit von 60 ms.
Optimierte Developer Experience
- Intuitive Benutzeroberfläche für mühelose Einrichtung.
- Vollständig automatisierte CI/CD-Pipelines und GitOps-Integration.
- Echtzeitmetriken und -protokollierung für umsetzbare Einblicke.
Mühelose Skalierbarkeit und hohe Leistung
- Automatische Skalierung zur einfachen Bewältigung hoher Concurrency.
- Kein operativer Overhead – konzentrieren Sie sich einfach auf den Aufbau.
Erfahren Sie mehr in der Dokumentation!
Folgen Sie uns auf X: @LeapcellHQ