Aufbau skalierbarer Systeme mit CQRS und Event Sourcing in Backend-Frameworks
Wenhao Wang
Dev Intern · Leapcell

Einleitung
In der sich rasant entwickelnden Landschaft der modernen Softwareentwicklung ist der Aufbau robuster, skalierbarer und wartbarer Anwendungen von größter Bedeutung. Wenn Systeme in ihrer Komplexität und dem Benutzerbedarf wachsen, können traditionelle CRUD-Architekturen (Create, Read, Update, Delete) oft zu Engpässen werden und mit Problemen bei der Lese-/Schreibkonflikten, der Datenkonsistenz und Schwierigkeiten bei der Überprüfung historischer Änderungen kämpfen. Hier treten fortgeschrittene Architekturmuster wie Command Query Responsibility Segregation (CQRS) und Event Sourcing als leistungsstarke Lösungen hervor. Durch die grundlegende Neudefinition, wie wir Datenmutationen und Abfragen behandeln, bieten diese Muster einen Weg zur Gestaltung von Systemen, die nicht nur performanter, sondern auch inhärent widerstandsfähig und aufschlussreich sind. Dieser Artikel befasst sich eingehend mit der praktischen Anwendung von CQRS und Event Sourcing in Backend-Frameworks und veranschaulicht, wie sie unseren Ansatz beim Aufbau komplexer, unternehmensgerechter Anwendungen verändern können.
Kernkonzepte und Prinzipien
Bevor wir uns mit der praktischen Implementierung befassen, ist es entscheidend, ein solides Verständnis der Kernkonzepte zu etablieren, die CQRS und Event Sourcing zugrunde liegen.
Command Query Responsibility Segregation (CQRS)
CQRS ist ein Architekturmuster, das die Verantwortung für die Verarbeitung von Befehlen (Operationen, die den Zustand der Anwendung ändern) von der Verantwortung für die Verarbeitung von Abfragen (Operationen, die Daten abrufen) trennt.
- Command Model (Schreibseite): Dieser Teil der Anwendung ist für die Annahme und Verarbeitung von Befehlen, die Validierung von Geschäftsregeln und die Persistenz von Zustandsänderungen verantwortlich. Er verwendet typischerweise einen für Schreibvorgänge optimierten Datenspeicher und konzentriert sich oft auf transaktionale Konsistenz.
- Query Model (Leseseite): Dieser Teil ist für das Lesen von Daten optimiert. Er kann einen denormalisierten Datenspeicher (z. B. eine Berichtsdatenbank, einen Key-Value-Speicher oder einen Suchindex) verwenden, der für bestimmte Abfragemuster zugeschnitten ist und hohe Leistung und Flexibilität bietet.
Warum sie trennen? Traditionelle CRUD-Systeme verwenden oft ein einzelnes Modell für Lese- und Schreibvorgänge, was zu Kompromissen führt. Die Optimierung für das eine beeinträchtigt oft das andere. CQRS ermöglicht die unabhängige Skalierung und Optimierung jeder Seite, was zu verbesserter Leistung, Flexibilität und Wartbarkeit führt.
Event Sourcing
Event Sourcing ist ein Persistenzmuster, bei dem anstelle des aktuellen Zustands eines Aggregats eine Sequenz unveränderlicher Ereignisse gespeichert wird, die jede vorgenommene Änderung an diesem Aggregat beschreiben.
- Events: Repräsentieren Fakten über etwas, das in der Vergangenheit geschehen ist (z. B.
OrderPlacedEvent
,ProductQuantityAdjustedEvent
). Sie sind unveränderlich und nur zum Anhängen bestimmt. - Event Store: Eine spezialisierte Datenbank, die zum Speichern und Abrufen dieser Ereignissequenzen entwickelt wurde. Sie fungiert als einzige Quelle der Wahrheit für den Zustand der Anwendung.
- Aggregate: Eine Ansammlung von Domänenobjekten, die als eine einzige Einheit für Datenänderungen behandelt werden können. Es ist die Grenze für transaktionale Konsistenz und die Anwendung von Ereignissen.
Warum Ereignisse statt Zustand speichern? Event Sourcing bietet eine vollständige, überprüfbare Historie aller Änderungen und ermöglicht leistungsstarke Funktionalitäten wie Time-Travel-Debugging, das Wiederholen von Ereignissen zur Rekonstruktion des Zustands zu jedem Zeitpunkt und die Generierung mehrerer Lesemodelle, die auf verschiedene Verbraucher zugeschnitten sind. Es integriert sich auch natürlich in Publish-Subscribe-Mechanismen für verteilte Systeme.
Beziehung zwischen CQRS und Event Sourcing
CQRS und Event Sourcing sind komplementäre Muster. Event Sourcing fügt sich natürlich in die Schreibseite von CQRS ein: Befehle generieren Ereignisse, die dann im Event Store gespeichert werden. Diese Ereignisse können dann asynchron veröffentlicht werden, um ein oder mehrere Lesemodelle zu erstellen und zu aktualisieren, die die Leseseite von CQRS bilden. Diese Synergie ermöglicht leistungsstarke, hochgradig skalierbare und überprüfbare Systeme.
Implementierung von CQRS und Event Sourcing
Lassen Sie uns die Implementierung von CQRS und Event Sourcing anhand eines hypothetischen E-Commerce-Produktverwaltungsdienstes als Beispiel veranschaulichen. Wir verwenden ein generisches Backend-Framework (z. B. Spring Boot in Java oder FastAPI in Python, aber die Prinzipien sind breit anwendbar), um die Konzepte zu demonstrieren.
Projektstrukturübersicht
├── src
│ ├── main
│ │ ├── java/python/...
│ │ │ └── com
│ │ │ └── example
│ │ │ └── productservice
│ │ │ ├── api // REST-Controller für Befehle und Abfragen
│ │ │ ├── command // Befehlsdefinitionen und Handler
│ │ │ │ ├── model // DTOs für Befehle
│ │ │ │ └── service // Befehlshandler-Logik
│ │ │ ├── query // Abfragedefinitionen und Handler
│ │ │ │ ├── model // DTOs für Abfragen und Lesemodelle
│ │ │ │ └── service // Abfragehandler-Logik
│ │ │ ├── domain // Aggregate, Ereignisse und Geschäftslogik
│ │ │ │ ├── aggregate // ProductAggregate
│ │ │ │ ├── event // Produkt-Ereignisse (z. B. ProductCreatedEvent)
│ │ │ │ └── repository // Event-Store-Interaktion
│ │ │ ├── infrastructure // Event-Store-Konfiguration, Event-Publisher
│ │ │ └── config // Anwendungs-Konfiguration
1. Definieren von Befehlen und Ereignissen
Zuerst definieren wir unsere Befehle und die Ereignisse, die sie für ein Product
-Aggregat erzeugen.
Befehle (Eingabe der Schreibseite):
// Java Beispiel (Command DTOs) public class CreateProductCommand { private String productId; private String name; private double price; private int quantity; // Getter, Setter, Konstruktor } public class UpdateProductPriceCommand { private String productId; private double newPrice; // Getter, Setter, Konstruktor }
# Python Beispiel (Command DTOs mit Pydantic) from pydantic import BaseModel class CreateProductCommand(BaseModel): product_id: str name: str price: float quantity: int class UpdateProductPriceCommand(BaseModel): product_id: str new_price: float
Ereignisse (Systemwahrheit):
// Java Beispiel (Ereignisse) abstract public class ProductEvent { private String productId; private long timestamp; // Getter, Setter, Konstruktor } public class ProductCreatedEvent extends ProductEvent { private String name; private double price; private int quantity; // Getter, Setter, Konstruktor } public class ProductPriceUpdatedEvent extends ProductEvent { private double newPrice; // Getter, Setter, Konstruktor }
# Python Beispiel (Ereignisse mit Pydantic) from datetime import datetime from typing import Optional class Event(BaseModel): product_id: str timestamp: datetime = datetime.utcnow() class ProductCreatedEvent(Event): name: str price: float quantity: int class ProductPriceUpdatedEvent(Event): new_price: float
2. Das Aggregat und die Event-Sourcing-Logik (Schreibseite)
Das ProductAggregate
ist für die Anwendung von Befehlen und die Generierung von Ereignissen verantwortlich. Es rekonstruiert seinen Zustand durch Wiederholen seiner eigenen Ereignisse.
// Java Beispiel (Product Aggregate) public class ProductAggregate { private String productId; private String name; private double price; private int quantity; private long version; // Für optimistische Nebenläufigkeitskontrolle // Konstruktor zum Erstellen eines neuen Aggregats public ProductAggregate(CreateProductCommand command) { applyNewEvent(new ProductCreatedEvent(command.getProductId(), command.getName(), command.getPrice(), command.getQuantity())); } // Konstruktor zum Wiederaufbau aus der Historie public ProductAggregate(List<ProductEvent> history) { history.forEach(this::apply); } // Befehlshandler public void updatePrice(UpdateProductPriceCommand command) { if (command.getNewPrice() <= 0) { throw new IllegalArgumentException("Preis darf nicht negativ oder null sein."); } applyNewEvent(new ProductPriceUpdatedEvent(this.productId, command.getNewPrice())); } // Apply-Methode zur Zustandsänderung basierend auf einem Ereignis private void apply(ProductEvent event) { if (event instanceof ProductCreatedEvent) { ProductCreatedEvent e = (ProductCreatedEvent) event; this.productId = e.getProductId(); this.name = e.getName(); this.price = e.getPrice(); this.quantity = e.getQuantity(); } else if (event instanceof ProductPriceUpdatedEvent) { ProductPriceUpdatedEvent e = (ProductPriceUpdatedEvent) event; this.price = e.getNewPrice(); } this.version++; } // Hilfsmethode zum Anwenden und Speichern neuer Ereignisse private void applyNewEvent(ProductEvent event) { apply(event); // Dieses 'Event' würde gespeichert und möglicherweise veröffentlicht // In einem echten System werden gesammelte Ereignisse an einen Event Store gesendet } // Getter }
# Python Beispiel (Product Aggregate) from typing import List, Dict, Any class ProductAggregate: def __init__(self, product_id: str): self.product_id = product_id self.name: Optional[str] = None self.price: Optional[float] = None self.quantity: Optional[int] = None self.version: int = -1 self._uncommitted_events: List[Event] = [] @classmethod def create(cls, command: CreateProductCommand) -> 'ProductAggregate': aggregate = cls(command.product_id) aggregate._apply_new_event(ProductCreatedEvent( product_id=command.product_id, name=command.name, price=command.price, quantity=command.quantity )) return aggregate @classmethod def from_history(cls, product_id: str, history: List[Event]) -> 'ProductAggregate': aggregate = cls(product_id) for event in history: aggregate._apply(event) return aggregate def update_price(self, command: UpdateProductPriceCommand): if command.new_price <= 0: raise ValueError("Preis darf nicht negativ oder null sein.") self._apply_new_event(ProductPriceUpdatedEvent( product_id=self.product_id, new_price=command.new_price )) def _apply(self, event: Event): if isinstance(event, ProductCreatedEvent): self.name = event.name self.price = event.price self.quantity = event.quantity elif isinstance(event, ProductPriceUpdatedEvent): self.price = event.new_price self.version += 1 def _apply_new_event(self, event: Event): self._apply(event) self._uncommitted_events.append(event) # Gespeicherte Ereignisse sammeln def get_uncommitted_events(self) -> List[Event]: return self._uncommitted_events def clear_uncommitted_events(self): self._uncommitted_events = [] # Getter/Eigenschaften @property def current_state(self) -> Dict[str, Any]: return { "productId": self.product_id, "name": self.name, "price": self.price, "quantity": self.quantity, "version": self.version }
3. Befehlshandler und Event-Store-Interaktion
Der Befehlshandler orchestriert den Prozess: Laden des Aggregats aus dem Event Store, Anwenden des Befehls, Speichern der neu generierten Ereignisse und Veröffentlichen dieser.
// Java Beispiel (Product Command Handler) @Service public class ProductCommandHandler { private final EventStore eventStore; private final EventPublisher eventPublisher; // Zum Veröffentlichen von Ereignissen für Lesemodelle public ProductCommandHandler(EventStore eventStore, EventPublisher eventPublisher) { this.eventStore = eventStore; this.eventPublisher = eventPublisher; } public void handle(CreateProductCommand command) { ProductAggregate aggregate = new ProductAggregate(command); List<ProductEvent> newEvents = // Logik zum Abrufen neuer Ereignisse vom Aggregat eventStore.saveEvents(command.getProductId(), newEvents, aggregate.getVersion()); newEvents.forEach(eventPublisher::publish); } public void handle(UpdateProductPriceCommand command) { List<ProductEvent> history = eventStore.getEventsForAggregate(command.getProductId()); ProductAggregate aggregate = new ProductAggregate(history); // Optimistische Nebenläufigkeitsprüfung (Versionierung) // Wenn die erwartete Version vom Befehl nicht mit aggregate.version übereinstimmt, einen Fehler auslösen // Vereinfacht gehen wir hier von der neuesten Version aus. aggregate.updatePrice(command); List<ProductEvent> newEvents = // Logik zum Abrufen neuer Ereignisse vom Aggregat eventStore.saveEvents(command.getProductId(), newEvents, aggregate.getVersion()); newEvents.forEach(eventPublisher::publish); } } // EventStore und EventPublisher wären Infrastrukturkomponenten. EventStore könnte eine // NoSQL-DB (z. B. Cassandra, MongoDB) oder ein spezialisierter Event Store (z. B. EventStoreDB) sein.
# Python Beispiel (Product Command Handler) from typing import Protocol, List from .domain.aggregate import ProductAggregate from .infrastructure.event_store import EventStore from .infrastructure.event_publisher import EventPublisher from .domain.event import Event class ProductCommandHandler: def __init__(self, event_store: EventStore, event_publisher: EventPublisher): self.event_store = event_store self.event_publisher = event_publisher def handle_create_product(self, command: CreateProductCommand): aggregate = ProductAggregate.create(command) self.event_store.save_events(command.product_id, aggregate.get_uncommitted_events(), aggregate.version) for event in aggregate.get_uncommitted_events(): self.event_publisher.publish(event) aggregate.clear_uncommitted_events() def handle_update_product_price(self, command: UpdateProductPriceCommand): history = self.event_store.get_events_for_aggregate(command.product_id) if not history: raise ValueError(f"Produkt mit ID {command.product_id} nicht gefunden.") aggregate = ProductAggregate.from_history(command.product_id, history) # In einem echten System würden Sie eine expected_version mit dem Befehl übergeben # und sie mit aggregate.version vergleichen, um optimistische Nebenläufigkeit zu behandeln. # Vereinfacht gehen wir hier von der neuesten Version aus. aggregate.update_price(command) self.event_store.save_events(command.product_id, aggregate.get_uncommitted_events(), aggregate.version) for event in aggregate.get_uncommitted_events(): self.event_publisher.publish(event) aggregate.clear_uncommitted_events() # Beispiel EventStore (vereinfacht) class EventStore: def __init__(self): self._stores: Dict[str, List[Event]] = {} # Ein einfacher In-Memory-Speicher zur Demonstration def save_events(self, aggregate_id: str, events: List[Event], expected_version: int): current_events = self._stores.get(aggregate_id, []) # Grundlegende Prüfung auf optimistische Nebenläufigkeit (robuster in der Produktion) if current_events and len(current_events) != expected_version - len(events): raise ValueError("Nebenläufigkeitskonflikt: Aggregat wurde modifiziert.") current_events.extend(events) self._stores[aggregate_id] = current_events print(f"{len(events)} Ereignisse für {aggregate_id} gespeichert. Gesamt: {len(current_events)}") def get_events_for_aggregate(self, aggregate_id: str) -> List[Event]: return self._stores.get(aggregate_id, []) # Beispiel EventPublisher (vereinfacht) class EventPublisher: def publish(self, event: Event): print(f"Veröffentliche Ereignis: {event.__class__.__name__} für Produkt {event.product_id}") # In einem echten System würde dies an einen Message Broker (z. B. Kafka, RabbitMQ) gesendet.
4. Lesemodelle und Abfragehandler (Leseseite)
Ereignisse, die von der Befehlsseite veröffentlicht werden, werden von Projektoren (oder Ereignis-Handlern) verbraucht, die denormalisierte Lesemodelle aktualisieren. Diese Lesemodelle werden dann abgefragt.
Lesemodell (Denormalisierte Ansicht):
// Java Beispiel (Product Overview Read Model) public class ProductSummary { private String productId; private String name; private double currentPrice; private int quantityOnHand; // Potenziell andere berechnete Felder wie 'lastUpdated', 'totalOrders' // Getter, Setter, Konstruktor }
# Python Beispiel (Product Overview Read Model mit Pydantic) class ProductSummary(BaseModel): product_id: str name: str current_price: float quantity_on_hand: int last_updated: datetime = datetime.utcnow() # Berechnetes Feld
Ereignisverbraucher / Projektor:
// Java Beispiel (Product Read Model Projector) @Service public class ProductReadModelProjector { private final ProductSummaryRepository productSummaryRepository; // Persistiert ProductSummary public ProductReadModelProjector(ProductSummaryRepository productSummaryRepository) { this.productSummaryRepository = productSummaryRepository; } @EventListener // Spring's Methode zum Abhören von Anwendungsereignissen public void handle(ProductCreatedEvent event) { ProductSummary summary = new ProductSummary(event.getProductId(), event.getName(), event.getPrice(), event.getQuantity()); productSummaryRepository.save(summary); } @EventListener public void handle(ProductPriceUpdatedEvent event) { ProductSummary summary = productSummaryRepository.findByProductId(event.getProductId()) .orElseThrow(() -> new RuntimeException("Produktübersicht nicht gefunden")); summary.setCurrentPrice(event.getNewPrice()); summary.setLastUpdated(event.getTimestamp()); // Letzte Aktualisierungszeit aktualisieren productSummaryRepository.save(summary); } }
# Python Beispiel (Product Read Model Projector) from .infrastructure.read_model_db import ProductSummaryRepository from .query.model import ProductSummary class ProductReadModelProjector: def __init__(self, product_summary_repo: ProductSummaryRepository): self.product_summary_repo = product_summary_repo def handle_product_created_event(self, event: ProductCreatedEvent): summary = ProductSummary( product_id=event.product_id, name=event.name, current_price=event.price, quantity_on_hand=event.quantity, last_updated=event.timestamp ) self.product_summary_repo.save(summary) print(f"Projektor: Zusammenfassung für Produkt {summary.product_id} erstellt") def handle_product_price_updated_event(self, event: ProductPriceUpdatedEvent): summary = self.product_summary_repo.find_by_product_id(event.product_id) if not summary: raise ValueError(f"Produktübersicht für {event.product_id} nicht gefunden.") summary.current_price = event.new_price summary.last_updated = event.timestamp self.product_summary_repo.save(summary) print(f"Projektor: Preis für Produkt {summary.product_id} auf {summary.current_price} aktualisiert") # Beispiel Read Model Repository (vereinfacht, könnte eine separate Datenbank wie PostgreSQL sein) class ProductSummaryRepository: def __init__(self): self._store: Dict[str, ProductSummary] = {} def save(self, summary: ProductSummary): self._store[summary.product_id] = summary def find_by_product_id(self, product_id: str) -> Optional[ProductSummary]: return self._store.get(product_id) def find_all(self) -> List[ProductSummary]: return list(self._store.values())
Abfragehandler und API-Endpunkte:
// Java Beispiel (Product Query Handler und REST-Endpunkt) @Service public class ProductQueryHandler { private final ProductSummaryRepository productSummaryRepository; public ProductQueryHandler(ProductSummaryRepository productSummaryRepository) { this.productSummaryRepository = productSummaryRepository; } public ProductSummary getProductSummary(String productId) { return productSummaryRepository.findByProductId(productId) .orElseThrow(() -> new ProductNotFoundException(productId)); } public List<ProductSummary> getAllProductSummaries() { return productSummaryRepository.findAll(); } } @RestController @RequestMapping("/api/products") public class ProductQueryController { private final ProductQueryHandler queryHandler; public ProductQueryController(ProductQueryHandler queryHandler) { this.queryHandler = queryHandler; } @GetMapping("/{productId}") public ResponseEntity<ProductSummary> getProduct(@PathVariable String productId) { return ResponseEntity.ok(queryHandler.getProductSummary(productId)); } @GetMapping public ResponseEntity<List<ProductSummary>> getAllProducts() { return ResponseEntity.ok(queryHandler.getAllProductSummaries()); } }
# Python Beispiel (Product Query Handler und FastAPI-Endpunkt) from fastapi import APIRouter, HTTPException from typing import List product_query_router = APIRouter() class ProductQueryHandler: def __init__(self, product_summary_repo: ProductSummaryRepository): self.product_summary_repo = product_summary_repo def get_product_summary(self, product_id: str) -> ProductSummary: summary = self.product_summary_repo.find_by_product_id(product_id) if not summary: raise HTTPException(status_code=404, detail=f"Produkt mit ID {product_id} nicht gefunden.") return summary def get_all_product_summaries(self) -> List[ProductSummary]: return self.product_summary_repo.find_all() # FastAPI-Endpunkt-Setup (angenommener App-Kontext/Dependency Injection für Handler) @product_query_router.get("/{product_id}", response_model=ProductSummary) async def get_product( product_id: str, query_handler: ProductQueryHandler # Eingefügt über Depends von FastAPI ): return query_handler.get_product_summary(product_id) @product_query_router.get("/", response_model=List[ProductSummary]) async def get_all_products( query_handler: ProductQueryHandler ): return query_handler.get_all_product_summaries() # In Ihrer Hauptanwendung: # app = FastAPI() # app.include_router(product_query_router, prefix="/api/products")
Anwendungsfälle
CQRS und Event Sourcing eignen sich besonders gut für:
- Komplexe Domänenmodelle: In denen Geschäftsregeln kompliziert sind und Zustandsänderungen überprüfbar sein müssen.
- Hochleistungsfähige Lese-/Schreibsysteme: Systeme mit signifikant unterschiedlichen Lese- und Schreibmustern, die eine unabhängige Skalierung ermöglichen.
- Auditierbarkeit und Compliance: Jede Änderung wird als Ereignis aufgezeichnet und bietet eine vollständige Historie für Überprüfungszwecke.
- Historische Analyse und Business Intelligence: Ereignisse können für verschiedene analytische Modelle wiederholt oder transformiert werden.
- Microservices-Architektur: Ereignisse erleichtern natürlich die Kommunikation und die letztendliche Konsistenz zwischen Diensten.
- Echtzeit-Dashboards und Projektionen: Ereignisse können Echtzeitansichten und Berichte aktualisieren.
Fazit
CQRS und Event Sourcing sind leistungsstarke Architekturmuster, die bei richtiger Anwendung zu hochgradig skalierbaren, robusten und wartbaren Backend-Systemen führen können. Durch die klare Trennung der Verantwortlichkeiten für die Befehlsverarbeitung und die Abfragebehandlung und durch die Persistenz von Änderungen als unveränderliche Ereignissequenz können Entwickler Anwendungen erstellen, die überlegene Leistung, eine umfassende Audit-Spur und unübertroffene Flexibilität für sich entwickelnde Geschäftsanforderungen bieten. Obwohl sie eine gewisse Komplexität mit sich bringen, überwiegen die langfristigen Vorteile in bestimmten Problembereichen oft die anfängliche Lernkurve und befähigen Teams, wirklich robuste und aufschlussreiche Software zu entwickeln. Diese Muster verschieben unsere Perspektive grundlegend vom Speichern des aktuellen Zustands zum Verstehen der kausalen Abfolge von Änderungen, die zu ihm geführt haben.