FastAPIでのバックグラウンドタスクと長時間実行オペレーションの管理
Grace Collins
Solutions Engineer · Leapcell

はじめに
現代のWebアプリケーションの世界では、ユーザーエクスペリエンスが最優先されます。遅い、または応答性の低いインターフェースは、すぐにユーザーの不満を招き、離脱させてしまいます。多くの場合、アプリケーションは計算集約型、外部サービスの利用、または単に完了にかなりの時間を要するオペレーションを実行する必要があります。これらのタスクがメインのリクエスト・レスポンスサイクル内で同期的に実行されると、サーバーがブロックされ、ユーザーにとっては遅延やタイムアウトの可能性があります。ここで、バックグラウンドタスクの概念が重要になります。これらの長時間実行オペレーションを非同期に処理するようにオフロードすることで、APIが高速で応答性が高く、即時のユーザーインタラクションに利用可能であることを保証できます。この記事では、Python 3.7+でAPIを構築するためのモダンで高速(高性能)なWebフレームワークであるFastAPIが、単純なファイア・アンド・フォーゲットオペレーションから専用タスクキューを必要とする複雑な長時間実行プロセスまで、これらのタスクを効果的に管理する方法を開発者に提供する方法を探ります。
コアコンセプトの解説
詳細に入る前に、議論の中心となるいくつかの重要な用語を定義しましょう。
- 同期オペレーション: 次のタスクが開始される前に完了する必要があるタスク。Webサーバーのコンテキストでは、リクエストハンドラが同期的な長時間実行オペレーションを実行すると、そのオペレーションが完了するまで他のリクエストを処理するためにサーバーがブロックされます。
- 非同期オペレーション: メインプログラムが他のタスクの完了を待たずに実行を続行できるように、バックグラウンドで独立して実行できるタスク。非同期を利用するWebサーバーは、一部のリクエストがI/Oオペレーションの待機を伴う場合でも、複数のリクエストを同時に処理できます。
- バックグラウンドタスク: Webリクエストのコンテキスト内で開始される非同期オペレーションの特定の種類ですが、HTTPレスポンスがクライアントに送信された後に処理されます。これらは通常、クライアントがタスクの完了を待つ必要のない「ファイア・アンド・フォーゲット」タスクです。
- 長時間実行タスク: 完了にかなりの時間(秒、分、または数時間)がかかるタスク。これらのタスクは、複雑な計算、データ処理、ファイル変換、または外部APIとの統合を伴うことがよくあります。バックグラウンドタスクは長時間実行される可能性がありますが、すべての長時間実行タスクが単純なファイア・アンド・フォーゲットであるわけではありません。一部は進行状況の追跡、エラー処理、および潜在的な再試行を必要とします。
- タスクキュー: タスクを非同期に管理および実行するために設計されたシステム。長時間実行タスクが開始されると、キューに追加されます。その後、別のワーカープロセスがこのキューからタスクを取得して実行します。多くの場合、再試行、スケジューリング、および結果の保存などの機能を提供します。人気のある例としては、CeleryやRQ(Redis Queue)があります。
- ワーカー: タスクキューからタスクを取得して実行する責任のある専用プロセスまたはプロセスのセット。ワーカーはメインAPIサーバーとは独立して動作します。
FastAPIでのタスクの処理
FastAPIは、BackgroundTasks
依存関係を使用して、単純なバックグラウンドタスクの組み込みサポートを提供します。より複雑で真に長時間実行されるオペレーションには、外部非同期タスクキューとの統合が業界標準です。
単純なオフロードのためのFastAPIのBackgroundTasks
FastAPIのBackgroundTasks
機能は、HTTPレスポンスが送信された後に実行する必要がある、軽量でクリティカルではないタスクに最適です。これには、メール通知の送信、アクティビティのログ記録、またはキャッシュの更新などが含まれます。ここでの重要な特徴は、クライアントがタスクの完了を待たず、その失敗が直接的なユーザーエクスペリエンスに影響しないということです。
ユーザーがリソースにアクセスした後にユーザーのアクティビティをログに記録する例で示しましょう。
from fastapi import FastAPI, BackgroundTasks, Depends import uvicorn import time app = FastAPI() def write_log(message: str): with open("app.log", mode="a") as log: log.write(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] {message}\n") @app.post("/send_email/{email}") async def send_email_background(email: str, background_tasks: BackgroundTasks): background_tasks.add_task(write_log, f"Sending email to {email}") # メール送信プロセスのシミュレーション(別のバックグラウンドタスクや実際の非同期オペレーションになる可能性があります) # email_service.send_async(email, "Welcome!", "Thanks for signing up!") return {"message": "Email sending initiated. Check logs for details."} @app.get("/items/{item_id}") async def read_item(item_id: int, background_tasks: BackgroundTasks): background_tasks.add_task(write_log, f"Accessed item {item_id}") return {"item_id": item_id, "message": "Item accessed successfully."} if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=8000)
この例では:
- タスクをシミュレートする単純な
write_log
関数を定義します。 - APIエンドポイント(
/send_email/{email}
と/items/{item_id}
)で、依存関係としてbackground_tasks: BackgroundTasks
を宣言します。 - 次に、
background_tasks.add_task(function_name, *args, **kwargs)
を使用してwrite_log
関数をスケジュールします。 - FastAPIは、クライアントにHTTPレスポンスが送信された後に
write_log
が実行されることを保証します。クライアントは即座に応答を受け取りますが、ログ記録はバックグラウンドで行われます。
原則: BackgroundTasks
は、レスポンスがストリームされた後に同じイベントループ内で実行されるように呼び出し可能をスケジュールすることによって機能します。これは小さなクイックタスクには効率的ですが、その制限を理解することが重要です。バックグラウンドタスク自体に時間がかかりすぎると、メインイベントループのリソースを依然として占有し、他のリクエストの応答性に影響を与える可能性がありますが、初期応答の後です。また、障害やサーバーの再起動の場合の永続性や再試行メカニズムも提供しません。
真の長時間実行タスクの外部タスクキューでの処理
CPUバウンド、外部サービスとのI/Oバウンド、失敗しやすい、または単にFastAPIプロセス(レスポンス後でも)で実行するには長すぎるタスクの場合、CeleryやRQのような専用の非同期タスクキューが不可欠になります。これらのシステムは、タスク実行とWebサーバーを分離し、堅牢性、スケーラビリティ、およびオブザーバビリティを提供します。
ユーザーが大きな画像をアップロードし、それを処理(リサイズ、フィルターの適用、クラウドストレージへのアップロード)する必要がある例を考えてみましょう。これには数秒または数分かかる場合があります。
アーキテクチャ:
- FastAPIアプリケーション: ユーザーリクエストを受け取り、タスクキュー(例:Redis)にタスクの詳細をプッシュします。
- Redis(またはRabbitMQ): メッセージブローカーとして機能し、タスクをキューに格納します。
- ワーカープロセス(CeleryまたはRQ): キューを継続的に監視し、タスクを取得して実行する、分離された永続的なプロセスです。完了すると、データベースを更新したり、ユーザーに通知したりすることがあります。
ここでは、人気のある堅牢な選択肢であるRedisをブローカーとして使用してCelery
を使用します。
1. CeleryとRedisのセットアップ:
- 必要なパッケージをインストールします:`pip install