Aufbau von Hochleistungs-Async-APIs mit FastAPI, SQLAlchemy 2.0 und Asyncpg
James Reed
Infrastructure Engineer · Leapcell

Einführung
In der sich rasant entwickelnden Landschaft der Webentwicklung ist der Aufbau von Anwendungen, die nicht nur leistungsfähig, sondern auch skalierbar sind, von größter Bedeutung. Herkömmliche synchrone E/A-Operationen können oft zu einem Engpass werden, insbesondere bei Datenbankinteraktionen, was zu einer beeinträchtigten Benutzererfahrung und einem reduzierten Serverdurchsatz führt. Hier glänzt die asynchrone Programmierung und bietet einen Paradigmenwechsel, der es Anwendungen ermöglicht, nicht-blockierende Operationen durchzuführen und somit ein höheres Volumen gleichzeitiger Anfragen effizient zu bearbeiten. FastAPI hat sich mit seinen inhärenten asynchronen Fähigkeiten zu einem bevorzugten Framework für die Erstellung von Hochleistungs-APIs entwickelt. Selbst in einem asynchronen Framework kann jedoch die Datenbankebene blockieren, wenn sie nicht korrekt gehandhabt wird. Dieser Artikel befasst sich mit der leistungsstarken Synergie von FastAPI, der asynchronen Engine von SQLAlchemy 2.0 und dem asyncpg
-Treiber und zeigt, wie eine robuste und hochgradig parallele API erstellt wird, die das volle Potenzial asynchroner Datenbankinteraktionen ausschöpft.
Kernkomponenten für asynchronen Datenbankzugriff
Bevor wir uns mit den Implementierungsdetails befassen, sollten wir ein klares Verständnis der Schlüsseltechnologien gewinnen, die für den asynchronen Datenbankzugriff innerhalb einer FastAPI-Anwendung erforderlich sind.
- FastAPI: Ein modernes, schnelles (hochleistungsfähiges) Webframework zum Erstellen von APIs mit Python 3.7+ auf Basis von Standard-Python-Typ-Hinweisen. Seine native asynchrone Unterstützung macht es zur idealen Wahl für I/O-gebundene Anwendungen.
- SQLAlchemy 2.0: Ein leistungsstarkes und flexibles ORM (Object Relational Mapper) für Python. Version 2.0 bringt signifikante Fortschritte, insbesondere die First-Class-Unterstützung für asynchrone Datenbankoperationen, die es besser auf moderne asynchrone Frameworks abstimmt.
- Asynchrone Engine (SQLA 2.0): Dies bezieht sich auf
create_async_engine
von SQLAlchemy und zugehörige asynchrone Konstrukte. Anstatt die Event-Schleife zu blockieren, während auf Datenbankantworten gewartet wird, ermöglichen diese asynchronen Komponenten der Anwendung den Wechsel zu anderen Aufgaben und verbessern so die Nebenläufigkeit. - asyncpg: Ein schneller PostgreSQL-Datenbanktreiber für Python. Er wurde von Grund auf für asynchrone E/A mit
asyncio
entwickelt und bietet im Vergleich zu herkömmlichen Treibern im asynchronen Kontext eine überlegene Leistung. Er ist der empfohlene Treiber für den asynchronen PostgreSQL-Backend von SQLAlchemy. async/await
: Python-Schlüsselwörter zum Definieren und Ausführen von Coroutinen, d. h. Funktionen, die ihre Ausführung pausieren und später fortsetzen können, um nicht-blockierende E/A zu ermöglichen.
Prinzipien der asynchronen Datenbankinteraktion
Das Kernprinzip der asynchronen Datenbankinteraktion mit SQLAlchemy 2.0 und asyncpg
besteht darin, sicherzustellen, dass alle Datenbankoperationen nicht blockierend sind. Wenn eine async
-Funktion eine Datenbankabfrage ausführt, gibt sie die Kontrolle an die Event-Schleife zurück, anstatt auf die Antwort der Datenbank zu warten. Die Event-Schleife kann dann andere ausstehende Aufgaben ausführen (z. B. eine andere eingehende Anfrage bearbeiten), bis die Datenbankantwort bereit ist. Sobald die Antwort eintrifft, wird die ursprüngliche async
-Funktion fortgesetzt.
Einrichtung der asynchronen Engine und Sitzung
Zuerst müssen wir SQLAlchemy so konfigurieren, dass es seine asynchronen Fähigkeiten mit asyncpg
nutzt.
Stellen Sie zunächst sicher, dass Sie die erforderlichen Bibliotheken installiert haben:
pip install fastapi "uvicorn[standard]" sqlalchemy "asyncpg<0.29.0"
Hinweis: Ab der SQLAlchemy 2.0.x-Serie können asyncpg
-Versionen 0.29.0 und höher Probleme mit create_async_engine
verursachen. Es ist sicherer, es vorerst auf <0.29.0
zu beschränken.
Als Nächstes richten wir die asynchrone Engine und eine Sitzungsfabrik ein:
# database.py from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession from sqlalchemy.orm import declarative_base # Ersetzen Sie dies durch Ihre PostgreSQL-Verbindungszeichenfolge # Das Präfix 'postgresql+asyncpg' weist SQLAlchemy an, asyncpg zu verwenden DATABASE_URL = "postgresql+asyncpg://user:password@host:port/dbname" # Erstellen Sie die asynchrone Engine # pool_pre_ping=True hilft bei der Aufrechterhaltung von Verbindungen für lang laufende Anwendungen engine = create_async_engine(DATABASE_URL, echo=True, pool_pre_ping=True) # Konfigurieren Sie den asynchronen Sessionmaker # expire_on_commit=False verhindert, dass Objekte nach einem Commit abgelaufen (getrennt) werden, # was nützlich sein kann, um Objekte für weitere Operationen mit der Sitzung verbunden zu halten. AsyncSessionLocal = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) Base = declarative_base() # Abhängigkeit zum Abrufen einer asynchronen Datenbanksitzung async def get_db(): async with AsyncSessionLocal() as session: yield session
In dieser Einrichtung:
create_async_engine
initialisiert eine asynchrone Engine, die mithilfe vonasyncpg
mit unserer PostgreSQL-Datenbank verbunden ist.async_sessionmaker
stellt eine Fabrik zum Erstellen vonAsyncSession
-Objekten bereit.get_db
ist eine FastAPI-Abhängigkeit, die eineAsyncSession
liefert und so die ordnungsgemäße Verwaltung des Sitzungslebenszyklus (Erstellung und Schließung) sicherstellt.
Modelle definieren und Migrationen durchführen
Genau wie bei der synchronen SQLAlchemy definieren Sie Ihre Modelle mit declarative_base
.
# models.py from sqlalchemy import Column, Integer, String from database import Base class Item(Base): __tablename__ = "items" id = Column(Integer, primary_key=True, index=True) name = Column(String, index=True) description = Column(String) def __repr__(self): return f"<Item(id={self.id}, name='{self.name}')>" # Beispiel für das Erstellen von Tabellen (normalerweise über Alembic für Migrationen) async def create_db_and_tables(): async with engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) # Dies würden Sie normalerweise einmal ausführen, um Ihre Datenbank einzurichten # import asyncio # async def main(): # await create_db_and_tables() # # if __name__ == "__main__": # asyncio.run(main())
Integration in FastAPI-Endpunkte
Nun integrieren wir diese Komponenten in eine FastAPI-Anwendung, um CRUD-Operationen asynchron durchzuführen.
# main.py from fastapi import FastAPI, Depends, HTTPException, status from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.future import select from database import get_db, create_db_and_tables, engine, AsyncSessionLocal from models import Item from schemas import ItemCreate, ItemResponse # Angenommen, diese Pydantic-Modelle sind definiert app = FastAPI() @app.on_event("startup") async def startup_event(): # Beispiel: Tabellen beim Start erstellen (für einfache Apps Alembic für die Produktion verwenden) async with engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) print("Datenbanktabellen erstellt (falls nicht vorhanden).") @app.post("/items/", response_model=ItemResponse, status_code=status.HTTP_201_CREATED) async def create_item(item: ItemCreate, db: AsyncSession = Depends(get_db)): db_item = Item(**item.dict()) db.add(db_item) await db.commit() # Asynchrones Commit await db.refresh(db_item) # Aktualisieren, um potenziell generierte IDs/Standardwerte zu erhalten return db_item @app.get("/items/", response_model=list[ItemResponse]) async def read_items(skip: int = 0, limit: int = 10, db: AsyncSession = Depends(get_db)): # Asynchrone Abfrageausführung result = await db.execute(select(Item).offset(skip).limit(limit)) items = result.scalars().all() return items @app.get("/items/{item_id}", response_model=ItemResponse) async def read_item(item_id: int, db: AsyncSession = Depends(get_db)): result = await db.execute(select(Item).filter(Item.id == item_id)) item = result.scalars().first() if item is None: raise HTTPException(status_code=404, detail="Item not found") return item # Beispiel Pydantic Modelle (schemas.py) # from pydantic import BaseModel # class ItemBase(BaseModel): # name: str # description: str | None = None # # class ItemCreate(ItemBase): # pass # # class ItemResponse(ItemBase): # id: int # # class Config: # orm_mode = True # Für SQLAlchemy ORM-Kompatibilität
Im main.py
-Beispiel:
- Das
startup_event
verwendetconn.run_sync(Base.metadata.create_all)
, dacreate_all
eine synchrone Methode vonBase.metadata
ist.conn.run_sync
überbrückt diesen synchronen Aufruf in einem asynchronen Kontext und führt ihn in einem Thread-Pool aus. Dies ist ein gängiges Muster für die Integration synchroner Operationen in einen asynchronen Ablauf. - Alle Datenbankoperationen (
db.add
,db.commit
,db.execute
) sind mitawait
präfixiert, was ihre asynchrone Natur kennzeichnet. select(Item)
ist die moderne Methode zum Erstellen von Abfragen in SQLAlchemy 2.0, die eine explizitere Kontrolle und eine bessere Typinferenz bietet.result.scalars().all()
ruft effizient alle Skalarergebnisse aus der Abfrage ab.
Anwendung in realen Szenarien
Dieses Muster ist sehr gut geeignet für:
- Microservices: Erstellung unabhängiger, leistungsfähiger Dienste, die mit Datenbanken interagieren.
- Echtzeit-APIs: Wo geringe Latenzzeiten und hohe Nebenläufigkeit entscheidend sind, wie z. B. bei Chat-Anwendungen, Gaming-Backends oder Finanzhandelsplattformen.
- Datenerfassungs-APIs: Effizientes Empfangen und Speichern großer Datenmengen.
- Webhooks: Verarbeitung eingehender Daten von externen Diensten, ohne zu blockieren.
Durch die Nutzung von asyncpg
innerhalb der asynchronen Engine von SQLAlchemy können FastAPI-Anwendungen eine signifikant höhere Nebenläufigkeit und einen höheren Durchsatz erzielen als mit herkömmlichen synchronen Setups. Die hochoptimierte C-Implementierung von asyncpg
für PostgreSQL-Interaktionen in Verbindung mit der Event-Loop-Verwaltung von asyncio
minimiert Leerlaufzeiten und stellt sicher, dass Ihre API effizient eine größere Anzahl gleichzeitiger Client-Verbindungen verarbeiten kann.
Fazit
Die Kombination aus FastAPI, der asynchronen Engine von SQLAlchemy 2.0 und dem asyncpg
-Treiber stellt einen leistungsstarken und modernen Stack für die Erstellung von Hochleistungs- und skalierbaren Web-APIs in Python dar. Durch die Nutzung asynchroner Programmierung von der Framework-Ebene bis hinunter zum Datenbanktreiber können Entwickler Anwendungen erstellen, die unter hoher Last schnell reagieren und die Systemressourcen effizient nutzen. Dieser Ansatz verbessert nicht nur die Benutzererfahrung, sondern bietet auch eine robuste Grundlage für zukünftige Skalierbarkeit. Die Erstellung asynchroner nativen Anwendungen mit diesen Werkzeugen ist ein entscheidender Schritt in Richtung moderner, effizienter Webentwicklung.