Pythonで`websockets`とASGIを使って超高速スタンドアロンWebSocketサーバーを構築する
Lukas Schneider
DevOps Engineer · Leapcell

リアルタイム通信の紹介
今日の相互接続されたデジタルランドスケープでは、リアルタイム通信はもはや贅沢ではなく、基本的な期待となっています。共同ドキュメント編集やライブチャットアプリケーションから、金融取引プラットフォームやIoTデバイスの監視まで、情報を即座に継続的に交換できる能力が最も重要です。従来のHTTPリクエスト・レスポンスサイクルは、多くのユースケースに優れていますが、永続的で低遅延、双方向の通信が必要な場合にはしばしば不十分です。ここでWebSocketが活躍します。WebSocketは、単一のTCP接続上でフルデュプレックス通信チャネルを提供し、繰り返しのHTTPポーリングと比較してオーバーヘッドを大幅に削減します。多くのPython WebフレームワークがWebSocket統合を提供していますが、スタンドアロンで高性能なWebSocketサーバー、つまりフル機能のWebフレームワークから分離されたものが最適なソリューションとなるシナリオがあります。この記事では、Pythonの強力なwebsockets
ライブラリと非同期サーバーゲートウェイインターフェイス(ASGI)仕様を使用して、そのようなサーバーを構築し、効率的なリアルタイム機能を実現する方法を掘り下げます。
コアコンポーネントの理解
実装の詳細に入る前に、高性能WebSocketサーバーの基盤となる主要なテクノロジーを明確にしましょう。
WebSockets: 前述のように、WebSocketは単一のTCP接続上でフルデュプレックス通信を可能にします。これは、クライアントとサーバーの両方が、各交換のために新しい接続を確立することなく、同時にメッセージを送受信できることを意味します。この永続的な接続は、遅延とオーバーヘッドを大幅に削減し、リアルタイムインタラクションに最適です。
ASGI (Asynchronous Server Gateway Interface): ASGIは、Pythonの非同期Webサーバー、フレームワーク、およびアプリケーションの仕様です。非同期Webサーバー(UvicornやHypercornなど)と非同期Python Webアプリケーション間の通信のための標準インターフェイスを定義します。ASGIアプリケーションは、基本的に、スコープ辞書(リクエストの詳細を含む)を受け取り、sendおよびreceive関数を介してイベントを送受信する非同期呼び出し可能オブジェクトです。この標準化により、異なるASGIサーバーとフレームワーク間の相互運用性が可能になり、堅牢で柔軟なエコシステムが促進されます。
websockets
ライブラリ: これは、WebSocketサーバーおよびクライアントを構築するためのクリーンで強力なAPIを提供する素晴らしいPythonライブラリです。ハンドシェイク、フレーミング、エラー処理を含む低レベルのWebSocketプロトコルの詳細を処理し、開発者がアプリケーションロジックに集中できるようにします。その非同期性質は、ASGIおよび最新のPythonのasyncio
パラダイムと完全に一致しています。
これらを組み合わせて使用する原則は、ASGIサーバーをエントリーポイントとして使用し、WebSocketリクエストをwebsockets
アプリケーションにディスパッチすることです。websockets
ライブラリ自体もスタンドアロンサーバーとして機能できますが、UvicornのようなASGIサーバーと統合することで、特に他のASGI互換プロトコルを処理したり、ASGIミドルウェアと統合したりする必要がある場合に、より大きな柔軟性が得られます。
高性能WebSocketサーバーの構築
私たちの目標は、多数の同時WebSocket接続を効率的に処理できるサーバーを作成することです。これには、非同期プログラミングと慎重なリソース管理が伴います。
シンプルなエコーサーバー
まず、基本的なエコーサーバーから始めましょう。クライアントがメッセージを送信すると、サーバーはそれをそのまま返します。これは、コアの送受信機能を示しています。
# echo_server.py import asyncio import websockets async def echo(websocket, path): """ WebSocket接続のための非同期ハンドラ。 クライアントから受信したメッセージをエコーバックします。 """ print(f"Client connected: {websocket.remote_address}") try: async for message in websocket: print(f"Received message from {websocket.remote_address}: {message}") await websocket.send(f"Echo: {message}") except websockets.exceptions.ConnectionClosedOK: print(f"Client {websocket.remote_address} disconnected gracefully") except websockets.exceptions.ConnectionClosedError as e: print(f"Client {websocket.remote_address} disconnected with error: {e}") finally: print(f"Client disconnected: {websocket.remote_address}") async def main(): """ WebSocketサーバーを起動します。 """ # localhost、ポート8765でWebSocketサーバーを起動 async with websockets.serve(echo, "localhost", 8765): await asyncio.Future() # 無限に実行 if __name__ == "__main__": print("Starting WebSocket echo server on ws://localhost:8765") asyncio.run(main())
これを実行するには、echo_server.py
として保存し、python echo_server.py
を実行するだけです。その後、ブラウザのコンソールで簡単なJavaScriptクライアントを使用してテストできます。
const ws = new WebSocket("ws://localhost:8765"); ws.onopen = () => console.log("Connected"); ws.onmessage = (event) => console.log("Received:", event.data); ws.send("Hello, WebSocket!");
ASGIとの統合による柔軟性の向上
websockets.serve
関数はスタンドアロンWebSocketアプリケーションに最適ですが、UvicornのようなASGIサーバーと統合することで、次のような利点が得られます。
*
- 同じサーバー上で他のASGIアプリケーション(例:REST API)を実行できる。
- ASGIミドルウェアを活用できる。
- 本番環境対応のASGIサーバーによって提供される、より優れたプロセス管理とスケーリング機能。
以下は、WebSocketハンドラをASGIアプリケーションにラップする方法です。websockets
ライブラリは、これを簡素化するためにwebsockets.ASGIHandler
を提供しています。
# asgi_websocket_server.py import asyncio import websockets from websockets.exceptions import ConnectionClosedOK, ConnectionClosedError from websockets.sync.server import serve import uvicorn async def websocket_application(scope, receive, send): """ エコーサーバー用のASGI互換WebSocketアプリケーション。 """ if scope['type'] == 'websocket': async def handler(websocket): print(f"ASGI Client connected: {websocket.remote_address}") try: async for message in websocket: print(f"ASGI Received message from {websocket.remote_address}: {message}") await websocket.send(f"ASGI Echo: {message}") except ConnectionClosedOK: print(f"ASGI Client {websocket.remote_address} disconnected gracefully") except ConnectionClosedError as e: print(f"ASGI Client {websocket.remote_address} disconnected with error: {e}") finally: print(f"ASGI Client disconnected: {websocket.remote_address}") # ASGI用のwebsockets.server.serveプロトコルを使用 # これは接続のためにAsyncWebSocketServerProtocolインスタンスを作成します await websockets.server.serve_websocket(handler, scope, receive, send) else: # 必要に応じて他のタイプの要求を処理します(例:ヘルスチェック用のHTTP) #純粋なWebSocketサーバーの場合、これらは単なるエラーである可能性があります。 response_start = {'type': 'http.response.start', 'status': 404, 'headers': []} response_body = {'type': 'http.response.body', 'body': b'Not Found'} await send(response_start) await send(response_body) # Uvicornで実行するには: # uvicorn asgi_websocket_server:websocket_application --port 8000 --ws websockets
このASGI互換サーバーを実行するには:
- Uvicornをインストールします:
pip install uvicorn websockets
- コマンドを実行します:
uvicorn asgi_websocket_server:websocket_application --port 8000 --ws websockets
--ws websockets
フラグは、UvicornにWebSocket接続の処理にwebsockets
を使用するように指示し、アプリケーションとの互換性を保証します。これで、JavaScriptクライアントはws://localhost:8000
を指す必要があります。
実例:シンプルなチャットルーム
エコーサーバーを基本的なチャットルームに拡張して、複数のクライアントの処理とメッセージのブロードキャストを実演しましょう。
# chat_server.py import asyncio import websockets from websockets.exceptions import ConnectionClosedOK, ConnectionClosedError import json CONNECTED_CLIENTS = set() # アクティブなWebSocket接続を保存 async def register(websocket): """新しいクライアント接続を登録します。""" CONNECTED_CLIENTS.add(websocket) print(f"Client connected: {websocket.remote_address}. Total clients: {len(CONNECTED_CLIENTS)}") async def unregister(websocket): """クライアント接続を登録解除します。""" CONNECTED_CLIENTS.remove(websocket) print(f"Client disconnected: {websocket.remote_address}. Total clients: {len(CONNECTED_CLIENTS)}") async def broadcast_message(message): """接続されているすべてのクライアントにメッセージを送信します。""" if CONNECTED_CLIENTS: # 送信するクライアントがいることを確認 await asyncio.wait([client.send(message) for client in CONNECTED_CLIENTS]) async def chat_handler(websocket, path): """ チャットルームで個々のクライアント接続を処理します。 """ await register(websocket) try: user_name = None async for message_str in websocket: try: message_data = json.loads(message_str) message_type = message_data.get("type") if message_type == "join": user_name = message_data.get("name", "Anonymous") join_msg = json.dumps({"type": "status", "message": f"{user_name} joined the chat."}) await broadcast_message(join_msg) elif message_type == "chat" and user_name: chat_msg = message_data.get("message", "") full_msg = json.dumps({"type": "chat", "sender": user_name, "message": chat_msg}) await broadcast_message(full_msg) else: await websocket.send(json.dumps({"type": "error", "message": "Invalid message format or not joined." })) except json.JSONDecodeError: print(f"Received invalid JSON from {websocket.remote_address}: {message_str}") await websocket.send(json.dumps({"type": "error", "message": "Invalid JSON format." })) except Exception as e: print(f"Error handling message from {websocket.remote_address}: {e}") await websocket.send(json.dumps({"type": "error", "message": f"Server error: {e}" })) except ConnectionClosedOK: print(f"Client {websocket.remote_address} disconnected gracefully") except ConnectionClosedError as e: print(f"Client {websocket.remote_address} disconnected with error: {e}") finally: if user_name: leave_msg = json.dumps({"type": "status", "message": f"{user_name} left the chat."}) await broadcast_message(leave_msg) await unregister(websocket) async def main_chat_server(): """チャットWebSocketサーバーを起動します。""" async with websockets.serve(chat_handler, "localhost", 8766): await asyncio.Future() # 無限に実行 if __name__ == "__main__": print("Starting WebSocket chat server on ws://localhost:8766") asyncio.run(main_chat_server())
このチャットサーバーは、ユーザーが名前で参加し、全員にブロードキャストされるメッセージを送信できるようにします。アクティブな接続を追跡するためにグローバルセットCONNECTED_CLIENTS
を使用し、効率的なブロードキャストのためにasyncio.wait
を使用しています。
アプリケーションシナリオ
websockets
およびASGIで構築されたスタンドアロンで高性能なWebSocketサーバーは、以下に最適です。
*
- リアルタイムダッシュボード: ライブデータ更新(株価、センサー値、分析)の表示。
- マルチプレイヤーゲーム: ゲーム状態同期のための低遅延通信。
- ライブチャットおよびメッセージング: フレームワークのオーバーヘッドなしでカスタムチャットアプリケーションを構築。
- IoTデバイス通信: 接続されたデバイスからのリアルタイムデータストリームの受信。
- 通知システム: クライアントへのインスタント通知のプッシュ。
結論:Pythonリアルタイムアプリケーションの強化
websockets
ライブラリの堅牢性とASGIの柔軟性および標準化を組み合わせることで、Python開発者は要求の厳しいリアルタイム通信ニーズに対応できる強力なスタンドアロンWebSocketサーバーを構築できます。このアプローチは、きめ細かな制御、優れたパフォーマンス、およびスケーリングのための明確なパスを提供し、最新のインタラクティブWebサービスにとって貴重なパターンとなります。これらのツールを活用することで、Pythonはリアルタイムアプリケーションの分野で強力な競合相手として立つことができます。