Python Async IO mit FastAPI meistern
Emily Parker
Product Engineer · Leapcell

Seit Python eine interpretierte Sprache ist, ist die Antwortzeit bei der Verwendung für die Back-End-Entwicklung, z. B. in der Kombination von Python + Django, im Vergleich zu Java + Spring etwas länger. Solange der Code jedoch vernünftig ist, ist der Unterschied nicht allzu groß. Selbst wenn Django den Multiprozessmodus verwendet, ist seine Fähigkeit zur gleichzeitigen Verarbeitung immer noch viel schwächer. Python bietet einige Lösungen zur Verbesserung der Fähigkeiten zur gleichzeitigen Verarbeitung. Zum Beispiel die Verwendung des asynchronen Frameworks FastAPI, mit dessen asynchronen Fähigkeiten die Fähigkeit zur gleichzeitigen Verarbeitung von E/A-intensiven Aufgaben erheblich verbessert werden kann. FastAPI ist eines der schnellsten Python-Frameworks.
FastAPI als Beispiel
Werfen wir zunächst einen kurzen Blick auf die Verwendung von FastAPI.
Beispiel 1: Standardmäßige asynchrone Netzwerk-E/A
Installation:
pip install fastapi
Einfacher serverseitiger Code:
# main.py
from typing import Union
from fastapi import FastAPI
app = FastAPI()
@app.get("/")
async def read_root():
return {"Hello": "World"}
Start:
uvicorn main:app --reload
Wir können sehen, dass die Schnittstelle von FastAPI im Vergleich zu anderen Frameworks nur ein zusätzliches Schlüsselwort async
enthält. Das Schlüsselwort async
definiert die Schnittstelle als asynchron. Allein aus dem Rückgabeergebnis können wir keinen Unterschied zwischen FastAPI und anderen Python-Frameworks erkennen. Der Unterschied liegt im gleichzeitigen Zugriff. Wenn die Server-Threads von FastAPI Routenanfragen bearbeiten, z. B. http://127.0.0.1:8000/
, warten sie nicht mehr darauf, wenn sie auf Netzwerk-E/A stoßen, sondern bearbeiten stattdessen andere Anfragen. Wenn die Netzwerk-E/A abgeschlossen ist, wird die Ausführung fortgesetzt. Diese asynchrone Fähigkeit verbessert die Verarbeitungsfähigkeit von E/A-intensiven Aufgaben.
Beispiel 2: Explizite asynchrone Netzwerk-E/A
Sehen wir uns ein weiteres Beispiel an. Im Geschäftscode wird eine explizite asynchrone Netzwerkanfrage initiiert. Für diese Netzwerk-E/A wird FastAPI sie genauso wie Routenanfragen auch asynchron verarbeiten.
# app.py
from fastapi import FastAPI, HTTPException
import httpx
app = FastAPI()
# Beispiel einer asynchronen GET-Anfrage
@app.get("/external-api")
async def call_external_api():
url = "https://leapcell.io"
async with httpx.AsyncClient() as client:
response = await client.get(url)
if response.status_code!= 200:
raise HTTPException(status_code=response.status_code, detail="Failed to fetch data")
return response.json()
Wenn die Datenbank-E/A asynchron sein soll, benötigen Sie die Unterstützung asynchroner Operationen vom Datenbanktreiber oder ORM.
Asynchrone E/A
Die Kernimplementierung der Asynchronität von FastAPI ist asynchrone E/A
. Wir können einen Server mit asynchronen Verarbeitungsfunktionen direkt mithilfe von asynchroner E/A starten, ohne FastAPI zu verwenden.
import asyncio
from aiohttp import web
async def index(request):
await asyncio.sleep(1) # Simulieren einer E/A-Operation
return web.Response(text='{"Hello": "World"}', content_type='application/json')
async def init(loop):
# Verwenden Sie die Ereignisschleife, um Webanfragen zu überwachen
app = web.Application(loop=loop)
app.router.add_route('GET', '/', index)
# Starten Sie den Server, und die Ereignisschleife überwacht und verarbeitet Webanfragen
srv = await loop.create_server(app.make_handler(), '127.0.0.1', 8000)
print('Server started at http://127.0.0.1:8000...')
return srv
# Explizites Abrufen einer Ereignisschleife
loop = asyncio.get_event_loop()
# Starten der Ereignisschleife
loop.run_until_complete(init(loop))
loop.run_forever()
Wenn dieses Beispiel gestartet wird, ist das Rückgabeergebnis von http://127.0.0.1:8000/
dasselbe wie in Beispiel 1. Das zugrunde liegende Implementierungsprinzip der asynchronen E/A sind "Koroutinen" und "Ereignisschleifen".
Koroutinen
async def index(request): await asyncio.sleep(1) # Simulieren einer E/A-Operation return web.Response(text='{"Hello": "World"}', content_type='application/json')
Die Funktion index
wird mit async def
definiert, was bedeutet, dass es sich um eine Koroutine handelt. Das Schlüsselwort await
wird vor einer E/A-Operation verwendet, um dem Ausführungsthread mitzuteilen, dass er nicht auf diese E/A-Operation warten soll. Die Aufrufe normaler Funktionen werden über den Stack implementiert, und Funktionen können nur einzeln aufgerufen und ausgeführt werden. Eine Koroutine ist jedoch eine spezielle Art von Funktion (kein kollaborativer Thread). Sie ermöglicht es dem Thread, die Ausführung an der Markierung await
anzuhalten und zur Ausführung anderer Aufgaben zu wechseln. Wenn die E/A-Operation abgeschlossen ist, wird die Ausführung fortgesetzt.
Sehen wir uns die Auswirkungen mehrerer Koroutinen an, die gleichzeitig ausgeführt werden.
import asyncio from datetime import datetime async def coroutine3(): print(f"Coroutine 3 started at {datetime.now()}") await asyncio.sleep(1) # Simulieren einer E/A-Operation print(f"Coroutine 3 finished at {datetime.now()}") async def coroutine2(): print(f"Coroutine 2 started at {datetime.now()}") await asyncio.sleep(1) # Simulieren einer E/A-Operation print(f"Coroutine 2 finished at {datetime.now()}") async def coroutine1(): print(f"Coroutine 1 started at {datetime.now()}") await asyncio.sleep(1) # Simulieren einer E/A-Operation print(f"Coroutine 1 finished at {datetime.now()}") async def main(): print("Main started") # Erstellen von Aufgaben, um Koroutinen gleichzeitig auszuführen task1 = asyncio.create_task(coroutine1()) task2 = asyncio.create_task(coroutine2()) task3 = asyncio.create_task(coroutine3()) # Warten auf den Abschluss aller Aufgaben await task1 await task2 await task3 print("Main finished") # Ausführen der Hauptkoroutine asyncio.run(main())
Ausgabe:
Main started
Coroutine 1 started at 2024-12-27 12:28:01.661251
Coroutine 2 started at 2024-12-27 12:28:01.661276
Coroutine 3 started at 2024-12-27 12:28:01.665012
Coroutine 1 finished at 2024-12-27 12:28:02.665125
Coroutine 2 finished at 2024-12-27 12:28:02.665120
Coroutine 3 finished at 2024-12-27 12:28:02.665120
Main finished
Wir können sehen, dass der Thread die drei Aufgaben nicht nacheinander ausführt. Wenn er auf eine E/A-Operation stößt, wechselt er zur Ausführung anderer Aufgaben. Nachdem die E/A-Operation abgeschlossen ist, wird die Ausführung fortgesetzt. Es ist auch zu erkennen, dass die drei Koroutinen im Wesentlichen gleichzeitig mit dem Warten auf die E/A-Operation beginnen, sodass die endgültigen Ausführungszeiten im Wesentlichen gleich sind. Obwohl die Ereignisschleife hier nicht explizit verwendet wird, wird asyncio.run
sie implizit verwenden.
Generatoren
Koroutinen werden durch Generatoren implementiert. Generatoren können die Ausführung von Funktionen anhalten und auch wieder aufnehmen, was die Eigenschaften von Koroutinen sind.
def simple_generator(): print("First value") yield 1 print("Second value") yield 2 print("Third value") yield 3 # simple_generator ist eine Generatorfunktion, gen ist ein Generator gen = simple_generator() print(next(gen)) # Output: First value \n 1 print(next(gen)) # Output: Second value \n 2 print(next(gen)) # Output: Third value \n 3
Wenn der Generator mit next()
ausgeführt wird, wird er angehalten, wenn er auf yield
stößt. Wenn next()
erneut ausgeführt wird, wird die Ausführung ab dem yield
fortgesetzt, an dem sie zuletzt angehalten wurde. Vor Python 3.5 wurden Koroutinen auch mit "Annotationen" + yeild
geschrieben. Ab Python 3.5 werden async def
+ await
verwendet.
import asyncio from datetime import datetime @asyncio.coroutine def my_coroutine(): print("Start coroutine", datetime.now()) # Asynchroner Aufruf von asyncio.sleep(1): yield from asyncio.sleep(1) print("End coroutine", datetime.now()) # Abrufen der EventLoop loop = asyncio.get_event_loop() # Ausführen der Koroutine loop.run_until_complete(my_coroutine()) loop.close()
Die Pausen- und Fortsetzungsfunktionen von Generatoren können für viele Dinge außer Koroutinen verwendet werden. Beispielsweise können sie während der Schleife Berechnungen durchführen und Algorithmen speichern. Zum Beispiel die Implementierung eines Pascalschen Dreiecks (beide Enden jeder Zeile sind 1, und die Zahlen an anderen Positionen sind die Summe der beiden Zahlen darüber).
def pascal_triangle():
row = [1]
while True:
yield row
new_row = [1] # Das erste Element jeder Zeile ist immer 1
for i in range(1, len(row)):
new_row.append(row[i - 1] + row[i])
new_row.append(1) # Das letzte Element jeder Zeile ist immer 1
row = new_row
# Generieren und Drucken der ersten 5 Zeilen des Pascalschen Dreiecks
triangle = pascal_triangle()
for _ in range(5):
print(next(triangle))
Ausgabe:
[1]
[1, 1]
[1, 2, 1]
[1, 3, 3, 1]
[1, 4, 6, 4, 1]
Ereignisschleifen
Da die Ausführung der Koroutine angehalten werden kann, wann wird die Ausführung der Koroutine fortgesetzt? Dies erfordert die Verwendung einer Ereignisschleife, um dem Ausführungsthread mitzuteilen.
# Abrufen der EventLoop loop = asyncio.get_event_loop() # Die Ereignisschleife führt die Koroutine aus loop.run_until_complete(my_coroutine()) loop.close()
Die Ereignisschleife verwendet die I/O-Multiplexing-Technologie, um ständig zu überprüfen, bei welchen Ereignissen Koroutinen weiterhin ausgeführt werden können. Wenn sie ausgeführt werden können, führt der Thread die Koroutinen weiterhin aus.
I/O-Multiplexing-Technologie
Um I/O-Multiplexing auf einfache Weise zu verstehen: Ich bin der Chef einer Kurierstation. Ich muss nicht jeden Kurier aktiv nach dem Abschluss seiner Aufgaben fragen. Stattdessen kommen die Kuriere von selbst zu mir, nachdem sie ihre Aufgaben erledigt haben. Dies verbessert meine Fähigkeit zur Aufgabenverarbeitung, und ich kann mehr Aufgaben erledigen.
select
, poll
und epoll
können alle I/O-Multiplexing erreichen. Im Vergleich zu select
und poll
bietet epoll
eine bessere Leistung. Linux verwendet im Allgemeinen standardmäßig epoll
, und macOS verwendet kqueue
, das ähnlich wie epoll
ist und eine ähnliche Leistung aufweist.
Socket-Server mit Ereignisschleifen
import selectors
import socket
# Erstellen eines Selektorobjekts, das der Implementierung von epoll entspricht, wenn es unter Linux ausgeführt wird
sel = selectors.DefaultSelector()
# Funktion zur Verarbeitung von Anfrageempfangsereignissen. Neue Verbindungen akzeptieren und Leseereignisse registrieren
def accept(sock, mask):
conn, addr = sock.accept() # Akzeptieren der Verbindung
print('Accepted connection from', addr)
conn.setblocking(False)
sel.register(conn, selectors.EVENT_READ, read) # Registrieren des Leseereignisses
# Funktion zur Verarbeitung von Anfrageleseereignissen. Anfragedaten lesen und eine HTTP-Antwort senden, dann die Verbindung schließen.
def read(conn, mask):
data = conn.recv(100) # Daten aus der Verbindung lesen
print('response to')
response = "HTTP/1.1 200 OK\r\n" \
"Content-Type: application/json\r\n" \
"Content-Length: 18\r\n" \
"Connection: close\r\n" \
"\r\n" \
"{\"Hello\": \"World\"}"
conn.send(response.encode()) # Echo der Daten
print('Closing connection')
sel.unregister(conn) # Aufheben der Registrierung des Ereignisses
conn.close() # Schließen der Verbindung
# Erstellen eines Server-Sockets
sock = socket.socket()
sock.bind(('localhost', 8000))
sock.listen()
sock.setblocking(False)
# Registrieren des Akzeptanzereignisses
sel.register(sock, selectors.EVENT_READ, accept)
print("Server is running on port 8000...")
# Ereignisschleife
while True:
# Dies blockiert, wenn keine Anforderungen vorhanden sind
events = sel.select() # Auswählen der Dateideskriptoren (Ereignisse), die bereit sind
print("events length: ", len(events))
for key, mask in events:
callback = key.data # Abrufen der Funktion zur Ereignisverarbeitung
print("handler_name:", callback.__name__)
callback(key.fileobj, mask) # Aufrufen der Funktion zur Ereignisverarbeitung
Starten Sie den Server-Socket, um den angegebenen Port zu überwachen. Wenn selectors
auf einem Linux-System ausgeführt wird, verwendet es standardmäßig epoll
als Implementierung. Der Code verwendet epoll
, um ein Anfrageempfangsereignis (Akzeptanzereignis) zu registrieren. Wenn eine neue Anfrage eintrifft, löst epoll
aus und führt die Ereignisverarbeitungsfunktion aus und registriert gleichzeitig ein Leseereignis (Leseereignis), um die Anfragedaten zu verarbeiten und zu beantworten. Wenn über die Web-Seite mit http://127.0.0.1:8000/
darauf zugegriffen wird, ist das Rückgabeergebnis dasselbe wie in Beispiel 1. Server-Ausführungsprotokoll:
Server is running on port 8000...
events length: 1
handler_name: accept
Accepted connection from ('127.0.0.1', 60941)
events length: 1
handler_name: read
response to
Closing connection
Socket-Server
Verwenden Sie Socket direkt, um einen Server zu starten. Wenn über einen Browser unter http://127.0.0.1:8080/
oder mit curl http://127.0.0.1:8080/
darauf zugegriffen wird, wird {"Hello": "World"}
zurückgegeben.
import socket from datetime import datetime # Erstellen eines TCP-Sockets server_socket = socket.socket() # Binden des Sockets an die angegebene IP-Adresse und Portnummer server_socket.bind(('127.0.0.1', 8001)) # Starten des Abfragens auf eingehende Verbindungen server_socket.listen(5) # Schleife zum Akzeptieren von Client-Verbindungen while True: print("%s Waiting for a connection..." % datetime.now()) client_socket, addr = server_socket.accept() # Dies blockiert und wartet auf Client-Verbindungen print(f"{datetime.now()} Got connection from {addr}") # Empfangen von Client-Daten data = client_socket.recv(1024) print(f"Received: {data.decode()}") # Senden von Antwortdaten response = "HTTP/1.1 200 OK\r\n" \ "Content-Type: application/json\r\n" \ "Content-Length: 18\r\n" \ "Connection: close\r\n" \ "\r\n" \ "{\"Hello\": \"World\"}" client_socket.sendall(response.encode()) # Schließen des Client-Sockets client_socket.close()
Wenn mit curl http://127.0.0.1:8001/
darauf zugegriffen wird, Server-Ausführungsprotokoll:
2024-12-27 12:53:36.711732 Waiting for a connection...
2024-12-27 12:54:30.715928 Got connection from ('127.0.0.1', 64361)
Received: GET / HTTP/1.1
Host: 127.0.0.1:8001
User-Agent: curl/8.4.0
Accept: */*
Zusammenfassung
Asynchrone E/A wird auf der untersten Ebene mithilfe von „Koroutinen“ und „Ereignisschleifen“ implementiert. „Koroutinen“ stellen sicher, dass der Thread beim Auftreten markierter E/A-Operationen während der Ausführung nicht auf den Abschluss der E/A warten muss, sondern anhalten und den Thread andere Aufgaben ausführen lassen kann, ohne zu blockieren. „Ereignisschleifen“ verwenden die I/O-Multiplexing-Technologie, um ständig zu überprüfen, bei welchen Ereignissen Koroutinen weiterhin ausgeführt werden können. Wenn sie ausgeführt werden können, führt der Thread die Koroutinen weiterhin aus.
Leapcell: Die ideale Plattform für FastAPI und andere Python-Anwendungen:
Lassen Sie mich abschließend die ideale Plattform für die Bereitstellung von Flask/FastAPI vorstellen: Leapcell.
Leapcell ist eine Cloud-Computing-Plattform, die speziell für moderne verteilte Anwendungen entwickelt wurde. Das Pay-as-you-go-Preismodell stellt sicher, dass keine Kosten für Leerlauf entstehen, was bedeutet, dass Benutzer nur für die Ressourcen zahlen, die sie tatsächlich nutzen.
Die einzigartigen Vorteile von Leapcell für WSGI/ASGI-Anwendungen:
1. Multi-Language-Unterstützung
- Unterstützt die Entwicklung in JavaScript, Python, Go oder Rust.
Kostenlose Bereitstellung unbegrenzter Projekte
- Berechnet nur basierend auf der Nutzung. Keine Gebühren, wenn keine Anfragen vorhanden sind.
2. Unübertroffene Kosteneffizienz
- Pay-as-you-go, ohne Leerlaufgebühren.
- Beispielsweise können 25 US-Dollar 6,94 Millionen Anfragen mit einer durchschnittlichen Antwortzeit von 60 Millisekunden unterstützen.
3. Vereinfachte Entwicklererfahrung
- Intuitive Benutzeroberfläche für einfache Einrichtung.
- Vollautomatische CI/CD-Pipelines und GitOps-Integration.
- Echtzeitmetriken und -protokolle, die umsetzbare Erkenntnisse liefern.
4. Mühelose Skalierbarkeit und hohe Leistung
- Automatische Skalierung zur einfachen Bewältigung hoher Parallelität.
- Keine Betriebskosten, sodass sich Entwickler auf die Entwicklung konzentrieren können.
Erfahren Sie mehr in der Dokumentation!
Leapcell Twitter: https://x.com/LeapcellHQ