Sagaパターンによるマイクロサービストランザクションのオーケストレーション
Wenhao Wang
Dev Intern · Leapcell

はじめに
現代のソフトウェア開発の進化する状況において、マイクロサービスは、スケーラビリティ、回復力、および独立した開発の点で比類のない利点を提供する、支配的なアーキテクチャスタイルとして登場しました。しかし、このモジュール性により、重大な課題が生じます。それは、複数のサービスにわたるトランザクションの一貫性を管理することです。単一のデータベースによってACID特性が本質的に保証されるモノリシックアプリケーションとは異なり、マイクロサービスはしばしば別々のデータベースに依存するため、分散トランザクションは非常に複雑になります。Eコマースの注文プロセスを想像してみてください。注文の作成、在庫の更新、支払いの処理です。いずれかのステップが失敗した場合、データ整合性を維持するために、ワークフロー全体を正しく元に戻す必要があります。分散環境におけるこのような堅牢なトランザクション整合性に対する重要なニーズにより、このまさにその問題を解決するために設計された強力なアプローチであるSagaパターンが登場し、ビジネスプロセスが異なるサービスにまたがっていても一貫性を保つことを保証します。
分散トランザクションのジレンマとSagaソリューション
Sagaパターン自体について詳しく掘り下げる前に、その必要性と運用を支えるいくつかの基本的な概念を明確にしましょう。
主要な用語
- マイクロサービスアーキテクチャ: アプリケーションを、それぞれが独立して開発、デプロイ、スケーリングされる、疎結合なサービスのコレクションとして構造化するアーキテクチャスタイル。
- 分散トランザクション: 複数の独立したシステムまたはサービスが関与し、それらを横断して操作を実行するトランザクション。ローカルトランザクションとは異なり、標準的なACID保証を直接維持することは困難または不可能です。
- CAP定理: 分散データストアが、整合性、可用性、およびパーティション耐性の3つの保証のうち2つ以上を同時に提供することは不可能であると述べる、分散コンピューティングにおける基本的な定理です。マイクロサービスアーキテクチャは、しばしば可用性とパーティション耐性を優先し、結果整合性につながります。
- 結果整合性: データ項目に新しい更新が行われない場合、最終的にその項目へのすべてのアクセスが最後に更新された値を返す一貫性モデルです。これは分散システムにおける一般的なトレードオフです。
- 補償トランザクション: 前の操作を意味的に元に戻す操作です。操作を直接元に戻すのではなく、その効果を補償する新しい操作を作成します。たとえば、アカウントから金額が差し引かれた場合、補償トランザクションはその金額をアカウントに戻します。
Sagaパターンの説明
Sagaパターンは、各サービスが独自のデータベースを維持する、複数のサービスにまたがる分散トランザクションを管理する方法です。すべてのサービスにまたがる単一の、アトミックなトランザクション(マイクロサービスでは問題になりがち)の代わりに、Sagaはトランザクションをローカルトランザクションのシーケンスに分割します。各ローカルトランザクションは、自身のサービスのデータベースを更新し、イベントを発行して、シーケンス内の次のローカルトランザクションをトリガーします。ローカルトランザクションが失敗した場合、Sagaは、先行する成功したローカルトランザクションの効果を元に戻すために、一連の補償トランザクションを実行します。
Sagaを調整するには、主に2つの方法があります。
-
Choreography-based Saga(振り付けベースのSaga):
- 各サービスは、ローカルトランザクションを実行するかどうか、およびいつ実行するかを決定するために、イベントを生成および消費します。
- 中央のコーディネーターは存在しません。サービスはイベントをリッスンし、作業を実行してから、新しいイベントを発行します。
- 長所: 疎結合で、単純なワークフローでは実装が簡単です。
- 短所: 長期間実行されるSagaの監視とデバッグは、特にサービス数が増加すると困難になることがあります。全体的なフローの透明性が低くなります。
-
Orchestration-based Saga(オーケストレーションベースのSaga):
- 中央のオーケストレーター(専用サービス)がSagaの調整を担当します。各参加サービスにどのローカルトランザクションを実行すべきかを指示します。
- オーケストレーターはSagaの状態を維持し、補償トランザクションの実行を含む次のステップを決定します。
- 長所: プロセス全体をより良く制御でき、Sagaの進行状況の監視が容易で、複雑なワークフローの管理が簡単です。
- 短所: オーケストレーターが、注意深く設計されていない場合、単一障害点またはボトルネックになる可能性があります。また、管理対象の追加サービスが増えます。
実践的な実装例(オーケストレーションベース)
Python風の擬似コードを使用して、オーケストレーションベースのSagaを単純化された注文処理の例で示しましょう。Order Service
、Inventory Service
、Payment Service
の3つのサービスを使用します。
シナリオ:顧客が注文を行う
- Order Service: 新しい注文を
PENDING
状態に作成します。 - Inventory Service: 要求されたアイテムを予約します。
- Payment Service: 支払い処理を行います。
- Order Service: 注文を
APPROVED
もしくはREJECTED
に更新します。
Saga Orchestrator
# KafkaまたはRabbitMQのようなメッセージキューを通信に使用すると仮定 class OrderCreationOrchestrator: def __init__(self, order_id): self.order_id = order_id self.state = "INITIATED" self.context = {"order_id": order_id, "items": [], "total_amount": 0.0} # サービス間で必要な注文詳細を保存 def start_saga(self, order_details): print(f"Orchestrator: Starting Saga for Order {self.order_id}") self.context.update(order_details) self.state = "CREATE_ORDER" self._send_command_to_order_service(self.context) def _send_command_to_order_service(self, payload): # Order Serviceへのコマンド送信をシミュレート print(f"Orchestrator: Sending 'create_order' command to Order Service with data: {payload}") # 実際のシステムでは、これはキューにメッセージを発行するでしょう self._simulate_order_service_response(payload) def _send_command_to_inventory_service(self, payload): # Inventory Serviceへのコマンド送信をシミュレート print(f"Orchestrator: Sending 'reserve_inventory' command to Inventory Service with data: {payload}") self._simulate_inventory_service_response(payload) def _send_command_to_payment_service(self, payload): # Payment Serviceへのコマンド送信をシミュレート print(f"Orchestrator: Sending 'process_payment' command to Payment Service with data: {payload}") self._simulate_payment_service_response(payload) def _simulate_order_service_response(self, payload): # Order Serviceが注文を作成し、イベントを発行するのをシミュレート print(f"Order Service: Order {payload['order_id']} created in PENDING state.") # 成功した場合、オーケストレーターは続行します self.handle_event("order_created", {"order_id": payload["order_id"], "items": payload["items"]}) def _simulate_inventory_service_response(self, payload, success=True): if success: print(f"Inventory Service: Items {payload['items']} reserved for Order {payload['order_id']}.") self.handle_event("inventory_reserved", {"order_id": payload["order_id"]}) else: print(f"Inventory Service: Failed to reserve inventory for Order {payload['order_id']}!") self.handle_event("inventory_reservation_failed", {"order_id": payload["order_id"]}) def _simulate_payment_service_response(self, payload, success=True): if success: print(f"Payment Service: Payment processed for Order {payload['order_id']} with amount {payload['total_amount']}.") self.handle_event("payment_processed", {"order_id": payload["order_id"]}) else: print(f"Payment Service: Failed to process payment for Order {payload['order_id']}!") self.handle_event("payment_failed", {"order_id": payload["order_id"]}) def _send_compensate_order_service(self, payload): print(f"Order Service: Compensating - Canceling Order {payload['order_id']}.") # 実際のシステムでは、注文ステータスを 'CANCELED' に変更します pass def _send_compensate_inventory_service(self, payload): print(f"Inventory Service: Compensating - Unreserving items for Order {payload['order_id']}.") # 実際のシステムでは、予約されたアイテムを解放します pass def _send_compensate_payment_service(self, payload): print(f"Payment Service: Compensating - Refunding payment for Order {payload['order_id']}.") # 実際のシステムでは、返金を処理します pass def handle_event(self, event_type, event_data): print(f"Orchestrator: Received event: {event_type} for Order {self.order_id}. Current state: {self.state}") if event_type == "order_created" and self.state == "CREATE_ORDER": self.state = "RESERVE_INVENTORY" self._send_command_to_inventory_service(self.context) elif event_type == "inventory_reserved" and self.state == "RESERVE_INVENTORY": self.state = "PROCESS_PAYMENT" self._send_command_to_payment_service(self.context) elif event_type == "payment_processed" and self.state == "PROCESS_PAYMENT": self.state = "SAGA_COMPLETED" print(f"Orchestrator: Saga for Order {self.order_id} completed successfully!") # Order Serviceで注文を最終化します(例:ステータスを 'APPROVED' に設定) print(f"Order Service: Order {self.order_id} status updated to APPROVED.") elif event_type == "inventory_reservation_failed": self.state = "SAGA_FAILED_INVENTORY" print(f"Orchestrator: Inventory reservation failed. Initiating compensation.") self._send_compensate_order_service(self.context) # 注文作成の補償 print(f"Orchestrator: Saga for Order {self.order_id} failed and compensated.") elif event_type == "payment_failed": self.state = "SAGA_FAILED_PAYMENT" print(f"Orchestrator: Payment failed. Initiating compensation.") self._send_compensate_inventory_service(self.context) # 在庫予約の補償 self._send_compensate_order_service(self.context) # 注文作成の補償 print(f"Orchestrator: Saga for Order {self.order_id} failed and compensated.") # --- Sagaの実行 --- if __name__ == "__main__": order_id = "ORDER-XYZ-123" order_details = { "customer_id": "CUST-001", "items": [{"item_id": "ITEM-A", "quantity": 2}, {"item_id": "ITEM-B", "quantity": 1}], "total_amount": 150.00 } orchestrator = OrderCreationOrchestrator(order_id) orchestrator.start_saga(order_details) print("\n--- シナリオの失敗をシミュレートします(例:支払い失敗) ---") orchestrator_failure = OrderCreationOrchestrator("ORDER-XYZ-FAIL") order_details_failure = { "customer_id": "CUST-002", "items": [{"item_id": "ITEM-C", "quantity": 1}], "total_amount": 50.00 } # 失敗と補償を実演するために、手動でイベントをシミュレートします orchestrator_failure.start_saga(order_details_failure) # この時点では、order_createdイベントは正常に処理されます # 次に inventory_reserved イベントが正常に処理されます # ここで、支払い失敗を直接シミュレートします orchestrator_failure.handle_event("payment_failed", {"order_id": "ORDER-XYZ-FAIL", "reason": "Insufficient funds"})
コード例の説明:
OrderCreationOrchestrator
: これはSagaオーケストレーターとして機能します。全体的なトランザクションの状態(self.state
)と、後続のステップに必要なコンテキスト(self.context
)を維持します。start_saga
:Order Service
への最初のコマンドを送信して、ワークフローを開始します。_send_command_to_X_service
: これらのメソッドは、さまざまなマイクロサービスへのメッセージ(コマンド)の送信をシミュレートします。実際のアプリケーションでは、メッセージブローカー(例:Kafka、RabbitMQ)にメッセージを発行することになります。_simulate_X_service_response
: これらのメソッドは、個々のマイクロサービスがローカルトランザクションを完了した後の応答をシミュレートします。これらは、オーケストレーターが処理するイベントを発行します。handle_event
: これはオーケストレーターの中核ロジックです。受信したイベントと現在のSagaの状態に基づいて、次のアクションを決定します。それは、次のステップに進む、Sagaを完了する、または補償ワークフローを開始することです。- 補償ロジック:
payment_failed
のようなイベントが受信されると、handle_event
メソッドは_send_compensate_X_service
呼び出しのシーケンスをトリガーします。これらの呼び出しは、以前に成功したサービスに、そのアクションを意味的に元に戻すように指示します。
アプリケーションシナリオ
Sagaパターンは、特に以下のようなマイクロサービスアーキテクチャのシナリオに適しています。
- ビジネスプロセスが複数のサービスとデータベースにまたがる場合: Eコマースの注文処理、ホテルの予約システム、航空券の予約。
- すべてのサービスにわたる強い整合性がリアルタイムで厳密に必要とされるわけではないが、最終整合性とアトミックな保証が最重要である場合: システムが最終的に一貫した状態に達するか、完全にロールバックされる限り、システムが一時的に不整合な状態であっても許容されます。
- 従来の分散トランザクション(XAトランザクション)ソリューションが実行不可能であるか、過剰なオーバーヘッドを導入する場合: XAトランザクションは、しばしば緊密に結合され、高度に分散され、自律的なマイクロサービス環境ではパフォーマンスが悪くなります。
- サービスが疎結合を維持する必要がある場合: Sagaパターンにより、データベーストランザクションを直接調整することなく、サービスが独立して進化できます。
主要な考慮事項
- 冪等性: すべてのコマンドと補償トランザクションは冪等である必要があります。同じコマンドを複数回送信しても、一度送信した場合と同じ効果があるべきです。これはメッセージ駆動型システムでの回復力にとって重要です。
- 監視とオブザーバビリティ: Sagaは長期間実行される可能性があり、多くのステップが関与します。堅牢な監視、ログ記録、およびトレースは、Sagaの状態を理解し、障害を診断するために不可欠です。
- エラーハンドリングとリトライ: サービスが一時的な障害をどのように処理するかを検討してください。オーケストレーターまたは個々のサービスには、リトライメカニズムが必要になる場合があります。
- 状態管理: オーケストレーターは、クラッシュからの回復と実行の継続のために、Sagaの状態を永続化する必要があります。
- タイムアウト: サービスが応答しないためにSagaが無限にハングしないように、Sagaの各ステップのタイムアウトを実装することが重要です。
結論
Sagaパターンは、複雑なマイクロサービスアーキテクチャにおける分散トランザクションを管理するための、実用的で強力なソリューションを提供します。アトミックな操作をローカルで独立したトランザクションのシーケンスに分割し、堅牢な補償メカニズムを提供することにより、マイクロサービスの利点を犠牲にすることなく、異なるサービスにわたるデータの一貫性を保証します。調整とエラーハンドリングに関する独自の複雑さを導入しますが、回復力がありスケーラブルな方法でトランザクション整合性を維持できる機能により、Sagaは堅牢な分散システムを構築するための不可欠なツールとなります。これは、インテリジェントな設計が分散コンピューティングの固有の課題をどのように克服できるかを示すものであり、最終整合性と運用保証を最新のアプリケーション開発の最前線にもたらします。