Event Sourcing mit einer einzigen Datenbanktabelle: Ein vereinfachter Ansatz
Ethan Miller
Product Engineer · Leapcell

Einleitung
In der sich entwickelnden Landschaft moderner Softwarearchitektur ist die Notwendigkeit robuster, auditierbarer und skalierbarer Systeme von größter Bedeutung. Event Sourcing, ein leistungsstarkes Muster, das alle Zustandsänderungen einer Anwendung als eine Sequenz unveränderlicher Ereignisse aufzeichnet, hat erhebliche Verbreitung erfahren. Es bietet unübertroffene Vorteile für Auditing, Debugging und den Aufbau von Systemen mit komplexen Geschäftsregeln. Traditionell beinhaltet die Implementierung von Event Sourcing oft hochentwickelte Nachrichtensysteme wie Kafka, die als Ereignisprotokoll dienen. Obwohl hochwirksam, können die Integration und Verwaltung von Kafka einen erheblichen betrieblichen Overhead und Komplexität einführen, insbesondere für kleinere Projekte oder Teams ohne dedizierte DevOps-Ressourcen. Dieser Artikel befasst sich mit einem alternativen, vereinfachten Ansatz: der Implementierung von Event Sourcing, indem nichts weiter als eine einzige relationale Datenbanktabelle als unser "Ereignisprotokoll" verwendet wird. Diese Methode zielt darauf ab, Event Sourcing zu entmystifizieren und zugänglich zu machen, und beweist, dass seine Kernvorteile ohne den Overhead externer Nachrichtenbroker genutzt werden können – ein Ansatz, der besonders dann attraktiv ist, wenn operative Einfachheit Priorität hat.
Die Kernkonzepte verstehen
Bevor wir uns mit der Implementierung befassen, wollen wir einige grundlegende Konzepte klären, die für das Verständnis von Event Sourcing entscheidend sind.
- Event Sourcing: Dieses Architekturmuster schreibt vor, dass wir anstatt den aktuellen Zustand einer Entität zu speichern, jede Änderung dieses Zustands als unveränderliches Ereignis speichern. Der aktuelle Zustand wird dann durch das Wiedergeben dieser Ereignisse in chronologischer Reihenfolge abgeleitet.
 - Ereignis (Event): Eine Aufzeichnung von etwas, das in der Vergangenheit geschehen ist. Ereignisse sind unveränderliche Fakten. Sie werden im Vergangenheitsform benannt (z. B. 
OrderPlaced,InventoryAdjusted,UserRegistered). Jedes Ereignis enthält typischerweise Metadaten wietimestamp,event_type,aggregate_idundversionzusammen mit derpayload, die die für diese spezifische Änderung relevanten Daten darstellen. - Aggregat (Aggregate): Eine Gruppe von Domänenobjekten, die als eine einzige Einheit für Datenänderungen behandelt werden können. Es ist die Konsistenzgrenze im Domain-Driven Design. Ereignisse sind immer mit einer bestimmten Aggregatinstanz (identifiziert durch 
aggregate_id) verbunden. - Zustandsrekonstruktion (State Reconstruction): Der Prozess des Wiedergebens der historischen Ereignisse eines Aggregats, um seinencurrent state zu rekonstruieren.
 - Snapshot: Ein vorab berechneter Zustand eines Aggregats zu einem bestimmten Zeitpunkt. Snapshots werden verwendet, um die Zustandsrekonstruktion zu optimieren, indem das Wiedergeben aller historischen Ereignisse von Anfang an vermieden wird, insbesondere für Aggregate mit einer langen Ereignisgeschichte.
 
Implementierung von Event Sourcing mit einer einzigen Datenbanktabelle
Die Kernidee ist einfach: Alle Ereignisse, unabhängig von ihrem Typ oder dem Aggregat, zu dem sie gehören, werden in einer einzigen, nur anzuhängenden Datenbanktabelle gespeichert. Diese Tabelle fungiert als unser zentrales, maßgebliches Ereignisprotokoll.
Schemadesign für die Event Log-Tabelle
Betrachten wir eine events-Tabelle, die alle unsere Domänenereignisse speichern wird.
CREATE TABLE events ( id SERIAL PRIMARY KEY, aggregate_id UUID NOT NULL, aggregate_type VARCHAR(255) NOT NULL, version INT NOT NULL, event_type VARCHAR(255) NOT NULL, payload JSONB NOT NULL, timestamp TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, metadata JSONB, UNIQUE (aggregate_id, version) -- Stellt die Ereignisreihenfolge sicher und verhindert doppelte Ereignisse für ein Aggregat ); CREATE INDEX idx_events_aggregate_id ON events (aggregate_id); CREATE INDEX idx_events_timestamp ON events (timestamp);
id: Eine eindeutige Kennung für das Ereignis selbst (z. B. ein serieller Primärschlüssel).aggregate_id: Die ID der Aggregatinstanz, auf die sich dieses Ereignis bezieht. Kritisch für das Filtern von Ereignissen.aggregate_type: Der Typ oder Name des Aggregats (z. B. 'Order', 'UserAccount'). Nützlich für Abfragen und das Verständnis des Ereignisstroms.version: Die Version des Aggregats nach Anwendung dieses Ereignisses. Dies ist entscheidend für die optimistische Nebenläufigkeitskontrolle.event_type: Der spezifische Ereignistyp (z. B. 'OrderCreated', 'ItemAddedToOrder').payload: Eine JSONB-Spalte zur Speicherung der tatsächlichen Ereignisdaten. JSONB wird wegen seiner effizienten Speicher- und Abfragefähigkeiten bevorzugt.timestamp: Wann das Ereignis aufgetreten ist.metadata: Optionale JSONB-Spalte für zusätzliche operative Metadaten (z. B.user_id, der die Aktion initiiert hat,ip_address).
Kernoperationen
Es gibt zwei Hauptoperationen in einem Event Sourced System:
- Ereignisse anhängen: Wenn eine Geschäftsoperation stattfindet, werden neue Ereignisse generiert und an das Ereignisprotokoll angehängt.
 - Aggregatzustand laden: Um Operationen auf einem Aggregat auszuführen, muss dessen aktueller Zustand durch Wiederholen seiner Ereignisse rekonstruiert werden.
 
Lassen Sie uns diese mit einem konzeptionellen Order-Aggregat in Python veranschaulichen.
1. Definition von Ereignissen und Aggregat
import uuid import datetime import json from typing import List, Dict, Any, Type # --- Ereignisdefinitionen --- class Event: def __init__(self, aggregate_id: uuid.UUID, version: int, timestamp: datetime.datetime, payload: Dict[str, Any]): self.aggregate_id = aggregate_id self.version = version self.timestamp = timestamp self.payload = payload @property def event_type(self) -> str: return self.__class__.__name__ def to_dict(self) -> Dict[str, Any]: return { "aggregate_id": str(self.aggregate_id), "version": self.version, "event_type": self.event_type, "timestamp": self.timestamp.isoformat(), "payload": self.payload } class OrderCreated(Event): def __init__(self, aggregate_id: uuid.UUID, version: int, customer_id: uuid.UUID, order_items: List[Dict[str, Any]], timestamp: datetime.datetime = None): super().__init__(aggregate_id, version, timestamp or datetime.datetime.now(datetime.timezone.utc), { "customer_id": str(customer_id), "order_items": order_items }) class ItemAddedToOrder(Event): def __init__(self, aggregate_id: uuid.UUID, version: int, item_id: str, quantity: int, price: float, timestamp: datetime.datetime = None): super().__init__(aggregate_id, version, timestamp or datetime.datetime.now(datetime.timezone.utc), { "item_id": item_id, "quantity": quantity, "price": price }) class OrderShipped(Event): def __init__(self, aggregate_id: uuid.UUID, version: int, shipping_date: datetime.datetime, timestamp: datetime.datetime = None): super().__init__(aggregate_id, version, timestamp or datetime.datetime.now(datetime.timezone.utc), { "shipping_date": shipping_date.isoformat() }) # --- Aggregatdefinition --- class OrderAggregate: def __init__(self, order_id: uuid.UUID = None, current_version: int = 0): self.order_id = order_id or uuid.uuid4() self.customer_id: uuid.UUID = None self.items: List[Dict[str, Any]] = [] self.status: str = "PENDING" self.current_version = current_version def apply(self, event: Event): if isinstance(event, OrderCreated): self._apply_order_created(event) elif isinstance(event, ItemAddedToOrder): self._apply_item_added_to_order(event) elif isinstance(event, OrderShipped): self._apply_order_shipped(event) self.current_version = event.version # Aktualisiert die Aggregatversion nach Anwendung des Ereignisses def _apply_order_created(self, event: OrderCreated): self.customer_id = uuid.UUID(event.payload["customer_id"]) self.items = event.payload["order_items"] self.status = "CREATED" def _apply_item_added_to_order(self, event: ItemAddedToOrder): self.items.append({ "item_id": event.payload["item_id"], "quantity": event.payload["quantity"], "price": event.payload["price"] }) def _apply_order_shipped(self, event: OrderShipped): self.status = "SHIPPED" # --- Befehle, die Ereignisse generieren --- def create_order(self, customer_id: uuid.UUID, order_items: List[Dict[str, Any]]) -> List[Event]: # Geschäftslogik hier (z. B. Artikel validieren) new_version = self.current_version + 1 event = OrderCreated(self.order_id, new_version, customer_id, order_items) self.apply(event) # Auf sich selbst anwenden für interne Zustandsaktualisierung return [event] def add_item(self, item_id: str, quantity: int, price: float) -> List[Event]: if self.status != "CREATED": raise ValueError("Kann Artikel nicht zu einer Bestellung hinzufügen, die nicht im Status CREATED ist.") if self._item_exists(item_id): raise ValueError("Artikel bereits in der Bestellung vorhanden. Verwenden Sie stattdessen die Aktualisierung der Artikelmenge.") new_version = self.current_version + 1 event = ItemAddedToOrder(self.order_id, new_version, item_id, quantity, price) self.apply(event) return [event] def ship_order(self) -> List[Event]: if self.status != "CREATED": # Annahme: Bestellung muss erstellt sein, um versendet zu werden raise ValueError("Kann eine Bestellung, die nicht im Status CREATED ist, nicht versenden.") new_version = self.current_version + 1 event = OrderShipped(self.order_id, new_version, datetime.datetime.now(datetime.timezone.utc)) self.apply(event) return [event] def _item_exists(self, item_id: str) -> bool: return any(item['item_id'] == item_id for item in self.items) def __repr__(self): return f"Order(id={self.order_id}, status={self.status}, version={self.current_version}, items={self.items})"
2. Event Store (Persistenzschicht)
Nun erstellen wir eine vereinfachte EventStore-Klasse, um mit unserer Datenbank zu interagieren. Wir verwenden psycopg2 für PostgreSQL.
import psycopg2 from psycopg2 import extras class EventStore: def __init__(self, db_config: Dict[str, str]): self.db_config = db_config self.event_type_map: Dict[str, Type[Event]] = { "OrderCreated": OrderCreated, "ItemAddedToOrder": ItemAddedToOrder, "OrderShipped": OrderShipped, # ... andere Ereignistypen } def _get_connection(self): return psycopg2.connect(**self.db_config) def save_events(self, aggregate_id: uuid.UUID, aggregate_type: str, expected_version: int, events: List[Event]): conn = None try: conn = self._get_connection() cur = conn.cursor() # Optimistische Nebenläufigkeitsprüfung: # Prüfen, ob die aktuelle Version mit expected_version + zu speichernde_ereignisse - 1 übereinstimmt # Oder einfacher: Zählen Sie die Ereignisse für aggregate_id. Das ist entscheidend. # Wenn `expected_version` 0 ist, bedeutet dies ein neues Aggregat. cur.execute( "SELECT version FROM events WHERE aggregate_id = %s ORDER BY version DESC LIMIT 1;", (str(aggregate_id),) ) latest_version_record = cur.fetchone() current_db_version = latest_version_record[0] if latest_version_record else 0 if current_db_version != expected_version: raise ConcurrencyException( f"Nebenläufigkeitskonflikt für Aggregat {aggregate_id}: " f"Erwartete Version {expected_version}, aber fand {current_db_version}." ) for event in events: cur.execute( """ INSERT INTO events (aggregate_id, aggregate_type, version, event_type, payload, timestamp) VALUES (%s, %s, %s, %s, %s, %s); """, ( str(event.aggregate_id), aggregate_type, event.version, event.event_type, json.dumps(event.payload), event.timestamp ) ) conn.commit() except psycopg2.IntegrityError as e: conn.rollback() if "duplicate key value violates unique constraint" in str(e): raise ConcurrencyException( "Versuch, ein Ereignis mit doppelter aggregate_id und Version zu speichern. Wahrscheinlich parallele Modifikation." ) from e raise # Andere Integritätsfehler erneut auslösen except Exception as e: if conn: conn.rollback() raise finally: if conn: cur.close() conn.close() def load_events(self, aggregate_id: uuid.UUID) -> List[Event]: conn = None events: List[Event] = [] try: conn = self._get_connection() cur = conn.cursor(cursor_factory=extras.DictCursor) # DictCursor für einfacheren Zugriff nach Spaltennamen verwenden cur.execute( "SELECT aggregate_id, version, event_type, payload, timestamp FROM events WHERE aggregate_id = %s ORDER BY version ASC;", (str(aggregate_id),) ) for record in cur.fetchall(): event_type_name = record['event_type'] if event_type_name not in self.event_type_map: raise ValueError(f"Unbekannter Ereignistyp: {event_type_name}") EventClass = self.event_type_map[event_type_name] # Ereignisobjekt aus Datenbankdatensatz rekonstruieren # Hinweis: Payload kann als String gespeichert werden, spezifische Handhabung erforderlich, wenn benutzerdefinierte Deserialisierung benötigt wird event = EventClass( aggregate_id=uuid.UUID(record['aggregate_id']), version=record['version'], timestamp=record['timestamp'], **record['payload'] # Payload-Dictionary direkt an den Ereigniskonstruktor übergeben, falls zutreffend ) events.append(event) return events except Exception as e: raise finally: if conn: cur.close() conn.close() def get_aggregate_by_id(self, aggregate_id: uuid.UUID, aggregate_type: Type[OrderAggregate]) -> OrderAggregate: events = self.load_events(aggregate_id) if not events: return None # Aggregat nicht gefunden aggregate = aggregate_type(aggregate_id=aggregate_id, current_version=0) # Mit ID initialisieren for event in events: aggregate.apply(event) return aggregate class ConcurrencyException(Exception): """Benutzerdefinierte Ausnahme für Nebenläufigkeitskonflikte.""" pass
Hinweis zur Ereignisrekonstruktion: Die Basisklasse Event erwartet payload direkt, und die Konstruktoren von OrderCreated usw. nehmen Attribute direkt an (z. B. customer_id). Für ein reales System hätten Sie möglicherweise einen ausgefeilteren Mechanismus zur Ereignisserialisierung/-deserialisierung, um Datenbank-Payload-Dictionaries auf gut definierte Ereignisobjekte abzubilden. Das obige Beispiel vereinfacht dies zur besseren Übersichtlichkeit.
Zusammenfassung der Funktionsweise
# Datenbankkonfiguration (mit Ihren tatsächlichen Zugangsdaten ersetzen) DB_CONFIG = { "host": "localhost", "database": "event_sourcing_db", "user": "your_user", "password": "your_password" } event_store = EventStore(DB_CONFIG) customer_id = uuid.uuid4() order_id = uuid.uuid4() try: # 1. Eine neue Bestellung erstellen print(f"Erstelle neue Bestellung mit ID: {order_id}") new_order = OrderAggregate(order_id=order_id, current_version=0) created_events = new_order.create_order( customer_id=customer_id, order_items=[{"item_id": "product-A", "quantity": 2, "price": 10.0}] ) event_store.save_events(new_order.order_id, "Order", 0, created_events) # Erwartete Version für ein neues Aggregat ist 0 print(f"Bestellung erstellt. Zustand: {new_order}") # 2. Die Bestellung laden und einen Artikel hinzufügen print(f"\nLade Bestellung {order_id}, um einen Artikel hinzuzufügen...") loaded_order = event_store.get_aggregate_by_id(order_id, OrderAggregate) if loaded_order: print(f"Geladener Bestellzustand: {loaded_order}") added_item_events = loaded_order.add_item("product-B", 1, 25.0) event_store.save_events(loaded_order.order_id, "Order", loaded_order.current_version - len(added_item_events), added_item_events) print(f"Artikel hinzugefügt. Aktueller Zustand: {loaded_order}") # 3. Die Bestellung erneut laden und versenden print(f"\nLade Bestellung {order_id}, um sie zu versenden...") final_order = event_store.get_aggregate_by_id(order_id, OrderAggregate) if final_order: print(f"Geladener Bestellzustand: {final_order}") shipped_events = final_order.ship_order() event_store.save_events(final_order.order_id, "Order", final_order.current_version - len(shipped_events), shipped_events) print(f"Bestellung versandt. Endzustand: {final_order}") # 4. Nebenläufigkeitskonflikt demonstrieren (optionaler Lauf) # Diese Simulation versucht, zwei Ereignisse von leicht unterschiedlichen Startpunkten aus zu speichern # Sie würden dies normalerweise in separaten Prozessen/Threads ausführen, um es in Aktion zu sehen. # order_for_conflict_1 = event_store.get_aggregate_by_id(order_id, OrderAggregate) # order_for_conflict_2 = event_store.get_aggregate_by_id(order_id, OrderAggregate) # # Prozess 1 (versucht, Artikel hinzuzufügen) # conflict_events_1 = order_for_conflict_1.add_item("product-C", 3, 5.0) # event_store.save_events(order_for_conflict_1.order_id, "Order", order_for_conflict_1.current_version - len(conflict_events_1), conflict_events_1) # print(f"\nVersuch 1 des Nebenläufigkeitskonflikts gespeichert.") # # Prozess 2 (versucht, einen weiteren Artikel hinzuzufügen, aber aus dem alten Zustand VOR Prozess 1 Save) # try: # conflict_events_2 = order_for_conflict_2.add_item("product-D", 1, 15.0) # event_store.save_events(order_for_conflict_2.order_id, "Order", order_for_conflict_2.current_version - len(conflict_events_2), conflict_events_2) # print(f"Dies sollte nicht gedruckt werden, wenn die Nebenläufigkeit funktioniert.") # except ConcurrencyException as e: # print(f"\nNebenläufigkeitskonflikt erfolgreich abgefangen: {e}") except ConcurrencyException as e: print(f"Anwendungsfehler aufgrund von Nebenläufigkeit: {e}") except Exception as e: print(f"Ein unerwarteter Fehler ist aufgetreten: {e}")
Wichtige Überlegungen und Best Practices
- Nebenläufigkeitskontrolle: Die 
UNIQUE (aggregate_id, version)-Beschränkung in derevents-Tabelle, kombiniert mit der Prüfung vonexpected_versionvor dem Speichern, ist das Fundament der optimistischen Nebenläufigkeitskontrolle. Wenn zwei Befehle versuchen, dasselbe Aggregat gleichzeitig zu ändern, wird nur einer erfolgreich sein, und der andere wird aufgrund eines Versionskonflikts oder eines Verstosses gegen die eindeutige Beschränkung fehlschlagen, was einen erneuten Versuch auslöst. - Leistung der Zustandsrekonstruktion: Für Aggregate mit Tausenden von Ereignissen kann das Wiedergeben aller Ereignisse jedes Mal langsam werden.
- Snapshots: Speichern Sie regelmäßig einen Snapshot des aktuellen Zustands des Aggregats in einer separaten 
snapshots-Tabelle. Beim Laden eines Aggregats wird der neueste Snapshot geladen und dann nur Ereignisse nach der Version dieses Snapshots angewendet. - Denormalisierte Lesemodelle (Projektionen): Dies ist der entscheidende Teil von CQRS (Command Query Responsibility Segregation). Erstellen Sie separate, hochoptimierte Lesemodelle (z. B. Zusammenfassungstabellen, Suchindizes), indem Sie den Ereignisstrom aus Ihrer 
events-Tabelle asynchron verarbeiten. Dies wird oft von einem separaten "Projector"-Dienst übernommen, der dieevents-Tabelle auf neue Einträge abfragt, sie verarbeitet und die Lesemodelle aktualisiert. 
 - Snapshots: Speichern Sie regelmäßig einen Snapshot des aktuellen Zustands des Aggregats in einer separaten 
 - Ereignisunveränderlichkeit: Sobald ein Ereignis geschrieben wurde, darf es niemals geändert oder gelöscht werden. Dies ist grundlegend für Event Sourcing.
 - Ereignisschema-Evolution: Wenn sich Ihre Anwendung weiterentwickelt, können sich Ereignisschemata ändern. Planen Sie die Versionierung von Ereignissen und stellen Sie Mechanismen bereit, um ältere Ereignisformate während der Zustandsrekonstruktion zu verarbeiten.
 - Abfragen: Die 
events-Tabelle selbst ist generell schlecht für komplexe Abfragen nach dem aktuellen Zustand von Aggregaten geeignet (z. B. "Gib mir alle Bestellungen über 100 €"). Hier werden Lesemodelle unerlässlich. Für Auditing oder Debugging ist die Abfrage derevents-Tabelle nachaggregate_id,event_typeodertimestampsehr effektiv. - Atomare Operationen: Stellen Sie sicher, dass das Speichern neuer Ereignisse und die Aktualisierung von Lesemodellen (falls synchron durchgeführt) innerhalb derselben Transaktion erfolgen, oder verwenden Sie das "Outbox-Pattern" für die eventual consistency, wenn Ereignisse an ein externes System gesendet werden. Für diesen "Single Table"-Ansatz behandelt die Methode 
save_eventsdie Atomizität für die Ereignispersistenz. 
Anwendungsszenarien
Dieser vereinfachte Event Sourcing-Ansatz eignet sich besonders gut für:
- Auditing und Compliance: Jede einzelne Änderung wird protokolliert und bietet einen vollständigen, unveränderlichen Audit-Trail.
 - Debugging und Fehlerbehebung: Wiederholen Sie Ereignisse, um genau zu verstehen, wie ein System einen bestimmten fehlerhaften Zustand erreicht hat.
 - Komplexe Domänenlogik: Wenn Geschäftsregeln komplex sind und von der Historie abhängen (z. B. "Ein Benutzer kann X nur tun, wenn er in den letzten Z Tagen nicht Y getan hat").
 - Zeitliche Abfragen: Stellen Sie Fragen wie "Wie war der Zustand der Bestellung gestern um 14 Uhr?"
 - Kleine bis mittelgroße Anwendungen: Wo der volle operative Overhead von Kafka oder anderen Nachrichtenbrokern übertrieben ist, die Vorteile von Event Sourcing jedoch gewünscht sind.
 - Prototyping und Lernen: Eine großartige Möglichkeit, mit Event Sourcing zu beginnen, ohne eine steile Lernkurve in Bezug auf die Infrastruktur.
 - Rekonstruktion von Lesemodellen: Wenn ein Lesemodell beschädigt wird oder neu gestaltet werden muss, können Sie es jederzeit durch Wiederholen aller historischen Ereignisse aus der 
events-Tabelle vollständig neu aufbauen. 
Fazit
Die Implementierung von Event Sourcing mit nur einer einzigen Datenbanktabelle für Ihr Ereignisprotokoll ist ein pragmatischer und leistungsfähiger Ansatz. Sie eliminiert die Komplexität externer Nachrichtenbroker und ermöglicht es Entwicklern, sich auf die Kernvorteile des Musters zu konzentrieren: eine unveränderliche, auditierbare Historie aller Änderungen, robuste Nebenläufigkeitskontrolle und die Fähigkeit, den Anwendungszustand wiederherzustellen. Obwohl es eine sorgfältige Betrachtung der Leistung für sehr aktive Aggregate und komplexe Abfragen erfordert, können diese Herausforderungen durch Snapshots und dedizierte Lesemodelle effektiv gelöst werden, was zeigt, dass widerstandsfähige, historiesreiche Systeme mit überraschend einfacher Infrastruktur erreichbar sind. Dieses Muster bietet eine überzeugende Mischung aus Einfachheit und Leistung und macht Event Sourcing für eine breitere Palette von Projekten zugänglich.

