Redis Verzögerte Warteschlangen Einfach Gemacht
Grace Collins
Solutions Engineer · Leapcell

Eine verzögerte Warteschlange ist im Wesentlichen eine Nachrichtenwarteschlange, die die Ausführung verzögert. In welchen Geschäftsszenarien ist sie nützlich?
Praktische Szenarien
- Wenn eine Auftragszahlung fehlschlägt, erinnere den Benutzer regelmäßig.
- Bei Benutzerkonkurrenz kannst du den Versand einer E-Mail an den Benutzer um 2 Minuten verzögern.
Verwenden von Redis zur Implementierung einer einfachen Nachrichtenwarteschlange
Wie wir wissen, müssen Verbraucher für professionelle Message-Queue-Middleware wie Kafka und RabbitMQ eine Reihe komplexer Schritte durchlaufen, bevor sie Nachrichten verarbeiten können.
In RabbitMQ musst du beispielsweise einen Exchange erstellen, bevor du Nachrichten sendest, dann eine Queue erstellen, die Queue und den Exchange mit einigen Routing-Regeln verknüpfen, einen Routing-Key beim Senden der Nachricht angeben und die Message-Header-Informationen steuern. Doch in den meisten Fällen müssen wir den obigen Prozess durchlaufen, selbst wenn unsere Nachrichtenwarteschlange nur einen einzelnen Verbraucher hat.
Mit Redis werden die Dinge für Nachrichtenwarteschlangen mit nur einer Verbrauchergruppe viel einfacher. Redis ist keine spezialisierte Nachrichtenwarteschlange und es fehlen erweiterte Funktionen - es gibt keine ACK-Garantie. Wenn du also strenge Zuverlässigkeitsanforderungen an Nachrichten hast, ist Redis möglicherweise nicht geeignet.
Grundlegende Implementierung einer asynchronen Nachrichtenwarteschlange
Die list
-Datenstruktur von Redis wird häufig für asynchrone Nachrichtenwarteschlangen verwendet. Du kannst rpush
oder lpush
verwenden, um Elemente in die Warteschlange einzureihen, und lpop
oder rpop
, um sie aus der Warteschlange zu entfernen.
> rpush queue Leapcell_1 Leapcell_2 Leapcell_3 (integer) 3 > lpop queue "Leapcell_1" > llen queue (integer) 2
Problem 1: Was ist, wenn die Warteschlange leer ist?
Der Client holt Nachrichten mit der Pop-Operation ab und verarbeitet sie. Nach der Verarbeitung holt er die nächste Nachricht ab und setzt die Verarbeitung fort. Dieser Zyklus wiederholt sich - das ist der Lebenszyklus eines Queue-Verbrauchers.
Wenn die Warteschlange jedoch leer ist, tritt der Client in eine Endlosschleife von Pop-Operationen ein - er führt immer wieder Pop-Operationen ohne Daten aus. Dies ist eine verschwenderische und ineffiziente Abfrage. Dies erhöht nicht nur die CPU-Auslastung des Clients, sondern auch das Redis-QPS. Wenn Dutzende von Clients auf diese Weise abfragen, kann Redis eine beträchtliche Anzahl langsamer Abfragen erleben.
In der Regel lösen wir dies mit einer Sleep-Operation - lasse den Thread 1 Sekunde lang schlafen. Dies reduziert die CPU-Auslastung auf der Client-Seite und senkt auch das Redis-QPS.
Problem 2: Warteschlangenlatenz
Das Schlafen hilft, das Problem zu lösen, aber wenn es nur einen Verbraucher gibt, beträgt diese Verzögerung 1 Sekunde. Bei mehreren Verbrauchern wird die Verzögerung etwas reduziert, da ihre Schlafzeiten gestaffelt sind.
Gibt es eine Möglichkeit, diese Latenz erheblich zu reduzieren?
Ja - durch die Verwendung von blpop
/brpop
.
Das Präfix b
steht für blocking, d.h. blockierende Lesevorgänge.
Wenn die Warteschlange keine Daten hat, bewirken blockierende Lesevorgänge, dass der Thread sofort schläft, und sobald Daten eintreffen, wacht er sofort auf. Dies reduziert die Nachrichtenlatenz auf nahezu Null. Indem wir lpop
/rpop
durch blpop
/brpop
ersetzen, können wir das obige Problem perfekt lösen.
Problem 3: Idle-Verbindungen werden automatisch getrennt
Es gibt ein weiteres Problem, das wir angehen müssen - Idle-Verbindungen.
Wenn ein Thread zu lange blockiert bleibt, wird die Redis-Client-Verbindung inaktiv. Die meisten Server schließen inaktive Verbindungen aktiv, um die Ressourcennutzung zu reduzieren. Wenn dies geschieht, werfen blpop
/brpop
eine Ausnahme.
Daher solltest du beim Schreiben der clientseitigen Consumer-Logik unbedingt Ausnahmen abfangen und eine Wiederholungslogik implementieren.
Umgang mit verteilten Lock-Konflikten
Was passiert, wenn der Client beim Verarbeiten einer Anfrage keinen verteilten Lock erhält?
In der Regel gibt es drei Strategien, um mit dem Fehlschlagen des Lock-Erwerbs umzugehen:
- Werfe direkt eine Ausnahme und benachrichtige den Benutzer, es später erneut zu versuchen.
- Schlafe eine Weile, bevor du es erneut versuchst.
- Verschiebe die Anfrage in eine verzögerte Warteschlange und versuche es später erneut.
Direkt eine bestimmte Art von Ausnahme werfen
Dieser Ansatz funktioniert gut für vom Benutzer initiierte Anfragen. Wenn der Benutzer ein Fehlerdialogfeld sieht, liest er normalerweise die Meldung und klickt auf "Wiederholen", was auf natürliche Weise eine Verzögerung erzeugt. Für eine bessere Benutzererfahrung kann der Frontend-Code diese Wiederholungsverzögerung übernehmen, anstatt sich auf den Benutzer zu verlassen. Im Wesentlichen gibt diese Strategie die aktuelle Anfrage auf und überlässt es dem Benutzer, zu entscheiden, ob er sie erneut initiieren soll.
Sleep
Die Verwendung von sleep
blockiert den aktuellen Nachrichtenverarbeitungs-Thread, was zu Verzögerungen bei der Verarbeitung nachfolgender Nachrichten in der Warteschlange führt. Wenn Kollisionen häufig auftreten oder die Warteschlange viele Nachrichten enthält, ist sleep
möglicherweise nicht die beste Wahl. Wenn das Fehlschlagen des Lock-Erwerbs durch einen Deadlock-Key verursacht wird, bleibt der Thread vollständig hängen, was die weitere Nachrichtenverarbeitung verhindert.
Verzögerte Warteschlange
Diese Strategie eignet sich besser für die asynchrone Nachrichtenverarbeitung. Du wirfst die konfliktträchtige Anfrage in eine andere Warteschlange, um sie später zu verarbeiten, wodurch sofortige Konflikte vermieden werden.
Implementieren einer verzögerten Warteschlange
Wir können die zset
-Datenstruktur von Redis verwenden und einen Zeitstempel als Score zuweisen, um Elemente zu sortieren. Verwende den Befehl zadd score1 value1 ...
, um kontinuierlich Nachrichten im Speicher zu erzeugen. Verwende dann zrangebyscore
, um alle Aufgaben abzufragen, die zur Verarbeitung bereit sind. Du kannst diese durchlaufen und einzeln ausführen. Du kannst auch zrangebyscore key min max withscores limit 0 1
verwenden, um nur die früheste Aufgabe zur Verarbeitung abzufragen.
private Jedis jedis; public void redisDelayQueueTest() { String key = "delay_queue"; // In real applications, it is recommended to use a business ID and a randomly generated unique ID as the value. // The unique ID ensures message uniqueness, and the business ID avoids carrying too much data in the value. String orderId1 = UUID.randomUUID().toString(); jedis.zadd(queueKey, System.currentTimeMillis() + 5000, orderId1); String orderId2 = UUID.randomUUID().toString(); jedis.zadd(queueKey, System.currentTimeMillis() + 5000, orderId2); new Thread() { @Override public void run() { while (true) { Set<String> resultList; // Get only the first item (non-destructive read) resultList = jedis.zrangebyscore(key, System.currentTimeMillis(), 0, 1); if (resultList.size() == 0) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); break; } } else { // Remove the fetched data if (jedis.zrem(key, resultList.iterator().next()) > 0) { String orderId = resultList.iterator().next(); log.info("orderId = {}", orderId); this.handleMsg(orderId); } } } } }.start(); } public void handleMsg(T msg) { System.out.println(msg); }
Die obige Implementierung funktioniert auch in einem Multithread-Szenario gut. Angenommen, du hast zwei Threads T1 und T2 und möglicherweise noch mehr. Die Logik läuft wie folgt ab, wobei sichergestellt wird, dass nur ein Thread eine Nachricht verarbeitet:
- T1, T2 und andere Threads rufen
zrangebyscore
auf und rufen Nachricht A ab. - T1 beginnt mit dem Löschen von Nachricht A. Da dies eine atomare Operation ist, warten T2 und andere Threads, bis T1
zrem
abgeschlossen hat, bevor sie fortfahren. - T1 löscht Nachricht A erfolgreich und verarbeitet sie.
- T2 und andere versuchen, Nachricht A zu löschen, scheitern jedoch, da sie bereits entfernt wurde - sie geben die Verarbeitung auf.
Stelle außerdem sicher, dass du Ausnahmebehandlung zu handleMsg
hinzufügst, damit eine einzelne fehlerhafte Aufgabe nicht dazu führt, dass die gesamte Verarbeitungsschleife abstürzt.
Weitere Optimierung
Im obigen Algorithmus kann dieselbe Aufgabe von mehreren Prozessen abgerufen werden, und nur einer wird sie mit zrem
erfolgreich löschen. Die anderen haben die Aufgabe vergeblich abgerufen - das ist Verschwendung. Um dies zu verbessern, kannst du Lua-Skripte verwenden, um die Logik zu optimieren, indem du zrangebyscore
und zrem
zu einer einzigen atomaren Operation auf der Serverseite kombinierst. Auf diese Weise führt die Konkurrenz mehrerer Prozesse um dieselbe Aufgabe nicht zu unnötigen Abrufen.
Lua-Skript zur weiteren Optimierung verwenden
Das Lua-Skript sucht nach abgelaufenen Nachrichten, entfernt sie und gibt die Nachricht zurück, wenn das Löschen erfolgreich war. Andernfalls wird eine leere Zeichenfolge zurückgegeben:
String luaScript = "local resultArray = redis.call('zrangebyscore', KEYS[1], 0, ARGV[1], 'limit', 0, 1)\n" + "if #resultArray > 0 then\n" + " if redis.call('zrem', KEYS[1], resultArray[1]) > 0 then\n" + " return resultArray[1]\n" + " else\n" + " return ''\n" + " end\n" + "else\n" + " return ''\n" + "end"; jedis.eval(luaScript, ScriptOutputType.VALUE, new String[]{key}, String.valueOf(System.currentTimeMillis()));
Vorteile von Redis-basierten verzögerten Warteschlangen
Redis bietet die folgenden Vorteile, wenn es zur Implementierung verzögerter Warteschlangen verwendet wird:
- Redis
zset
bietet eine leistungsstarke Score-basierte Sortierung. - Redis arbeitet im Speicher und ist daher extrem schnell.
- Redis unterstützt Clustering. Wenn es viele Nachrichten gibt, können Cluster die Nachrichtenverarbeitungsgeschwindigkeit und -verfügbarkeit verbessern.
- Redis unterstützt Persistenz. Im Falle eines Fehlers können Daten mit AOF oder RDB wiederhergestellt werden, was die Zuverlässigkeit gewährleistet.
Nachteile von Redis-basierten verzögerten Warteschlangen
Redis-basierte verzögerte Warteschlangen haben jedoch auch einige Einschränkungen:
- Nachrichtenpersistenz und -zuverlässigkeit sind immer noch ein Problem. Während Redis die Persistenz unterstützt, ist sie nicht so zuverlässig wie eine dedizierte MQ.
- Kein Wiederholungsmechanismus - Wenn während der Nachrichtenverarbeitung eine Ausnahme auftritt, bietet Redis keinen eingebauten Wiederholungsmechanismus. Du musst dies selbst implementieren, einschließlich der Verwaltung von Wiederholungszählern.
- Kein ACK-Mechanismus - Wenn beispielsweise ein Client eine Nachricht abruft und löscht, aber während der Verarbeitung abstürzt, geht die Nachricht verloren. Im Gegensatz dazu benötigen Message Queues (MQ) eine Bestätigung, um die erfolgreiche Verarbeitung zu bestätigen, bevor eine Nachricht entfernt wird.
Wenn Nachrichtenzuverlässigkeit entscheidend ist, wird empfohlen, stattdessen eine dedizierte MQ zu verwenden.
Implementieren verzögerter Warteschlangen mit Redisson
Die Redisson-basierte verteilte verzögerte Warteschlange (RDelayedQueue
) basiert auf der RQueue
-Schnittstelle und bietet die Funktionalität, das Hinzufügen von Elementen zur Warteschlange zu verzögern. Dies kann verwendet werden, um geometrisch zunehmende oder abnehmende Nachrichtenzustellungsstrategien zu implementieren.
RQueue<String> destinationQueue = ... RDelayedQueue<String> delayedQueue = getDelayedQueue(destinationQueue); // Send message to queue after 10 seconds delayedQueue.offer("msg1", 10, TimeUnit.SECONDS); // Send message to queue after 1 minute delayedQueue.offer("msg2", 1, TimeUnit.MINUTES);
Wenn das Objekt nicht mehr benötigt wird, sollte es aktiv zerstört werden. Nur wenn das zugehörige Redisson-Objekt heruntergefahren wird, ist es akzeptabel, es nicht manuell zu zerstören.
RDelayedQueue<String> delayedQueue = ... delayedQueue.destroy();
Ist das nicht praktisch?
Wir sind Leapcell, deine erste Wahl für das Hosting von Backend-Projekten.
Leapcell ist die Serverless-Plattform der nächsten Generation für Webhosting, asynchrone Aufgaben und Redis:
Multi-Language-Unterstützung
- Entwickle mit Node.js, Python, Go oder Rust.
Unbegrenzt Projekte kostenlos bereitstellen
- Zahle nur für die Nutzung - keine Anfragen, keine Gebühren.
Unschlagbare Kosteneffizienz
- Pay-as-you-go ohne Leerlaufgebühren.
- Beispiel: 25 $ unterstützt 6,94 Millionen Anfragen bei einer durchschnittlichen Antwortzeit von 60 ms.
Optimierte Entwicklererfahrung
- Intuitive Benutzeroberfläche für mühelose Einrichtung.
- Vollautomatische CI/CD-Pipelines und GitOps-Integration.
- Echtzeitmetriken und -protokollierung für verwertbare Erkenntnisse.
Mühelose Skalierbarkeit und hohe Leistung
- Automatische Skalierung zur einfachen Bewältigung hoher Parallelität.
- Null Betriebsaufwand - konzentriere dich einfach auf das Bauen.
Erfahre mehr in der Dokumentation!
Folge uns auf X: @LeapcellHQ