目次
現代のマイクロサービスアーキテクチャにおいて、サービス間通信の管理は複雑さを増している。サービスメッシュ(Service Mesh)は、このサービス間通信をインフラ層として抽象化し、セキュリティ、可観測性、トラフィック制御を一元的に管理するための基盤技術である。
本記事では、サービスメッシュの基本概念から、サイドカープロキシのアーキテクチャ、mTLS によるセキュアな通信、トラフィック制御のメカニズムまでを体系的に解説する。
サービスメッシュの基本概念——なぜ「メッシュ」が必要か
マイクロサービスの通信課題
【マイクロサービス通信の課題】
サービス A → サービス B → サービス C
↓ ↓ ↓
認証? 認証? 認証?
ログ? ログ? ログ?
再試行? 再試行? 再試行?
速率制限? 速率制限? 速率制限?
【課題】
1. 各サービスが通信ロジックを重複実装
2. 認証・暗号化のポリシー統一が困難
3. 障害時の挙動(リトライ、タイムアウト)がバラバラ
4. 通信ログの収集方法がサービスごとに異なる
サービスメッシュの解決策: 通信ロジックをアプリケーションコードから分離し、インフラ層として一元管理する。
サービスメッシュの定義
サービスメッシュ:
「マイクロサービス間のサービス間通信を管理するための
専用インフラ層(dedicated infrastructure layer)」
【主な機能】
1. サービスディスカバリ(サービス発見)
2. トラフィック制御(ルーティング、負荷分散)
3. セキュリティ(mTLS、認証、認可)
4. 可観測性(メトリクス、ログ、トレーシング)
5. 回復性(リトライ、サーキットブレーカー、レート制限)
サービスメッシュのアプローチ比較
# サービスメッシュなし(ライブラリアプローチ)
class ServiceClient:
def __init__(self, service_name):
self.service_name = service_name
# 各サービスが通信ロジックを重複実装
self.retry_policy = RetryPolicy(max_retries=3)
self.auth_handler = AuthHandler()
self.logger = ServiceLogger()
self.rate_limiter = RateLimiter()
def call(self, endpoint, data):
# 認証処理
token = self.auth_handler.get_token()
# 再試行ロジック
for attempt in range(self.retry_policy.max_retries):
try:
# レート制限チェック
self.rate_limiter.check_limit()
# actual call
response = self._make_request(endpoint, data, token)
return response
except Exception as e:
self.logger.log_retry_attempt(attempt, e)
if attempt == self.retry_policy.max_retries - 1:
raise
# 課題:各サービスでコードが重複、ポリシー変更時の一斉更新が必要
# サービスメッシュあり(サイドカーアプローチ)
class ServiceClient:
def __init__(self, service_name):
self.service_name = service_name
# 通信ロジックは不要 - サービスメッシュが管理
def call(self, endpoint, data):
# ローカルホストのサイドカープロキシに送信するだけ
# あとはサイドカーがすべて処理
response = self._make_request(f"http://localhost:15001/{endpoint}", data)
return response
# メリット:通信ポリシーはメッシュ設定で一元管理、アプリコードはビジネスロジックに集中
サイドカープロキシのアーキテクチャ
サイドカーパターンの基本概念
【サイドカーアーキテクチャ】
┌─────────────────────────────────────────────────────┐
│ Pod / VM │
│ ┌───────────────────┐ ┌───────────────────────┐ │
│ │ Main Container │ │ Sidecar Proxy │ │
│ │ (Application) │ │ (Envoy, Linkerd) │ │
│ │ │ │ │ │
│ │ - ビジネスロジック │ │ - トラフィック制御 │ │
│ │ - HTTP サーバー │◄─┤ - mTLS 終端 │ │
│ │ - アプリログ │ │ - メトリクス収集 │ │
│ │ │ │ - 認証・認可 │ │
│ └─────────┬─────────┘ └───────────┬───────────┘ │
│ │ │ │
│ └────────────┬───────────┘ │
│ │ │
│ localhost:15001 │
│ (ループバック) │
└─────────────────────────┼───────────────────────────┘
│
▼
外部サービス・他のサイドカー
サイドカーの利点:
- 言語非依存: アプリケーションの言語に関係なく動作
- 透過的: アプリコード変更なしに機能追加
- 分離: 障害がアプリに直接伝播しない
- 一元管理: ポリシーをメッシュ全体で統一
Envoy プロキシの基本構造
import threading
import time
from dataclasses import dataclass
from typing import Dict, List, Optional
import hashlib
@dataclass
class ClusterConfig:
"""クラスター(アップストリームサービス)設定"""
name: str
hosts: List[str]
health_check_interval: float = 5.0
circuit_breaker_threshold: int = 5
@dataclass
class ListenerConfig:
"""リスナー設定"""
address: str
port: int
route_config_name: str
@dataclass
class RouteConfig:
"""ルート設定"""
name: str
routes: Dict[str, str] # path_prefix -> cluster
class SimpleSidecarProxy:
"""簡易サイドカープロキシの実装"""
def __init__(self):
self.clusters: Dict[str, ClusterConfig] = {}
self.listeners: Dict[str, ListenerConfig] = {}
self.routes: Dict[str, RouteConfig] = {}
self.circuit_breakers: Dict[str, int] = {} # cluster -> failure_count
self.health_status: Dict[str, bool] = {} # host -> status
def add_cluster(self, config: ClusterConfig):
"""アップストリームクラスターの登録"""
self.clusters[config.name] = config
self.circuit_breakers[config.name] = 0
# 初期ヘルスチェック
for host in config.hosts:
self.health_status[host] = True
# 定期ヘルスチェック開始
threading.Thread(target=self._health_check_loop, args=(config,), daemon=True).start()
def _health_check_loop(self, cluster: ClusterConfig):
"""定期ヘルスチェック実行"""
while True:
time.sleep(cluster.health_check_interval)
for host in cluster.hosts:
# 簡易ヘルスチェック(実際は HTTP/TCP チェック)
is_healthy = self._perform_health_check(host)
self.health_status[host] = is_healthy
if not is_healthy:
print(f"[HEALTH] {host} is unhealthy")
def _perform_health_check(self, host: str) -> bool:
"""ヘルスチェック実行(簡易実装)"""
# 実際は HTTP GET /health や TCP connect を実行
return True
def route_request(self, path: str) -> Optional[str]:
"""リクエストを適切なクラスターにルーティング"""
for route_config in self.routes.values():
for prefix, cluster_name in route_config.routes.items():
if path.startswith(prefix):
cluster = self.clusters.get(cluster_name)
if cluster:
# 正常なホストから選択(ラウンドロビン)
healthy_hosts = [h for h in cluster.hosts if self.health_status.get(h, False)]
if healthy_hosts:
# 簡易ラウンドロビン
host_hash = hashlib.md5(path.encode()).hexdigest()
return healthy_hosts[int(host_hash, 16) % len(healthy_hosts)]
return None
def check_circuit_breaker(self, cluster_name: str) -> bool:
"""サーキットブレーカーの状態を確認"""
failure_count = self.circuit_breakers.get(cluster_name, 0)
threshold = self.clusters[cluster_name].circuit_breaker_threshold
if failure_count >= threshold:
print(f"[CIRCUIT BREAKER] OPEN for {cluster_name} (failures: {failure_count})")
return False # リクエスト拒否
return True # リクエスト許可
def record_success(self, cluster_name: str):
"""成功を記録(サーキットブレーカーリセット)"""
self.circuit_breakers[cluster_name] = 0
def record_failure(self, cluster_name: str):
"""失敗を記録(サーキットブレーカーカウントアップ)"""
self.circuit_breakers[cluster_name] += 1
# 使用例
proxy = SimpleSidecarProxy()
# クラスター設定
proxy.add_cluster(ClusterConfig(
name="user-service",
hosts=["user-1.internal:8080", "user-2.internal:8080"],
health_check_interval=5.0,
circuit_breaker_threshold=3
))
proxy.add_cluster(ClusterConfig(
name="order-service",
hosts=["order-1.internal:8080"],
health_check_interval=10.0,
circuit_breaker_threshold=5
))
# ルート設定
proxy.routes["main-routes"] = RouteConfig(
name="main-routes",
routes={
"/users": "user-service",
"/orders": "order-service"
}
)
# リクエストルーティング
target = proxy.route_request("/users/profile")
print(f"Routing to: {target}") # user-1.internal:8080 or user-2.internal:8080
データプレーンとコントロールプレーン
【サービスメッシュの 2 層アーキテクチャ】
┌─────────────────────────────────────────────────────┐
│ コントロールプレーン │
│ (Istio Pilot, Linkerd Control Plane) │
│ │
│ - 設定管理(ポリシー、ルール) │
│ - サービスディスカバリ │
│ - 証明書発行(mTLS 用) │
│ - 設定をサイドカーに配信(xDS API) │
└─────────────────────┬───────────────────────────────┘
│ xDS API (gRPC)
│ - LDS: Listener Discovery
│ - RDS: Route Discovery
│ - CDS: Cluster Discovery
│ - EDS: Endpoint Discovery
▼
┌─────────────────────────────────────────────────────┐
│ データプレーン(サイドカー) │
│ (Envoy, Linkerd Data Plane) │
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Sidecar │ │ Sidecar │ │ Sidecar │ │
│ │ Pod 1 │ │ Pod 2 │ │ Pod 3 │ │
│ └─────────┘ └─────────┘ └─────────┘ │
│ │
│ - 実際のトラフィック処理 │
│ - mTLS 終端 │
│ - メトリクス収集 │
└─────────────────────────────────────────────────────┘
コントロールプレーンの役割:
- 設定の一元管理と配信
- サービスレジストリとの統合(Kubernetes, Consul)
- 証明書ライフサイクル管理
データプレーンの役割:
- 実際のトラフィック転送
- ポリシーの適用(認証、レート制限)
- メトリクス・ログの収集
mTLS(相互 TLS)によるセキュアな通信
TLS と mTLS の違い
【TLS(一方向認証)】
クライアント サーバー
│ │
│──── ClientHello ──────────►│
│◄──── ServerHello ──────────│
│◄──── 証明書(サーバー) ─────│
│ │
│ [サーバー認証のみ] │
│ 証明書検証 │
│ │
│──── 暗号化通信 ───────────►│
│◄─── 暗号化通信 ────────────│
【mTLS(双方向認証)】
クライアント サーバー
│ │
│──── ClientHello ──────────►│
│◄──── ServerHello ──────────│
│◄──── 証明書(サーバー) ─────│
│ │
│ [サーバー認証] │
│ 証明書検証 │
│ │
│──── 証明書(クライアント) ──►│
│ │
│ [クライアント認証] │
│ 証明書検証 │
│ │
│──── 暗号化通信 ───────────►│
│◄─── 暗号化通信 ────────────│
mTLS のメリット:
- 相互認証: クライアント・サーバー双方が身元を確認
- なりすまし防止: 有効な証明書がないサービスは通信不可
- ゼロトラスト: 「内部ネットワークは安全」という前提を廃止
- 自動ローテーション: 証明書はメッシュが自動更新
mTLS ハンドシェイクの実装
import ssl
import socket
from typing import Tuple, Optional
from dataclasses import dataclass
import datetime
@dataclass
class CertificateConfig:
"""証明書設定"""
ca_cert_path: str # 認証局の証明書
cert_path: str # 自証明書
key_path: str # 秘密鍵
verify_mode: bool = True
class MutualTLSConnection:
"""mTLS 接続の実装"""
def __init__(self, config: CertificateConfig):
self.config = config
self.context = self._create_ssl_context()
def _create_ssl_context(self) -> ssl.SSLContext:
"""mTLS 用の SSL コンテキスト作成"""
context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
# 証明書検証を有効化
if self.config.verify_mode:
context.verify_mode = ssl.CERT_REQUIRED
context.check_hostname = True
# CA 証明書の読み込み
context.load_verify_locations(self.config.ca_cert_path)
# クライアント証明書の読み込み(mTLS 用)
context.load_cert_chain(
certfile=self.config.cert_path,
keyfile=self.config.key_path
)
# モダンな暗号スイートのみ許可
context.set_ciphers('ECDHE+AESGCM:DHE+AESGCM:ECDHE+CHACHA20:DHE+CHACHA20')
# TLS 1.3 を優先
context.minimum_version = ssl.TLSVersion.TLSv1_3
return context
def connect(self, host: str, port: int) -> ssl.SSLSocket:
"""mTLS 接続を確立"""
# TCP 接続
sock = socket.create_connection((host, port))
# mTLS ハンドシェイク
secure_sock = self.context.wrap_socket(
sock,
server_hostname=host,
do_handshake_on_connect=True
)
# 接続情報のログ
print(f"[mTLS] Connected to {host}:{port}")
print(f"[mTLS] Protocol: {secure_sock.version()}")
print(f"[mTLS] Cipher: {secure_sock.cipher()[0]}")
# サーバー証明書の検証
cert = secure_sock.getpeercert()
print(f"[mTLS] Server certificate subject: {cert['subject']}")
return secure_sock
class MTLSAuthServer:
"""mTLS サーバー side 実装"""
def __init__(self, config: CertificateConfig):
self.config = config
self.context = self._create_ssl_context()
def _create_ssl_context(self) -> ssl.SSLContext:
"""mTLS サーバー用 SSL コンテキスト"""
context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
# クライアント証明書を必須に設定
context.verify_mode = ssl.CERT_REQUIRED
context.check_hostname = False # サーバー側はホスト名検証しない場合が多い
# CA 証明書(クライアント証明書検証用)
context.load_verify_locations(self.config.ca_cert_path)
# サーバー証明書
context.load_cert_chain(
certfile=self.config.cert_path,
keyfile=self.config.key_path
)
return context
def accept_connection(self, host: str = '0.0.0.0', port: int = 8443) -> ssl.SSLSocket:
"""mTLS 接続を受理"""
server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_sock.bind((host, port))
server_sock.listen(5)
print(f"[mTLS Server] Listening on {host}:{port}")
client_sock, addr = server_sock.accept()
# mTLS ハンドシェイク
secure_sock = self.context.wrap_socket(
client_sock,
server_side=True,
do_handshake_on_connect=True
)
# クライアント証明書の検証
cert = secure_sock.getpeercert()
print(f"[mTLS Server] Client connected from {addr}")
print(f"[mTLS Server] Client certificate subject: {cert['subject']}")
return secure_sock
# 使用例(架空の証明書パス)
# client_config = CertificateConfig(
# ca_cert_path='/etc/mesh-ca/ca-cert.pem',
# cert_path='/etc/mesh/client-cert.pem',
# key_path='/etc/mesh/client-key.pem'
# )
#
# client = MutualTLSConnection(client_config)
# secure_conn = client.connect('user-service.default.svc.cluster.local', 8443)
# secure_conn.send(b'GET /api/users HTTP/1.1\r\nHost: user-service\r\n\r\n')
# response = secure_conn.recv(4096)
証明書の自動ローテーション
import os
import subprocess
from datetime import datetime, timedelta
class CertificateRotator:
"""証明書自動ローテーター"""
def __init__(self, ca_cert_path: str, output_dir: str):
self.ca_cert_path = ca_cert_path
self.output_dir = output_dir
self.cert_validity_days = 365
self.renewal_threshold_days = 30
def check_expiration(self, cert_path: str) -> Tuple[bool, int]:
"""証明書の有効期限をチェック"""
# OpenSSL で有効期限を取得
result = subprocess.run(
['openssl', 'x509', '-in', cert_path, '-noout', '-enddate'],
capture_output=True,
text=True
)
if result.returncode != 0:
return False, 0
# 有効期限日付をパース
end_date_str = result.stdout.strip().replace('notAfter=', '')
end_date = datetime.strptime(end_date_str, '%b %d %H:%M:%S %Y %Z')
# 残存日数を計算
days_remaining = (end_date - datetime.now()).days
needs_renewal = days_remaining <= self.renewal_threshold_days
return needs_renewal, days_remaining
def rotate_certificate(self, service_name: str, hosts: list):
"""証明書をローテーション"""
# 秘密鍵生成
key_path = os.path.join(self.output_dir, f'{service_name}-key.pem')
subprocess.run([
'openssl', 'genrsa', '-out', key_path, '2048'
], check=True)
# CSR(Certificate Signing Request)生成
csr_path = os.path.join(self.output_dir, f'{service_name}.csr')
openssl_cnf = self._create_openssl_cnf(hosts)
cnf_path = os.path.join(self.output_dir, f'{service_name}.cnf')
with open(cnf_path, 'w') as f:
f.write(openssl_cnf)
subprocess.run([
'openssl', 'req', '-new', '-key', key_path,
'-out', csr_path, '-config', cnf_path
], check=True)
# CA で署名
cert_path = os.path.join(self.output_dir, f'{service_name}-cert.pem')
subprocess.run([
'openssl', 'x509', '-req', '-in', csr_path,
'-CA', self.ca_cert_path,
'-CAkey', os.path.join(self.output_dir, 'ca-key.pem'),
'-CAcreateserial',
'-out', cert_path,
'-days', str(self.cert_validity_days),
'-sha256'
], check=True)
print(f"[ROTATE] Certificate rotated for {service_name}")
print(f"[ROTATE] Valid for {self.cert_validity_days} days")
def _create_openssl_cnf(self, hosts: list) -> str:
"""OpenSSL 設定ファイル生成(SAN 対応)"""
san_entries = [f'DNS:{host}' for host in hosts]
return f"""
[req]
distinguished_name = req_distinguished_name
req_extensions = v3_req
[req_distinguished_name]
CN = {hosts[0]}
[v3_req]
subjectAltName = @alt_names
[alt_names]
""" + '\n'.join(f'DNS.{i+1} = {host}' for i, host in enumerate(hosts))
# 使用例
# rotator = CertificateRotator(
# ca_cert_path='/etc/mesh-ca/ca-cert.pem',
# output_dir='/etc/mesh/certs'
# )
#
# # 証明書チェック
# needs_renewal, days = rotator.check_expiration('/etc/mesh/certs/service-cert.pem')
# if needs_renewal:
# print(f"Certificate expires in {days} days - rotating...")
# rotator.rotate_certificate(
# service_name='user-service',
# hosts=['user-service', 'user-service.default.svc.cluster.local']
# )
トラフィック制御のメカニズム
カナリアリリース(トラフィック分割)
from typing import Dict, List
import random
class CanaryTrafficController:
"""カナリアリリース用トラフィックコントローラー"""
def __init__(self):
self.routes: Dict[str, List[Dict]] = {}
def configure_canary(self, route_name: str, stable_weight: int, canary_version: str):
"""カナリアリリース設定"""
# stable_weight: ステージングバージョンへのトラフィック割合(0-100)
# canary_weight = 100 - stable_weight
canary_weight = 100 - stable_weight
self.routes[route_name] = [
{
'version': 'stable',
'weight': stable_weight,
'endpoint': 'user-service-stable:8080'
},
{
'version': canary_version,
'weight': canary_weight,
'endpoint': f'user-service-{canary_version}:8080'
}
]
print(f"[CANARY] {route_name}: stable={stable_weight}%, {canary_version}={canary_weight}%")
def route_request(self, route_name: str) -> str:
"""リクエストをルーティング(重み付きランダム)"""
if route_name not in self.routes:
raise ValueError(f"Route {route_name} not found")
route_config = self.routes[route_name]
# 重み付きランダム選択
total_weight = sum(r['weight'] for r in route_config)
rand = random.randint(1, total_weight)
cumulative = 0
for route in route_config:
cumulative += route['weight']
if rand <= cumulative:
return route['endpoint']
return route_config[-1]['endpoint']
def gradual_rollout(self, route_name: str, canary_version: str, steps: List[int]):
"""段階的ロールアウト(カナリア%を徐々に増加)"""
print(f"[ROLLOUT] Starting gradual rollout for {canary_version}")
for canary_weight in steps:
stable_weight = 100 - canary_weight
self.configure_canary(route_name, stable_weight, canary_version)
# 実際にはここでヘルスチェック・メトリクス監視
# 問題があればロールバック
print(f"[ROLLOUT] Waiting for metrics collection...")
# time.sleep(300) # 5 分待機
def rollback(self, route_name: str):
"""ロールバック(100% ステージング)"""
self.configure_canary(route_name, 100, 'unknown')
print(f"[ROLLBACK] {route_name} rolled back to stable")
# 使用例
controller = CanaryTrafficController()
# カナリアリリース設定(95% stable, 5% canary)
controller.configure_canary('user-service', 95, 'v2')
# リクエストルーティング
for _ in range(20):
endpoint = controller.route_request('user-service')
print(f"Request routed to: {endpoint}")
# 段階的ロールアウト
# controller.gradual_rollout('user-service', 'v2', steps=[5, 10, 25, 50, 100])
サーキットブレーカー
from enum import Enum
from datetime import datetime, timedelta
from typing import Optional
class CircuitState(Enum):
CLOSED = "closed" # 通常状態
OPEN = "open" # 遮断状態
HALF_OPEN = "half_open" # 半開き状態
class CircuitBreaker:
"""サーキットブレーカー実装"""
def __init__(self, failure_threshold: int = 5,
success_threshold: int = 2,
timeout_seconds: int = 30):
self.failure_threshold = failure_threshold
self.success_threshold = success_threshold
self.timeout_seconds = timeout_seconds
self.state = CircuitState.CLOSED
self.failure_count = 0
self.success_count = 0
self.last_failure_time: Optional[datetime] = None
def call(self, func, *args, **kwargs):
"""サーキットブレーカー経由で関数実行"""
if self.state == CircuitState.OPEN:
if self._should_attempt_reset():
self.state = CircuitState.HALF_OPEN
print("[CIRCUIT] State: OPEN → HALF_OPEN")
else:
raise Exception("Circuit breaker is OPEN")
try:
result = func(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise e
def _on_success(self):
"""成功時の処理"""
if self.state == CircuitState.HALF_OPEN:
self.success_count += 1
if self.success_count >= self.success_threshold:
self.state = CircuitState.CLOSED
self.failure_count = 0
self.success_count = 0
print("[CIRCUIT] State: HALF_OPEN → CLOSED (recovered)")
else:
# CLOSED 状態でも失敗カウントをリセット
self.failure_count = 0
def _on_failure(self):
"""失敗時の処理"""
self.failure_count += 1
self.last_failure_time = datetime.now()
if self.state == CircuitState.HALF_OPEN:
self.state = CircuitState.OPEN
print("[CIRCUIT] State: HALF_OPEN → OPEN (recovery failed)")
elif self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
print(f"[CIRCUIT] State: CLOSED → OPEN (failures: {self.failure_count})")
def _should_attempt_reset(self) -> bool:
"""タイムアウト経過でリセット試行可能か判定"""
if self.last_failure_time is None:
return True
elapsed = datetime.now() - self.last_failure_time
return elapsed.total_seconds() >= self.timeout_seconds
def get_state(self) -> Dict:
"""現在の状態を取得"""
return {
'state': self.state.value,
'failure_count': self.failure_count,
'success_count': self.success_count,
'last_failure': self.last_failure_time.isoformat() if self.last_failure_time else None
}
# 使用例
def unstable_service_call():
"""失敗しやすいサービスの模擬"""
import random
if random.random() < 0.7: # 70% の確率で失敗
raise Exception("Service unavailable")
return "Success"
breaker = CircuitBreaker(failure_threshold=3, timeout_seconds=5)
# シミュレーション
for i in range(10):
try:
result = breaker.call(unstable_service_call)
print(f"Call {i+1}: {result}")
except Exception as e:
print(f"Call {i+1}: FAILED - {e}")
print(f"State: {breaker.get_state()}")
レート制限(レート制限)
from collections import deque
from datetime import datetime, timedelta
class TokenBucketRateLimiter:
"""トークンバケットアルゴリズムによるレート制限"""
def __init__(self, rate: int, capacity: int):
"""
rate: 1 秒あたりのトークン生成数
capacity: バケットの最大容量
"""
self.rate = rate
self.capacity = capacity
self.tokens = capacity
self.last_refill = datetime.now()
def _refill(self):
"""トークン補充"""
now = datetime.now()
elapsed = (now - self.last_refill).total_seconds()
tokens_to_add = elapsed * self.rate
self.tokens = min(self.capacity, self.tokens + tokens_to_add)
self.last_refill = now
def acquire(self, tokens: int = 1) -> bool:
"""トークン取得(レート制限チェック)"""
self._refill()
if self.tokens >= tokens:
self.tokens -= tokens
return True
return False
def get_wait_time(self, tokens: int = 1) -> float:
"""指定トークン数を取得できるまでの待機時間"""
self._refill()
if self.tokens >= tokens:
return 0.0
tokens_needed = tokens - self.tokens
return tokens_needed / self.rate
class SlidingWindowRateLimiter:
"""スライディングウィンドウアルゴリズム"""
def __init__(self, max_requests: int, window_seconds: int):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.requests: deque = deque()
def _clean_old_requests(self):
"""ウィンドウ外の古いリクエストを削除"""
cutoff = datetime.now() - timedelta(seconds=self.window_seconds)
while self.requests and self.requests[0] < cutoff:
self.requests.popleft()
def acquire(self) -> bool:
"""リクエスト許可チェック"""
self._clean_old_requests()
if len(self.requests) < self.max_requests:
self.requests.append(datetime.now())
return True
return False
def get_remaining(self) -> int:
"""残リクエスト数"""
self._clean_old_requests()
return self.max_requests - len(self.requests)
# 使用例
limiter = TokenBucketRateLimiter(rate=10, capacity=20) # 10 req/s, burst 20
for i in range(25):
if limiter.acquire():
print(f"Request {i+1}: ALLOWED")
else:
wait_time = limiter.get_wait_time()
print(f"Request {i+1}: RATE LIMITED (wait {wait_time:.2f}s)")
サービスメッシュの実装比較
主要サービスメッシュの比較
【主要サービスメッシュ比較】
| 機能 | Istio + Envoy | Linkerd | Consul Connect |
|----------------|--------------------|-------------------|-------------------|
| データプレーン | Envoy | Linkerd2-proxy | Envoy |
| コントロール | Pilot, Citadel | Linkerd controller| Consul control plane |
| mTLS | 自動(デフォルト) | 自動(オプトイン) | 自動 |
| 設定モデル | CRD(Kubernetes) | Kubernetes native | Consul catalog |
| パフォーマンス | 高機能だが重い | 軽量・高速 | 標準 |
| 学習曲線 | 急 | 緩やか | 標準 |
| マルチクラスター| 対応 | 対応 | 対応 |
Istio の基本設定例
# VirtualService: トラフィックルーティング
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
name: user-service-vs
spec:
hosts:
- user-service
http:
- match:
- headers:
canary:
exact: "true"
route:
- destination:
host: user-service
subset: canary
- route:
- destination:
host: user-service
subset: stable
weight: 95
- destination:
host: user-service
subset: canary
weight: 5
---
# DestinationRule: サブセット定義とサーキットブレーカー
apiVersion: networking.istio.io/v1beta1
kind: DestinationRule
metadata:
name: user-service-dr
spec:
host: user-service
trafficPolicy:
connectionPool:
tcp:
maxConnections: 100
http:
h2UpgradePolicy: UPGRADE
http1MaxPendingRequests: 100
http2MaxRequests: 1000
outlierDetection: # サーキットブレーカー
consecutive5xxErrors: 5
interval: 30s
baseEjectionTime: 30s
maxEjectionPercent: 50
subsets:
- name: stable
labels:
version: v1
- name: canary
labels:
version: v2
---
# PeerAuthentication: mTLS 設定
apiVersion: security.istio.io/v1beta1
kind: PeerAuthentication
metadata:
name: default-mtls
namespace: default
spec:
mtls:
mode: STRICT # mTLS を必須に設定
可観測性——メトリクス・ログ・トレーシング
メトリクス収集(Prometheus 統合)
from dataclasses import dataclass, field
from datetime import datetime
from typing import Dict, List
import json
@dataclass
class MetricPoint:
"""メトリクスポイント"""
timestamp: datetime
value: float
labels: Dict[str, str] = field(default_factory=dict)
class SidecarMetricsCollector:
"""サイドカーメトリクスコレクター"""
def __init__(self):
self.metrics: Dict[str, List[MetricPoint]] = {}
def record(self, metric_name: str, value: float, labels: Dict[str, str] = None):
"""メトリクスを記録"""
if metric_name not in self.metrics:
self.metrics[metric_name] = []
point = MetricPoint(
timestamp=datetime.now(),
value=value,
labels=labels or {}
)
self.metrics[metric_name].append(point)
# 古いメトリクスは削除(簡易実装)
if len(self.metrics[metric_name]) > 1000:
self.metrics[metric_name] = self.metrics[metric_name][-500:]
def record_request(self, destination: str, method: str,
status_code: int, duration_ms: float):
"""リクエストメトリクスを記録"""
self.record('http_requests_total', 1, labels={
'destination': destination,
'method': method,
'status_code': str(status_code)
})
self.record('http_request_duration_ms', duration_ms, labels={
'destination': destination,
'method': method
})
def record_connection(self, destination: str, active: bool):
"""接続メトリクスを記録"""
gauge_value = 1 if active else -1
self.record('tcp_connections', gauge_value, labels={
'destination': destination
})
def export_prometheus(self) -> str:
"""Prometheus 形式でエクスポート"""
output = []
for metric_name, points in self.metrics.items():
if not points:
continue
# 最後の値のみエクスポート(ガウジ/カウンター)
latest = points[-1]
labels_str = ','.join(f'{k}="{v}"' for k, v in latest.labels.items())
if labels_str:
output.append(f'{metric_name}{{{labels_str}}} {latest.value}')
else:
output.append(f'{metric_name} {latest.value}')
return '\n'.join(output)
# 使用例
collector = SidecarMetricsCollector()
# メトリクス記録
collector.record_request(
destination='user-service',
method='GET',
status_code=200,
duration_ms=45.2
)
collector.record_request(
destination='user-service',
method='POST',
status_code=500,
duration_ms=120.5
)
# Prometheus エクスポート
print(collector.export_prometheus())
# 出力例:
# http_requests_total{destination="user-service",method="POST",status_code="500"} 1
# http_request_duration_ms{destination="user-service",method="POST"} 120.5
分散トレーシング(Jaeger/Zipkin 統合)
import uuid
from dataclasses import dataclass, field
from typing import List, Optional
from datetime import datetime
@dataclass
class Span:
"""トレーススパン"""
trace_id: str
span_id: str
parent_span_id: Optional[str]
operation_name: str
service_name: str
start_time: datetime
end_time: Optional[datetime] = None
tags: Dict[str, str] = field(default_factory=dict)
logs: List[Dict] = field(default_factory=list)
def finish(self):
"""スパンを終了"""
self.end_time = datetime.now()
def duration_ms(self) -> float:
"""スパンの継続時間(ミリ秒)"""
if self.end_time is None:
return 0.0
return (self.end_time - self.start_time).total_seconds() * 1000
class TracingContext:
"""トレーシングコンテキスト(スレッドローカル)"""
def __init__(self):
self.current_span: Optional[Span] = None
class DistributedTracer:
"""分散トレーサー"""
def __init__(self, service_name: str):
self.service_name = service_name
self.context = TracingContext()
self.finished_spans: List[Span] = []
def start_span(self, operation_name: str,
parent_span_id: Optional[str] = None) -> Span:
"""新しいスパンを開始"""
trace_id = self.context.current_span.trace_id if self.context.current_span else str(uuid.uuid4())
span_id = str(uuid.uuid4())
span = Span(
trace_id=trace_id,
span_id=span_id,
parent_span_id=parent_span_id or (self.context.current_span.span_id if self.context.current_span else None),
operation_name=operation_name,
service_name=self.service_name,
start_time=datetime.now()
)
self.context.current_span = span
return span
def finish_span(self, span: Span):
"""スパンを終了"""
span.finish()
self.finished_spans.append(span)
# コンテキストを親スパンに戻す
if span.parent_span_id:
# 実際にはスパンスタックを管理する必要がある
pass
else:
self.context.current_span = None
def inject_context(self, headers: Dict[str, str]):
"""トレースコンテキストを HTTP ヘッダーに注入"""
if self.context.current_span:
headers['x-trace-id'] = self.context.current_span.trace_id
headers['x-span-id'] = self.context.current_span.span_id
def extract_context(self, headers: Dict[str, str]) -> Optional[Span]:
"""HTTP ヘッダーからトレースコンテキストを抽出"""
trace_id = headers.get('x-trace-id')
parent_span_id = headers.get('x-span-id')
if trace_id and parent_span_id:
return self.start_span(
operation_name='incoming-request',
parent_span_id=parent_span_id
)
return None
def export_jaeger(self) -> List[Dict]:
"""Jaeger 形式でエクスポート"""
return [
{
'trace_id': span.trace_id,
'span_id': span.span_id,
'parent_span_id': span.parent_span_id,
'operation_name': span.operation_name,
'service_name': span.service_name,
'start_time': span.start_time.isoformat(),
'duration_ms': span.duration_ms(),
'tags': span.tags,
'logs': span.logs
}
for span in self.finished_spans
]
# 使用例
# サーバー側
server_tracer = DistributedTracer('user-service')
# クライアントからトレースヘッダーを受け取り
incoming_headers = {'x-trace-id': 'abc123', 'x-span-id': 'span-456'}
span = server_tracer.extract_context(incoming_headers)
if span:
# ビジネスロジック実行
# ...
server_tracer.finish_span(span)
# クライアント側(呼び出し元)
client_tracer = DistributedTracer('api-gateway')
outgoing_span = client_tracer.start_span('call-user-service')
headers = {}
client_tracer.inject_context(headers)
# headers に x-trace-id, x-span-id が設定される
# HTTP リクエスト実行(headers を渡す)
# response = http.get('http://user-service/...', headers=headers)
client_tracer.finish_span(outgoing_span)
まとめ
サービスメッシュの核心:
- 基本概念: マイクロサービス間通信をインフラ層として抽象化
- サイドカーアーキテクチャ: アプリコードから通信ロジックを分離
- mTLS: 証明書による相互認証でゼロトラストを実現
- トラフィック制御: カナリアリリース、サーキットブレーカー、レート制限
- 可観測性: メトリクス・ログ・トレーシングの自動収集
- 実装比較: Istio(高機能)、Linkerd(軽量)、Consul(統合)
サービスメッシュは、マイクロサービスの複雑さを管理するための不可欠なインフラとなりつつある。ただし、サービスメッシュ自体も複雑さを導入するため、小規模なシステムではオーバーエンジニアリングとなる場合もある。
重要なのは、「サービス数が 10 を超え、通信ポリシーの統一が必要になった時点で導入を検討する」という段階的なアプローチだ。サービスメッシュは銀の弾丸ではないが、適切に導入されれば、マイクロサービスの運用負劇を大幅に軽減する。
参考資料
- “Service Mesh Patterns” Randy Shoup, Chad Knaus 著
- “Istio: Up and Running” Lee Calcote, Zack Butcher 著
- Envoy Proxy 公式ドキュメント: https://www.envoyproxy.io/
- Istio 公式ドキュメント: https://istio.io/
- Linkerd 公式ドキュメント: https://linkerd.io/
- “The Service Mesh Interface (SMI) Specification”: https://smi-spec.io/
- Buoyant Blog: https://blog.buoyant.io/(Linkerd 開発チーム)
- Solo.io Blog: https://www.solo.io/blog/(Istio 関連)
免責事項 — 掲載情報は執筆時点のものです。料金・機能は変更される場合があります。最新情報は各公式サイトをご確認ください。