Lab AI 分散システムの基礎——可用性、フォールトトレランス、_consensus の仕組み
目次
「分散システムとは、あなたが存在しないことに気づかないコンピューターの集合である」——この皮肉は、分散システムの複雑さを端的に表している。本稿では、分散システムの基礎概念を数理的な理解とともに解説する。
分散システムの特徴
分散システムは複数のコンピューターがネットワークを通じて協調して動作するシステムだ。
| 特徴 | 説明 | 課題 |
|---|---|---|
| 並行性 | 複数のプロセスが同時に実行 | 競合状態、デッドロック |
| 故障の独立性 | 一部のノードが故障しても全体は継続 | フォールトトレランス |
| メッセージ遅延 | ネットワーク遅延は不確定 | タイムアウト、再送 |
| 時計の非同期性 | 正確な時刻同期は不可能 | イベント順序の判定 |
| 状態の整合性 | データの複製間で整合性維持 | コンセンサス |
可用性(Availability)
可用性はシステムが正常に動作する時間の割合だ。
可用性 = 稼働時間 / (稼働時間 + 停止時間) × 100
可用性のレベル
| 可用性 | 年間停止時間 | 別名 |
|---|---|---|
| 99% | 3.65 日 | Two Nines |
| 99.9% | 8.76 時間 | Three Nines |
| 99.99% | 52.6 分 | Four Nines |
| 99.999% | 5.26 分 | Five Nines |
| 99.9999% | 31.5 秒 | Six Nines |
def calculate_availability(uptime, downtime):
"""可用性の計算"""
total_time = uptime + downtime
if total_time == 0:
return 1.0
return uptime / total_time
def calculate_downtime_from_availability(availability, period_hours=8760):
"""可用性から年間停止時間を逆算"""
return period_hours * (1 - availability)
# 例:稼働時間 8750 時間、停止時間 10 時間
availability = calculate_availability(8750, 10)
print(f"可用性:{availability*100:.3f}%") # 99.886%
# 各可用性レベルの停止時間
for nines in [99, 99.9, 99.99, 99.999, 99.9999]:
downtime = calculate_downtime_from_availability(nines / 100)
print(f"{nines}%: 年間{downtime*60:.1f}分")
シリーズ構成とパラレル構成
def series_availability(components):
"""直列構成の可用性(全コンポーネントが正常な必要がある)"""
availability = 1.0
for comp in components:
availability *= comp
return availability
def parallel_availability(components):
"""並列構成の可用性(いずれかのコンポーネントが正常ならよい)"""
# 全てが故障する確率
all_fail_prob = 1.0
for comp in components:
all_fail_prob *= (1 - comp)
return 1 - all_fail_prob
# 例:可用性 99% のコンポーネント 3 個
components = [0.99, 0.99, 0.99]
series_avail = series_availability(components)
parallel_avail = parallel_availability(components)
print(f"直列構成:{series_avail*100:.3f}%") # 97.029%
print(f"並列構成:{parallel_avail*100:.6f}%") # 99.9999%
重要な洞察:
- 直列構成:可用性は低下する(弱いリンクに依存)
- 並列構成:可用性は向上する(冗長化の効果)
フォールトトレランス
フォールトトレランスは一部のコンポーネントが故障してもシステム全体として機能を継続する性質だ。
故障の種類
| 故障タイプ | 説明 | 対策 |
|---|---|---|
| クラッシュ故障 | ノードが停止 | 再起動、フェイルオーバー |
| ビザンチン故障 | 任意の誤動作(悪意含む) | ビザンチン障害耐性 |
| ネットワーク分割 | ノード間通信が断絶 | ネットワークパーティション |
| タイミング故障 | 応答が時間内に返らない | タイムアウト、再送 |
冗長化パターン
class RedundancyPattern:
"""冗長化パターンのベースクラス"""
def __init__(self, nodes, required_for_success):
self.nodes = nodes # 各ノードの可用性
self.k = required_for_success # 成功に必要なノード数
def system_availability(self):
"""システム全体の可用性を計算"""
from itertools import combinations
import math
n = len(self.nodes)
total_avail = 0.0
# k 個以上のノードが正常な確率
for k in range(self.k, n + 1):
for combo in combinations(range(n), k):
# この組み合わせが正常で、他が故障する確率
prob = 1.0
for i in range(n):
if i in combo:
prob *= self.nodes[i]
else:
prob *= (1 - self.nodes[i])
total_avail += prob
return total_avail
# 例:可用性 95% のノード 3 個、2 個以上正常なら OK
nodes = [0.95, 0.95, 0.95]
redundancy = RedundancyPattern(nodes, required_for_success=2)
print(f"2-out-of-3 システム:{redundancy.system_availability()*100:.3f}%")
# 1-out-of-3(1 個でも正常なら OK)
redundancy_any = RedundancyPattern(nodes, required_for_success=1)
print(f"1-out-of-3 システム:{redundancy_any.system_availability()*100:.6f}%")
CAP 定理
CAP 定理は分散システムが同時に満たせるのは以下の 3 つのうち 2 つまでと述べる。
| 性質 | 説明 |
|---|---|
| Consistency(一貫性) | 全てのノードが同じ時刻のデータを見る |
| Availability(可用性) | 全てのリクエストが応答を返す |
| Partition Tolerance(分割耐性) | ネットワーク分割が発生してもシステムは動作 |
Consistency(一貫性)
/ \
/ \
/ \
/ \
/ \
CP ─── ──── AP
(一貫性 + 分割耐性) (可用性 + 分割耐性)
\ /
\ /
\ /
\ /
\ /
Partition Tolerance(分割耐性)
CAP のトレードオフ例
| システム | CAP 選択 | 使用例 |
|---|---|---|
| RDBMS(シングルノード) | CA | 銀行取引(分割発生時は停止) |
| MongoDB(デフォルト) | CP | 一貫性重視、リーダー選出時に利用不可 |
| Cassandra | AP | 可用性重視、最終整合性 |
| DynamoDB | AP | 可用性重視、コンフリクト解決 |
| Redis Cluster | CP | 一貫性重視 |
class CAPTradeoff:
"""CAP 定理のトレードオフをシミュレーション"""
def __init__(self, consistency_level, availability_target):
self.consistency_level = consistency_level # 1-5(5 が最強)
self.availability_target = availability_target # 目標可用性
def evaluate(self, network_partition=False):
"""ネットワーク分割発生時の挙動"""
if network_partition:
if self.consistency_level >= 4:
# CP: 一貫性重視 → 可用性を犠牲
return {"consistent": True, "available": False, "status": "読み取り不可(一貫性維持)"}
else:
# AP: 可用性重視 → 最終整合性
return {"consistent": False, "available": True, "status": "古いデータ返却(最終整合性)"}
else:
# 分割なし:両方満たせる
return {"consistent": True, "available": True, "status": "正常動作"}
# CP システム(一貫性レベル 5)
cp_system = CAPTradeoff(consistency_level=5, availability_target=0.99)
print("CP システム(ネットワーク分割時):")
print(cp_system.evaluate(network_partition=True))
# AP システム(一貫性レベル 2)
ap_system = CAPTradeoff(consistency_level=2, availability_target=0.999)
print("\nAP システム(ネットワーク分割時):")
print(ap_system.evaluate(network_partition=True))
コンセンサスアルゴリズム
コンセンサスは分散システムの複数ノードが合意形成するプロセスだ。
Paxos アルゴリズム
Paxos は多数決による合意形成アルゴリズムだ。
役割:
- Proposer: 値を提案
- Acceptor: 提案を受け入れ・拒否
- Learner: 合意値を学習
フェーズ:
1. Prepare フェーズ: Proposer が Accepter に Prepare 要求
2. Promise フェーズ: Acceptor が Promise 返答
3. Accept フェーズ: Proposer が Accept 要求
4. Accepted フェーズ: Acceptor が Accept 返答
過半数(Quorum)が Accept すれば合意成立
class PaxosSimulator:
"""Paxos アルゴリズムの簡易シミュレーション"""
def __init__(self, num_acceptors):
self.num_acceptors = num_acceptors
self.promises = {}
self.accepts = {}
def propose(self, proposal_id, value):
"""提案を実行"""
# フェーズ 1: Prepare
promise_count = 0
for i in range(self.num_acceptors):
if self._send_prepare(i, proposal_id):
promise_count += 1
if promise_count <= self.num_acceptors // 2:
return None # Quorum 未達
# フェーズ 2: Accept
accept_count = 0
for i in range(self.num_acceptors):
if self._send_accept(i, proposal_id, value):
accept_count += 1
if accept_count > self.num_acceptors // 2:
return value # 合意成立
return None
def _send_prepare(self, acceptor_id, proposal_id):
"""Prepare 送信(簡易化:80% の確率で成功)"""
import random
return random.random() < 0.8
def _send_accept(self, acceptor_id, proposal_id, value):
"""Accept 送信(簡易化:80% の確率で成功)"""
import random
return random.random() < 0.8
# 例:5 ノードの Paxos
paxos = PaxosSimulator(num_acceptors=5)
result = paxos.propose(proposal_id=1, value="X")
print(f"Paxos 合意結果:{result}")
Raft アルゴリズム
Raft はPaxos より理解しやすいコンセンサスアルゴリズムだ。
状態:
- Leader: 全ての書き込みを処理
- Follower: リーダーに追従
- Candidate: リーダー選出中
ターム(時代):
- 各ノードが現在のタームを保持
- タイムアウトすると Candidate になり、選挙開始
リーダー選出:
1. Candidate が Vote Request を送信
2. Follower が Vote Grant で応答(過半数で当選)
3. 当選した Candidate が Leader になる
ログ複製:
1. Leader が Append Entries を Follower に送信
2. 過半数が ACK すればコミット
class RaftSimulator:
"""Raft アルゴリズムの簡易シミュレーション"""
def __init__(self, num_nodes):
self.num_nodes = num_nodes
self.current_term = 0
self.leader_id = None
self.log = []
def start_election(self, candidate_id):
"""選挙開始"""
self.current_term += 1
votes = 1 # 自分への投票
for node_id in range(self.num_nodes):
if node_id != candidate_id:
if self._request_vote(node_id, candidate_id):
votes += 1
if votes > self.num_nodes // 2:
self.leader_id = candidate_id
return True
return False
def _request_vote(self, voter_id, candidate_id):
"""投票依頼(簡易化:70% の確率で賛成)"""
import random
return random.random() < 0.7
def append_entry(self, entry):
"""ログ追加(リーダーのみ可能)"""
if self.leader_id is None:
return False
# リーダーがログに追加
self.log.append({'term': self.current_term, 'entry': entry})
# Follower に複製
ack_count = 1
for node_id in range(self.num_nodes):
if node_id != self.leader_id:
if self._replicate_to_follower(node_id, entry):
ack_count += 1
if ack_count > self.num_nodes // 2:
return True # コミット成立
return False
def _replicate_to_follower(self, follower_id, entry):
"""複製(簡易化:90% の確率で成功)"""
import random
return random.random() < 0.9
# 例:5 ノードの Raft
raft = RaftSimulator(num_nodes=5)
if raft.start_election(candidate_id=0):
print(f"リーダー選出:ノード 0(ターム {raft.current_term})")
if raft.append_entry("SET x=1"):
print("ログコミット成功")
ビザンチン障害耐性(PBFT)
PBFT(Practical Byzantine Fault Tolerance)は悪意のあるノードが存在しても合意形成可能だ。
必要なノード数:n >= 3f + 1
(f 個のビザンチンノードに耐えるには 3f+1 ノードが必要)
例:
- f=1(1 個の悪意ノード): 4 ノード必要
- f=2(2 個の悪意ノード): 7 ノード必要
def byzantine_fault_tolerance(n, f):
"""ビザンチン障害耐性の判定"""
required = 3 * f + 1
if n >= required:
return True, f"{n}ノードで{f}個のビザンチン故障に耐えられます"
else:
return False, f"{n}ノードでは不十分です(最低{required}ノード必要)"
# 例
for f in range(1, 4):
for n in [3, 4, 5, 6, 7, 10]:
result, msg = byzantine_fault_tolerance(n, f)
if result:
print(f"f={f}, n={n}: {msg}")
break
整合性モデル
強い整合性 vs 最終整合性
| 整合性モデル | 説明 | 使用例 |
|---|---|---|
| 线性化可能性(Linearizability) | 全ての操作がアトミック、即時反映 | 銀行システム |
| 順序整合性(Sequential) | 全ノードで同じ操作順序 | 分散データベース |
| 因果整合性(Causal) | 因果関係のある操作は順序維持 | SNS の投稿とコメント |
| 最終整合性(Eventual) | 十分時間が経てば全て同じ値 | DNS、CDN |
class ConsistencyModel:
"""整合性モデルのシミュレーション"""
def __init__(self, model_type):
self.model_type = model_type
self.data = {}
self.replicas = {}
def write(self, key, value, timestamp):
"""書き込み"""
self.data[key] = {'value': value, 'timestamp': timestamp}
if self.model_type == "linearizable":
# 全レプリカに即時反映
for replica_id in self.replicas:
self.replicas[replica_id][key] = {'value': value, 'timestamp': timestamp}
elif self.model_type == "eventual":
# リーダーのみ更新、非同期で複製
pass # 簡易化
def read(self, key, replica_id=0):
"""読み取り"""
if self.model_type == "linearizable":
# 最新値を返す
return self.data.get(key)
elif self.model_type == "eventual":
# レプリカの値を返す(古い可能性あり)
return self.replicas.get(replica_id, {}).get(key)
分散システムのパターン
リトライパターン
import random
import time
class RetryPattern:
"""リトライパターン(指数バックオフ)"""
def __init__(self, max_retries=3, base_delay=0.1, max_delay=10.0):
self.max_retries = max_retries
self.base_delay = base_delay
self.max_delay = max_delay
def execute(self, func, *args, **kwargs):
"""リトライ付き実行"""
last_exception = None
for attempt in range(self.max_retries + 1):
try:
return func(*args, **kwargs)
except Exception as e:
last_exception = e
if attempt < self.max_retries:
# 指数バックオフ(jitter 付き)
delay = min(self.base_delay * (2 ** attempt), self.max_delay)
jitter = delay * 0.2 * random.random()
time.sleep(delay + jitter)
raise last_exception
# 例:一時的な失敗をリトライ
def flaky_operation():
if random.random() < 0.7:
raise ConnectionError("一時的な接続エラー")
return "成功"
retry = RetryPattern(max_retries=3, base_delay=0.1)
result = retry.execute(flaky_operation)
print(f"結果:{result}")
サーキットブレーカーパターン
from enum import Enum
from datetime import datetime, timedelta
class CircuitState(Enum):
CLOSED = "closed" # 正常
OPEN = "open" # 遮断
HALF_OPEN = "half_open" # 復旧テスト
class CircuitBreaker:
"""サーキットブレーカーパターン"""
def __init__(self, failure_threshold=5, recovery_timeout=30, success_threshold=2):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.success_threshold = success_threshold
self.state = CircuitState.CLOSED
self.failure_count = 0
self.success_count = 0
self.last_failure_time = None
def call(self, func, *args, **kwargs):
"""サーキットブレーカー付き呼び出し"""
if self.state == CircuitState.OPEN:
if datetime.utcnow() > self.last_failure_time + timedelta(seconds=self.recovery_timeout):
self.state = CircuitState.HALF_OPEN
self.success_count = 0
else:
raise Exception("Circuit breaker is OPEN")
try:
result = func(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise
def _on_success(self):
"""成功時の処理"""
self.failure_count = 0
if self.state == CircuitState.HALF_OPEN:
self.success_count += 1
if self.success_count >= self.success_threshold:
self.state = CircuitState.CLOSED
def _on_failure(self):
"""失敗時の処理"""
self.failure_count += 1
self.last_failure_time = datetime.utcnow()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
バルヘッドパターン
import threading
import time
class BulkheadPattern:
"""バルヘッドパターン(リソース隔離)"""
def __init__(self, max_concurrent):
self.max_concurrent = max_concurrent
self.semaphore = threading.Semaphore(max_concurrent)
def execute(self, func, *args, **kwargs):
"""バルヘッド付き実行"""
acquired = self.semaphore.acquire(timeout=5.0)
if not acquired:
raise Exception("Resource pool exhausted")
try:
return func(*args, **kwargs)
finally:
self.semaphore.release()
# 例:最大 5 同時実行
bulkhead = BulkheadPattern(max_concurrent=5)
def slow_operation():
time.sleep(1)
return "完了"
分散トレーシング
import uuid
from datetime import datetime
class Span:
"""トレーススパン"""
def __init__(self, trace_id, span_id, operation_name, parent_id=None):
self.trace_id = trace_id
self.span_id = span_id
self.operation_name = operation_name
self.parent_id = parent_id
self.start_time = datetime.utcnow()
self.end_time = None
self.tags = {}
self.logs = []
def set_tag(self, key, value):
self.tags[key] = value
def log(self, message):
self.logs.append({'timestamp': datetime.utcnow(), 'message': message})
def finish(self):
self.end_time = datetime.utcnow()
@property
def duration(self):
if self.end_time:
return (self.end_time - self.start_time).total_seconds()
return None
class DistributedTracer:
"""分散トレーサー"""
def __init__(self, service_name):
self.service_name = service_name
self.spans = []
def start_trace(self, operation_name):
"""新しいトレース開始"""
trace_id = str(uuid.uuid4())
span_id = str(uuid.uuid4())
span = Span(trace_id, span_id, operation_name)
return span
def start_span(self, operation_name, parent_span):
"""子スパン開始"""
span_id = str(uuid.uuid4())
span = Span(parent_span.trace_id, span_id, operation_name, parent_span.span_id)
return span
def report(self, span):
"""スパンをレポート"""
span.finish()
self.spans.append(span)
print(f"[{self.service_name}] {span.operation_name}: {span.duration*1000:.2f}ms")
# 例:マイクロサービス間トレーシング
# API Gateway
gateway_tracer = DistributedTracer("api-gateway")
gateway_span = gateway_tracer.start_trace("GET /users")
gateway_span.set_tag("http.method", "GET")
gateway_span.set_tag("http.url", "/users")
# User Service
user_tracer = DistributedTracer("user-service")
user_span = user_tracer.start_span("get_users", gateway_span)
user_span.set_tag("db.query", "SELECT * FROM users")
# Database
db_tracer = DistributedTracer("postgres")
db_span = db_tracer.start_span("SELECT", user_span)
db_tracer.report(db_span)
user_tracer.report(user_span)
gateway_tracer.report(gateway_span)
まとめ
分散システムの核心を整理する:
- 可用性: 並列構成で向上、直列構成で低下
- フォールトトレランス: 冗長化と障害検出・復旧
- CAP 定理: 一貫性、可用性、分割耐性の 2 つまで
- コンセンサス: Paxos、Raft、PBFT(ビザンチン耐性)
- 整合性モデル: 強整合性から最終整合性まで用途で選択
- パターン: リトライ、サーキットブレーカー、バルヘッド
- 可観測性: 分散トレーシングでエンドツーエンドの可視化
分散システムの設計では「故障は避けられないもの」として受け入れ、故障を前提とした設計が求められる。
免責事項 — 掲載情報は執筆時点のものです。料金・機能は変更される場合があります。最新情報は各公式サイトをご確認ください。