Lab AI 分散システムの基礎——可用性、フォールトトレランス、_consensus の仕組み
目次

「分散システムとは、あなたが存在しないことに気づかないコンピューターの集合である」——この皮肉は、分散システムの複雑さを端的に表している。本稿では、分散システムの基礎概念を数理的な理解とともに解説する。

分散システムの特徴

分散システムは複数のコンピューターがネットワークを通じて協調して動作するシステムだ。

特徴 説明 課題
並行性 複数のプロセスが同時に実行 競合状態、デッドロック
故障の独立性 一部のノードが故障しても全体は継続 フォールトトレランス
メッセージ遅延 ネットワーク遅延は不確定 タイムアウト、再送
時計の非同期性 正確な時刻同期は不可能 イベント順序の判定
状態の整合性 データの複製間で整合性維持 コンセンサス

可用性(Availability)

可用性はシステムが正常に動作する時間の割合だ。

可用性 = 稼働時間 / (稼働時間 + 停止時間) × 100

可用性のレベル

可用性 年間停止時間 別名
99% 3.65 日 Two Nines
99.9% 8.76 時間 Three Nines
99.99% 52.6 分 Four Nines
99.999% 5.26 分 Five Nines
99.9999% 31.5 秒 Six Nines
def calculate_availability(uptime, downtime):
    """可用性の計算"""
    total_time = uptime + downtime
    if total_time == 0:
        return 1.0
    return uptime / total_time

def calculate_downtime_from_availability(availability, period_hours=8760):
    """可用性から年間停止時間を逆算"""
    return period_hours * (1 - availability)

# 例:稼働時間 8750 時間、停止時間 10 時間
availability = calculate_availability(8750, 10)
print(f"可用性:{availability*100:.3f}%")  # 99.886%

# 各可用性レベルの停止時間
for nines in [99, 99.9, 99.99, 99.999, 99.9999]:
    downtime = calculate_downtime_from_availability(nines / 100)
    print(f"{nines}%: 年間{downtime*60:.1f}分")

シリーズ構成とパラレル構成

def series_availability(components):
    """直列構成の可用性(全コンポーネントが正常な必要がある)"""
    availability = 1.0
    for comp in components:
        availability *= comp
    return availability

def parallel_availability(components):
    """並列構成の可用性(いずれかのコンポーネントが正常ならよい)"""
    # 全てが故障する確率
    all_fail_prob = 1.0
    for comp in components:
        all_fail_prob *= (1 - comp)
    return 1 - all_fail_prob

# 例:可用性 99% のコンポーネント 3 個
components = [0.99, 0.99, 0.99]

series_avail = series_availability(components)
parallel_avail = parallel_availability(components)

print(f"直列構成:{series_avail*100:.3f}%")    # 97.029%
print(f"並列構成:{parallel_avail*100:.6f}%")  # 99.9999%

重要な洞察:

  • 直列構成:可用性は低下する(弱いリンクに依存)
  • 並列構成:可用性は向上する(冗長化の効果)

フォールトトレランス

フォールトトレランスは一部のコンポーネントが故障してもシステム全体として機能を継続する性質だ。

故障の種類

故障タイプ 説明 対策
クラッシュ故障 ノードが停止 再起動、フェイルオーバー
ビザンチン故障 任意の誤動作(悪意含む) ビザンチン障害耐性
ネットワーク分割 ノード間通信が断絶 ネットワークパーティション
タイミング故障 応答が時間内に返らない タイムアウト、再送

冗長化パターン

class RedundancyPattern:
    """冗長化パターンのベースクラス"""

    def __init__(self, nodes, required_for_success):
        self.nodes = nodes  # 各ノードの可用性
        self.k = required_for_success  # 成功に必要なノード数

    def system_availability(self):
        """システム全体の可用性を計算"""
        from itertools import combinations
        import math

        n = len(self.nodes)
        total_avail = 0.0

        # k 個以上のノードが正常な確率
        for k in range(self.k, n + 1):
            for combo in combinations(range(n), k):
                # この組み合わせが正常で、他が故障する確率
                prob = 1.0
                for i in range(n):
                    if i in combo:
                        prob *= self.nodes[i]
                    else:
                        prob *= (1 - self.nodes[i])
                total_avail += prob

        return total_avail

# 例:可用性 95% のノード 3 個、2 個以上正常なら OK
nodes = [0.95, 0.95, 0.95]
redundancy = RedundancyPattern(nodes, required_for_success=2)
print(f"2-out-of-3 システム:{redundancy.system_availability()*100:.3f}%")

# 1-out-of-3(1 個でも正常なら OK)
redundancy_any = RedundancyPattern(nodes, required_for_success=1)
print(f"1-out-of-3 システム:{redundancy_any.system_availability()*100:.6f}%")

CAP 定理

CAP 定理は分散システムが同時に満たせるのは以下の 3 つのうち 2 つまでと述べる。

性質 説明
Consistency(一貫性) 全てのノードが同じ時刻のデータを見る
Availability(可用性) 全てのリクエストが応答を返す
Partition Tolerance(分割耐性) ネットワーク分割が発生してもシステムは動作
       Consistency(一貫性)
           /    \
          /      \
         /        \
        /          \
       /            \
CP ───                ──── AP
(一貫性 + 分割耐性)   (可用性 + 分割耐性)
       \            /
        \          /
         \        /
          \      /
           \    /
       Partition Tolerance(分割耐性)

CAP のトレードオフ例

システム CAP 選択 使用例
RDBMS(シングルノード) CA 銀行取引(分割発生時は停止)
MongoDB(デフォルト) CP 一貫性重視、リーダー選出時に利用不可
Cassandra AP 可用性重視、最終整合性
DynamoDB AP 可用性重視、コンフリクト解決
Redis Cluster CP 一貫性重視
class CAPTradeoff:
    """CAP 定理のトレードオフをシミュレーション"""

    def __init__(self, consistency_level, availability_target):
        self.consistency_level = consistency_level  # 1-5(5 が最強)
        self.availability_target = availability_target  # 目標可用性

    def evaluate(self, network_partition=False):
        """ネットワーク分割発生時の挙動"""
        if network_partition:
            if self.consistency_level >= 4:
                # CP: 一貫性重視 → 可用性を犠牲
                return {"consistent": True, "available": False, "status": "読み取り不可(一貫性維持)"}
            else:
                # AP: 可用性重視 → 最終整合性
                return {"consistent": False, "available": True, "status": "古いデータ返却(最終整合性)"}
        else:
            # 分割なし:両方満たせる
            return {"consistent": True, "available": True, "status": "正常動作"}

# CP システム(一貫性レベル 5)
cp_system = CAPTradeoff(consistency_level=5, availability_target=0.99)
print("CP システム(ネットワーク分割時):")
print(cp_system.evaluate(network_partition=True))

# AP システム(一貫性レベル 2)
ap_system = CAPTradeoff(consistency_level=2, availability_target=0.999)
print("\nAP システム(ネットワーク分割時):")
print(ap_system.evaluate(network_partition=True))

コンセンサスアルゴリズム

コンセンサスは分散システムの複数ノードが合意形成するプロセスだ。

Paxos アルゴリズム

Paxos は多数決による合意形成アルゴリズムだ。

役割:
- Proposer: 値を提案
- Acceptor: 提案を受け入れ・拒否
- Learner: 合意値を学習

フェーズ:
1. Prepare フェーズ: Proposer が Accepter に Prepare 要求
2. Promise フェーズ: Acceptor が Promise 返答
3. Accept フェーズ: Proposer が Accept 要求
4. Accepted フェーズ: Acceptor が Accept 返答

過半数(Quorum)が Accept すれば合意成立
class PaxosSimulator:
    """Paxos アルゴリズムの簡易シミュレーション"""

    def __init__(self, num_acceptors):
        self.num_acceptors = num_acceptors
        self.promises = {}
        self.accepts = {}

    def propose(self, proposal_id, value):
        """提案を実行"""
        # フェーズ 1: Prepare
        promise_count = 0
        for i in range(self.num_acceptors):
            if self._send_prepare(i, proposal_id):
                promise_count += 1

        if promise_count <= self.num_acceptors // 2:
            return None  # Quorum 未達

        # フェーズ 2: Accept
        accept_count = 0
        for i in range(self.num_acceptors):
            if self._send_accept(i, proposal_id, value):
                accept_count += 1

        if accept_count > self.num_acceptors // 2:
            return value  # 合意成立
        return None

    def _send_prepare(self, acceptor_id, proposal_id):
        """Prepare 送信(簡易化:80% の確率で成功)"""
        import random
        return random.random() < 0.8

    def _send_accept(self, acceptor_id, proposal_id, value):
        """Accept 送信(簡易化:80% の確率で成功)"""
        import random
        return random.random() < 0.8

# 例:5 ノードの Paxos
paxos = PaxosSimulator(num_acceptors=5)
result = paxos.propose(proposal_id=1, value="X")
print(f"Paxos 合意結果:{result}")

Raft アルゴリズム

Raft はPaxos より理解しやすいコンセンサスアルゴリズムだ。

状態:
- Leader: 全ての書き込みを処理
- Follower: リーダーに追従
- Candidate: リーダー選出中

ターム(時代):
- 各ノードが現在のタームを保持
- タイムアウトすると Candidate になり、選挙開始

リーダー選出:
1. Candidate が Vote Request を送信
2. Follower が Vote Grant で応答(過半数で当選)
3. 当選した Candidate が Leader になる

ログ複製:
1. Leader が Append Entries を Follower に送信
2. 過半数が ACK すればコミット
class RaftSimulator:
    """Raft アルゴリズムの簡易シミュレーション"""

    def __init__(self, num_nodes):
        self.num_nodes = num_nodes
        self.current_term = 0
        self.leader_id = None
        self.log = []

    def start_election(self, candidate_id):
        """選挙開始"""
        self.current_term += 1
        votes = 1  # 自分への投票

        for node_id in range(self.num_nodes):
            if node_id != candidate_id:
                if self._request_vote(node_id, candidate_id):
                    votes += 1

        if votes > self.num_nodes // 2:
            self.leader_id = candidate_id
            return True
        return False

    def _request_vote(self, voter_id, candidate_id):
        """投票依頼(簡易化:70% の確率で賛成)"""
        import random
        return random.random() < 0.7

    def append_entry(self, entry):
        """ログ追加(リーダーのみ可能)"""
        if self.leader_id is None:
            return False

        # リーダーがログに追加
        self.log.append({'term': self.current_term, 'entry': entry})

        # Follower に複製
        ack_count = 1
        for node_id in range(self.num_nodes):
            if node_id != self.leader_id:
                if self._replicate_to_follower(node_id, entry):
                    ack_count += 1

        if ack_count > self.num_nodes // 2:
            return True  # コミット成立
        return False

    def _replicate_to_follower(self, follower_id, entry):
        """複製(簡易化:90% の確率で成功)"""
        import random
        return random.random() < 0.9

# 例:5 ノードの Raft
raft = RaftSimulator(num_nodes=5)
if raft.start_election(candidate_id=0):
    print(f"リーダー選出:ノード 0(ターム {raft.current_term})")
    if raft.append_entry("SET x=1"):
        print("ログコミット成功")

ビザンチン障害耐性(PBFT)

PBFT(Practical Byzantine Fault Tolerance)は悪意のあるノードが存在しても合意形成可能だ。

必要なノード数:n >= 3f + 1
(f 個のビザンチンノードに耐えるには 3f+1 ノードが必要)

例:
- f=1(1 個の悪意ノード): 4 ノード必要
- f=2(2 個の悪意ノード): 7 ノード必要
def byzantine_fault_tolerance(n, f):
    """ビザンチン障害耐性の判定"""
    required = 3 * f + 1
    if n >= required:
        return True, f"{n}ノードで{f}個のビザンチン故障に耐えられます"
    else:
        return False, f"{n}ノードでは不十分です(最低{required}ノード必要)"

# 例
for f in range(1, 4):
    for n in [3, 4, 5, 6, 7, 10]:
        result, msg = byzantine_fault_tolerance(n, f)
        if result:
            print(f"f={f}, n={n}: {msg}")
            break

整合性モデル

強い整合性 vs 最終整合性

整合性モデル 説明 使用例
线性化可能性(Linearizability) 全ての操作がアトミック、即時反映 銀行システム
順序整合性(Sequential) 全ノードで同じ操作順序 分散データベース
因果整合性(Causal) 因果関係のある操作は順序維持 SNS の投稿とコメント
最終整合性(Eventual) 十分時間が経てば全て同じ値 DNS、CDN
class ConsistencyModel:
    """整合性モデルのシミュレーション"""

    def __init__(self, model_type):
        self.model_type = model_type
        self.data = {}
        self.replicas = {}

    def write(self, key, value, timestamp):
        """書き込み"""
        self.data[key] = {'value': value, 'timestamp': timestamp}

        if self.model_type == "linearizable":
            # 全レプリカに即時反映
            for replica_id in self.replicas:
                self.replicas[replica_id][key] = {'value': value, 'timestamp': timestamp}

        elif self.model_type == "eventual":
            # リーダーのみ更新、非同期で複製
            pass  # 簡易化

    def read(self, key, replica_id=0):
        """読み取り"""
        if self.model_type == "linearizable":
            # 最新値を返す
            return self.data.get(key)

        elif self.model_type == "eventual":
            # レプリカの値を返す(古い可能性あり)
            return self.replicas.get(replica_id, {}).get(key)

分散システムのパターン

リトライパターン

import random
import time

class RetryPattern:
    """リトライパターン(指数バックオフ)"""

    def __init__(self, max_retries=3, base_delay=0.1, max_delay=10.0):
        self.max_retries = max_retries
        self.base_delay = base_delay
        self.max_delay = max_delay

    def execute(self, func, *args, **kwargs):
        """リトライ付き実行"""
        last_exception = None

        for attempt in range(self.max_retries + 1):
            try:
                return func(*args, **kwargs)
            except Exception as e:
                last_exception = e

                if attempt < self.max_retries:
                    # 指数バックオフ(jitter 付き)
                    delay = min(self.base_delay * (2 ** attempt), self.max_delay)
                    jitter = delay * 0.2 * random.random()
                    time.sleep(delay + jitter)

        raise last_exception

# 例:一時的な失敗をリトライ
def flaky_operation():
    if random.random() < 0.7:
        raise ConnectionError("一時的な接続エラー")
    return "成功"

retry = RetryPattern(max_retries=3, base_delay=0.1)
result = retry.execute(flaky_operation)
print(f"結果:{result}")

サーキットブレーカーパターン

from enum import Enum
from datetime import datetime, timedelta

class CircuitState(Enum):
    CLOSED = "closed"      # 正常
    OPEN = "open"          # 遮断
    HALF_OPEN = "half_open"  # 復旧テスト

class CircuitBreaker:
    """サーキットブレーカーパターン"""

    def __init__(self, failure_threshold=5, recovery_timeout=30, success_threshold=2):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.success_threshold = success_threshold

        self.state = CircuitState.CLOSED
        self.failure_count = 0
        self.success_count = 0
        self.last_failure_time = None

    def call(self, func, *args, **kwargs):
        """サーキットブレーカー付き呼び出し"""
        if self.state == CircuitState.OPEN:
            if datetime.utcnow() > self.last_failure_time + timedelta(seconds=self.recovery_timeout):
                self.state = CircuitState.HALF_OPEN
                self.success_count = 0
            else:
                raise Exception("Circuit breaker is OPEN")

        try:
            result = func(*args, **kwargs)
            self._on_success()
            return result
        except Exception as e:
            self._on_failure()
            raise

    def _on_success(self):
        """成功時の処理"""
        self.failure_count = 0
        if self.state == CircuitState.HALF_OPEN:
            self.success_count += 1
            if self.success_count >= self.success_threshold:
                self.state = CircuitState.CLOSED

    def _on_failure(self):
        """失敗時の処理"""
        self.failure_count += 1
        self.last_failure_time = datetime.utcnow()

        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN

バルヘッドパターン

import threading
import time

class BulkheadPattern:
    """バルヘッドパターン(リソース隔離)"""

    def __init__(self, max_concurrent):
        self.max_concurrent = max_concurrent
        self.semaphore = threading.Semaphore(max_concurrent)

    def execute(self, func, *args, **kwargs):
        """バルヘッド付き実行"""
        acquired = self.semaphore.acquire(timeout=5.0)
        if not acquired:
            raise Exception("Resource pool exhausted")

        try:
            return func(*args, **kwargs)
        finally:
            self.semaphore.release()

# 例:最大 5 同時実行
bulkhead = BulkheadPattern(max_concurrent=5)

def slow_operation():
    time.sleep(1)
    return "完了"

分散トレーシング

import uuid
from datetime import datetime

class Span:
    """トレーススパン"""

    def __init__(self, trace_id, span_id, operation_name, parent_id=None):
        self.trace_id = trace_id
        self.span_id = span_id
        self.operation_name = operation_name
        self.parent_id = parent_id
        self.start_time = datetime.utcnow()
        self.end_time = None
        self.tags = {}
        self.logs = []

    def set_tag(self, key, value):
        self.tags[key] = value

    def log(self, message):
        self.logs.append({'timestamp': datetime.utcnow(), 'message': message})

    def finish(self):
        self.end_time = datetime.utcnow()

    @property
    def duration(self):
        if self.end_time:
            return (self.end_time - self.start_time).total_seconds()
        return None

class DistributedTracer:
    """分散トレーサー"""

    def __init__(self, service_name):
        self.service_name = service_name
        self.spans = []

    def start_trace(self, operation_name):
        """新しいトレース開始"""
        trace_id = str(uuid.uuid4())
        span_id = str(uuid.uuid4())
        span = Span(trace_id, span_id, operation_name)
        return span

    def start_span(self, operation_name, parent_span):
        """子スパン開始"""
        span_id = str(uuid.uuid4())
        span = Span(parent_span.trace_id, span_id, operation_name, parent_span.span_id)
        return span

    def report(self, span):
        """スパンをレポート"""
        span.finish()
        self.spans.append(span)
        print(f"[{self.service_name}] {span.operation_name}: {span.duration*1000:.2f}ms")

# 例:マイクロサービス間トレーシング
# API Gateway
gateway_tracer = DistributedTracer("api-gateway")
gateway_span = gateway_tracer.start_trace("GET /users")
gateway_span.set_tag("http.method", "GET")
gateway_span.set_tag("http.url", "/users")

# User Service
user_tracer = DistributedTracer("user-service")
user_span = user_tracer.start_span("get_users", gateway_span)
user_span.set_tag("db.query", "SELECT * FROM users")

# Database
db_tracer = DistributedTracer("postgres")
db_span = db_tracer.start_span("SELECT", user_span)
db_tracer.report(db_span)

user_tracer.report(user_span)
gateway_tracer.report(gateway_span)

まとめ

分散システムの核心を整理する:

  • 可用性: 並列構成で向上、直列構成で低下
  • フォールトトレランス: 冗長化と障害検出・復旧
  • CAP 定理: 一貫性、可用性、分割耐性の 2 つまで
  • コンセンサス: Paxos、Raft、PBFT(ビザンチン耐性)
  • 整合性モデル: 強整合性から最終整合性まで用途で選択
  • パターン: リトライ、サーキットブレーカー、バルヘッド
  • 可観測性: 分散トレーシングでエンドツーエンドの可視化

分散システムの設計では「故障は避けられないもの」として受け入れ、故障を前提とした設計が求められる。

免責事項 — 掲載情報は執筆時点のものです。料金・機能は変更される場合があります。最新情報は各公式サイトをご確認ください。