目次

分散システムにおいて、障害は避けられない。ネットワークは不安定になり、データベースは応答不能になり、外部 API はレート制限される。重要なのは「障害が起きないこと」ではなく、「一部の障害がシステム全体の停止に繋がらないこと」だ。

本記事では、フォールトトレラントなシステム設計に不可欠な 4 つのレジリエンシーパターン——サーキットブレーカーリトライバルクヘッドタイムアウト——を解説する。これらを適切に組み合わせることで、個別コンポーネントの故障を吸収し、システム全体の可用性を維持できる。

フォールトトレランスの基本概念

障害の伝播——なぜシステムは連鎖停止するのか

【障害の連鎖例:オンライン注文システム】

ユーザー → API ゲートウェイ → 注文サービス → 在庫サービス → データベース

1. データベースが応答遅延(10 秒→)
2. 在庫サービスの接続プールが枯渇
3. 注文サービスがタイムアウト待機
4. API ゲートウェイのリソース不足
5. **全ユーザーが応答不能** ← 1 つの DB 遅延で全体停止

この現象を**カスケード故障(連鎖故障)**と呼ぶ。レジリエンシーパターンの目的は、この連鎖を断ち切ることだ。

レジリエンスとフォールトトレランスの違い

【用語の整理】

- **フォールトトレランス(Fault Tolerance)**
  - 目的:一部の故障があってもシステム全体が機能を継続
  - 手法:冗長化、フェイルオーバー、エラー吸収

- **レジリエンス(Resilience)**
  - 目的:障害から速やかに回復し、正常状態に復帰
  - 手法:自己回復、状態復元、グレースフルデグラデーション

- **関係性**
  - フォールトトレランスは「耐える」こと
  - レジリエンスは「回復する」こと
  - 両方を備えて初めて「折れないシステム」が実現する

サーキットブレーカーパターン——過負荷からの保護

サーキットブレーカーの仕組み

**サーキットブレーカー(Circuit Breaker)**は、電気回路のブレーカーから着想を得たパターンだ。連続してエラーが発生した場合、それ以上リクエストを流さず「遮断」状態にする。

【サーキットブレーカーの状態遷移】

                    エラー閾値超
    ┌─────────────────────────────────┐
    │                                 ▼
[CLOSED] ──────────────────────> [OPEN]
  │  ▲                              │
  │  │  半分開く(テスト)             │  タイムアウト
  │  │                              │
  │  └──────────────────────────────┘
  ▼
[HALF-OPEN]
  │
  │  成功
  ▼
[CLOSED]

3 つの状態

状態説明リクエスト処理
CLOSED(閉)正常状態通常通り処理
OPEN(開)遮断状態即座にエラー返却(フォールバックへ)
HALF-OPEN(半開)テスト状態一部のリクエストで回復テスト

サーキットブレーカーの実装

import time
import threading
from enum import Enum
from typing import Callable, Any
from datetime import datetime, timedelta

class CircuitState(Enum):
    CLOSED = "closed"
    OPEN = "open"
    HALF_OPEN = "half_open"

class CircuitBreaker:
    def __init__(
        self,
        failure_threshold: int = 5,      # エラー閾値
        recovery_timeout: float = 30.0,   # 回復までの秒数
        half_open_max_calls: int = 1      # 半開状態の最大試行
    ):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.half_open_max_calls = half_open_max_calls

        self._state = CircuitState.CLOSED
        self._failure_count = 0
        self._last_failure_time: datetime = None
        self._half_open_calls = 0
        self._lock = threading.Lock()

    @property
    def state(self) -> CircuitState:
        with self._lock:
            # OPEN 状態で回復タイムアウト経過 → HALF-OPEN へ移行
            if (self._state == CircuitState.OPEN and
                self._last_failure_time and
                datetime.now() - self._last_failure_time > timedelta(seconds=self.recovery_timeout)):
                self._state = CircuitState.HALF_OPEN
                self._half_open_calls = 0
            return self._state

    def call(self, func: Callable, *args, **kwargs) -> Any:
        """サーキットブレーカー経由で関数を実行"""
        current_state = self.state

        if current_state == CircuitState.OPEN:
            # 遮断中:即座にエラー
            raise CircuitBreakerOpenError("サーキットブレーカーが OPEN 状態です")

        if current_state == CircuitState.HALF_OPEN:
            if self._half_open_calls >= self.half_open_max_calls:
                raise CircuitBreakerOpenError("HALF-OPEN 状態の最大コール数超過")
            self._half_open_calls += 1

        try:
            result = func(*args, **kwargs)
            self._on_success()
            return result
        except Exception as e:
            self._on_failure()
            raise

    def _on_success(self):
        with self._lock:
            if self._state == CircuitState.HALF_OPEN:
                # HALF-OPEN で成功 → CLOSED へ復帰
                self._state = CircuitState.CLOSED
                self._failure_count = 0
            elif self._state == CircuitState.CLOSED:
                # 連続成功カウントをリセット(オプション)
                self._failure_count = 0

    def _on_failure(self):
        with self._lock:
            self._failure_count += 1
            self._last_failure_time = datetime.now()

            if self._state == CircuitState.HALF_OPEN:
                # HALF-OPEN で失敗 → 即 OPEN へ
                self._state = CircuitState.OPEN
            elif (self._state == CircuitState.CLOSED and
                  self._failure_count >= self.failure_threshold):
                # エラー閾値超 → OPEN へ
                self._state = CircuitState.OPEN

class CircuitBreakerOpenError(Exception):
    """サーキットブレーカーがOPEN のときに発生するエラー"""
    pass

使用例

import requests

# サーキットブレーカーの作成
circuit_breaker = CircuitBreaker(
    failure_threshold=5,      # 5 回連続エラーで OPEN
    recovery_timeout=30.0     # 30 秒後に HALF-OPEN
)

# 外部 API 呼び出し
def call_external_api(data: str) -> str:
    response = requests.post(
        "https://api.example.com/process",
        json={"data": data},
        timeout=5.0
    )
    response.raise_for_status()
    return response.json()["result"]

# フォールバック付き呼び出し
def resilient_call(data: str) -> str:
    try:
        return circuit_breaker.call(call_external_api, data)
    except CircuitBreakerOpenError:
        # フォールバック:キャッシュまたはデフォルト値
        print("サーキットブレーカーOPEN:フォールバックへ")
        return get_cached_result(data)  # キャッシュから取得
    except requests.RequestException as e:
        print(f"API 呼び出しエラー:{e}")
        return get_cached_result(data)

def get_cached_result(data: str) -> str:
    """フォールバック:簡易キャッシュ(実際は Redis 等)"""
    cache = {"test": "cached_result"}
    return cache.get(data, "default_value")

サーキットブレーカーの効果

# シミュレーション:外部 API が応答不能になった場合
import random

def unstable_api(data: str) -> str:
    """80% の確率で失敗する API"""
    if random.random() < 0.8:
        raise requests.Timeout("API timeout")
    return "success"

cb = CircuitBreaker(failure_threshold=3, recovery_timeout=5.0)

# 連続エラーの連鎖を観測
for i in range(10):
    try:
        result = cb.call(unstable_api, f"data_{i}")
        print(f"試行 {i+1}: 成功 - {result}")
    except CircuitBreakerOpenError:
        print(f"試行 {i+1}: サーキットブレーカーOPEN(遮断)")
    except requests.Timeout:
        print(f"試行 {i+1}: タイムアウトエラー")

    time.sleep(0.5)

出力例:

試行 1: タイムアウトエラー
試行 2: タイムアウトエラー
試行 3: タイムアウトエラー
試行 4: サーキットブレーカー OPEN(遮断) ← 閾値超で遮断
試行 5: サーキットブレーカー OPEN(遮断)
試行 6: サーキットブレーカー OPEN(遮断)
試行 7: サーキットブレーカー OPEN(遮断)
試行 8: サーキットブレーカー OPEN(遮断)
試行 9: サーキットブレーカー OPEN(遮断)
試行 10: サーキットブレーカー OPEN(遮断)
# 5 秒後→HALF-OPEN 状態へ自動遷移

リトライパターン——一時的障害への対応

リトライが有効なケースと無効なケース

【リトライすべきエラー】

- ✅ 一時的ネットワーク障害(タイムアウト、接続切断)
- ✅ サーバー過負荷(503 Service Unavailable)
- ✅ レート制限(429 Too Many Requests)→ Retry-After ヘッダーに従う
- ✅ 一時的なデータベースロック

【リトライすべきでないエラー】

- ❌ クライアントエラー(400 Bad Request, 404 Not Found)
- ❌ 認証エラー(401 Unauthorized, 403 Forbidden)
- ❌ 永続的なサーバーエラー(500 Internal Server Error の一部)
- ❌ バリデーションエラー

指数バックオフとジッター

単純な固定間隔リトライは**リトライの嵐(retry storm)**を引き起こす。複数のクライアントが同時にリトライすると、サーバー回復のタイミングで一斉にリクエストが集中する。

import random
import time

def calculate_backoff(
    attempt: int,
    base_delay: float = 1.0,
    max_delay: float = 60.0,
    exponential_base: float = 2.0,
    jitter: bool = True
) -> float:
    """
    指数バックオフ+ジッターの計算

    Parameters:
        attempt: 試行回数(0 から開始)
        base_delay: 基本遅延秒数
        max_delay: 最大遅延秒数
        exponential_base: 指数の底
        jitter: ランダムジッターを追加するか
    """
    # 指数バックオフ
    delay = min(base_delay * (exponential_base ** attempt), max_delay)

    # ジッター(ランダムな揺らぎ)を追加
    if jitter:
        # フルジッター:0〜delay のランダム
        delay = random.uniform(0, delay)
        # または等幅ジッター:delay/2 〜 delay のランダム
        # delay = delay / 2 + random.uniform(0, delay / 2)

    return delay

# 試行ごとの待機時間比較
print("【待機時間の比較】")
print(f"{'試行':<6} {'固定':<8} {'指数':<8} {'指数 + ジッター':<15}")
print("-" * 40)

for attempt in range(6):
    fixed = 1.0
    exponential = min(1.0 * (2 ** attempt), 60.0)
    with_jitter = calculate_backoff(attempt, jitter=True)
    print(f"{attempt+1:<6} {fixed:<8.1f} {exponential:<8.1f} {with_jitter:<15.2f}")

出力:

【待機時間の比較】
試行   固定     指数     指数 + ジッター
----------------------------------------
1      1.0      1.0      0.73
2      1.0      2.0      1.45
3      1.0      4.0      2.89
4      1.0      8.0      4.12
5      1.0      16.0     8.67
6      1.0      32.0     18.34

リトライ実装(再帰版)

from functools import wraps
import logging

logger = logging.getLogger(__name__)

def retry(
    max_retries: int = 3,
    base_delay: float = 1.0,
    max_delay: float = 60.0,
    exponential_base: float = 2.0,
    jitter: bool = True,
    retryable_exceptions: tuple = (Exception,)
):
    """
    リトライデコレーター

    Parameters:
        max_retries: 最大リトライ回数
        base_delay: 基本遅延秒数
        max_delay: 最大遅延秒数
        exponential_base: 指数の底
        jitter: ジッターを追加するか
        retryable_exceptions: リトライ対象の例外タプル
    """
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            last_exception = None

            for attempt in range(max_retries + 1):
                try:
                    return func(*args, **kwargs)

                except retryable_exceptions as e:
                    last_exception = e

                    # 最大試行に達したらリタイア
                    if attempt >= max_retries:
                        logger.error(f"{func.__name__}: 最大リトライ回数 {max_retries} に達しました")
                        break

                    # 待機時間計算
                    delay = calculate_backoff(
                        attempt,
                        base_delay,
                        max_delay,
                        exponential_base,
                        jitter
                    )

                    logger.warning(
                        f"{func.__name__}: 試行 {attempt + 1}/{max_retries} 失敗 - "
                        f"{e.__class__.__name__}: {e}. {delay:.2f}秒後にリトライ"
                    )
                    time.sleep(delay)

            # 全ての試行が失敗
            raise last_exception

        return wrapper
    return decorator

# 使用例
@retry(
    max_retries=5,
    base_delay=1.0,
    max_delay=30.0,
    retryable_exceptions=(requests.RequestException, ConnectionError)
)
def call_unstable_api(url: str) -> dict:
    """不安定な API をリトライ付きで呼び出す"""
    response = requests.get(url, timeout=5.0)
    response.raise_for_status()
    return response.json()

バルクヘッドパターン——リソース隔離による故障封じ込め

バルクヘッドの概念

**バルクヘッド(Bulkhead)**は、船舶の防水区画から着想を得たパターンだ。船体内部を複数の区画に分割し、一部の区画が浸水しても全体が沈まないようにする。

【バルクヘッドの概念図】

伝統的アーキテクチャ(共有リソース):
┌─────────────────────────────────────┐
│      全サービスが共有する           │
│         1 つのスレッドプール         │
│  [注文][在庫][配送][ユーザー][...]  │
│         全部で 100 スレッド          │
└─────────────────────────────────────┘
→ 在庫サービスの遅延が全サービスを巻き込む

バルクヘッド適用(リソース隔離):
┌───────┬───────┬───────┬───────┐
│ 注文用 │ 在庫用 │ 配送用 │ユーザー│
│ 25 枚  │ 25 枚  │ 25 枚  │ 25 枚  │
└───────┴───────┴───────┴───────┘
→ 在庫サービスの遅延は在庫プールのみ影響

スレッドプールによるバルクヘッド実装

from concurrent.futures import ThreadPoolExecutor
from typing import Callable, Any

class BulkheadExecutor:
    """
    バルクヘッドパターン:リソース隔離のための実行エグゼキューター
    """
    def __init__(self, max_workers: int, name: str = "default"):
        self.name = name
        self.max_workers = max_workers
        self._executor = ThreadPoolExecutor(max_workers=max_workers)
        self._active_count = 0
        self._lock = threading.Lock()

    def submit(self, fn: Callable, *args, **kwargs):
        """タスクをサブミット(リソース制限あり)"""
        with self._lock:
            if self._active_count >= self.max_workers:
                raise BulkheadFullError(f"{self.name} バルクヘッドが満杯です")
            self._active_count += 1

        future = self._executor.submit(self._wrap_task, fn, *args, **kwargs)
        return future

    def _wrap_task(self, fn: Callable, *args, **kwargs):
        """タスク完了時にカウンタをデクリメント"""
        try:
            return fn(*args, **kwargs)
        finally:
            with self._lock:
                self._active_count -= 1

    def shutdown(self, wait: bool = True):
        self._executor.shutdown(wait=wait)

class BulkheadFullError(Exception):
    """バルクヘッドが満杯のときに発生するエラー"""
    pass

# 使用例:サービスごとにバルクヘッドを分離
order_bulkhead = BulkheadExecutor(max_workers=20, name="order")
inventory_bulkhead = BulkheadExecutor(max_workers=10, name="inventory")
user_bulkhead = BulkheadExecutor(max_workers=30, name="user")

def process_order(order_id: str) -> dict:
    """注文処理(在庫サービス呼び出しを含む)"""
    # 注文用プールで実行(最大 20 並列)
    future = order_bulkhead.submit(_do_process_order, order_id)
    return future.result()

def check_inventory(product_id: str) -> int:
    """在庫チェック(在庫サービス専用プール)"""
    future = inventory_bulkhead.submit(_do_check_inventory, product_id)
    return future.result()

# 在庫サービスが遅延しても、注文サービスは影響を受けない

セマンフォによるバルクヘッド

import threading

class SemaphoreBulkhead:
    """
    セマンフォを使ったバルクヘッド実装
    単純で軽量、ブロッキング動作
    """
    def __init__(self, max_concurrent: int):
        self._semaphore = threading.Semaphore(max_concurrent)
        self._max_concurrent = max_concurrent

    def execute(self, func: Callable, *args, **kwargs) -> Any:
        """セマンフォ制御付きで関数を実行"""
        self._semaphore.acquire()
        try:
            return func(*args, **kwargs)
        finally:
            self._semaphore.release()

    def try_execute(self, func: Callable, timeout: float = 0.0) -> Any:
        """タイムアウト付きで獲得試行(失敗時は即エラー)"""
        acquired = self._semaphore.acquire(timeout=timeout)
        if not acquired:
            raise BulkheadFullError("バルクヘッドが満杯です")
        try:
            return func(*args, **kwargs)
        finally:
            self._semaphore.release()

# 使用例
api_bulkhead = SemaphoreBulkhead(max_concurrent=50)

def call_external_service(data: str) -> str:
    """外部サービス呼び出し(最大 50 並列に制限)"""
    return api_bulkhead.execute(_do_call, data)

def call_with_timeout(data: str, timeout: float = 0.1) -> str:
    """即時失敗許容(待たず即フォールバック)"""
    try:
        return api_bulkhead.try_execute(_do_call, data, timeout=timeout)
    except BulkheadFullError:
        # 即座にフォールバック
        return get_cached_response(data)

タイムアウトパターン——無限待機の防止

タイムアウトの重要性

タイムアウトは、応答がない呼び出しを強制的に終了するパターンだ。設定しない場合、システムリソースが解放されず、最終的に全体が停止する。

# ❌ 問題:タイムアウトなし
def call_slow_service():
    response = requests.get("https://slow-service.com/api")
    # 相手が応答しない場合、永遠に待機(または OS の長いタイムアウトまで)
    return response.json()

# ✅ 推奨:明示的なタイムアウト
def call_with_timeout():
    response = requests.get("https://slow-service.com/api", timeout=5.0)
    # 5 秒でタイムアウト、リソース解放
    return response.json()

タイムアウトの種類の使い分け

【タイムアウトの種類】

1. **接続タイムアウト(connect timeout)**
   - TCP 接続が確立されるまでの待ち時間
   - 短めに設定(1-3 秒)
   - 例:`requests.get(url, timeout=(3.0, 10.0))` の第 1 引数

2. **読み取りタイムアウト(read timeout)**
   - データ受信までの待ち時間
   - サービス応答時間に応じて設定(5-30 秒)
   - 例:`requests.get(url, timeout=(3.0, 10.0))` の第 2 引数

3. **全体タイムアウト(total timeout)**
   - 接続 + 読み取りの合計時間
   - クライアント体験時間に基づく

タイムアウト実装(デコレーター)

from functools import wraps
import signal

class TimeoutError(Exception):
    """タイムアウト例外"""
    pass

def timeout(seconds: float):
    """
    関数実行にタイムアウトを設定するデコレーター

    Note: Unix シグナル使用のため Windows では動作しない
    Windows では concurrent.futures を使用
    """
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            def handler(signum, frame):
                raise TimeoutError(f"{func.__name__}{seconds}秒でタイムアウト")

            # シグナルハンドラ設定
            old_handler = signal.signal(signal.SIGALRM, handler)
            signal.alarm(int(seconds))

            try:
                return func(*args, **kwargs)
            finally:
                signal.alarm(0)
                signal.signal(signal.SIGALRM, old_handler)

        return wrapper
    return decorator

# Windows 対応版(concurrent.futures 使用)
from concurrent.futures import ThreadPoolExecutor, TimeoutError as FuturesTimeoutError

def timeout_windows(seconds: float):
    """Windows 対応タイムアウトデコレーター"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            with ThreadPoolExecutor(max_workers=1) as executor:
                future = executor.submit(func, *args, **kwargs)
                try:
                    return future.result(timeout=seconds)
                except FuturesTimeoutError:
                    raise TimeoutError(f"{func.__name__}{seconds}秒でタイムアウト")

        return wrapper
    return decorator

# 使用例
@timeout_windows(5.0)
def slow_database_query(query: str) -> list:
    """5 秒タイムアウトのデータベースクエリ"""
    return execute_query(query)

フォールバックパターン——グレースフルデグラデーション

フォールバックの戦略

**フォールバック(Fallback)**は、メインの処理が失敗したときに代替手段を提供するパターンだ。完全な機能を諦め、制限された機能で継続する(グレースフルデグラデーション)。

【フォールバックの例】

1. **キャッシュからの返却**
   - 最新性は失われるが、応答は維持
   - 例:DB 応答不能→最後に成功したキャッシュデータを返す

2. **デフォルト値の返却**
   - 簡素化された結果を返す
   - 例:レコメンド機能失敗→人気アイテム一覧を返す

3. **機能の無効化**
   - オプション機能をスキップ
   - 例:レビュー表示失敗→レビューセクションを非表示

4. **別のサービスへ切り替え**
   - セカンダリプロバイダーへフォールバック
   - 例:メイン API 失敗→バックアップ API へ

フォールバック実装

from typing import Callable, Any, Optional
import logging

logger = logging.getLogger(__name__)

class FallbackExecutor:
    """
    フォールバックチェーンを実行する
    メイン→フォールバック 1→フォールバック 2→...→デフォルト
    """
    def __init__(self, name: str = "fallback"):
        self.name = name

    def execute_with_fallback(
        self,
        primary: Callable,
        fallbacks: list[Callable],
        default: Any = None,
        *args,
        **kwargs
    ) -> Any:
        """
        フォールバックチェーンを実行

        Parameters:
            primary: メインの処理
            fallbacks: フォールバック処理のリスト(優先度順)
            default: 全てのフォールバックが失敗したときのデフォルト値
            args, kwargs: 各関数に渡す引数
        """
        # メイン処理を試行
        try:
            return primary(*args, **kwargs)
        except Exception as e:
            logger.warning(f"{self.name}: メイン処理失敗 - {e}")

        # フォールバックを順に試行
        for i, fallback in enumerate(fallbacks):
            try:
                result = fallback(*args, **kwargs)
                logger.info(f"{self.name}: フォールバック {i+1} で成功")
                return result
            except Exception as e:
                logger.warning(f"{self.name}: フォールバック {i+1} 失敗 - {e}")

        # 全て失敗:デフォルト値
        logger.error(f"{self.name}: 全てのフォールバック失敗、デフォルト値を返却")
        return default

# 使用例
fallback = FallbackExecutor(name="product_service")

def get_product_info_primary(product_id: str) -> dict:
    """メイン:データベースから取得"""
    return db_query("SELECT * FROM products WHERE id = ?", product_id)

def get_product_info_cache(product_id: str) -> dict:
    """フォールバック 1:キャッシュから取得"""
    return redis_cache.get(f"product:{product_id}")

def get_product_info_default(product_id: str) -> dict:
    """フォールバック 2:最小限の情報"""
    return {"id": product_id, "name": "商品情報 unavailable", "price": 0}

# 実行
product = fallback.execute_with_fallback(
    primary=get_product_info_primary,
    fallbacks=[get_product_info_cache],
    default={"id": "unknown", "name": "不明", "price": 0},
    product_id="P12345"
)

パターンの組み合わせ——現実的な実装

4 パターンの統合

実際のシステムでは、これら 4 つのパターンを組み合わせて使用する。

from dataclasses import dataclass
from typing import Callable, Any
import time

@dataclass
class ResilienceConfig:
    """レジリエンシー設定"""
    # サーキットブレーカー
    circuit_failure_threshold: int = 5
    circuit_recovery_timeout: float = 30.0

    # リトライ
    retry_max_attempts: int = 3
    retry_base_delay: float = 1.0
    retry_max_delay: float = 30.0

    # バルクヘッド
    bulkhead_max_concurrent: int = 50

    # タイムアウト
    timeout_seconds: float = 10.0

class ResilientExecutor:
    """
    4 つのレジリエンシーパターンを統合したエグゼキューター
    """
    def __init__(self, config: ResilienceConfig, name: str = "resilient"):
        self.config = config
        self.name = name

        # サーキットブレーカー
        self._circuit_breaker = CircuitBreaker(
            failure_threshold=config.circuit_failure_threshold,
            recovery_timeout=config.circuit_recovery_timeout
        )

        # バルクヘッド
        self._bulkhead = SemaphoreBulkhead(config.bulkhead_max_concurrent)

        self._logger = logging.getLogger(name)

    def execute(
        self,
        func: Callable,
        *args,
        fallback: Callable = None,
        fallback_args: tuple = None,
        **kwargs
    ) -> Any:
        """
        レジリエントな実行

        実行フロー:
        1. バルクヘッド(並列数制限)
        2. サーキットブレーカー(遮断チェック)
        3. リトライ(一時的エラー対応)
        4. タイムアウト(無限待機防止)
        5. フォールバック(代替処理)
        """
        fallback_args = fallback_args or ()

        def _execute_with_retry_and_timeout():
            """リトライ+タイムアウトで実行"""
            last_error = None

            for attempt in range(self.config.retry_max_attempts + 1):
                try:
                    # タイムアウト付き実行
                    return self._execute_with_timeout(func, *args, **kwargs)

                except (TimeoutError, ConnectionError, requests.RequestException) as e:
                    last_error = e

                    if attempt >= self.config.retry_max_attempts:
                        break  # 最大試行に達した

                    # 指数バックオフで待機
                    delay = calculate_backoff(
                        attempt,
                        self.config.retry_base_delay,
                        self.config.retry_max_delay
                    )
                    self._logger.warning(
                        f"{self.name}: 試行 {attempt+1} 失敗、{delay:.2f}秒後にリトライ: {e}"
                    )
                    time.sleep(delay)

                except Exception as e:
                    # リトライしないエラー
                    last_error = e
                    break

            raise last_error

        def _execute_bulkhead():
            """バルクヘッド経由で実行"""
            return self._bulkhead.execute(_execute_with_retry_and_timeout)

        def _execute_circuit():
            """サーキットブレーカー経由で実行"""
            return self._circuit_breaker.call(_execute_bulkhead)

        # メイン実行
        try:
            return _execute_circuit()

        except (CircuitBreakerOpenError, BulkheadFullError, TimeoutError, Exception) as e:
            self._logger.error(f"{self.name}: メイン実行失敗 - {e}")

            # フォールバック
            if fallback:
                try:
                    self._logger.info(f"{self.name}: フォールバック実行")
                    return fallback(*fallback_args, **kwargs)
                except Exception as fallback_error:
                    self._logger.error(f"{self.name}: フォールバック失敗 - {fallback_error}")

            # 再スロー
            raise

# 使用例
config = ResilienceConfig(
    circuit_failure_threshold=5,
    circuit_recovery_timeout=30.0,
    retry_max_attempts=3,
    retry_base_delay=1.0,
    retry_max_delay=30.0,
    bulkhead_max_concurrent=50,
    timeout_seconds=10.0
)

executor = ResilientExecutor(config, name="order_service")

def call_inventory_service(product_id: str) -> int:
    """在庫サービス呼び出し"""
    response = requests.get(
        f"http://inventory-service/api/stock/{product_id}",
        timeout=5.0
    )
    response.raise_for_status()
    return response.json()["stock_count"]

def fallback_inventory(product_id: str) -> int:
    """フォールバック:キャッシュまたはデフォルト値"""
    # Redis 等からキャッシュ取得
    cached = redis_cache.get(f"inventory:{product_id}")
    if cached:
        return cached
    return 0  # デフォルト:在庫なし

# 実行
try:
    stock = executor.execute(
        call_inventory_service,
        "P12345",
        fallback=fallback_inventory,
        fallback_args=("P12345",)
    )
    print(f"在庫数:{stock}")
except Exception as e:
    print(f"在庫取得失敗:{e}")

まとめ

レジリエンシーパターンの核心:

  1. サーキットブレーカー: 連続エラーを検知して遮断し、カスケード故障を防止

    • 状態遷移:CLOSED → OPEN → HALF-OPEN → CLOSED
    • 回復タイムアウトで自動的にテスト再開
  2. リトライ: 一時的エラーを自動回復

    • 指数バックオフ: 試行ごとに待機時間を倍増
    • ジッター: 複数クライアントの同期を防止
  3. バルクヘッド: リソースを隔離して故障を封じ込め

    • サービスごとにスレッドプールを分離
    • 一部のサービス遅延が全体に波及しない
  4. タイムアウト: 無限待機を防止

    • 接続タイムアウトと読み取りタイムアウトを使い分け
    • 全体タイムアウトでクライアント体験を保護
  5. フォールバック: グレースフルデグラデーション

    • キャッシュ、デフォルト値、機能無効化を準備
    • 完全停止より部分機能継続を選ぶ

重要なのは、これらを単独で使うのではなく組み合わせることだ。サーキットブレーカーだけが有効でも、リトライだけが有効でもない。現実のシステムでは、これらのパターンが相互に補完し合うことで、真のフォールトトレランスが実現する。

実装上の指針:

  1. まずはタイムアウトから: 最も単純で効果的
  2. リトライを追加: 指数バックオフ+ジッター必須
  3. サーキットブレーカーで保護: 過負荷時の連鎖を断つ
  4. バルクヘッドで隔離: 重要サービスから順に分離
  5. フォールバックで UX 維持: 完全停止よりマシな体験を

参考資料

  • “Release It! 2nd Edition” Michael T. Nygard 著(Pragmatic Bookshelf)
  • “Building Microservices” Sam Newman 著(O’Reilly)
  • Microsoft Azure Architecture Center - “Transient fault handling”
  • Netflix Hystrix GitHub Repository(レガシーだが概念理解に有用)
  • App-vNext/Polly - .NET 向けレジリエンシーライブラリ
  • resilience4j - Java 向けレジリエンシーライブラリ
  • AWS Well-Architected Framework - Reliability Pillar

免責事項 — 掲載情報は執筆時点のものです。料金・機能は変更される場合があります。最新情報は各公式サイトをご確認ください。