目次

現代のマイクロサービスアーキテクチャにおいて、サービス間通信の管理は複雑さを増している。サービスメッシュ(Service Mesh)は、このサービス間通信をインフラ層として抽象化し、セキュリティ、可観測性、トラフィック制御を一元的に管理するための基盤技術である。

本記事では、サービスメッシュの基本概念から、サイドカープロキシのアーキテクチャ、mTLS によるセキュアな通信、トラフィック制御のメカニズムまでを体系的に解説する。

サービスメッシュの基本概念——なぜ「メッシュ」が必要か

マイクロサービスの通信課題

【マイクロサービス通信の課題】

サービス A → サービス B → サービス C
    ↓           ↓           ↓
  認証?     認証?      認証?
  ログ?     ログ?      ログ?
  再試行?   再試行?    再試行?
  速率制限?  速率制限?   速率制限?

【課題】
1. 各サービスが通信ロジックを重複実装
2. 認証・暗号化のポリシー統一が困難
3. 障害時の挙動(リトライ、タイムアウト)がバラバラ
4. 通信ログの収集方法がサービスごとに異なる

サービスメッシュの解決策: 通信ロジックをアプリケーションコードから分離し、インフラ層として一元管理する。

サービスメッシュの定義

サービスメッシュ:
「マイクロサービス間のサービス間通信を管理するための
  専用インフラ層(dedicated infrastructure layer)」

【主な機能】
1. サービスディスカバリ(サービス発見)
2. トラフィック制御(ルーティング、負荷分散)
3. セキュリティ(mTLS、認証、認可)
4. 可観測性(メトリクス、ログ、トレーシング)
5. 回復性(リトライ、サーキットブレーカー、レート制限)

サービスメッシュのアプローチ比較

# サービスメッシュなし(ライブラリアプローチ)
class ServiceClient:
    def __init__(self, service_name):
        self.service_name = service_name
        # 各サービスが通信ロジックを重複実装
        self.retry_policy = RetryPolicy(max_retries=3)
        self.auth_handler = AuthHandler()
        self.logger = ServiceLogger()
        self.rate_limiter = RateLimiter()

    def call(self, endpoint, data):
        # 認証処理
        token = self.auth_handler.get_token()
        # 再試行ロジック
        for attempt in range(self.retry_policy.max_retries):
            try:
                # レート制限チェック
                self.rate_limiter.check_limit()
                #  actual call
                response = self._make_request(endpoint, data, token)
                return response
            except Exception as e:
                self.logger.log_retry_attempt(attempt, e)
                if attempt == self.retry_policy.max_retries - 1:
                    raise

# 課題:各サービスでコードが重複、ポリシー変更時の一斉更新が必要
# サービスメッシュあり(サイドカーアプローチ)
class ServiceClient:
    def __init__(self, service_name):
        self.service_name = service_name
        # 通信ロジックは不要 - サービスメッシュが管理

    def call(self, endpoint, data):
        # ローカルホストのサイドカープロキシに送信するだけ
        # あとはサイドカーがすべて処理
        response = self._make_request(f"http://localhost:15001/{endpoint}", data)
        return response

# メリット:通信ポリシーはメッシュ設定で一元管理、アプリコードはビジネスロジックに集中

サイドカープロキシのアーキテクチャ

サイドカーパターンの基本概念

【サイドカーアーキテクチャ】

┌─────────────────────────────────────────────────────┐
│                    Pod / VM                         │
│  ┌───────────────────┐  ┌───────────────────────┐   │
│  │  Main Container   │  │   Sidecar Proxy       │   │
│  │  (Application)    │  │   (Envoy, Linkerd)    │   │
│  │                   │  │                       │   │
│  │  - ビジネスロジック  │  │  - トラフィック制御      │   │
│  │  - HTTP サーバー     │◄─┤  - mTLS 終端          │   │
│  │  - アプリログ       │  │  - メトリクス収集       │   │
│  │                   │  │  - 認証・認可          │   │
│  └─────────┬─────────┘  └───────────┬───────────┘   │
│            │                        │               │
│            └────────────┬───────────┘               │
│                         │                           │
│                  localhost:15001                    │
│                   (ループバック)                   │
└─────────────────────────┼───────────────────────────┘
                          │
                          ▼
              外部サービス・他のサイドカー

サイドカーの利点:

  1. 言語非依存: アプリケーションの言語に関係なく動作
  2. 透過的: アプリコード変更なしに機能追加
  3. 分離: 障害がアプリに直接伝播しない
  4. 一元管理: ポリシーをメッシュ全体で統一

Envoy プロキシの基本構造

import threading
import time
from dataclasses import dataclass
from typing import Dict, List, Optional
import hashlib

@dataclass
class ClusterConfig:
    """クラスター(アップストリームサービス)設定"""
    name: str
    hosts: List[str]
    health_check_interval: float = 5.0
    circuit_breaker_threshold: int = 5

@dataclass
class ListenerConfig:
    """リスナー設定"""
    address: str
    port: int
    route_config_name: str

@dataclass
class RouteConfig:
    """ルート設定"""
    name: str
    routes: Dict[str, str]  # path_prefix -> cluster

class SimpleSidecarProxy:
    """簡易サイドカープロキシの実装"""

    def __init__(self):
        self.clusters: Dict[str, ClusterConfig] = {}
        self.listeners: Dict[str, ListenerConfig] = {}
        self.routes: Dict[str, RouteConfig] = {}
        self.circuit_breakers: Dict[str, int] = {}  # cluster -> failure_count
        self.health_status: Dict[str, bool] = {}  # host -> status

    def add_cluster(self, config: ClusterConfig):
        """アップストリームクラスターの登録"""
        self.clusters[config.name] = config
        self.circuit_breakers[config.name] = 0
        # 初期ヘルスチェック
        for host in config.hosts:
            self.health_status[host] = True

        # 定期ヘルスチェック開始
        threading.Thread(target=self._health_check_loop, args=(config,), daemon=True).start()

    def _health_check_loop(self, cluster: ClusterConfig):
        """定期ヘルスチェック実行"""
        while True:
            time.sleep(cluster.health_check_interval)
            for host in cluster.hosts:
                # 簡易ヘルスチェック(実際は HTTP/TCP チェック)
                is_healthy = self._perform_health_check(host)
                self.health_status[host] = is_healthy

                if not is_healthy:
                    print(f"[HEALTH] {host} is unhealthy")

    def _perform_health_check(self, host: str) -> bool:
        """ヘルスチェック実行(簡易実装)"""
        # 実際は HTTP GET /health や TCP connect を実行
        return True

    def route_request(self, path: str) -> Optional[str]:
        """リクエストを適切なクラスターにルーティング"""
        for route_config in self.routes.values():
            for prefix, cluster_name in route_config.routes.items():
                if path.startswith(prefix):
                    cluster = self.clusters.get(cluster_name)
                    if cluster:
                        # 正常なホストから選択(ラウンドロビン)
                        healthy_hosts = [h for h in cluster.hosts if self.health_status.get(h, False)]
                        if healthy_hosts:
                            # 簡易ラウンドロビン
                            host_hash = hashlib.md5(path.encode()).hexdigest()
                            return healthy_hosts[int(host_hash, 16) % len(healthy_hosts)]
        return None

    def check_circuit_breaker(self, cluster_name: str) -> bool:
        """サーキットブレーカーの状態を確認"""
        failure_count = self.circuit_breakers.get(cluster_name, 0)
        threshold = self.clusters[cluster_name].circuit_breaker_threshold

        if failure_count >= threshold:
            print(f"[CIRCUIT BREAKER] OPEN for {cluster_name} (failures: {failure_count})")
            return False  # リクエスト拒否

        return True  # リクエスト許可

    def record_success(self, cluster_name: str):
        """成功を記録(サーキットブレーカーリセット)"""
        self.circuit_breakers[cluster_name] = 0

    def record_failure(self, cluster_name: str):
        """失敗を記録(サーキットブレーカーカウントアップ)"""
        self.circuit_breakers[cluster_name] += 1

# 使用例
proxy = SimpleSidecarProxy()

# クラスター設定
proxy.add_cluster(ClusterConfig(
    name="user-service",
    hosts=["user-1.internal:8080", "user-2.internal:8080"],
    health_check_interval=5.0,
    circuit_breaker_threshold=3
))

proxy.add_cluster(ClusterConfig(
    name="order-service",
    hosts=["order-1.internal:8080"],
    health_check_interval=10.0,
    circuit_breaker_threshold=5
))

# ルート設定
proxy.routes["main-routes"] = RouteConfig(
    name="main-routes",
    routes={
        "/users": "user-service",
        "/orders": "order-service"
    }
)

# リクエストルーティング
target = proxy.route_request("/users/profile")
print(f"Routing to: {target}")  # user-1.internal:8080 or user-2.internal:8080

データプレーンとコントロールプレーン

【サービスメッシュの 2 層アーキテクチャ】

┌─────────────────────────────────────────────────────┐
│              コントロールプレーン                    │
│         (Istio Pilot, Linkerd Control Plane)       │
│                                                      │
│  - 設定管理(ポリシー、ルール)                       │
│  - サービスディスカバリ                              │
│  - 証明書発行(mTLS 用)                              │
│  - 設定をサイドカーに配信(xDS API)                 │
└─────────────────────┬───────────────────────────────┘
                      │ xDS API (gRPC)
                      │ - LDS: Listener Discovery
                      │ - RDS: Route Discovery
                      │ - CDS: Cluster Discovery
                      │ - EDS: Endpoint Discovery
                      ▼
┌─────────────────────────────────────────────────────┐
│              データプレーン(サイドカー)             │
│         (Envoy, Linkerd Data Plane)                │
│                                                      │
│  ┌─────────┐  ┌─────────┐  ┌─────────┐             │
│  │ Sidecar │  │ Sidecar │  │ Sidecar │             │
│  │  Pod 1  │  │  Pod 2  │  │  Pod 3  │             │
│  └─────────┘  └─────────┘  └─────────┘             │
│                                                      │
│  - 実際のトラフィック処理                            │
│  - mTLS 終端                                          │
│  - メトリクス収集                                    │
└─────────────────────────────────────────────────────┘

コントロールプレーンの役割:

  • 設定の一元管理と配信
  • サービスレジストリとの統合(Kubernetes, Consul)
  • 証明書ライフサイクル管理

データプレーンの役割:

  • 実際のトラフィック転送
  • ポリシーの適用(認証、レート制限)
  • メトリクス・ログの収集

mTLS(相互 TLS)によるセキュアな通信

TLS と mTLS の違い

【TLS(一方向認証)】

クライアント                    サーバー
    │                            │
    │──── ClientHello ──────────►│
    │◄──── ServerHello ──────────│
    │◄──── 証明書(サーバー) ─────│
    │                            │
    │  [サーバー認証のみ]          │
    │  証明書検証                │
    │                            │
    │──── 暗号化通信 ───────────►│
    │◄─── 暗号化通信 ────────────│

【mTLS(双方向認証)】

クライアント                    サーバー
    │                            │
    │──── ClientHello ──────────►│
    │◄──── ServerHello ──────────│
    │◄──── 証明書(サーバー) ─────│
    │                            │
    │  [サーバー認証]              │
    │  証明書検証                │
    │                            │
    │──── 証明書(クライアント) ──►│
    │                            │
    │  [クライアント認証]          │
    │  証明書検証                │
    │                            │
    │──── 暗号化通信 ───────────►│
    │◄─── 暗号化通信 ────────────│

mTLS のメリット:

  1. 相互認証: クライアント・サーバー双方が身元を確認
  2. なりすまし防止: 有効な証明書がないサービスは通信不可
  3. ゼロトラスト: 「内部ネットワークは安全」という前提を廃止
  4. 自動ローテーション: 証明書はメッシュが自動更新

mTLS ハンドシェイクの実装

import ssl
import socket
from typing import Tuple, Optional
from dataclasses import dataclass
import datetime

@dataclass
class CertificateConfig:
    """証明書設定"""
    ca_cert_path: str  # 認証局の証明書
    cert_path: str     # 自証明書
    key_path: str      # 秘密鍵
    verify_mode: bool = True

class MutualTLSConnection:
    """mTLS 接続の実装"""

    def __init__(self, config: CertificateConfig):
        self.config = config
        self.context = self._create_ssl_context()

    def _create_ssl_context(self) -> ssl.SSLContext:
        """mTLS 用の SSL コンテキスト作成"""
        context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)

        # 証明書検証を有効化
        if self.config.verify_mode:
            context.verify_mode = ssl.CERT_REQUIRED
            context.check_hostname = True

        # CA 証明書の読み込み
        context.load_verify_locations(self.config.ca_cert_path)

        # クライアント証明書の読み込み(mTLS 用)
        context.load_cert_chain(
            certfile=self.config.cert_path,
            keyfile=self.config.key_path
        )

        # モダンな暗号スイートのみ許可
        context.set_ciphers('ECDHE+AESGCM:DHE+AESGCM:ECDHE+CHACHA20:DHE+CHACHA20')

        # TLS 1.3 を優先
        context.minimum_version = ssl.TLSVersion.TLSv1_3

        return context

    def connect(self, host: str, port: int) -> ssl.SSLSocket:
        """mTLS 接続を確立"""
        # TCP 接続
        sock = socket.create_connection((host, port))

        # mTLS ハンドシェイク
        secure_sock = self.context.wrap_socket(
            sock,
            server_hostname=host,
            do_handshake_on_connect=True
        )

        # 接続情報のログ
        print(f"[mTLS] Connected to {host}:{port}")
        print(f"[mTLS] Protocol: {secure_sock.version()}")
        print(f"[mTLS] Cipher: {secure_sock.cipher()[0]}")

        # サーバー証明書の検証
        cert = secure_sock.getpeercert()
        print(f"[mTLS] Server certificate subject: {cert['subject']}")

        return secure_sock

class MTLSAuthServer:
    """mTLS サーバー side 実装"""

    def __init__(self, config: CertificateConfig):
        self.config = config
        self.context = self._create_ssl_context()

    def _create_ssl_context(self) -> ssl.SSLContext:
        """mTLS サーバー用 SSL コンテキスト"""
        context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)

        # クライアント証明書を必須に設定
        context.verify_mode = ssl.CERT_REQUIRED
        context.check_hostname = False  # サーバー側はホスト名検証しない場合が多い

        # CA 証明書(クライアント証明書検証用)
        context.load_verify_locations(self.config.ca_cert_path)

        # サーバー証明書
        context.load_cert_chain(
            certfile=self.config.cert_path,
            keyfile=self.config.key_path
        )

        return context

    def accept_connection(self, host: str = '0.0.0.0', port: int = 8443) -> ssl.SSLSocket:
        """mTLS 接続を受理"""
        server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        server_sock.bind((host, port))
        server_sock.listen(5)

        print(f"[mTLS Server] Listening on {host}:{port}")

        client_sock, addr = server_sock.accept()

        # mTLS ハンドシェイク
        secure_sock = self.context.wrap_socket(
            client_sock,
            server_side=True,
            do_handshake_on_connect=True
        )

        # クライアント証明書の検証
        cert = secure_sock.getpeercert()
        print(f"[mTLS Server] Client connected from {addr}")
        print(f"[mTLS Server] Client certificate subject: {cert['subject']}")

        return secure_sock

# 使用例(架空の証明書パス)
# client_config = CertificateConfig(
#     ca_cert_path='/etc/mesh-ca/ca-cert.pem',
#     cert_path='/etc/mesh/client-cert.pem',
#     key_path='/etc/mesh/client-key.pem'
# )
#
# client = MutualTLSConnection(client_config)
# secure_conn = client.connect('user-service.default.svc.cluster.local', 8443)
# secure_conn.send(b'GET /api/users HTTP/1.1\r\nHost: user-service\r\n\r\n')
# response = secure_conn.recv(4096)

証明書の自動ローテーション

import os
import subprocess
from datetime import datetime, timedelta

class CertificateRotator:
    """証明書自動ローテーター"""

    def __init__(self, ca_cert_path: str, output_dir: str):
        self.ca_cert_path = ca_cert_path
        self.output_dir = output_dir
        self.cert_validity_days = 365
        self.renewal_threshold_days = 30

    def check_expiration(self, cert_path: str) -> Tuple[bool, int]:
        """証明書の有効期限をチェック"""
        # OpenSSL で有効期限を取得
        result = subprocess.run(
            ['openssl', 'x509', '-in', cert_path, '-noout', '-enddate'],
            capture_output=True,
            text=True
        )

        if result.returncode != 0:
            return False, 0

        # 有効期限日付をパース
        end_date_str = result.stdout.strip().replace('notAfter=', '')
        end_date = datetime.strptime(end_date_str, '%b %d %H:%M:%S %Y %Z')

        # 残存日数を計算
        days_remaining = (end_date - datetime.now()).days

        needs_renewal = days_remaining <= self.renewal_threshold_days

        return needs_renewal, days_remaining

    def rotate_certificate(self, service_name: str, hosts: list):
        """証明書をローテーション"""
        # 秘密鍵生成
        key_path = os.path.join(self.output_dir, f'{service_name}-key.pem')
        subprocess.run([
            'openssl', 'genrsa', '-out', key_path, '2048'
        ], check=True)

        # CSR(Certificate Signing Request)生成
        csr_path = os.path.join(self.output_dir, f'{service_name}.csr')
        openssl_cnf = self._create_openssl_cnf(hosts)
        cnf_path = os.path.join(self.output_dir, f'{service_name}.cnf')

        with open(cnf_path, 'w') as f:
            f.write(openssl_cnf)

        subprocess.run([
            'openssl', 'req', '-new', '-key', key_path,
            '-out', csr_path, '-config', cnf_path
        ], check=True)

        # CA で署名
        cert_path = os.path.join(self.output_dir, f'{service_name}-cert.pem')
        subprocess.run([
            'openssl', 'x509', '-req', '-in', csr_path,
            '-CA', self.ca_cert_path,
            '-CAkey', os.path.join(self.output_dir, 'ca-key.pem'),
            '-CAcreateserial',
            '-out', cert_path,
            '-days', str(self.cert_validity_days),
            '-sha256'
        ], check=True)

        print(f"[ROTATE] Certificate rotated for {service_name}")
        print(f"[ROTATE] Valid for {self.cert_validity_days} days")

    def _create_openssl_cnf(self, hosts: list) -> str:
        """OpenSSL 設定ファイル生成(SAN 対応)"""
        san_entries = [f'DNS:{host}' for host in hosts]
        return f"""
[req]
distinguished_name = req_distinguished_name
req_extensions = v3_req

[req_distinguished_name]
CN = {hosts[0]}

[v3_req]
subjectAltName = @alt_names

[alt_names]
""" + '\n'.join(f'DNS.{i+1} = {host}' for i, host in enumerate(hosts))

# 使用例
# rotator = CertificateRotator(
#     ca_cert_path='/etc/mesh-ca/ca-cert.pem',
#     output_dir='/etc/mesh/certs'
# )
#
# # 証明書チェック
# needs_renewal, days = rotator.check_expiration('/etc/mesh/certs/service-cert.pem')
# if needs_renewal:
#     print(f"Certificate expires in {days} days - rotating...")
#     rotator.rotate_certificate(
#         service_name='user-service',
#         hosts=['user-service', 'user-service.default.svc.cluster.local']
#     )

トラフィック制御のメカニズム

カナリアリリース(トラフィック分割)

from typing import Dict, List
import random

class CanaryTrafficController:
    """カナリアリリース用トラフィックコントローラー"""

    def __init__(self):
        self.routes: Dict[str, List[Dict]] = {}

    def configure_canary(self, route_name: str, stable_weight: int, canary_version: str):
        """カナリアリリース設定"""
        # stable_weight: ステージングバージョンへのトラフィック割合(0-100)
        # canary_weight = 100 - stable_weight
        canary_weight = 100 - stable_weight

        self.routes[route_name] = [
            {
                'version': 'stable',
                'weight': stable_weight,
                'endpoint': 'user-service-stable:8080'
            },
            {
                'version': canary_version,
                'weight': canary_weight,
                'endpoint': f'user-service-{canary_version}:8080'
            }
        ]

        print(f"[CANARY] {route_name}: stable={stable_weight}%, {canary_version}={canary_weight}%")

    def route_request(self, route_name: str) -> str:
        """リクエストをルーティング(重み付きランダム)"""
        if route_name not in self.routes:
            raise ValueError(f"Route {route_name} not found")

        route_config = self.routes[route_name]

        # 重み付きランダム選択
        total_weight = sum(r['weight'] for r in route_config)
        rand = random.randint(1, total_weight)

        cumulative = 0
        for route in route_config:
            cumulative += route['weight']
            if rand <= cumulative:
                return route['endpoint']

        return route_config[-1]['endpoint']

    def gradual_rollout(self, route_name: str, canary_version: str, steps: List[int]):
        """段階的ロールアウト(カナリア%を徐々に増加)"""
        print(f"[ROLLOUT] Starting gradual rollout for {canary_version}")

        for canary_weight in steps:
            stable_weight = 100 - canary_weight
            self.configure_canary(route_name, stable_weight, canary_version)

            # 実際にはここでヘルスチェック・メトリクス監視
            # 問題があればロールバック

            print(f"[ROLLOUT] Waiting for metrics collection...")
            # time.sleep(300)  # 5 分待機

    def rollback(self, route_name: str):
        """ロールバック(100% ステージング)"""
        self.configure_canary(route_name, 100, 'unknown')
        print(f"[ROLLBACK] {route_name} rolled back to stable")

# 使用例
controller = CanaryTrafficController()

# カナリアリリース設定(95% stable, 5% canary)
controller.configure_canary('user-service', 95, 'v2')

# リクエストルーティング
for _ in range(20):
    endpoint = controller.route_request('user-service')
    print(f"Request routed to: {endpoint}")

# 段階的ロールアウト
# controller.gradual_rollout('user-service', 'v2', steps=[5, 10, 25, 50, 100])

サーキットブレーカー

from enum import Enum
from datetime import datetime, timedelta
from typing import Optional

class CircuitState(Enum):
    CLOSED = "closed"      # 通常状態
    OPEN = "open"          # 遮断状態
    HALF_OPEN = "half_open"  # 半開き状態

class CircuitBreaker:
    """サーキットブレーカー実装"""

    def __init__(self, failure_threshold: int = 5,
                 success_threshold: int = 2,
                 timeout_seconds: int = 30):
        self.failure_threshold = failure_threshold
        self.success_threshold = success_threshold
        self.timeout_seconds = timeout_seconds

        self.state = CircuitState.CLOSED
        self.failure_count = 0
        self.success_count = 0
        self.last_failure_time: Optional[datetime] = None

    def call(self, func, *args, **kwargs):
        """サーキットブレーカー経由で関数実行"""
        if self.state == CircuitState.OPEN:
            if self._should_attempt_reset():
                self.state = CircuitState.HALF_OPEN
                print("[CIRCUIT] State: OPEN → HALF_OPEN")
            else:
                raise Exception("Circuit breaker is OPEN")

        try:
            result = func(*args, **kwargs)
            self._on_success()
            return result
        except Exception as e:
            self._on_failure()
            raise e

    def _on_success(self):
        """成功時の処理"""
        if self.state == CircuitState.HALF_OPEN:
            self.success_count += 1
            if self.success_count >= self.success_threshold:
                self.state = CircuitState.CLOSED
                self.failure_count = 0
                self.success_count = 0
                print("[CIRCUIT] State: HALF_OPEN → CLOSED (recovered)")
        else:
            # CLOSED 状態でも失敗カウントをリセット
            self.failure_count = 0

    def _on_failure(self):
        """失敗時の処理"""
        self.failure_count += 1
        self.last_failure_time = datetime.now()

        if self.state == CircuitState.HALF_OPEN:
            self.state = CircuitState.OPEN
            print("[CIRCUIT] State: HALF_OPEN → OPEN (recovery failed)")
        elif self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN
            print(f"[CIRCUIT] State: CLOSED → OPEN (failures: {self.failure_count})")

    def _should_attempt_reset(self) -> bool:
        """タイムアウト経過でリセット試行可能か判定"""
        if self.last_failure_time is None:
            return True

        elapsed = datetime.now() - self.last_failure_time
        return elapsed.total_seconds() >= self.timeout_seconds

    def get_state(self) -> Dict:
        """現在の状態を取得"""
        return {
            'state': self.state.value,
            'failure_count': self.failure_count,
            'success_count': self.success_count,
            'last_failure': self.last_failure_time.isoformat() if self.last_failure_time else None
        }

# 使用例
def unstable_service_call():
    """失敗しやすいサービスの模擬"""
    import random
    if random.random() < 0.7:  # 70% の確率で失敗
        raise Exception("Service unavailable")
    return "Success"

breaker = CircuitBreaker(failure_threshold=3, timeout_seconds=5)

# シミュレーション
for i in range(10):
    try:
        result = breaker.call(unstable_service_call)
        print(f"Call {i+1}: {result}")
    except Exception as e:
        print(f"Call {i+1}: FAILED - {e}")

    print(f"State: {breaker.get_state()}")

レート制限(レート制限)

from collections import deque
from datetime import datetime, timedelta

class TokenBucketRateLimiter:
    """トークンバケットアルゴリズムによるレート制限"""

    def __init__(self, rate: int, capacity: int):
        """
        rate: 1 秒あたりのトークン生成数
        capacity: バケットの最大容量
        """
        self.rate = rate
        self.capacity = capacity
        self.tokens = capacity
        self.last_refill = datetime.now()

    def _refill(self):
        """トークン補充"""
        now = datetime.now()
        elapsed = (now - self.last_refill).total_seconds()

        tokens_to_add = elapsed * self.rate
        self.tokens = min(self.capacity, self.tokens + tokens_to_add)
        self.last_refill = now

    def acquire(self, tokens: int = 1) -> bool:
        """トークン取得(レート制限チェック)"""
        self._refill()

        if self.tokens >= tokens:
            self.tokens -= tokens
            return True
        return False

    def get_wait_time(self, tokens: int = 1) -> float:
        """指定トークン数を取得できるまでの待機時間"""
        self._refill()

        if self.tokens >= tokens:
            return 0.0

        tokens_needed = tokens - self.tokens
        return tokens_needed / self.rate

class SlidingWindowRateLimiter:
    """スライディングウィンドウアルゴリズム"""

    def __init__(self, max_requests: int, window_seconds: int):
        self.max_requests = max_requests
        self.window_seconds = window_seconds
        self.requests: deque = deque()

    def _clean_old_requests(self):
        """ウィンドウ外の古いリクエストを削除"""
        cutoff = datetime.now() - timedelta(seconds=self.window_seconds)
        while self.requests and self.requests[0] < cutoff:
            self.requests.popleft()

    def acquire(self) -> bool:
        """リクエスト許可チェック"""
        self._clean_old_requests()

        if len(self.requests) < self.max_requests:
            self.requests.append(datetime.now())
            return True
        return False

    def get_remaining(self) -> int:
        """残リクエスト数"""
        self._clean_old_requests()
        return self.max_requests - len(self.requests)

# 使用例
limiter = TokenBucketRateLimiter(rate=10, capacity=20)  # 10 req/s, burst 20

for i in range(25):
    if limiter.acquire():
        print(f"Request {i+1}: ALLOWED")
    else:
        wait_time = limiter.get_wait_time()
        print(f"Request {i+1}: RATE LIMITED (wait {wait_time:.2f}s)")

サービスメッシュの実装比較

主要サービスメッシュの比較

【主要サービスメッシュ比較】

| 機能           | Istio + Envoy      | Linkerd           | Consul Connect    |
|----------------|--------------------|-------------------|-------------------|
| データプレーン | Envoy              | Linkerd2-proxy    | Envoy             |
| コントロール   | Pilot, Citadel     | Linkerd controller| Consul control plane |
| mTLS           | 自動(デフォルト)  | 自動(オプトイン) | 自動             |
| 設定モデル     | CRD(Kubernetes)  | Kubernetes native | Consul catalog    |
| パフォーマンス | 高機能だが重い      | 軽量・高速         | 標準             |
| 学習曲線       | 急                 | 緩やか            | 標準             |
| マルチクラスター| 対応               | 対応              | 対応             |

Istio の基本設定例

# VirtualService: トラフィックルーティング
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
  name: user-service-vs
spec:
  hosts:
  - user-service
  http:
  - match:
    - headers:
        canary:
          exact: "true"
    route:
    - destination:
        host: user-service
        subset: canary
  - route:
    - destination:
        host: user-service
        subset: stable
      weight: 95
    - destination:
        host: user-service
        subset: canary
      weight: 5

---
# DestinationRule: サブセット定義とサーキットブレーカー
apiVersion: networking.istio.io/v1beta1
kind: DestinationRule
metadata:
  name: user-service-dr
spec:
  host: user-service
  trafficPolicy:
    connectionPool:
      tcp:
        maxConnections: 100
      http:
        h2UpgradePolicy: UPGRADE
        http1MaxPendingRequests: 100
        http2MaxRequests: 1000
    outlierDetection:  # サーキットブレーカー
      consecutive5xxErrors: 5
      interval: 30s
      baseEjectionTime: 30s
      maxEjectionPercent: 50
  subsets:
  - name: stable
    labels:
      version: v1
  - name: canary
    labels:
      version: v2

---
# PeerAuthentication: mTLS 設定
apiVersion: security.istio.io/v1beta1
kind: PeerAuthentication
metadata:
  name: default-mtls
  namespace: default
spec:
  mtls:
    mode: STRICT  # mTLS を必須に設定

可観測性——メトリクス・ログ・トレーシング

メトリクス収集(Prometheus 統合)

from dataclasses import dataclass, field
from datetime import datetime
from typing import Dict, List
import json

@dataclass
class MetricPoint:
    """メトリクスポイント"""
    timestamp: datetime
    value: float
    labels: Dict[str, str] = field(default_factory=dict)

class SidecarMetricsCollector:
    """サイドカーメトリクスコレクター"""

    def __init__(self):
        self.metrics: Dict[str, List[MetricPoint]] = {}

    def record(self, metric_name: str, value: float, labels: Dict[str, str] = None):
        """メトリクスを記録"""
        if metric_name not in self.metrics:
            self.metrics[metric_name] = []

        point = MetricPoint(
            timestamp=datetime.now(),
            value=value,
            labels=labels or {}
        )
        self.metrics[metric_name].append(point)

        # 古いメトリクスは削除(簡易実装)
        if len(self.metrics[metric_name]) > 1000:
            self.metrics[metric_name] = self.metrics[metric_name][-500:]

    def record_request(self, destination: str, method: str,
                       status_code: int, duration_ms: float):
        """リクエストメトリクスを記録"""
        self.record('http_requests_total', 1, labels={
            'destination': destination,
            'method': method,
            'status_code': str(status_code)
        })

        self.record('http_request_duration_ms', duration_ms, labels={
            'destination': destination,
            'method': method
        })

    def record_connection(self, destination: str, active: bool):
        """接続メトリクスを記録"""
        gauge_value = 1 if active else -1
        self.record('tcp_connections', gauge_value, labels={
            'destination': destination
        })

    def export_prometheus(self) -> str:
        """Prometheus 形式でエクスポート"""
        output = []

        for metric_name, points in self.metrics.items():
            if not points:
                continue

            # 最後の値のみエクスポート(ガウジ/カウンター)
            latest = points[-1]
            labels_str = ','.join(f'{k}="{v}"' for k, v in latest.labels.items())

            if labels_str:
                output.append(f'{metric_name}{{{labels_str}}} {latest.value}')
            else:
                output.append(f'{metric_name} {latest.value}')

        return '\n'.join(output)

# 使用例
collector = SidecarMetricsCollector()

# メトリクス記録
collector.record_request(
    destination='user-service',
    method='GET',
    status_code=200,
    duration_ms=45.2
)

collector.record_request(
    destination='user-service',
    method='POST',
    status_code=500,
    duration_ms=120.5
)

# Prometheus エクスポート
print(collector.export_prometheus())
# 出力例:
# http_requests_total{destination="user-service",method="POST",status_code="500"} 1
# http_request_duration_ms{destination="user-service",method="POST"} 120.5

分散トレーシング(Jaeger/Zipkin 統合)

import uuid
from dataclasses import dataclass, field
from typing import List, Optional
from datetime import datetime

@dataclass
class Span:
    """トレーススパン"""
    trace_id: str
    span_id: str
    parent_span_id: Optional[str]
    operation_name: str
    service_name: str
    start_time: datetime
    end_time: Optional[datetime] = None
    tags: Dict[str, str] = field(default_factory=dict)
    logs: List[Dict] = field(default_factory=list)

    def finish(self):
        """スパンを終了"""
        self.end_time = datetime.now()

    def duration_ms(self) -> float:
        """スパンの継続時間(ミリ秒)"""
        if self.end_time is None:
            return 0.0
        return (self.end_time - self.start_time).total_seconds() * 1000

class TracingContext:
    """トレーシングコンテキスト(スレッドローカル)"""

    def __init__(self):
        self.current_span: Optional[Span] = None

class DistributedTracer:
    """分散トレーサー"""

    def __init__(self, service_name: str):
        self.service_name = service_name
        self.context = TracingContext()
        self.finished_spans: List[Span] = []

    def start_span(self, operation_name: str,
                   parent_span_id: Optional[str] = None) -> Span:
        """新しいスパンを開始"""
        trace_id = self.context.current_span.trace_id if self.context.current_span else str(uuid.uuid4())
        span_id = str(uuid.uuid4())

        span = Span(
            trace_id=trace_id,
            span_id=span_id,
            parent_span_id=parent_span_id or (self.context.current_span.span_id if self.context.current_span else None),
            operation_name=operation_name,
            service_name=self.service_name,
            start_time=datetime.now()
        )

        self.context.current_span = span
        return span

    def finish_span(self, span: Span):
        """スパンを終了"""
        span.finish()
        self.finished_spans.append(span)

        # コンテキストを親スパンに戻す
        if span.parent_span_id:
            # 実際にはスパンスタックを管理する必要がある
            pass
        else:
            self.context.current_span = None

    def inject_context(self, headers: Dict[str, str]):
        """トレースコンテキストを HTTP ヘッダーに注入"""
        if self.context.current_span:
            headers['x-trace-id'] = self.context.current_span.trace_id
            headers['x-span-id'] = self.context.current_span.span_id

    def extract_context(self, headers: Dict[str, str]) -> Optional[Span]:
        """HTTP ヘッダーからトレースコンテキストを抽出"""
        trace_id = headers.get('x-trace-id')
        parent_span_id = headers.get('x-span-id')

        if trace_id and parent_span_id:
            return self.start_span(
                operation_name='incoming-request',
                parent_span_id=parent_span_id
            )
        return None

    def export_jaeger(self) -> List[Dict]:
        """Jaeger 形式でエクスポート"""
        return [
            {
                'trace_id': span.trace_id,
                'span_id': span.span_id,
                'parent_span_id': span.parent_span_id,
                'operation_name': span.operation_name,
                'service_name': span.service_name,
                'start_time': span.start_time.isoformat(),
                'duration_ms': span.duration_ms(),
                'tags': span.tags,
                'logs': span.logs
            }
            for span in self.finished_spans
        ]

# 使用例
# サーバー側
server_tracer = DistributedTracer('user-service')

# クライアントからトレースヘッダーを受け取り
incoming_headers = {'x-trace-id': 'abc123', 'x-span-id': 'span-456'}
span = server_tracer.extract_context(incoming_headers)

if span:
    # ビジネスロジック実行
    # ...
    server_tracer.finish_span(span)

# クライアント側(呼び出し元)
client_tracer = DistributedTracer('api-gateway')
outgoing_span = client_tracer.start_span('call-user-service')

headers = {}
client_tracer.inject_context(headers)
# headers に x-trace-id, x-span-id が設定される

# HTTP リクエスト実行(headers を渡す)
# response = http.get('http://user-service/...', headers=headers)

client_tracer.finish_span(outgoing_span)

まとめ

サービスメッシュの核心:

  1. 基本概念: マイクロサービス間通信をインフラ層として抽象化
  2. サイドカーアーキテクチャ: アプリコードから通信ロジックを分離
  3. mTLS: 証明書による相互認証でゼロトラストを実現
  4. トラフィック制御: カナリアリリース、サーキットブレーカー、レート制限
  5. 可観測性: メトリクス・ログ・トレーシングの自動収集
  6. 実装比較: Istio(高機能)、Linkerd(軽量)、Consul(統合)

サービスメッシュは、マイクロサービスの複雑さを管理するための不可欠なインフラとなりつつある。ただし、サービスメッシュ自体も複雑さを導入するため、小規模なシステムではオーバーエンジニアリングとなる場合もある。

重要なのは、「サービス数が 10 を超え、通信ポリシーの統一が必要になった時点で導入を検討する」という段階的なアプローチだ。サービスメッシュは銀の弾丸ではないが、適切に導入されれば、マイクロサービスの運用負劇を大幅に軽減する。


参考資料

免責事項 — 掲載情報は執筆時点のものです。料金・機能は変更される場合があります。最新情報は各公式サイトをご確認ください。