Warum SQLAlchemy 2.0 das mächtigste Python ORM bisher ist
Min-jun Kim
Dev Intern · Leapcell

SQLAlchemy Tutorial
SQLAlchemy ist das beliebteste Object Relational Mapping (ORM) im Python-Ökosystem. Es hat ein elegantes Design und ist in zwei Teile unterteilt: den zugrunde liegenden Core und das traditionelle ORM der oberen Ebene. In den meisten ORMs in Python und sogar in anderen Sprachen wurde kein gutes hierarchisches Design implementiert. Zum Beispiel sind im ORM von Django die Datenbankverbindung und das ORM selbst vollständig miteinander vermischt.
Warum brauchen wir den Core?
Die Core-Schicht implementiert hauptsächlich den Client-Verbindungspool. Als Kern moderner Webanwendungen ist die Fähigkeit von relationalen Datenbanken, gleichzeitige Verbindungen zu verwalten, oft nicht stark. Es wird generell nicht empfohlen, eine große Anzahl von Kurzzeitverbindungen zu verwenden, und in den meisten Fällen ist ein Verbindungspool erforderlich. Es gibt ungefähr zwei Arten von Verbindungspools:
- Serverseitiger Verbindungspool: Eine spezielle Verbindungspool-Middleware, die jedes Mal eine lange Verbindung zur Wiederverwendung für eine kurze Verbindung zuweist.
- Clientseitiger Verbindungspool: Wird im Allgemeinen als Drittanbieterbibliothek in den Code eingeführt.
Der Verbindungspool von SQLAlchemy gehört zum clientseitigen Verbindungspool. In diesem Verbindungspool verwaltet SQLAlchemy eine bestimmte Anzahl von Langzeitverbindungen. Wenn connect
aufgerufen wird, wird tatsächlich eine Verbindung aus dem Pool abgerufen; wenn close
aufgerufen wird, wird die Verbindung tatsächlich an den Pool zurückgegeben.
Erstellen einer Verbindung
In SQLAlchemy verwenden Sie create_engine
, um eine Verbindung (Pool) zu erstellen. Der Parameter von create_engine
ist die URL der Datenbank.
from sqlalchemy import create_engine # MySQL-Verbindungsbeispiel engine = create_engine( "mysql://user:password@localhost:3306/dbname", echo=True, # Wenn echo auf True gesetzt ist, wird das tatsächlich ausgeführte SQL ausgegeben, was die Fehlersuche erleichtert future=True, # Verwenden Sie die SQLAlchemy 2.0 API, die abwärtskompatibel ist pool_size=5, # Die Größe des Verbindungspools beträgt standardmäßig 5. Wenn Sie ihn auf 0 setzen, gibt es keine Beschränkung für die Verbindung pool_recycle=3600 # Stellen Sie die Zeit ein, um die automatische Trennung der Datenbank zu begrenzen ) # Erstellen Sie eine In-Memory-SQLite-Datenbank. Sie müssen check_same_thread=False hinzufügen, da sie sonst nicht in einer Multithread-Umgebung verwendet werden kann engine = create_engine("sqlite:///:memory:", echo=True, future=True, connect_args={"check_same_thread": False}) # Eine andere Möglichkeit, sich mit MySQL zu verbinden # pip install mysqlclient engine = create_engine('mysql+mysqldb://user:password@localhost/foo?charset=utf8mb4')
Die Core-Schicht -- SQL direkt verwenden
CRUD
from sqlalchemy import text with engine.connect() as conn: result = conn.execute(text("select * from users")) print(result.all()) # Das Ergebnis kann iteriert werden, und jedes Zeilenergebnis ist ein Row-Objekt for row in result: # Das Row-Objekt unterstützt drei Zugriffsmethoden print(row.x, row.y) print(row[0], row[1]) print(row["x"], row["y"]) # Parameter übergeben, verwenden Sie `:var` zum Übergeben result = conn.execute( text("SELECT x, y FROM some_table WHERE y > :y"), {"y": 2} ) # Sie können die Parameter auch vorkompilieren stmt = text("SELECT x, y FROM some_table WHERE y > :y ORDER BY x, y").bindparams(y=6) # Beim Einfügen können Sie direkt mehrere Zeilen einfügen conn.execute( text("INSERT INTO some_table (x, y) VALUES (:x, :y)"), [{"x": 11, "y": 12}, {"x": 13, "y": 14}] )
Transaktionen und Commit
SQLAlchemy bietet zwei Möglichkeiten zum Commit, eine ist das manuelle commit
und die andere ist das halbautomatische commit
. Die offizielle Dokumentation empfiehlt die Verwendung von engine.begin()
. Es gibt auch eine vollständig automatische autocommit
-Methode, die einmal für jede Zeile einen Commit ausführt, was nicht empfohlen wird.
# "Commit as you go" erfordert einen manuellen Commit with engine.connect() as conn: conn.execute(text("CREATE TABLE some_table (x int, y int)")) conn.execute( text("INSERT INTO some_table (x, y) VALUES (:x, :y)"), [{"x": 1, "y": 1}, {"x": 2, "y": 4}] ) conn.commit() # Beachten Sie hier den Commit # "Begin once" halbautomatischer Commit with engine.begin() as conn: conn.execute( text("INSERT INTO some_table (x, y) VALUES (:x, :y)"), [{"x": 6, "y": 8}, {"x": 9, "y": 10}] )
ORM
Session
Die Session
ist nicht threadsicher. Aber im Allgemeinen sollte das Web-Framework am Anfang jeder Anfrage eine Session
abrufen, also ist es auch kein Problem.
from sqlalchemy.orm import Session with Session(engine) as session: session.add(foo) session.commit() # Sie können auch sessionmaker verwenden, um eine Factory-Funktion zu erstellen, sodass Sie nicht jedes Mal Parameter eingeben müssen from sqlalchemy.orm import sessionmaker new_session = sessionmaker(engine) with new_session() as session: ...
Deklarative API
- Verwenden Sie
__tablename__
, um den Namen der Datenbanktabelle anzugeben. - Verwenden Sie
Mapped
und native Typen, um jedes Feld zu deklarieren. - Verwenden Sie
Integer
,String
usw., um den Feldtyp anzugeben. - Verwenden Sie den Parameter
index
, um den Index anzugeben. - Verwenden Sie den Parameter
unique
, um den eindeutigen Index anzugeben. - Verwenden Sie
__table_args__
, um andere Attribute anzugeben, z. B. zusammengesetzte Indizes.
from datetime import datetime from sqlalchemy import Integer, String, func, UniqueConstraint from sqlalchemy.orm import relationship, mapped_column, Mapped from sqlalchemy.orm import DeclarativeBase class Base(DeclarativeBase): pass class User(Base): __tablename__ = "users" # Es muss ein Tupel sein, keine Liste __table_args__ = (UniqueConstraint("name", "time_created"),) id: Mapped[int] = mapped_column(Integer, primary_key=True) name: Mapped[str] = mapped_column(String(30), index=True) fullname: Mapped[str] = mapped_column(String, unique=True) # Für besonders große Felder können Sie auch deferred verwenden, sodass dieses Feld standardmäßig nicht geladen wird description: Mapped[str] = mapped_column(Text, deferred=True) # Standardwert, beachten Sie, dass eine Funktion übergeben wird, nicht die aktuelle Zeit time_created: Mapped[datetime] = mapped_column(DateTime(Timezone=True), default=datetime.now) # Oder verwenden Sie den Serverstandardwert, der jedoch beim Erstellen der Tabelle festgelegt werden muss und Teil des Tabellenschemas wird time_created: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now()) time_updated: Mapped[datetime] = mapped_column(DateTime(timezone=True), onupdate=func.now()) class Address(Base): __tablename__ = "address" id: Mapped[int] = mapped_column(Integer, primary_key=True) email_address: Mapped[str] = mapped_column(String, nullable=False) # Rufen Sie create_all auf, um alle Modelle zu erstellen Base.metadata.create_all(engine) # Wenn Sie nur ein Modell erstellen müssen User.__table__.create(engine)
Fremdschlüssel
Verwenden Sie relationship
, um die Zuordnungsbeziehung zwischen Modellen anzugeben.
Bidirektionale Zuordnung einer Eins-zu-Viele-Beziehung
from sqlalchemy import create_engine, Integer, String, ForeignKey from sqlalchemy.orm import DeclarativeBase, relationship, Session, Mapped, mapped_column class Group(Base): __tablename__ = 'groups' id: Mapped[int] = mapped_column(Integer, primary_key=True) name: Mapped[str] = mapped_column(String) # Die entsprechenden mehreren Benutzer verwenden hier den Modellnamen als Parameter members = relationship('User') class User(Base): __tablename__ = 'users' id = Column(Integer, primary_key=True) name = Column(String) # group_id ist der eigentliche Name des Fremdschlüssels in der Datenbank, und das zweite Feld ForeignKey wird verwendet, um die entsprechende ID anzugeben group_id = Column(Integer, ForeignKey('groups.id')) # Das entsprechende Gruppenfeld im Modell muss deklarieren, mit welchem Feld im entsprechenden Modell es sich überschneidet group = relationship('Group', overlaps="members")
Viele-zu-Viele-Zuordnung, eine Zuordnungstabelle ist erforderlich
# Zuordnungstabelle class UserPermissions(Base): __tablename__ = 'user_permissions' id: Mapped[int] = mapped_column(Integer, primary_key=True) # Verwenden Sie auch den Fremdschlüssel, um den Fremdschlüssel anzugeben user_id: Mapped[int] = mapped_column(Integer, ForeignKey('users.id')) permission_id: Mapped[str] = mapped_column(String, ForeignKey('permissions.id')) class User(Base): __tablename__ = 'users' id: Mapped[int] = mapped_column(Integer, primary_key=True) name: Mapped[str] = Column(String) # Verwenden Sie secondary, um die Zuordnungstabelle anzugeben, und verwenden Sie auch overlaps, um das entsprechende Feld im Modell anzugeben permissions = relationship('Permission', secondary="user_permissions", overlaps="users") class Permission(Base): __tablename__ = 'permissions' id: Mapped[int] = mapped_column(Integer, primary_key=True) name: Mapped[str] = Column(String) # Das gleiche wie oben users = relationship('User', secondary="user_permissions", overlaps="permissions") user1 = User(name='user1', group_id=1) user2 = User(name='user2') group1 = Group(name='group1') group2 = Group(name='group2', members=[user2]) permission1 = Permission(name="open_file") permission2 = Permission(name="save_file") user1.permissions.append(permission1) db.add_all([user1, user2, group1, group2, permission1, permission2]) db.commit() print(user1.permissions[0].id)
In den meisten anderen Tutorials wird backref
verwendet, um die Attribute des entsprechenden Modells zu generieren. Hier ist es jedoch vorzuziehen, die zugänglichen Attribute im entsprechenden Modell explizit zu deklarieren.
CRUD
Anders als bei der 1.x-API wird in der 2.0-API nicht mehr query
verwendet, sondern select
, um Daten abzufragen.
from sqlalchemy import select # Der Parameter von where ist ein Ausdruck, der aus `==` besteht. Der Vorteil ist, dass beim Schreiben von Code Rechtschreibfehler erkannt werden stmt = select(User).where(User.name == "john").order_by(User.id) # filter_by verwendet **kwargs als Parameter stmt = select(User).filter_by(name="some_user") # order_by kann auch User.id.desc() verwenden, um die umgekehrte Sortierung darzustellen result = session.execute(stmt) # Im Allgemeinen sollte beim Auswählen des gesamten Objekts die scalars-Methode verwendet werden, da andernfalls ein Tupel mit einem Objekt zurückgegeben wird for user in result.scalars(): print(user.name) # Beim Abfragen eines einzelnen Attributs des Modells ist es nicht erforderlich, scalars zu verwenden result = session.execute(select(User.name)) for row in result: print(row.name) # Es gibt auch eine Verknüpfung zum Abfragen nach ID: user = session.get(User, pk=1) # Um Daten zu aktualisieren, muss die Update-Anweisung verwendet werden from sqlalchemy import update # synchronize_session hat drei Optionen: false, "fetch", "evaluate", und der Standardwert ist evaluate # false bedeutet, das Objekt in Python überhaupt nicht zu aktualisieren # fetch bedeutet, ein Objekt aus der Datenbank neu zu laden # evaluate bedeutet, dass beim Aktualisieren der Datenbank auch versucht wird, die gleiche Operation so weit wie möglich am Objekt in Python auszuführen stmt = update(User).where(User.name == "john").values(name="John").execution_options(synchronize_session="fetch") session.execute(stmt) # Oder weisen Sie dem Attribut direkt einen Wert zu user.name = "John" session.commit() # Es gibt hier einen Ort, der eine Race Condition (竞态条件) verursachen kann # Falsch! Wenn zwei Prozesse diesen Wert gleichzeitig aktualisieren, kann dies dazu führen, dass nur ein Wert aktualisiert wird. # Beide weisen den Wert zu, von dem sie glauben, dass er korrekt ist, nämlich 2, aber der tatsächlich korrekte Wert ist 1 + 1 + 1 = 3 # Entsprechendes SQL: Update users set visit_count = 2 where user.id = 1 user.visit_count += 1 # Korrekter Ansatz: Beachten Sie das große U, d. h. die Verwendung des Attributs des Modells, und das generierte SQL soll 1 auf der SQL-Serverseite hinzufügen # Entsprechendes SQL: Update users set visit_count = visit_count + 1 where user.id = 1 user.visit_count = User.visit_count + 1 # Um ein Objekt hinzuzufügen, verwenden Sie direkt die session.add-Methode session.add(user) # Oder add_all session.add_all([user1, user2, group1]) # Wenn Sie die eingefügte ID abrufen möchten, können Sie sie natürlich auch nach dem Commit lesen session.flush() # flush ist kein Commit, und die Transaktion wurde nicht ausgeführt. Es sollte wiederholtes Lesen sein, was mit der Isolationsebene der Datenbank zusammenhängt. print(user.id) # Zum Löschen verwenden Sie session.delete session.delete(user)
Laden zugehöriger Modelle
Wenn Sie nach dem Lesen einer Liste von N Datensätzen einzeln die spezifischen Werte jedes Elements in der Datenbank lesen, werden N+1 Abfragen generiert. Dies ist der häufigste Fehler in der Datenbank: das N+1-Problem.
Standardmäßig werden die den Fremdschlüssel zugeordneten Modelle in der Abfrage nicht geladen. Sie können die Option selectinload
verwenden, um die Fremdschlüssel zu laden und so das N+1-Problem zu vermeiden.
# Fremdschlüssel nicht geladen session.execute(select(User)).scalars().all() # Fremdschlüssel geladen session.execute(select(User).options(selectinload(User.groups))).scalars().all()
Das Prinzip von Selectinload
ist die Verwendung der select in
-Unterabfrage. Zusätzlich zu selectinload
kann auch das herkömmliche joinedload
verwendet werden, dessen Prinzip die gebräuchlichste join table
ist.
# Verwenden Sie joinedload, um Fremdschlüssel zu laden. Beachten Sie, dass die eindeutige Methode verwendet werden muss, die in 2.0 angegeben ist. session.execute(select(User).options(joinedload(User.groups))).unique().scalars().all()
In 2.0 wird empfohlen, eher selectinload
als joinedload
zu verwenden. Im Allgemeinen ist selectinload
besser und es ist nicht erforderlich, unique
zu verwenden.
Schreiben von Fremdschlüsseln
In SQLAlchemy können Sie Fremdschlüssel direkt wie Arrays behandeln.
user.permissions.append(open_permission) # Hinzufügen user.permissions.remove(save_permission) # Entfernen # Löschen Sie alle Fremdschlüssel user.permissions.clear() user.permissions = []
Besondere Behandlung von JSON-Feldern
Die meisten Datenbanken unterstützen jetzt JSON-Felder. In SQLAlchemy können Sie ein JSON-Objekt direkt aus einem Feld lesen oder ein JSON-Objekt hineinschreiben. Führen Sie jedoch niemals direkt ein Update
für dieses JSON-Objekt durch und erwarten Sie, dass es in die Datenbank zurückgeschrieben wird, was unzuverlässig ist. Stellen Sie sicher, dass Sie es kopieren, lesen und schreiben und es dann wieder zuweisen.
import copy article = session.get(Article, 1) tags = copy.copy(article.tags) tags.append("iOS") article.tags = tags session.commit()
Batch-Einfügung
Wenn eine große Datenmenge eingefügt werden muss, wird viel Zeit für die Interaktion mit der Datenbank verschwendet, wenn die Methode des einzelnen Einfügens verwendet wird, und die Effizienz ist sehr gering. Die meisten Datenbanken wie MySQL stellen die Batch-Einfügungs-API insert ... values (...), (...) ...
bereit, und diese kann auch in SQLAlchemy gut genutzt werden.
# Verwenden Sie session.bulk_save_objects(...), um direkt mehrere Objekte einzufügen from sqlalchemy.orm import Session s = Session() objects = [ User(name="u1"), User(name="u2"), User(name="u3") ] s.bulk_save_objects(objects) s.commit() # Die Verwendung von bulk_insert_mappings kann den Aufwand für das Erstellen von Objekten sparen und direkt Wörterbücher einfügen users = [ {"name": "u1"}, {"name": "u2"}, {"name": "u3"}, ] s.bulk_insert_mappings(User, users) s.commit() # Mithilfe von bulk_update_mappings können Objekte in Batches aktualisiert werden. Die ID im Wörterbuch wird als where-Bedingung verwendet, # und alle anderen Felder werden für die Aktualisierung verwendet session.bulk_update_mappings(User, users)
DeclarativeBase
Nutzen Sie das native Python-Typsystem voll aus
from sqlalchemy.orm import DeclarativeBase class Base(DeclarativeBase): pass from sqlalchemy.orm import mapped_column, MappedColumn id: Mapped[int] = mapped_column(Integer, primary_key=True) fullname: Mapped[Optional[str]]
Asyncio
One AsyncSession per task
. Das AsyncSession
-Objekt ist ein veränderliches, zustandsbehaftetes Objekt, das eine laufende einzelne, zustandsbehaftete Datenbanktransaktion darstellt. Wenn Sie asyncio
für gleichzeitige Aufgaben verwenden, z. B. APIs wie asyncio.gather()
, sollte jede einzelne Aufgabe eine separate AsyncSession
verwenden.
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession engine = create_async_engine(url, echo=True) session = async_sessionmaker(engine) # Objekte erstellen async with engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) # Daten einfügen async with session() as db: db.add(...) await db.commit() # Daten abfragen async with session() as db: stmt = select(A) row = await db.execute(stmt) for obj in row.scalars(): print(obj.id) await engine.dispose()
Verwendung in einer Multiprocessing-Umgebung
Aufgrund des Global Interpreter Lock (GIL) in Python muss Multiprocessing verwendet werden, um Multi-Core-Prozessoren zu nutzen. In einer Multiprocessing-Umgebung können Ressourcen nicht gemeinsam genutzt werden. Entsprechend SQLAlchemy kann der Verbindungspool nicht gemeinsam genutzt werden. Wir müssen dieses Problem manuell lösen.
Im Allgemeinen ist es am besten, nicht zu versuchen, dieselbe Session
unter mehreren Prozessen gemeinsam zu nutzen. Es ist am besten, eine Session
zu erstellen, wenn jeder Prozess initialisiert wird.
Hinzufügen von Where-Bedingungen nur, wenn ein Wert festgelegt ist
In der URL ist es oft notwendig, entsprechende Ergebnisse zurückzugeben, je nachdem, welche Optionen der Benutzer angegeben hat.
query = select(User) if username is not None: query = query.where(User.username == username) if password is not None: query = query.where(User.password == password)
Leapcell: Die Next-Gen Serverless-Plattform für Webhosting, Async-Aufgaben und Redis
Abschließend möchte ich Ihnen die Plattform empfehlen, die sich am besten für die Bereitstellung von Python-Diensten eignet: Leapcell
1. Multi-Sprachen Unterstützung
- Entwickeln Sie mit JavaScript, Python, Go oder Rust.
2. Stellen Sie unbegrenzt Projekte kostenlos bereit
- Zahlen Sie nur für die Nutzung – keine Anfragen, keine Gebühren.
3. Unschlagbare Kosteneffizienz
- Pay-as-you-go ohne Leerlaufgebühren.
- Beispiel: 25 US-Dollar unterstützen 6,94 Millionen Anfragen bei einer durchschnittlichen Antwortzeit von 60 ms.
4. Optimierte Entwicklererfahrung
- Intuitive Benutzeroberfläche für mühelose Einrichtung.
- Vollautomatische CI/CD-Pipelines und GitOps-Integration.
- Echtzeitmetriken und -protokollierung für umsetzbare Erkenntnisse.
5. Mühelose Skalierbarkeit und hohe Leistung
- Automatische Skalierung zur einfachen Bewältigung hoher Parallelität.
- Kein Betriebsaufwand – konzentrieren Sie sich einfach auf das Bauen.
Erfahren Sie mehr in der Dokumentation!
Leapcell Twitter: https://x.com/LeapcellHQ