目次

「モノリスは悪ではない、マイクロサービスは銀の弾丸ではない」——これは現代のシステム設計において最も理解されるべき真実だ。マイクロサービスは適切に設計されれば驚異的な敏捷性をもたらすが、誤れば地獄のような分散システムの複雑性をもたらす。本稿では、マイクロサービスアーキテクチャの実際の設計パターンを体系的に解説する。

マイクロサービスの 3 つの基本原理

原理説明重要性
単一責任1 サービス = 1 ビジネス能力
自律性独立した開発・デプロイ・スケール
弱い結合サービス間依存を最小化

1. サービス分割戦略

1.1 ドメイン駆動設計(DDD)による分割

バウンデッドコンテキストを基準にサービスを分割する。

EC サイトの例:

┌─────────────────┐
│  カートコンテキスト  │ ← カートサービス
├─────────────────┤
│  注文コンテキスト   │ ← 注文サービス
├─────────────────┤
│  決済コンテキスト   │ ← 決済サービス
├─────────────────┤
│  在庫コンテキスト   │ ← 在庫サービス
└─────────────────┘
分割基準説明
ビジネス能力企業機能ごとの分割注文、顧客、商品
サブドメインDDD のサブドメインで分割コア、サポーティング、ジェネリック
変更の速度変更頻度で分割頻繁に変更 vs 安定
データ特性データの性質で分割参照最適化 vs 更新最適化

1.2 適切なサービス粒度

# ❌ 細かすぎる:機能ごとに分割しすぎ
class UserService:
    def create_user(self, ...): pass

class UserAuthService:  # 独立させる必要ある?
    def login(self, ...): pass

class UserProfileService:  # 独立させる必要ある?
    def update_profile(self, ...): pass

# ✅ 適切:ドメインで分割
class UserContextService:
    """ユーザー関連の全機能を担当"""
    def create_user(self, ...): pass
    def login(self, ...): pass
    def update_profile(self, ...): pass

class OrderService:
    """注文関連の全機能を担当"""
    def create_order(self, ...): pass
    def cancel_order(self, ...): pass

粒度の判断基準:

  • チーム規模(2 ピザチームが担当できる範囲)
  • デプロイ頻度(独立してデプロイできる単位)
  • データの凝集度(同じデータにアクセスする機能は一緒に)

1.3 サービス設計の原則

from dataclasses import dataclass
from typing import List, Optional
from enum import Enum

# ✅ 良い設計:明確な責任範囲
class OrderStatus(Enum):
    PENDING = "pending"
    CONFIRMED = "confirmed"
    SHIPPED = "shipped"
    DELIVERED = "delivered"
    CANCELLED = "cancelled"

@dataclass
class Order:
    id: str
    user_id: str
    items: List[dict]
    total_amount: int
    status: OrderStatus
    shipping_address: dict

class OrderService:
    """注文サービスの責務:注文のライフサイクル管理"""

    def create_order(self, user_id: str, items: List[dict]) -> Order:
        """注文作成"""
        # 在庫チェック(在庫サービスと連携)
        # 価格計算(価格サービスと連携)
        # 注文保存
        pass

    def confirm_order(self, order_id: str) -> None:
        """注文確定"""
        pass

    def cancel_order(self, order_id: str, reason: str) -> None:
        """注文キャンセル"""
        pass

# 責務外:ユーザー認証、決済処理、在庫管理
# これらは別サービスの責務

2. サービス間通信

2.1 同期通信 vs 非同期通信

方式プロトコル用途メリットデメリット
同期HTTP/REST, gRPC即時レスポンス必須実装簡単、レスポンス即時カプリング高、可用性低下
非同期メッセージキュー結果不即時でも良い疎結合、可用性高実装複雑、結果遅延

2.2 API Gateway パターン

クライアント
    │
    ↓
┌─────────────────┐
│   API Gateway   │ ← 単一エントリーポイント
└────────┬────────┘
         │
    ┌────┼────┬────────┐
    ↓    ↓    ↓        ↓
┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐
│User │ │Order│ │Product│ │Cart │
│Svc  │ │Svc  │ │Svc   │ │Svc  │
└─────┘ └─────┘ └─────┘ └─────┘
# API Gateway の実装例(FastAPI)
from fastapi import FastAPI, HTTPException
import httpx

app = FastAPI()

# サービス URL 設定
SERVICE_URLS = {
    "user": "http://user-service:8001",
    "order": "http://order-service:8002",
    "product": "http://product-service:8003",
}

@app.get("/api/v1/users/{user_id}/orders")
async def get_user_orders(user_id: str):
    """ユーザーの注文一覧(複数サービスにまたがるクエリ)"""

    async with httpx.AsyncClient() as client:
        # 1. ユーザーサービスからユーザー情報取得
        user_response = await client.get(f"{SERVICE_URLS['user']}/users/{user_id}")
        if user_response.status_code == 404:
            raise HTTPException(status_code=404, detail="User not found")
        user_data = user_response.json()

        # 2. 注文サービスから注文一覧取得
        orders_response = await client.get(f"{SERVICE_URLS['order']}/orders?user_id={user_id}")
        orders_data = orders_response.json()

        # 3. 各注文の商品詳細を商品サービスから取得(並列)
        import asyncio
        async def fetch_product(product_id):
            resp = await client.get(f"{SERVICE_URLS['product']}/products/{product_id}")
            return resp.json() if resp.status_code == 200 else None

        enriched_orders = []
        for order in orders_data:
            product_tasks = [fetch_product(item["product_id"]) for item in order["items"]]
            products = await asyncio.gather(*product_tasks)
            order["product_details"] = products
            enriched_orders.append(order)

        return {
            "user": user_data,
            "orders": enriched_orders
        }

2.3 サービスメッシュ

┌─────────────────────────────────────────┐
│           サービスメッシュ               │
│  ┌─────────┐     ┌─────────┐           │
│  │Sidecar  │     │Sidecar  │           │
│  │Proxy    │     │Proxy    │           │
│  └────┬────┘     └────┬────┘           │
│       │               │                 │
│  ┌────┴────┐     ┌────┴────┐           │
│  │ Service │     │ Service │           │
│  │   A     │     │   B     │           │
│  └─────────┘     └─────────┘           │
└─────────────────────────────────────────┘

機能:
- トラフィック制御(カナリア、ブルーグリーン)
- mTLS(サービス間暗号化)
- 可観測性(メトリクス、トレーシング)
- リトライ、サーキットブレーカー

2.4 非同期通信パターン

# イベント駆動アーキテクチャ
from abc import ABC, abstractmethod
import json

# イベント定義
class Event(ABC):
    def __init__(self, event_id: str, timestamp: str):
        self.event_id = event_id
        self.timestamp = timestamp

class OrderCreatedEvent(Event):
    def __init__(self, order_id: str, user_id: str, items: list, total: int):
        super().__init__(event_id=f"order_{order_id}", timestamp="...")
        self.order_id = order_id
        self.user_id = user_id
        self.items = items
        self.total = total

# イベントパブリッシャー
class EventPublisher:
    def __init__(self, message_broker):
        self.broker = message_broker

    async def publish(self, event: Event, topic: str):
        await self.broker.publish(
            topic=topic,
            message=json.dumps(event.__dict__)
        )

# イベントサブスクライバー
class EventSubscriber:
    def __init__(self, message_broker):
        self.broker = message_broker
        self.handlers = {}

    def subscribe(self, topic: str, handler):
        self.handlers[topic] = handler
        self.broker.subscribe(topic, self._handle)

    async def _handle(self, message):
        event_data = json.loads(message)
        # ハンドラー実行
        if "order" in message.get("topic", ""):
            await self.handlers["order"](event_data)

# 使用例:注文作成時にイベント発行
class OrderService:
    def __init__(self, event_publisher):
        self.publisher = event_publisher

    async def create_order(self, user_id, items):
        # 注文作成処理
        order = await self._create_order_in_db(user_id, items)

        # イベント発行
        event = OrderCreatedEvent(
            order_id=order.id,
            user_id=user_id,
            items=items,
            total=order.total
        )
        await self.publisher.publish(event, "orders")

# 購読側:在庫サービス、通知サービスなど
class InventoryService:
    def __init__(self, subscriber):
        subscriber.subscribe("orders", self.on_order_created)

    async def on_order_created(self, event):
        # 在庫確保処理
        await self._reserve_inventory(event["items"])

class NotificationService:
    def __init__(self, subscriber):
        subscriber.subscribe("orders", self.on_order_created)

    async def on_order_created(self, event):
        # 注文確認メール送信
        await self._send_order_confirmation(event["user_id"])

3. データ管理

3.1 データベース・パー・サービス

各サービスが独立したデータベースを持つ。

┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│ User Svc    │    │ Order Svc   │    │ Product Svc │
│ ┌─────────┐ │    │ ┌─────────┐ │    │ ┌─────────┐ │
│ │  User   │ │    │ │  Order  │ │    │ │ Product │ │
│ │   DB    │ │    │ │   DB    │ │    │ │   DB    │ │
│ └─────────┘ │    │ └─────────┘ │    │ └─────────┘ │
└─────────────┘    └─────────────┘    └─────────────┘
     PostgreSQL         PostgreSQL         MySQL

メリット:

  • サービス間のデータ結合排除
  • 技術スタックの自由選択
  • 独立したスケール

デメリット:

  • トランザクション管理が複雑
  • データ整合性確保が困難
  • 結合クエリが不可能

3.2 Saga パターン

複数サービスにまたがる分散トランザクションを管理する。

# オーケストレーション方式
class OrderSaga:
    """注文処理の Saga オーケストレーター"""

    def __init__(self, services):
        self.services = services
        self compensations = []

    async def execute(self, order_data):
        """Saga 実行"""
        try:
            # ステップ 1: 在庫確保
            inventory_result = await self.services["inventory"].reserve(order_data["items"])
            self.compensations.append(lambda: self.services["inventory"].release(inventory_result))

            # ステップ 2: 決済実行
            payment_result = await self.services["payment"].charge(order_data["payment"])
            self.compensations.append(lambda: self.services["payment"].refund(payment_result))

            # ステップ 3: 注文作成
            order = await self.services["order"].create(order_data)

            # ステップ 4: 通知送信
            await self.services["notification"].send_order_confirmation(order)

            return {"status": "success", "order": order}

        except Exception as e:
            # 補償トランザクションを逆順で実行
            print(f"Saga 失敗:{e}、補償を実行")
            for compensation in reversed(self.compensations):
                try:
                    await compensation()
                except Exception as comp_error:
                    print(f"補償失敗:{comp_error}")
                    # 手動介入が必要な場合も

            return {"status": "failed", "reason": str(e)}

# 使用例
saga = OrderSaga({
    "inventory": InventoryService(),
    "payment": PaymentService(),
    "order": OrderService(),
    "notification": NotificationService()
})

result = await saga.execute({
    "items": [{"product_id": "p1", "qty": 2}],
    "payment": {"card": "xxx", "amount": 5000}
})

3.3 CQRS(Command Query Responsibility Segregation)

書き込み(Command)と読み取り(Query)を分離する。

from dataclasses import dataclass
from typing import List, Optional

# コマンド(書き込み)
@dataclass
class CreateUserCommand:
    email: str
    password: str
    name: str

@dataclass
class UpdateUserCommand:
    user_id: str
    email: Optional[str] = None
    name: Optional[str] = None

# クエリ(読み取り)
@dataclass
class GetUserQuery:
    user_id: str

@dataclass
class ListUsersQuery:
    page: int = 1
    per_page: int = 20
    role: Optional[str] = None

# コマンドハンドラー(書き込みモデル)
class CreateUserHandler:
    def __init__(self, write_db, event_publisher):
        self.write_db = write_db
        self.publisher = event_publisher

    async def handle(self, command: CreateUserCommand):
        # 書き込み専用モデルで処理
        user = await self.write_db.users.insert({
            "email": command.email,
            "password_hash": hash_password(command.password),
            "name": command.name
        })

        # イベント発行(読み取りモデル更新用)
        await self.publisher.publish("user_created", {
            "user_id": user.id,
            "email": user.email,
            "name": user.name
        })

        return user

# クエリハンドラー(読み取りモデル)
class GetUserHandler:
    def __init__(self, read_db):
        self.read_db = read_db  # 読み取り最適化 DB(Elasticsearch 等)

    async def handle(self, query: GetUserQuery):
        # 読み取り専用モデルから高速取得
        return await self.read_db.users.get_by_id(query.user_id)

class ListUsersHandler:
    def __init__(self, read_db):
        self.read_db = read_db

    async def handle(self, query: ListUsersQuery):
        # 複雑な検索・集計も可能
        users = await self.read_db.users.search(
            filters={"role": query.role} if query.role else {},
            pagination={"page": query.page, "per_page": query.per_page}
        )
        return users

3.4 イベントソーシング

状態をイベントのリストとして保存する。

from datetime import datetime
from typing import List, Dict

# イベント定義
@dataclass
class Event:
    event_id: str
    event_type: str
    aggregate_id: str
    timestamp: str
    data: Dict

# 口座のイベント例
class AccountEvent:
    @staticmethod
    def account_created(account_id: str, initial_balance: int) -> Event:
        return Event(
            event_id=f"evt_{uuid.uuid4()}",
            event_type="AccountCreated",
            aggregate_id=account_id,
            timestamp=datetime.utcnow().isoformat(),
            data={"initial_balance": initial_balance}
        )

    @staticmethod
    def money_deposited(account_id: str, amount: int) -> Event:
        return Event(
            event_id=f"evt_{uuid.uuid4()}",
            event_type="MoneyDeposited",
            aggregate_id=account_id,
            timestamp=datetime.utcnow().isoformat(),
            data={"amount": amount}
        )

    @staticmethod
    def money_withdrawn(account_id: str, amount: int) -> Event:
        return Event(
            event_id=f"evt_{uuid.uuid4()}",
            event_type="MoneyWithdrawn",
            aggregate_id=account_id,
            timestamp=datetime.utcnow().isoformat(),
            data={"amount": amount}
        )

# イベントストア
class EventStore:
    def __init__(self, db):
        self.db = db

    async def append(self, event: Event):
        await self.db.events.insert(event.__dict__)

    async def get_history(self, aggregate_id: str) -> List[Event]:
        events = await self.db.events.find({"aggregate_id": aggregate_id})
        return [Event(**e) for e in events]

# アグリゲート:イベントから状態を再構築
class Account:
    def __init__(self, account_id: str):
        self.account_id = account_id
        self.balance = 0
        self.version = 0

    def apply(self, event: Event):
        """イベントを適用して状態を更新"""
        if event.event_type == "AccountCreated":
            self.balance = event.data["initial_balance"]
        elif event.event_type == "MoneyDeposited":
            self.balance += event.data["amount"]
        elif event.event_type == "MoneyWithdrawn":
            self.balance -= event.data["amount"]
        self.version += 1

    @classmethod
    def from_history(cls, account_id: str, history: List[Event]):
        """イベント履歴からアグリゲートを再構築"""
        account = cls(account_id)
        for event in history:
            account.apply(event)
        return account

# 使用例
async def transfer_money(from_account: str, to_account: str, amount: int):
    event_store = EventStore(db)

    # 送金元口座の履歴を読み込み
    from_history = await event_store.get_history(from_account)
    from_account_obj = Account.from_history(from_account, from_history)

    # 残高チェック
    if from_account_obj.balance < amount:
        raise ValueError("残高不足")

    # 新規イベントを保存
    await event_store.append(AccountEvent.money_withdrawn(from_account, amount))
    await event_store.append(AccountEvent.money_deposited(to_account, amount))

4. 耐障害性パターン

4.1 サーキットブレーカー

from enum import Enum
from datetime import datetime, timedelta
from typing import Callable, Any

class CircuitState(Enum):
    CLOSED = "closed"      # 通常動作
    OPEN = "open"         # 遮断中
    HALF_OPEN = "half_open"  # 試験動作

class CircuitBreaker:
    def __init__(
        self,
        failure_threshold: int = 5,
        recovery_timeout: int = 30,
        half_open_max_calls: int = 3
    ):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.half_open_max_calls = half_open_max_calls

        self.state = CircuitState.CLOSED
        self.failure_count = 0
        self.success_count = 0
        self.last_failure_time = None
        self.half_open_calls = 0

    async def call(self, func: Callable, *args, **kwargs) -> Any:
        """サーキットブレーカー経由で関数を実行"""

        if self.state == CircuitState.OPEN:
            # 復旧時間経過チェック
            if datetime.utcnow() > self.last_failure_time + timedelta(seconds=self.recovery_timeout):
                self.state = CircuitState.HALF_OPEN
                self.half_open_calls = 0
            else:
                raise Exception("Circuit breaker is OPEN")

        try:
            result = await func(*args, **kwargs)
            self._on_success()
            return result
        except Exception as e:
            self._on_failure()
            raise

    def _on_success(self):
        self.failure_count = 0
        if self.state == CircuitState.HALF_OPEN:
            self.half_open_calls += 1
            if self.half_open_calls >= self.half_open_max_calls:
                self.state = CircuitState.CLOSED

    def _on_failure(self):
        self.failure_count += 1
        self.last_failure_time = datetime.utcnow()

        if self.state == CircuitState.HALF_OPEN:
            self.state = CircuitState.OPEN
        elif self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN

# 使用例
circuit_breaker = CircuitBreaker(failure_threshold=5, recovery_timeout=30)

async def call_external_service():
    return await circuit_breaker.call(
        external_service.request,
        "GET", "/api/data"
    )

4.2 リトライパターン

import asyncio
import random
from functools import wraps

def retry_with_backoff(
    max_retries: int = 3,
    base_delay: float = 1.0,
    max_delay: float = 60.0,
    exponential_base: float = 2.0,
    jitter: bool = True
):
    """指数バックオフ付きリトライデコレーター"""

    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            last_exception = None

            for attempt in range(max_retries + 1):
                try:
                    return await func(*args, **kwargs)
                except Exception as e:
                    last_exception = e

                    if attempt == max_retries:
                        break

                    # バックオフ時間計算
                    delay = min(base_delay * (exponential_base ** attempt), max_delay)
                    if jitter:
                        delay *= random.uniform(0.5, 1.5)

                    print(f"リトライ {attempt + 1}/{max_retries}{delay:.2f}秒後に再試行")
                    await asyncio.sleep(delay)

            raise last_exception

        return wrapper
    return decorator

# 使用例
@retry_with_backoff(max_retries=3, base_delay=1.0)
async def call_unreliable_service():
    # 一時的なエラーが発生する可能性のあるサービス
    async with httpx.AsyncClient() as client:
        response = await client.get("http://unreliable-service/api")
        response.raise_for_status()
        return response.json()

4.3 バルクヘッドパターン

import asyncio
from asyncio import Semaphore

class Bulkhead:
    """バルクヘッド(区画)パターン:リソース隔離"""

    def __init__(self, max_concurrent: int):
        self.semaphore = Semaphore(max_concurrent)

    async def execute(self, func, *args, **kwargs):
        async with self.semaphore:
            return await func(*args, **kwargs)

# 使用例:サービスごとにリソースを隔離
user_service_bulkhead = Bulkhead(max_concurrent=10)
order_service_bulkhead = Bulkhead(max_concurrent=20)
payment_service_bulkhead = Bulkhead(max_concurrent=5)

async def handle_user_request():
    return await user_service_bulkhead.execute(
        user_service.process_request
    )

async def handle_order_request():
    return await order_service_bulkhead.execute(
        order_service.process_request
    )

# 決済サービスが輻輳しても、他のサービスは影響を受けない

5. 可観測性

5.1 分散トレーシング

from opentelemetry import trace
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor

# 設定
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer(__name__)

# Jaeger エクスポーター
jaeger_exporter = JaegerExporter(
    agent_host_name="localhost",
    agent_port=6831,
)
span_processor = BatchSpanProcessor(jaeger_exporter)
trace.get_tracer_provider().add_span_processor(span_processor)

# トレース使用例
async def process_order(order_id: str):
    with tracer.start_as_current_span("process_order") as span:
        span.set_attribute("order.id", order_id)

        # 在庫サービス呼び出し
        with tracer.start_as_current_span("check_inventory") as child_span:
            child_span.set_attribute("order.id", order_id)
            await inventory_service.check(order_id)

        # 決済サービス呼び出し
        with tracer.start_as_current_span("process_payment") as child_span:
            child_span.set_attribute("order.id", order_id)
            await payment_service.charge(order_id)

5.2 メトリクス収集

from prometheus_client import Counter, Histogram, generate_latest
import time
from functools import wraps

# メトリクス定義
REQUEST_COUNT = Counter(
    'service_requests_total',
    'Total service requests',
    ['service', 'method', 'status']
)

REQUEST_LATENCY = Histogram(
    'service_request_duration_seconds',
    'Service request latency',
    ['service', 'method']
)

def measure_metrics(service_name: str):
    """メトリクス収集デコレーター"""
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            start_time = time.time()
            status = "success"

            try:
                result = await func(*args, **kwargs)
                return result
            except Exception as e:
                status = "error"
                raise
            finally:
                duration = time.time() - start_time
                REQUEST_COUNT.labels(
                    service=service_name,
                    method=func.__name__,
                    status=status
                ).inc()
                REQUEST_LATENCY.labels(
                    service=service_name,
                    method=func.__name__
                ).observe(duration)

        return wrapper
    return decorator

# 使用例
@measure_metrics("order_service")
async def create_order(order_data):
    return await order_service.create(order_data)

6. マイクロサービスのアンチパターン

6.1 分散モノリス

❌ 分散モノリス:
- サービス数が多すぎる(50+)
- サービス間の依存関係が複雑
- デプロイが全て同時必要
- 呼び出しチェーンが深すぎる(A→B→C→D→E)

✅ 適切なアーキテクチャ:
- サービス数は team 数程度(5-15)
- 依存関係は DAG(有向非循環グラフ)
- 独立デプロイ可能
- 呼び出しは最大 2-3 階層

6.2 premature_optimization

# ❌ 过早最適化:いきなりマイクロサービス
class Monolith:
    pass

class MicroserviceA:  # オーバーエンジニアリング
    pass

class MicroserviceB:  # 複雑性だけ増加
    pass

# ✅ 段階的移行
# 1. モノリスで始める(迅速な開発)
# 2. 成長に応じて抽出
#    - 負荷の高い部分
#    - 独立してスケールが必要な部分
#    - 異なる技術スタックが必要な部分

まとめ

マイクロサービスアーキテクチャの要点を整理する:

  1. サービス分割: DDD のバウンデッドコンテキスト、単一責任原則
  2. 通信: API Gateway、サービスメッシュ、非同期イベント駆動
  3. データ管理: データベース・パー・サービス、Saga、CQRS、イベントソーシング
  4. 耐障害性: サーキットブレーカー、リトライ、バルクヘッド
  5. 可観測性: 分散トレーシング、メトリクス、構造化ログ
  6. アンチパターン: 分散モノリス、 premature_optimization

マイクロサービスは「目的」ではなく「手段」だ。ビジネスの成長段階と組織構造に合わせて、適切なアーキテクチャを選択することが重要である。

免責事項 — 掲載情報は執筆時点のものです。料金・機能は変更される場合があります。最新情報は各公式サイトをご確認ください。