Event Sourcing with a Single Database Table A Simplified Approach
Ethan Miller
Product Engineer · Leapcell

Introduction
In the evolving landscape of modern software architecture, the need for robust, auditable, and scalable systems is paramount. Event Sourcing, a powerful pattern that records all changes to application state as a sequence of immutable events, has gained significant traction. It offers unparalleled benefits for auditing, debugging, and building systems with complex business rules. Traditionally, implementing Event Sourcing often involves sophisticated messaging systems like Kafka to serve as an event log. While highly effective, integrating and managing Kafka can introduce considerable operational overhead and complexity, especially for smaller projects or teams without dedicated DevOps resources. This article delves into an alternative, simplified approach: implementing Event Sourcing using nothing more than a single relational database table as our "event log." This method aims to demystify Event Sourcing and make it accessible, proving that its core benefits can be harnessed without the overhead of external message brokers – an approach particularly appealing when operational simplicity is a priority.
Understanding the Core Concepts
Before we dive into the implementation, let's clarify some fundamental concepts crucial to understanding Event Sourcing.
- Event Sourcing: This architectural pattern dictates that instead of storing the current state of an entity, we store every change to that state as an immutable event. The current state is then derived by replaying these events in chronological order.
 - Event: A record of something that happened in the past. Events are immutable facts. They are named in the past tense (e.g., 
OrderPlaced,InventoryAdjusted,UserRegistered). Each event typically contains metadata liketimestamp,event_type,aggregate_id, andversion, along with thepayloadrepresenting the data relevant to that specific change. - Aggregate: A cluster of domain objects that can be treated as a single unit for data changes. It's the consistency boundary in Domain-Driven Design. Events are always associated with a specific aggregate instance (identified by 
aggregate_id). - State Reconstruction: The process of replaying an aggregate's historical events to reconstruct its current state.
 - Snapshot: A pre-computed state of an aggregate at a specific point in time. Snapshots are used to optimize state reconstruction by avoiding replaying all historical events from the very beginning, especially for aggregates with a long event history.
 
Implementing Event Sourcing with a Single Database Table
The core idea is simple: all events, regardless of their type or the aggregate they belong to, are stored in a single, append-only database table. This table acts as our central, authoritative event log.
Schema Design for the Event Log Table
Let's consider a events table that will store all our domain events.
CREATE TABLE events ( id SERIAL PRIMARY KEY, aggregate_id UUID NOT NULL, aggregate_type VARCHAR(255) NOT NULL, version INT NOT NULL, event_type VARCHAR(255) NOT NULL, payload JSONB NOT NULL, timestamp TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, metadata JSONB, UNIQUE (aggregate_id, version) -- Ensures event order and prevents duplicate events for an aggregate ); CREATE INDEX idx_events_aggregate_id ON events (aggregate_id); CREATE INDEX idx_events_timestamp ON events (timestamp);
id: A unique identifier for the event itself (e.g., a serial primary key).aggregate_id: The ID of the aggregate instance to which this event applies. Critical for filtering events.aggregate_type: The type or name of the aggregate (e.g., 'Order', 'UserAccount'). Useful for querying and understanding the event stream.version: The version of the aggregate after this event was applied. This is crucial for optimistic concurrency control.event_type: The specific type of event (e.g., 'OrderCreated', 'ItemAddedToOrder').payload: A JSONB column to store the actual event data. JSONB is preferred for its efficient storage and querying capabilities.timestamp: When the event occurred.metadata: Optional JSONB column for additional operational metadata (e.g.,user_idwho initiated the action,ip_address).
Core Operations
There are two primary operations in an Event Sourced system:
- Appending Events: When a business operation occurs, new events are generated and appended to the event log.
 - Loading Aggregate State: To perform operations on an aggregate, its current state must be reconstructed by replaying its events.
 
Let's illustrate these with a conceptual Order aggregate in Python.
1. Defining Events and Aggregate
import uuid import datetime import json from typing import List, Dict, Any, Type # --- Event Definitions --- class Event: def __init__(self, aggregate_id: uuid.UUID, version: int, timestamp: datetime.datetime, payload: Dict[str, Any]): self.aggregate_id = aggregate_id self.version = version self.timestamp = timestamp self.payload = payload @property def event_type(self) -> str: return self.__class__.__name__ def to_dict(self) -> Dict[str, Any]: return { "aggregate_id": str(self.aggregate_id), "version": self.version, "event_type": self.event_type, "timestamp": self.timestamp.isoformat(), "payload": self.payload } class OrderCreated(Event): def __init__(self, aggregate_id: uuid.UUID, version: int, customer_id: uuid.UUID, order_items: List[Dict[str, Any]], timestamp: datetime.datetime = None): super().__init__(aggregate_id, version, timestamp or datetime.datetime.now(datetime.timezone.utc), { "customer_id": str(customer_id), "order_items": order_items }) class ItemAddedToOrder(Event): def __init__(self, aggregate_id: uuid.UUID, version: int, item_id: str, quantity: int, price: float, timestamp: datetime.datetime = None): super().__init__(aggregate_id, version, timestamp or datetime.datetime.now(datetime.timezone.utc), { "item_id": item_id, "quantity": quantity, "price": price }) class OrderShipped(Event): def __init__(self, aggregate_id: uuid.UUID, version: int, shipping_date: datetime.datetime, timestamp: datetime.datetime = None): super().__init__(aggregate_id, version, timestamp or datetime.datetime.now(datetime.timezone.utc), { "shipping_date": shipping_date.isoformat() }) # --- Aggregate Definition --- class OrderAggregate: def __init__(self, order_id: uuid.UUID = None, current_version: int = 0): self.order_id = order_id or uuid.uuid4() self.customer_id: uuid.UUID = None self.items: List[Dict[str, Any]] = [] self.status: str = "PENDING" self.current_version = current_version def apply(self, event: Event): if isinstance(event, OrderCreated): self._apply_order_created(event) elif isinstance(event, ItemAddedToOrder): self._apply_item_added_to_order(event) elif isinstance(event, OrderShipped): self._apply_order_shipped(event) self.current_version = event.version # Update aggregate version after applying event def _apply_order_created(self, event: OrderCreated): self.customer_id = uuid.UUID(event.payload["customer_id"]) self.items = event.payload["order_items"] self.status = "CREATED" def _apply_item_added_to_order(self, event: ItemAddedToOrder): self.items.append({ "item_id": event.payload["item_id"], "quantity": event.payload["quantity"], "price": event.payload["price"] }) def _apply_order_shipped(self, event: OrderShipped): self.status = "SHIPPED" # --- Commands that generate events --- def create_order(self, customer_id: uuid.UUID, order_items: List[Dict[str, Any]]) -> List[Event]: # Business logic goes here (e.g., validate items) new_version = self.current_version + 1 event = OrderCreated(self.order_id, new_version, customer_id, order_items) self.apply(event) # Apply to self for internal state update return [event] def add_item(self, item_id: str, quantity: int, price: float) -> List[Event]: if self.status != "CREATED": raise ValueError("Cannot add items to an order that is not in CREATED status.") if self._item_exists(item_id): raise ValueError("Item already exists in order. Use update item quantity instead.") new_version = self.current_version + 1 event = ItemAddedToOrder(self.order_id, new_version, item_id, quantity, price) self.apply(event) return [event] def ship_order(self) -> List[Event]: if self.status != "CREATED": # Assuming order needs to be created to be shipped raise ValueError("Cannot ship an order that is not in CREATED status.") new_version = self.current_version + 1 event = OrderShipped(self.order_id, new_version, datetime.datetime.now(datetime.timezone.utc)) self.apply(event) return [event] def _item_exists(self, item_id: str) -> bool: return any(item['item_id'] == item_id for item in self.items) def __repr__(self): return f"Order(id={self.order_id}, status={self.status}, version={self.current_version}, items={self.items})"
2. Event Store (Persistence Layer)
Now, let's create a simplified EventStore class to interact with our database. We'll use psycopg2 for PostgreSQL.
import psycopg2 from psycopg2 import extras class EventStore: def __init__(self, db_config: Dict[str, str]): self.db_config = db_config self.event_type_map: Dict[str, Type[Event]] = { "OrderCreated": OrderCreated, "ItemAddedToOrder": ItemAddedToOrder, "OrderShipped": OrderShipped, # ... other event types } def _get_connection(self): return psycopg2.connect(**self.db_config) def save_events(self, aggregate_id: uuid.UUID, aggregate_type: str, expected_version: int, events: List[Event]): conn = None try: conn = self._get_connection() cur = conn.cursor() # Optimistic Concurrency Check: # Check if current version matches expected_version + events_to_be_saved - 1 # Or, more simply, count events for aggregate_id. This is crucial. # If `expected_version` is 0, it means it's a new aggregate. cur.execute( "SELECT version FROM events WHERE aggregate_id = %s ORDER BY version DESC LIMIT 1;", (str(aggregate_id),) ) latest_version_record = cur.fetchone() current_db_version = latest_version_record[0] if latest_version_record else 0 if current_db_version != expected_version: raise ConcurrencyException( f"Concurrency conflict for aggregate {aggregate_id}: " f"Expected version {expected_version}, but found {current_db_version}." ) for event in events: cur.execute( """ INSERT INTO events (aggregate_id, aggregate_type, version, event_type, payload, timestamp) VALUES (%s, %s, %s, %s, %s, %s); """, ( str(event.aggregate_id), aggregate_type, event.version, event.event_type, json.dumps(event.payload), event.timestamp ) ) conn.commit() except psycopg2.IntegrityError as e: conn.rollback() if "duplicate key value violates unique constraint" in str(e): raise ConcurrencyException( "Attempted to save an event with a duplicate aggregate_id and version. Concurrent modification likely." ) from e raise # Re-raise other integrity errors except Exception as e: if conn: conn.rollback() raise finally: if conn: cur.close() conn.close() def load_events(self, aggregate_id: uuid.UUID) -> List[Event]: conn = None events: List[Event] = [] try: conn = self._get_connection() cur = conn.cursor(cursor_factory=extras.DictCursor) # Use DictCursor for easier access by column name cur.execute( "SELECT aggregate_id, version, event_type, payload, timestamp FROM events WHERE aggregate_id = %s ORDER BY version ASC;", (str(aggregate_id),) ) for record in cur.fetchall(): event_type_name = record['event_type'] if event_type_name not in self.event_type_map: raise ValueError(f"Unknown event type: {event_type_name}") EventClass = self.event_type_map[event_type_name] # Reconstruct event object from database record # Note: Payload might be stored as string, need specific handling if custom deserialization needed event = EventClass( aggregate_id=uuid.UUID(record['aggregate_id']), version=record['version'], timestamp=record['timestamp'], **record['payload'] # Unpack payload dictionary directly to event constructor if applicable ) events.append(event) return events except Exception as e: raise finally: if conn: cur.close() conn.close() def get_aggregate_by_id(self, aggregate_id: uuid.UUID, aggregate_type: Type[OrderAggregate]) -> OrderAggregate: events = self.load_events(aggregate_id) if not events: return None # Aggregate not found aggregate = aggregate_type(aggregate_id=aggregate_id, current_version=0) # Initialize with ID for event in events: aggregate.apply(event) return aggregate class ConcurrencyException(Exception): """Custom exception for concurrency conflicts.""" pass
Note on Event Reconstruction: The Event base class expects payload directly, and OrderCreated etc. constructors take attributes directly (e.g., customer_id). For a real-world system, you might have a more sophisticated event serialization/deserialization mechanism to map database payload dictionaries to well-defined event objects. The example above simplifies this for clarity.
Putting it to Work
# Database configuration (replace with your actual details) DB_CONFIG = { "host": "localhost", "database": "event_sourcing_db", "user": "your_user", "password": "your_password" } event_store = EventStore(DB_CONFIG) customer_id = uuid.uuid4() order_id = uuid.uuid4() try: # 1. Create a new Order print(f"Creating a new order with ID: {order_id}") new_order = OrderAggregate(order_id=order_id, current_version=0) created_events = new_order.create_order( customer_id=customer_id, order_items=[{"item_id": "product-A", "quantity": 2, "price": 10.0}] ) event_store.save_events(new_order.order_id, "Order", 0, created_events) # Expected version for a new aggregate is 0 print(f"Order created. State: {new_order}") # 2. Load the order and add an item print(f"\nLoading order {order_id} to add an item...") loaded_order = event_store.get_aggregate_by_id(order_id, OrderAggregate) if loaded_order: print(f"Loaded order state: {loaded_order}") added_item_events = loaded_order.add_item("product-B", 1, 25.0) event_store.save_events(loaded_order.order_id, "Order", loaded_order.current_version - len(added_item_events), added_item_events) print(f"Item added. Current state: {loaded_order}") # 3. Load the order again and ship it print(f"\nLoading order {order_id} to ship it...") final_order = event_store.get_aggregate_by_id(order_id, OrderAggregate) if final_order: print(f"Loaded order state: {final_order}") shipped_events = final_order.ship_order() event_store.save_events(final_order.order_id, "Order", final_order.current_version - len(shipped_events), shipped_events) print(f"Order shipped. Final state: {final_order}") # 4. Demonstrate Concurrency Conflict (optional run) # This simulation tries to save two events from slightly different starting points # You would typically run this in two separate processes/threads to see it in action. # order_for_conflict_1 = event_store.get_aggregate_by_id(order_id, OrderAggregate) # order_for_conflict_2 = event_store.get_aggregate_by_id(order_id, OrderAggregate) # # Process 1 (attempts to add item) # conflict_events_1 = order_for_conflict_1.add_item("product-C", 3, 5.0) # event_store.save_events(order_for_conflict_1.order_id, "Order", order_for_conflict_1.current_version - len(conflict_events_1), conflict_events_1) # print(f"\nConflict attempt 1 saved.") # # Process 2 (attempts to add another item, but from old state BEFORE process 1 save) # try: # conflict_events_2 = order_for_conflict_2.add_item("product-D", 1, 15.0) # event_store.save_events(order_for_conflict_2.order_id, "Order", order_for_conflict_2.current_version - len(conflict_events_2), conflict_events_2) # print(f"This should not be printed if concurrency works.") # except ConcurrencyException as e: # print(f"\nSuccessfully caught concurrency conflict: {e}") except ConcurrencyException as e: print(f"Application error due to concurrency: {e}") except Exception as e: print(f"An unexpected error occurred: {e}")
Key Considerations and Best Practices
- Concurrency Control: The 
UNIQUE (aggregate_id, version)constraint on theeventstable combined with checkingexpected_versionbefore saving is the cornerstone of optimistic concurrency control. If two commands try to modify the same aggregate concurrently, only one will succeed, and the other will fail due to a version mismatch or a unique constraint violation, prompting a retry. - Performance of State Reconstruction: For aggregates with thousands of events, replaying all events every time can become slow.
- Snapshots: Periodically save a snapshot of the aggregate's current state to a separate 
snapshotstable. When loading an aggregate, load the latest snapshot and then only apply events after that snapshot's version. - Denormalized Read Models (Projections): This is the critical part of CQRS (Command Query Responsibility Segregation). Build separate, highly optimized read models (e.g., summary tables, search indexes) by asynchronously processing the event stream from your 
eventstable. This is often done by a separate "projector" service that polls theeventstable for new entries, processes them, and updates the read models. 
 - Snapshots: Periodically save a snapshot of the aggregate's current state to a separate 
 - Event Immutability: Once an event is written, it should never be changed or deleted. This is fundamental to Event Sourcing.
 - Event Schema Evolution: As your application evolves, event schemas may change. Plan for versioning events and provide mechanisms to handle older event formats during state reconstruction.
 - Querying: The 
eventstable itself is generally poor for complex queries about the current state of aggregates (e.g., "give me all orders over $100"). This is where read models become vital. For auditing or debugging, querying theeventstable byaggregate_id,event_type, ortimestampis highly effective. - Atomic Operations: Ensure that the saving of new events and the update to any read models (if done synchronously) within the same transaction or use "outbox pattern" for eventual consistency if pushing events to an external system. For this "single table" approach, the 
save_eventsmethod handles the atomicity for event persistence. 
Application Scenarios
This simplified Event Sourcing approach is particularly well-suited for:
- Auditing and Compliance: Every single change is recorded, providing a complete, immutable audit trail.
 - Debugging and Troubleshooting: Replay events to understand exactly how a system reached a particular erroneous state.
 - Complex Domain Logic: When business rules are complex and dependent on history (e.g., "a user can only do X if they haven't done Y in the last Z days").
 - Temporal Queries: Ask questions like "What was the state of the order yesterday at 2 PM?".
 - Small to Medium-Sized Applications: Where the full operational overhead of Kafka or other message brokers is overkill, but the benefits of Event Sourcing are desired.
 - Prototyping and Learning: A great way to get started with Event Sourcing without a steep learning curve related to infrastructure.
 - Read Model Reconstruction: If a read model becomes corrupted or needs re-architecting, you can always rebuild it entirely by replaying all historical events from the 
eventstable. 
Conclusion
Implementing Event Sourcing with just a single database table for your event log is a pragmatic and powerful approach. It strips away the complexity of external message brokers, allowing developers to focus on the core benefits of the pattern: an immutable, auditable history of all changes, robust concurrency control, and the ability to rebuild application state. While it necessitates careful consideration of performance for highly active aggregates and complex querying, these challenges can be effectively addressed through snapshots and dedicated read models, showcasing that resilient, history-rich systems are achievable with surprisingly simple infrastructure. This pattern offers a compelling blend of simplicity and power, making Event Sourcing accessible to a wider range of projects.