目次
分散システムにおいて、障害は避けられない。ネットワークは不安定になり、データベースは応答不能になり、外部 API はレート制限される。重要なのは「障害が起きないこと」ではなく、「一部の障害がシステム全体の停止に繋がらないこと」だ。
本記事では、フォールトトレラントなシステム設計に不可欠な 4 つのレジリエンシーパターン——サーキットブレーカー、リトライ、バルクヘッド、タイムアウト——を解説する。これらを適切に組み合わせることで、個別コンポーネントの故障を吸収し、システム全体の可用性を維持できる。
フォールトトレランスの基本概念
障害の伝播——なぜシステムは連鎖停止するのか
【障害の連鎖例:オンライン注文システム】
ユーザー → API ゲートウェイ → 注文サービス → 在庫サービス → データベース
1. データベースが応答遅延(10 秒→)
2. 在庫サービスの接続プールが枯渇
3. 注文サービスがタイムアウト待機
4. API ゲートウェイのリソース不足
5. **全ユーザーが応答不能** ← 1 つの DB 遅延で全体停止
この現象を**カスケード故障(連鎖故障)**と呼ぶ。レジリエンシーパターンの目的は、この連鎖を断ち切ることだ。
レジリエンスとフォールトトレランスの違い
【用語の整理】
- **フォールトトレランス(Fault Tolerance)**
- 目的:一部の故障があってもシステム全体が機能を継続
- 手法:冗長化、フェイルオーバー、エラー吸収
- **レジリエンス(Resilience)**
- 目的:障害から速やかに回復し、正常状態に復帰
- 手法:自己回復、状態復元、グレースフルデグラデーション
- **関係性**
- フォールトトレランスは「耐える」こと
- レジリエンスは「回復する」こと
- 両方を備えて初めて「折れないシステム」が実現する
サーキットブレーカーパターン——過負荷からの保護
サーキットブレーカーの仕組み
**サーキットブレーカー(Circuit Breaker)**は、電気回路のブレーカーから着想を得たパターンだ。連続してエラーが発生した場合、それ以上リクエストを流さず「遮断」状態にする。
【サーキットブレーカーの状態遷移】
エラー閾値超
┌─────────────────────────────────┐
│ ▼
[CLOSED] ──────────────────────> [OPEN]
│ ▲ │
│ │ 半分開く(テスト) │ タイムアウト
│ │ │
│ └──────────────────────────────┘
▼
[HALF-OPEN]
│
│ 成功
▼
[CLOSED]
3 つの状態
| 状態 | 説明 | リクエスト処理 |
|---|---|---|
| CLOSED(閉) | 正常状態 | 通常通り処理 |
| OPEN(開) | 遮断状態 | 即座にエラー返却(フォールバックへ) |
| HALF-OPEN(半開) | テスト状態 | 一部のリクエストで回復テスト |
サーキットブレーカーの実装
import time
import threading
from enum import Enum
from typing import Callable, Any
from datetime import datetime, timedelta
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
class CircuitBreaker:
def __init__(
self,
failure_threshold: int = 5, # エラー閾値
recovery_timeout: float = 30.0, # 回復までの秒数
half_open_max_calls: int = 1 # 半開状態の最大試行
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.half_open_max_calls = half_open_max_calls
self._state = CircuitState.CLOSED
self._failure_count = 0
self._last_failure_time: datetime = None
self._half_open_calls = 0
self._lock = threading.Lock()
@property
def state(self) -> CircuitState:
with self._lock:
# OPEN 状態で回復タイムアウト経過 → HALF-OPEN へ移行
if (self._state == CircuitState.OPEN and
self._last_failure_time and
datetime.now() - self._last_failure_time > timedelta(seconds=self.recovery_timeout)):
self._state = CircuitState.HALF_OPEN
self._half_open_calls = 0
return self._state
def call(self, func: Callable, *args, **kwargs) -> Any:
"""サーキットブレーカー経由で関数を実行"""
current_state = self.state
if current_state == CircuitState.OPEN:
# 遮断中:即座にエラー
raise CircuitBreakerOpenError("サーキットブレーカーが OPEN 状態です")
if current_state == CircuitState.HALF_OPEN:
if self._half_open_calls >= self.half_open_max_calls:
raise CircuitBreakerOpenError("HALF-OPEN 状態の最大コール数超過")
self._half_open_calls += 1
try:
result = func(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise
def _on_success(self):
with self._lock:
if self._state == CircuitState.HALF_OPEN:
# HALF-OPEN で成功 → CLOSED へ復帰
self._state = CircuitState.CLOSED
self._failure_count = 0
elif self._state == CircuitState.CLOSED:
# 連続成功カウントをリセット(オプション)
self._failure_count = 0
def _on_failure(self):
with self._lock:
self._failure_count += 1
self._last_failure_time = datetime.now()
if self._state == CircuitState.HALF_OPEN:
# HALF-OPEN で失敗 → 即 OPEN へ
self._state = CircuitState.OPEN
elif (self._state == CircuitState.CLOSED and
self._failure_count >= self.failure_threshold):
# エラー閾値超 → OPEN へ
self._state = CircuitState.OPEN
class CircuitBreakerOpenError(Exception):
"""サーキットブレーカーがOPEN のときに発生するエラー"""
pass
使用例
import requests
# サーキットブレーカーの作成
circuit_breaker = CircuitBreaker(
failure_threshold=5, # 5 回連続エラーで OPEN
recovery_timeout=30.0 # 30 秒後に HALF-OPEN
)
# 外部 API 呼び出し
def call_external_api(data: str) -> str:
response = requests.post(
"https://api.example.com/process",
json={"data": data},
timeout=5.0
)
response.raise_for_status()
return response.json()["result"]
# フォールバック付き呼び出し
def resilient_call(data: str) -> str:
try:
return circuit_breaker.call(call_external_api, data)
except CircuitBreakerOpenError:
# フォールバック:キャッシュまたはデフォルト値
print("サーキットブレーカーOPEN:フォールバックへ")
return get_cached_result(data) # キャッシュから取得
except requests.RequestException as e:
print(f"API 呼び出しエラー:{e}")
return get_cached_result(data)
def get_cached_result(data: str) -> str:
"""フォールバック:簡易キャッシュ(実際は Redis 等)"""
cache = {"test": "cached_result"}
return cache.get(data, "default_value")
サーキットブレーカーの効果
# シミュレーション:外部 API が応答不能になった場合
import random
def unstable_api(data: str) -> str:
"""80% の確率で失敗する API"""
if random.random() < 0.8:
raise requests.Timeout("API timeout")
return "success"
cb = CircuitBreaker(failure_threshold=3, recovery_timeout=5.0)
# 連続エラーの連鎖を観測
for i in range(10):
try:
result = cb.call(unstable_api, f"data_{i}")
print(f"試行 {i+1}: 成功 - {result}")
except CircuitBreakerOpenError:
print(f"試行 {i+1}: サーキットブレーカーOPEN(遮断)")
except requests.Timeout:
print(f"試行 {i+1}: タイムアウトエラー")
time.sleep(0.5)
出力例:
試行 1: タイムアウトエラー
試行 2: タイムアウトエラー
試行 3: タイムアウトエラー
試行 4: サーキットブレーカー OPEN(遮断) ← 閾値超で遮断
試行 5: サーキットブレーカー OPEN(遮断)
試行 6: サーキットブレーカー OPEN(遮断)
試行 7: サーキットブレーカー OPEN(遮断)
試行 8: サーキットブレーカー OPEN(遮断)
試行 9: サーキットブレーカー OPEN(遮断)
試行 10: サーキットブレーカー OPEN(遮断)
# 5 秒後→HALF-OPEN 状態へ自動遷移
リトライパターン——一時的障害への対応
リトライが有効なケースと無効なケース
【リトライすべきエラー】
- ✅ 一時的ネットワーク障害(タイムアウト、接続切断)
- ✅ サーバー過負荷(503 Service Unavailable)
- ✅ レート制限(429 Too Many Requests)→ Retry-After ヘッダーに従う
- ✅ 一時的なデータベースロック
【リトライすべきでないエラー】
- ❌ クライアントエラー(400 Bad Request, 404 Not Found)
- ❌ 認証エラー(401 Unauthorized, 403 Forbidden)
- ❌ 永続的なサーバーエラー(500 Internal Server Error の一部)
- ❌ バリデーションエラー
指数バックオフとジッター
単純な固定間隔リトライは**リトライの嵐(retry storm)**を引き起こす。複数のクライアントが同時にリトライすると、サーバー回復のタイミングで一斉にリクエストが集中する。
import random
import time
def calculate_backoff(
attempt: int,
base_delay: float = 1.0,
max_delay: float = 60.0,
exponential_base: float = 2.0,
jitter: bool = True
) -> float:
"""
指数バックオフ+ジッターの計算
Parameters:
attempt: 試行回数(0 から開始)
base_delay: 基本遅延秒数
max_delay: 最大遅延秒数
exponential_base: 指数の底
jitter: ランダムジッターを追加するか
"""
# 指数バックオフ
delay = min(base_delay * (exponential_base ** attempt), max_delay)
# ジッター(ランダムな揺らぎ)を追加
if jitter:
# フルジッター:0〜delay のランダム
delay = random.uniform(0, delay)
# または等幅ジッター:delay/2 〜 delay のランダム
# delay = delay / 2 + random.uniform(0, delay / 2)
return delay
# 試行ごとの待機時間比較
print("【待機時間の比較】")
print(f"{'試行':<6} {'固定':<8} {'指数':<8} {'指数 + ジッター':<15}")
print("-" * 40)
for attempt in range(6):
fixed = 1.0
exponential = min(1.0 * (2 ** attempt), 60.0)
with_jitter = calculate_backoff(attempt, jitter=True)
print(f"{attempt+1:<6} {fixed:<8.1f} {exponential:<8.1f} {with_jitter:<15.2f}")
出力:
【待機時間の比較】
試行 固定 指数 指数 + ジッター
----------------------------------------
1 1.0 1.0 0.73
2 1.0 2.0 1.45
3 1.0 4.0 2.89
4 1.0 8.0 4.12
5 1.0 16.0 8.67
6 1.0 32.0 18.34
リトライ実装(再帰版)
from functools import wraps
import logging
logger = logging.getLogger(__name__)
def retry(
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0,
exponential_base: float = 2.0,
jitter: bool = True,
retryable_exceptions: tuple = (Exception,)
):
"""
リトライデコレーター
Parameters:
max_retries: 最大リトライ回数
base_delay: 基本遅延秒数
max_delay: 最大遅延秒数
exponential_base: 指数の底
jitter: ジッターを追加するか
retryable_exceptions: リトライ対象の例外タプル
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(max_retries + 1):
try:
return func(*args, **kwargs)
except retryable_exceptions as e:
last_exception = e
# 最大試行に達したらリタイア
if attempt >= max_retries:
logger.error(f"{func.__name__}: 最大リトライ回数 {max_retries} に達しました")
break
# 待機時間計算
delay = calculate_backoff(
attempt,
base_delay,
max_delay,
exponential_base,
jitter
)
logger.warning(
f"{func.__name__}: 試行 {attempt + 1}/{max_retries} 失敗 - "
f"{e.__class__.__name__}: {e}. {delay:.2f}秒後にリトライ"
)
time.sleep(delay)
# 全ての試行が失敗
raise last_exception
return wrapper
return decorator
# 使用例
@retry(
max_retries=5,
base_delay=1.0,
max_delay=30.0,
retryable_exceptions=(requests.RequestException, ConnectionError)
)
def call_unstable_api(url: str) -> dict:
"""不安定な API をリトライ付きで呼び出す"""
response = requests.get(url, timeout=5.0)
response.raise_for_status()
return response.json()
バルクヘッドパターン——リソース隔離による故障封じ込め
バルクヘッドの概念
**バルクヘッド(Bulkhead)**は、船舶の防水区画から着想を得たパターンだ。船体内部を複数の区画に分割し、一部の区画が浸水しても全体が沈まないようにする。
【バルクヘッドの概念図】
伝統的アーキテクチャ(共有リソース):
┌─────────────────────────────────────┐
│ 全サービスが共有する │
│ 1 つのスレッドプール │
│ [注文][在庫][配送][ユーザー][...] │
│ 全部で 100 スレッド │
└─────────────────────────────────────┘
→ 在庫サービスの遅延が全サービスを巻き込む
バルクヘッド適用(リソース隔離):
┌───────┬───────┬───────┬───────┐
│ 注文用 │ 在庫用 │ 配送用 │ユーザー│
│ 25 枚 │ 25 枚 │ 25 枚 │ 25 枚 │
└───────┴───────┴───────┴───────┘
→ 在庫サービスの遅延は在庫プールのみ影響
スレッドプールによるバルクヘッド実装
from concurrent.futures import ThreadPoolExecutor
from typing import Callable, Any
class BulkheadExecutor:
"""
バルクヘッドパターン:リソース隔離のための実行エグゼキューター
"""
def __init__(self, max_workers: int, name: str = "default"):
self.name = name
self.max_workers = max_workers
self._executor = ThreadPoolExecutor(max_workers=max_workers)
self._active_count = 0
self._lock = threading.Lock()
def submit(self, fn: Callable, *args, **kwargs):
"""タスクをサブミット(リソース制限あり)"""
with self._lock:
if self._active_count >= self.max_workers:
raise BulkheadFullError(f"{self.name} バルクヘッドが満杯です")
self._active_count += 1
future = self._executor.submit(self._wrap_task, fn, *args, **kwargs)
return future
def _wrap_task(self, fn: Callable, *args, **kwargs):
"""タスク完了時にカウンタをデクリメント"""
try:
return fn(*args, **kwargs)
finally:
with self._lock:
self._active_count -= 1
def shutdown(self, wait: bool = True):
self._executor.shutdown(wait=wait)
class BulkheadFullError(Exception):
"""バルクヘッドが満杯のときに発生するエラー"""
pass
# 使用例:サービスごとにバルクヘッドを分離
order_bulkhead = BulkheadExecutor(max_workers=20, name="order")
inventory_bulkhead = BulkheadExecutor(max_workers=10, name="inventory")
user_bulkhead = BulkheadExecutor(max_workers=30, name="user")
def process_order(order_id: str) -> dict:
"""注文処理(在庫サービス呼び出しを含む)"""
# 注文用プールで実行(最大 20 並列)
future = order_bulkhead.submit(_do_process_order, order_id)
return future.result()
def check_inventory(product_id: str) -> int:
"""在庫チェック(在庫サービス専用プール)"""
future = inventory_bulkhead.submit(_do_check_inventory, product_id)
return future.result()
# 在庫サービスが遅延しても、注文サービスは影響を受けない
セマンフォによるバルクヘッド
import threading
class SemaphoreBulkhead:
"""
セマンフォを使ったバルクヘッド実装
単純で軽量、ブロッキング動作
"""
def __init__(self, max_concurrent: int):
self._semaphore = threading.Semaphore(max_concurrent)
self._max_concurrent = max_concurrent
def execute(self, func: Callable, *args, **kwargs) -> Any:
"""セマンフォ制御付きで関数を実行"""
self._semaphore.acquire()
try:
return func(*args, **kwargs)
finally:
self._semaphore.release()
def try_execute(self, func: Callable, timeout: float = 0.0) -> Any:
"""タイムアウト付きで獲得試行(失敗時は即エラー)"""
acquired = self._semaphore.acquire(timeout=timeout)
if not acquired:
raise BulkheadFullError("バルクヘッドが満杯です")
try:
return func(*args, **kwargs)
finally:
self._semaphore.release()
# 使用例
api_bulkhead = SemaphoreBulkhead(max_concurrent=50)
def call_external_service(data: str) -> str:
"""外部サービス呼び出し(最大 50 並列に制限)"""
return api_bulkhead.execute(_do_call, data)
def call_with_timeout(data: str, timeout: float = 0.1) -> str:
"""即時失敗許容(待たず即フォールバック)"""
try:
return api_bulkhead.try_execute(_do_call, data, timeout=timeout)
except BulkheadFullError:
# 即座にフォールバック
return get_cached_response(data)
タイムアウトパターン——無限待機の防止
タイムアウトの重要性
タイムアウトは、応答がない呼び出しを強制的に終了するパターンだ。設定しない場合、システムリソースが解放されず、最終的に全体が停止する。
# ❌ 問題:タイムアウトなし
def call_slow_service():
response = requests.get("https://slow-service.com/api")
# 相手が応答しない場合、永遠に待機(または OS の長いタイムアウトまで)
return response.json()
# ✅ 推奨:明示的なタイムアウト
def call_with_timeout():
response = requests.get("https://slow-service.com/api", timeout=5.0)
# 5 秒でタイムアウト、リソース解放
return response.json()
タイムアウトの種類の使い分け
【タイムアウトの種類】
1. **接続タイムアウト(connect timeout)**
- TCP 接続が確立されるまでの待ち時間
- 短めに設定(1-3 秒)
- 例:`requests.get(url, timeout=(3.0, 10.0))` の第 1 引数
2. **読み取りタイムアウト(read timeout)**
- データ受信までの待ち時間
- サービス応答時間に応じて設定(5-30 秒)
- 例:`requests.get(url, timeout=(3.0, 10.0))` の第 2 引数
3. **全体タイムアウト(total timeout)**
- 接続 + 読み取りの合計時間
- クライアント体験時間に基づく
タイムアウト実装(デコレーター)
from functools import wraps
import signal
class TimeoutError(Exception):
"""タイムアウト例外"""
pass
def timeout(seconds: float):
"""
関数実行にタイムアウトを設定するデコレーター
Note: Unix シグナル使用のため Windows では動作しない
Windows では concurrent.futures を使用
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
def handler(signum, frame):
raise TimeoutError(f"{func.__name__} が {seconds}秒でタイムアウト")
# シグナルハンドラ設定
old_handler = signal.signal(signal.SIGALRM, handler)
signal.alarm(int(seconds))
try:
return func(*args, **kwargs)
finally:
signal.alarm(0)
signal.signal(signal.SIGALRM, old_handler)
return wrapper
return decorator
# Windows 対応版(concurrent.futures 使用)
from concurrent.futures import ThreadPoolExecutor, TimeoutError as FuturesTimeoutError
def timeout_windows(seconds: float):
"""Windows 対応タイムアウトデコレーター"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(func, *args, **kwargs)
try:
return future.result(timeout=seconds)
except FuturesTimeoutError:
raise TimeoutError(f"{func.__name__} が {seconds}秒でタイムアウト")
return wrapper
return decorator
# 使用例
@timeout_windows(5.0)
def slow_database_query(query: str) -> list:
"""5 秒タイムアウトのデータベースクエリ"""
return execute_query(query)
フォールバックパターン——グレースフルデグラデーション
フォールバックの戦略
**フォールバック(Fallback)**は、メインの処理が失敗したときに代替手段を提供するパターンだ。完全な機能を諦め、制限された機能で継続する(グレースフルデグラデーション)。
【フォールバックの例】
1. **キャッシュからの返却**
- 最新性は失われるが、応答は維持
- 例:DB 応答不能→最後に成功したキャッシュデータを返す
2. **デフォルト値の返却**
- 簡素化された結果を返す
- 例:レコメンド機能失敗→人気アイテム一覧を返す
3. **機能の無効化**
- オプション機能をスキップ
- 例:レビュー表示失敗→レビューセクションを非表示
4. **別のサービスへ切り替え**
- セカンダリプロバイダーへフォールバック
- 例:メイン API 失敗→バックアップ API へ
フォールバック実装
from typing import Callable, Any, Optional
import logging
logger = logging.getLogger(__name__)
class FallbackExecutor:
"""
フォールバックチェーンを実行する
メイン→フォールバック 1→フォールバック 2→...→デフォルト
"""
def __init__(self, name: str = "fallback"):
self.name = name
def execute_with_fallback(
self,
primary: Callable,
fallbacks: list[Callable],
default: Any = None,
*args,
**kwargs
) -> Any:
"""
フォールバックチェーンを実行
Parameters:
primary: メインの処理
fallbacks: フォールバック処理のリスト(優先度順)
default: 全てのフォールバックが失敗したときのデフォルト値
args, kwargs: 各関数に渡す引数
"""
# メイン処理を試行
try:
return primary(*args, **kwargs)
except Exception as e:
logger.warning(f"{self.name}: メイン処理失敗 - {e}")
# フォールバックを順に試行
for i, fallback in enumerate(fallbacks):
try:
result = fallback(*args, **kwargs)
logger.info(f"{self.name}: フォールバック {i+1} で成功")
return result
except Exception as e:
logger.warning(f"{self.name}: フォールバック {i+1} 失敗 - {e}")
# 全て失敗:デフォルト値
logger.error(f"{self.name}: 全てのフォールバック失敗、デフォルト値を返却")
return default
# 使用例
fallback = FallbackExecutor(name="product_service")
def get_product_info_primary(product_id: str) -> dict:
"""メイン:データベースから取得"""
return db_query("SELECT * FROM products WHERE id = ?", product_id)
def get_product_info_cache(product_id: str) -> dict:
"""フォールバック 1:キャッシュから取得"""
return redis_cache.get(f"product:{product_id}")
def get_product_info_default(product_id: str) -> dict:
"""フォールバック 2:最小限の情報"""
return {"id": product_id, "name": "商品情報 unavailable", "price": 0}
# 実行
product = fallback.execute_with_fallback(
primary=get_product_info_primary,
fallbacks=[get_product_info_cache],
default={"id": "unknown", "name": "不明", "price": 0},
product_id="P12345"
)
パターンの組み合わせ——現実的な実装
4 パターンの統合
実際のシステムでは、これら 4 つのパターンを組み合わせて使用する。
from dataclasses import dataclass
from typing import Callable, Any
import time
@dataclass
class ResilienceConfig:
"""レジリエンシー設定"""
# サーキットブレーカー
circuit_failure_threshold: int = 5
circuit_recovery_timeout: float = 30.0
# リトライ
retry_max_attempts: int = 3
retry_base_delay: float = 1.0
retry_max_delay: float = 30.0
# バルクヘッド
bulkhead_max_concurrent: int = 50
# タイムアウト
timeout_seconds: float = 10.0
class ResilientExecutor:
"""
4 つのレジリエンシーパターンを統合したエグゼキューター
"""
def __init__(self, config: ResilienceConfig, name: str = "resilient"):
self.config = config
self.name = name
# サーキットブレーカー
self._circuit_breaker = CircuitBreaker(
failure_threshold=config.circuit_failure_threshold,
recovery_timeout=config.circuit_recovery_timeout
)
# バルクヘッド
self._bulkhead = SemaphoreBulkhead(config.bulkhead_max_concurrent)
self._logger = logging.getLogger(name)
def execute(
self,
func: Callable,
*args,
fallback: Callable = None,
fallback_args: tuple = None,
**kwargs
) -> Any:
"""
レジリエントな実行
実行フロー:
1. バルクヘッド(並列数制限)
2. サーキットブレーカー(遮断チェック)
3. リトライ(一時的エラー対応)
4. タイムアウト(無限待機防止)
5. フォールバック(代替処理)
"""
fallback_args = fallback_args or ()
def _execute_with_retry_and_timeout():
"""リトライ+タイムアウトで実行"""
last_error = None
for attempt in range(self.config.retry_max_attempts + 1):
try:
# タイムアウト付き実行
return self._execute_with_timeout(func, *args, **kwargs)
except (TimeoutError, ConnectionError, requests.RequestException) as e:
last_error = e
if attempt >= self.config.retry_max_attempts:
break # 最大試行に達した
# 指数バックオフで待機
delay = calculate_backoff(
attempt,
self.config.retry_base_delay,
self.config.retry_max_delay
)
self._logger.warning(
f"{self.name}: 試行 {attempt+1} 失敗、{delay:.2f}秒後にリトライ: {e}"
)
time.sleep(delay)
except Exception as e:
# リトライしないエラー
last_error = e
break
raise last_error
def _execute_bulkhead():
"""バルクヘッド経由で実行"""
return self._bulkhead.execute(_execute_with_retry_and_timeout)
def _execute_circuit():
"""サーキットブレーカー経由で実行"""
return self._circuit_breaker.call(_execute_bulkhead)
# メイン実行
try:
return _execute_circuit()
except (CircuitBreakerOpenError, BulkheadFullError, TimeoutError, Exception) as e:
self._logger.error(f"{self.name}: メイン実行失敗 - {e}")
# フォールバック
if fallback:
try:
self._logger.info(f"{self.name}: フォールバック実行")
return fallback(*fallback_args, **kwargs)
except Exception as fallback_error:
self._logger.error(f"{self.name}: フォールバック失敗 - {fallback_error}")
# 再スロー
raise
# 使用例
config = ResilienceConfig(
circuit_failure_threshold=5,
circuit_recovery_timeout=30.0,
retry_max_attempts=3,
retry_base_delay=1.0,
retry_max_delay=30.0,
bulkhead_max_concurrent=50,
timeout_seconds=10.0
)
executor = ResilientExecutor(config, name="order_service")
def call_inventory_service(product_id: str) -> int:
"""在庫サービス呼び出し"""
response = requests.get(
f"http://inventory-service/api/stock/{product_id}",
timeout=5.0
)
response.raise_for_status()
return response.json()["stock_count"]
def fallback_inventory(product_id: str) -> int:
"""フォールバック:キャッシュまたはデフォルト値"""
# Redis 等からキャッシュ取得
cached = redis_cache.get(f"inventory:{product_id}")
if cached:
return cached
return 0 # デフォルト:在庫なし
# 実行
try:
stock = executor.execute(
call_inventory_service,
"P12345",
fallback=fallback_inventory,
fallback_args=("P12345",)
)
print(f"在庫数:{stock}")
except Exception as e:
print(f"在庫取得失敗:{e}")
まとめ
レジリエンシーパターンの核心:
サーキットブレーカー: 連続エラーを検知して遮断し、カスケード故障を防止
- 状態遷移:CLOSED → OPEN → HALF-OPEN → CLOSED
- 回復タイムアウトで自動的にテスト再開
リトライ: 一時的エラーを自動回復
- 指数バックオフ: 試行ごとに待機時間を倍増
- ジッター: 複数クライアントの同期を防止
バルクヘッド: リソースを隔離して故障を封じ込め
- サービスごとにスレッドプールを分離
- 一部のサービス遅延が全体に波及しない
タイムアウト: 無限待機を防止
- 接続タイムアウトと読み取りタイムアウトを使い分け
- 全体タイムアウトでクライアント体験を保護
フォールバック: グレースフルデグラデーション
- キャッシュ、デフォルト値、機能無効化を準備
- 完全停止より部分機能継続を選ぶ
重要なのは、これらを単独で使うのではなく組み合わせることだ。サーキットブレーカーだけが有効でも、リトライだけが有効でもない。現実のシステムでは、これらのパターンが相互に補完し合うことで、真のフォールトトレランスが実現する。
実装上の指針:
- まずはタイムアウトから: 最も単純で効果的
- リトライを追加: 指数バックオフ+ジッター必須
- サーキットブレーカーで保護: 過負荷時の連鎖を断つ
- バルクヘッドで隔離: 重要サービスから順に分離
- フォールバックで UX 維持: 完全停止よりマシな体験を
参考資料
- “Release It! 2nd Edition” Michael T. Nygard 著(Pragmatic Bookshelf)
- “Building Microservices” Sam Newman 著(O’Reilly)
- Microsoft Azure Architecture Center - “Transient fault handling”
- Netflix Hystrix GitHub Repository(レガシーだが概念理解に有用)
- App-vNext/Polly - .NET 向けレジリエンシーライブラリ
- resilience4j - Java 向けレジリエンシーライブラリ
- AWS Well-Architected Framework - Reliability Pillar
免責事項 — 掲載情報は執筆時点のものです。料金・機能は変更される場合があります。最新情報は各公式サイトをご確認ください。