Wie man einen Event Bus in Go baut
Grace Collins
Solutions Engineer · Leapcell

Vorwort
In der heutigen Landschaft, in der Microservices und verteilte Systeme weit verbreitet sind, spielt die Event-Driven Architecture (EDA) eine entscheidende Rolle. Dieses Architekturdesign ermöglicht es Diensten, über Ereignisse zu kommunizieren, entweder synchron oder asynchron, anstelle von traditionellen direkten Schnittstellenaufrufen. Der ereignisbasierte Interaktionsmodus fördert eine lose Kopplung zwischen Diensten und verbessert die Skalierbarkeit des Systems.
Das Publish-Subscribe-Muster ist eine Möglichkeit, ereignisgesteuerte Architekturen zu implementieren. Es ermöglicht verschiedenen Komponenten oder Diensten im System, Ereignisse zu veröffentlichen, während andere Komponenten oder Dienste diese Ereignisse abonnieren und basierend auf dem Ereignisinhalt reagieren können. Die meisten Entwickler sind wahrscheinlich mit diesem Muster vertraut; gängige technische Implementierungen umfassen Message Queues (MQ) und die Publish/Subscribe (PUB/SUB)-Funktionen von Redis.
In Go können wir die leistungsstarken Kanäle und Nebenläufigkeitsmechanismen nutzen, um das Publish-Subscribe-Muster zu implementieren. Dieser Artikel befasst sich eingehend mit der Implementierung eines einfachen Event Bus in Go, der eine konkrete Realisierung des Publish-Subscribe-Musters darstellt.
Event Bus
Der Event Bus ist eine konkrete Implementierung des Publish-Subscribe-Musters. Als Middleware zwischen Publishern und Subscribern verwaltet er die Ereignisübermittlung und -verteilung und stellt sicher, dass Ereignisse reibungslos von Publishern zu Subscribern übertragen werden.
Die Hauptvorteile des Event Bus sind:
- Entkopplung: Dienste müssen nicht direkt miteinander kommunizieren, sondern interagieren stattdessen über Ereignisse, wodurch Abhängigkeiten zwischen Diensten reduziert werden.
- Asynchrone Verarbeitung: Ereignisse können asynchron verarbeitet werden, was die Reaktionsfähigkeit und Leistung des Systems verbessert.
- Skalierbarkeit: Neue Abonnenten können Ereignisse problemlos abonnieren, ohne den vorhandenen Publisher-Code zu ändern.
- Fehlerisolation: Fehler bei der Ereignisbehandlung beeinträchtigen nicht direkt den normalen Betrieb anderer Dienste.
Code-Implementierung des Event Bus
Als Nächstes stellen wir vor, wie man einen einfachen Event Bus in Go implementiert, der die folgenden Schlüsselfunktionen umfasst:
- Publish: Ermöglicht es verschiedenen Diensten im System, Ereignisse zu senden.
- Subscribe: Ermöglicht es interessierten Diensten, bestimmte Arten von Ereignissen zu abonnieren und zu empfangen.
- Unsubscribe: Ermöglicht es Diensten, Ereignisse zu entfernen, die sie zuvor abonniert haben.
Event-Datenstrukturdefinition
type Event struct { Payload any }
Event
ist eine Struktur, die ein Ereignis kapselt, wobei Payload
die Kontextinformationen des Ereignisses darstellt und sein Typ any
ist.
Event-Bus-Definition
type ( EventChan chan Event ) type EventBus struct { mu sync.RWMutex subscribers map[string][]EventChan } func NewEventBus() *EventBus { return &EventBus{ subscribers: make(map[string][]EventChan), } }
EventChan
ist ein Typalias, der als Kanal zum Übertragen von Event
-Strukturen definiert ist: chan Event
.
EventBus
ist die Definition des Event Bus. Er enthält zwei Eigenschaften:
- mu: Ein Lese-Schreib-Mutex (
sync.RWMutex
), das verwendet wird, um die gleichzeitige Lese- und Schreibsicherheit für die unten stehendensubscribers
zu gewährleisten. - subscribers: Eine Map, bei der der Schlüssel ein String ist, der das Abonnementthema darstellt, und der Wert ein Slice von
EventChan
ist. Diese Eigenschaft wird verwendet, um alle Abonnenten für jedes Thema zu speichern. Jeder Abonnent empfängt Ereignisse über seinen eigenenEventChan
.
Die Funktion NewEventBus
wird verwendet, um eine neue EventBus
-Instanz zu erstellen.
Event-Bus-Methodenimplementierung
Der Event Bus implementiert drei Methoden: Veröffentlichen von Ereignissen (Publish
), Abonnieren von Ereignissen (Subscribe
) und Abbestellen von Ereignissen (Unsubscribe
).
Publish
func (eb *EventBus) Publish(topic string, event Event) { eb.mu.RLock() defer eb.mu.RUnlock() // Copy a new subscriber list to avoid modifying the list while publishing subscribers := append([]EventChan{}, eb.subscribers[topic]...) go func() { for _, subscriber := range subscribers { subscriber <- event } }() }
Die Methode Publish
wird verwendet, um Ereignisse zu veröffentlichen. Diese Methode empfängt zwei Parameter: topic
(das Thema) und event
(das gekapselte Ereignisobjekt).
In der Implementierung von Publish
wird zuerst über die Eigenschaft mu
eine Lesesperre abgerufen, um sicherzustellen, dass die folgenden Operationen auf subscribers
in nebenläufigen Routinen sicher sind. Dann wird eine Kopie der aktuellen Abonnentenliste für das Thema erstellt. Es wird eine neue Goroutine gestartet, die die kopierte Abonnentenliste durchläuft und das Ereignis über ihren Kanal an jeden Abonnenten sendet. Nachdem diese Operationen abgeschlossen sind, wird die Lesesperre freigegeben.
Warum eine Kopie der Abonnentenliste erstellen?
Antwort: Das Kopieren der Abonnentenliste gewährleistet Datenkonsistenz und -stabilität beim Senden von Ereignissen. Da das Senden von Daten an die Kanäle in einer neuen Goroutine erfolgt, wurde die Lesesperre bereits freigegeben, wenn die Daten gesendet werden, und die ursprüngliche Abonnentenliste hat sich möglicherweise durch das Hinzufügen oder Entfernen von Abonnenten geändert. Wenn Sie die ursprüngliche Abonnentenliste direkt verwenden, können unerwartete Fehler auftreten (z. B. kann das Senden von Daten an einen geschlossenen Kanal eine Panik auslösen).
Subscribe
func (eb *EventBus) Subscribe(topic string) EventChan { eb.mu.Lock() defer eb.mu.Unlock() ch := make(EventChan) eb.subscribers[topic] = append(eb.subscribers[topic], ch) return ch }
Die Methode Subscribe
wird verwendet, um Ereignisse für ein bestimmtes Thema zu abonnieren. Sie akzeptiert einen topic
-Parameter, der das zu abonnierende Thema angibt. Über diese Methode erhalten Sie einen EventChan
-Kanal, um Ereignisse für das Thema zu empfangen.
In der Implementierung von Subscribe
wird zuerst über die Eigenschaft mu
eine Schreibsperre abgerufen, um sicherzustellen, dass die bevorstehenden Lese- und Schreiboperationen auf subscribers
in nebenläufigen Routinen sicher sind. Dann wird ein neuer EventChan
-Kanal ch
erstellt und an den entsprechenden Abonnenten-Slice des Themas angehängt. Nachdem diese Operationen abgeschlossen sind, wird die Schreibsperre freigegeben.
Unsubscribe
func (eb *EventBus) Unsubscribe(topic string, ch EventChan) { eb.mu.Lock() defer eb.mu.Unlock() if subscribers, ok := eb.subscribers[topic]; ok { for i, subscriber := range subscribers { if ch == subscriber { eb.subscribers[topic] = append(subscribers[:i], subscribers[i+1:]...) close(ch) // Drain the channel for range ch { } return } } } }
Die Methode Unsubscribe
wird verwendet, um Ereignisse abzubestellen. Sie empfängt zwei Parameter: topic
(das abonnierte Thema) und ch
(der ausgegebene Kanal).
Innerhalb der Methode Unsubscribe
wird zuerst über die Eigenschaft mu
eine Schreibsperre abgerufen, um die gleichzeitige Lese- und Schreibsicherheit für die bevorstehenden Operationen auf subscribers
zu gewährleisten. Dann wird geprüft, ob das Thema entsprechende Abonnenten hat. Wenn dies der Fall ist, durchläuft es den Subscriber-Slice für dieses Thema, sucht den Kanal, der mit ch
übereinstimmt, entfernt ihn aus dem Subscriber-Slice und schließt den Kanal. Dann wird der Kanal geleert. Nach diesen Operationen wird die Schreibsperre freigegeben.
Anwendungsbeispiel
package main import ( "fmt" "time" "demo-eventbus" ) func main() { eventBus := eventbus.NewEventBus() // Subscribe to the "post" topic event subscribe := eventBus.Subscribe("post") go func() { for event := range subscribe { fmt.Println(event.Payload) } }() eventBus.Publish("post", eventbus.Event{Payload: map[string]any{ "postId": 1, "title": "Welcome to Leapcell", "author": "Leapcell", }}) // Topic with no subscribers eventBus.Publish("pay", eventbus.Event{Payload: "pay"}) time.Sleep(time.Second * 2) // Unsubscribe from the "post" topic event eventBus.Unsubscribe("post", subscribe) }
Vorschläge für Erweiterungen
Der in diesem Artikel implementierte Event Bus ist relativ einfach. Wenn Sie die Flexibilität, Zuverlässigkeit und Benutzerfreundlichkeit des Event Bus verbessern möchten, können Sie ihn auf folgende Weise erweitern:
- Ereignispersistenz: Implementieren Sie eine persistente Speicherung für Ereignisse, um sicherzustellen, dass nicht verarbeitete Ereignisse nach einem Systemabsturz wiederhergestellt werden können.
- Platzhalter- und Musterübereinstimmungsabonnements: Ermöglichen Sie die Verwendung von Platzhaltern oder regulären Ausdrücken, um eine Gruppe verwandter Themen anstelle eines einzelnen bestimmten Themas zu abonnieren.
- Lastverteilungs- und Nachrichtenzustellungsstrategien: Verteilen Sie Ereignisse auf mehrere Abonnenten, um einen Lastenausgleich zu erreichen.
- Plugin-Unterstützung: Ermöglichen Sie Funktionserweiterungen durch Plugins, wie z. B. Protokollierung, Nachrichtenfilterung, Transformation usw.
Fazit
Dieser Artikel untersucht eingehend den Prozess der Implementierung eines einfachen Event Bus in Go. Durch die Nutzung der leistungsstarken Funktionen von Go wie Kanäle und Nebenläufigkeitsmechanismen können wir das Publish-Subscribe-Muster einfach implementieren.
Der Artikel beginnt mit der Vorstellung der Vorteile des Event Bus, einschließlich Entkopplung, asynchroner Verarbeitung, Skalierbarkeit und Fehlerisolation. Anschließend wird detailliert erläutert, wie die Ereignisdatenstruktur und die Event-Bus-Struktur definiert werden und wie die Methoden zum Veröffentlichen, Abonnieren und Abbestellen von Ereignissen implementiert werden. Abschließend werden mehrere potenzielle Richtungen für die Erweiterung vorgeschlagen, wie z. B. Ereignispersistenz, Platzhalterabonnements, Lastverteilung und Plugin-Unterstützung, um die Flexibilität und Funktionalität des Event Bus zu verbessern.
Durch das Lesen dieses Artikels können Sie lernen, wie man einen einfachen, aber leistungsstarken Event Bus in Go implementiert und ihn je nach Bedarf erweitern kann.
Wir sind Leapcell, Ihre erste Wahl für das Hosting von Go-Projekten.
Leapcell ist die Serverless-Plattform der nächsten Generation für Webhosting, asynchrone Aufgaben und Redis:
Multi-Language-Unterstützung
- Entwickeln Sie mit Node.js, Python, Go oder Rust.
Stellen Sie unbegrenzt Projekte kostenlos bereit
- zahlen Sie nur für die Nutzung - keine Anfragen, keine Gebühren.
Unschlagbare Kosteneffizienz
- Pay-as-you-go ohne Leerlaufgebühren.
- Beispiel: 25 $ unterstützen 6,94 Mio. Anfragen bei einer durchschnittlichen Reaktionszeit von 60 ms.
Optimierte Entwicklererfahrung
- Intuitive Benutzeroberfläche für mühelose Einrichtung.
- Vollständig automatisierte CI/CD-Pipelines und GitOps-Integration.
- Echtzeit-Metriken und -Protokollierung für umsetzbare Erkenntnisse.
Mühelose Skalierbarkeit und hohe Leistung
- Automatische Skalierung zur einfachen Bewältigung hoher Parallelität.
- Null Betriebsaufwand - konzentrieren Sie sich einfach auf den Aufbau.
Erfahren Sie mehr in der Dokumentation!
Folgen Sie uns auf X: @LeapcellHQ