目次
「モノリスは悪ではない、マイクロサービスは銀の弾丸ではない」——これは現代のシステム設計において最も理解されるべき真実だ。マイクロサービスは適切に設計されれば驚異的な敏捷性をもたらすが、誤れば地獄のような分散システムの複雑性をもたらす。本稿では、マイクロサービスアーキテクチャの実際の設計パターンを体系的に解説する。
マイクロサービスの 3 つの基本原理
| 原理 | 説明 | 重要性 |
|---|---|---|
| 単一責任 | 1 サービス = 1 ビジネス能力 | 高 |
| 自律性 | 独立した開発・デプロイ・スケール | 高 |
| 弱い結合 | サービス間依存を最小化 | 高 |
1. サービス分割戦略
1.1 ドメイン駆動設計(DDD)による分割
バウンデッドコンテキストを基準にサービスを分割する。
EC サイトの例:
┌─────────────────┐
│ カートコンテキスト │ ← カートサービス
├─────────────────┤
│ 注文コンテキスト │ ← 注文サービス
├─────────────────┤
│ 決済コンテキスト │ ← 決済サービス
├─────────────────┤
│ 在庫コンテキスト │ ← 在庫サービス
└─────────────────┘
| 分割基準 | 説明 | 例 |
|---|---|---|
| ビジネス能力 | 企業機能ごとの分割 | 注文、顧客、商品 |
| サブドメイン | DDD のサブドメインで分割 | コア、サポーティング、ジェネリック |
| 変更の速度 | 変更頻度で分割 | 頻繁に変更 vs 安定 |
| データ特性 | データの性質で分割 | 参照最適化 vs 更新最適化 |
1.2 適切なサービス粒度
# ❌ 細かすぎる:機能ごとに分割しすぎ
class UserService:
def create_user(self, ...): pass
class UserAuthService: # 独立させる必要ある?
def login(self, ...): pass
class UserProfileService: # 独立させる必要ある?
def update_profile(self, ...): pass
# ✅ 適切:ドメインで分割
class UserContextService:
"""ユーザー関連の全機能を担当"""
def create_user(self, ...): pass
def login(self, ...): pass
def update_profile(self, ...): pass
class OrderService:
"""注文関連の全機能を担当"""
def create_order(self, ...): pass
def cancel_order(self, ...): pass
粒度の判断基準:
- チーム規模(2 ピザチームが担当できる範囲)
- デプロイ頻度(独立してデプロイできる単位)
- データの凝集度(同じデータにアクセスする機能は一緒に)
1.3 サービス設計の原則
from dataclasses import dataclass
from typing import List, Optional
from enum import Enum
# ✅ 良い設計:明確な責任範囲
class OrderStatus(Enum):
PENDING = "pending"
CONFIRMED = "confirmed"
SHIPPED = "shipped"
DELIVERED = "delivered"
CANCELLED = "cancelled"
@dataclass
class Order:
id: str
user_id: str
items: List[dict]
total_amount: int
status: OrderStatus
shipping_address: dict
class OrderService:
"""注文サービスの責務:注文のライフサイクル管理"""
def create_order(self, user_id: str, items: List[dict]) -> Order:
"""注文作成"""
# 在庫チェック(在庫サービスと連携)
# 価格計算(価格サービスと連携)
# 注文保存
pass
def confirm_order(self, order_id: str) -> None:
"""注文確定"""
pass
def cancel_order(self, order_id: str, reason: str) -> None:
"""注文キャンセル"""
pass
# 責務外:ユーザー認証、決済処理、在庫管理
# これらは別サービスの責務
2. サービス間通信
2.1 同期通信 vs 非同期通信
| 方式 | プロトコル | 用途 | メリット | デメリット |
|---|---|---|---|---|
| 同期 | HTTP/REST, gRPC | 即時レスポンス必須 | 実装簡単、レスポンス即時 | カプリング高、可用性低下 |
| 非同期 | メッセージキュー | 結果不即時でも良い | 疎結合、可用性高 | 実装複雑、結果遅延 |
2.2 API Gateway パターン
クライアント
│
↓
┌─────────────────┐
│ API Gateway │ ← 単一エントリーポイント
└────────┬────────┘
│
┌────┼────┬────────┐
↓ ↓ ↓ ↓
┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐
│User │ │Order│ │Product│ │Cart │
│Svc │ │Svc │ │Svc │ │Svc │
└─────┘ └─────┘ └─────┘ └─────┘
# API Gateway の実装例(FastAPI)
from fastapi import FastAPI, HTTPException
import httpx
app = FastAPI()
# サービス URL 設定
SERVICE_URLS = {
"user": "http://user-service:8001",
"order": "http://order-service:8002",
"product": "http://product-service:8003",
}
@app.get("/api/v1/users/{user_id}/orders")
async def get_user_orders(user_id: str):
"""ユーザーの注文一覧(複数サービスにまたがるクエリ)"""
async with httpx.AsyncClient() as client:
# 1. ユーザーサービスからユーザー情報取得
user_response = await client.get(f"{SERVICE_URLS['user']}/users/{user_id}")
if user_response.status_code == 404:
raise HTTPException(status_code=404, detail="User not found")
user_data = user_response.json()
# 2. 注文サービスから注文一覧取得
orders_response = await client.get(f"{SERVICE_URLS['order']}/orders?user_id={user_id}")
orders_data = orders_response.json()
# 3. 各注文の商品詳細を商品サービスから取得(並列)
import asyncio
async def fetch_product(product_id):
resp = await client.get(f"{SERVICE_URLS['product']}/products/{product_id}")
return resp.json() if resp.status_code == 200 else None
enriched_orders = []
for order in orders_data:
product_tasks = [fetch_product(item["product_id"]) for item in order["items"]]
products = await asyncio.gather(*product_tasks)
order["product_details"] = products
enriched_orders.append(order)
return {
"user": user_data,
"orders": enriched_orders
}
2.3 サービスメッシュ
┌─────────────────────────────────────────┐
│ サービスメッシュ │
│ ┌─────────┐ ┌─────────┐ │
│ │Sidecar │ │Sidecar │ │
│ │Proxy │ │Proxy │ │
│ └────┬────┘ └────┬────┘ │
│ │ │ │
│ ┌────┴────┐ ┌────┴────┐ │
│ │ Service │ │ Service │ │
│ │ A │ │ B │ │
│ └─────────┘ └─────────┘ │
└─────────────────────────────────────────┘
機能:
- トラフィック制御(カナリア、ブルーグリーン)
- mTLS(サービス間暗号化)
- 可観測性(メトリクス、トレーシング)
- リトライ、サーキットブレーカー
2.4 非同期通信パターン
# イベント駆動アーキテクチャ
from abc import ABC, abstractmethod
import json
# イベント定義
class Event(ABC):
def __init__(self, event_id: str, timestamp: str):
self.event_id = event_id
self.timestamp = timestamp
class OrderCreatedEvent(Event):
def __init__(self, order_id: str, user_id: str, items: list, total: int):
super().__init__(event_id=f"order_{order_id}", timestamp="...")
self.order_id = order_id
self.user_id = user_id
self.items = items
self.total = total
# イベントパブリッシャー
class EventPublisher:
def __init__(self, message_broker):
self.broker = message_broker
async def publish(self, event: Event, topic: str):
await self.broker.publish(
topic=topic,
message=json.dumps(event.__dict__)
)
# イベントサブスクライバー
class EventSubscriber:
def __init__(self, message_broker):
self.broker = message_broker
self.handlers = {}
def subscribe(self, topic: str, handler):
self.handlers[topic] = handler
self.broker.subscribe(topic, self._handle)
async def _handle(self, message):
event_data = json.loads(message)
# ハンドラー実行
if "order" in message.get("topic", ""):
await self.handlers["order"](event_data)
# 使用例:注文作成時にイベント発行
class OrderService:
def __init__(self, event_publisher):
self.publisher = event_publisher
async def create_order(self, user_id, items):
# 注文作成処理
order = await self._create_order_in_db(user_id, items)
# イベント発行
event = OrderCreatedEvent(
order_id=order.id,
user_id=user_id,
items=items,
total=order.total
)
await self.publisher.publish(event, "orders")
# 購読側:在庫サービス、通知サービスなど
class InventoryService:
def __init__(self, subscriber):
subscriber.subscribe("orders", self.on_order_created)
async def on_order_created(self, event):
# 在庫確保処理
await self._reserve_inventory(event["items"])
class NotificationService:
def __init__(self, subscriber):
subscriber.subscribe("orders", self.on_order_created)
async def on_order_created(self, event):
# 注文確認メール送信
await self._send_order_confirmation(event["user_id"])
3. データ管理
3.1 データベース・パー・サービス
各サービスが独立したデータベースを持つ。
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ User Svc │ │ Order Svc │ │ Product Svc │
│ ┌─────────┐ │ │ ┌─────────┐ │ │ ┌─────────┐ │
│ │ User │ │ │ │ Order │ │ │ │ Product │ │
│ │ DB │ │ │ │ DB │ │ │ │ DB │ │
│ └─────────┘ │ │ └─────────┘ │ │ └─────────┘ │
└─────────────┘ └─────────────┘ └─────────────┘
PostgreSQL PostgreSQL MySQL
メリット:
- サービス間のデータ結合排除
- 技術スタックの自由選択
- 独立したスケール
デメリット:
- トランザクション管理が複雑
- データ整合性確保が困難
- 結合クエリが不可能
3.2 Saga パターン
複数サービスにまたがる分散トランザクションを管理する。
# オーケストレーション方式
class OrderSaga:
"""注文処理の Saga オーケストレーター"""
def __init__(self, services):
self.services = services
self compensations = []
async def execute(self, order_data):
"""Saga 実行"""
try:
# ステップ 1: 在庫確保
inventory_result = await self.services["inventory"].reserve(order_data["items"])
self.compensations.append(lambda: self.services["inventory"].release(inventory_result))
# ステップ 2: 決済実行
payment_result = await self.services["payment"].charge(order_data["payment"])
self.compensations.append(lambda: self.services["payment"].refund(payment_result))
# ステップ 3: 注文作成
order = await self.services["order"].create(order_data)
# ステップ 4: 通知送信
await self.services["notification"].send_order_confirmation(order)
return {"status": "success", "order": order}
except Exception as e:
# 補償トランザクションを逆順で実行
print(f"Saga 失敗:{e}、補償を実行")
for compensation in reversed(self.compensations):
try:
await compensation()
except Exception as comp_error:
print(f"補償失敗:{comp_error}")
# 手動介入が必要な場合も
return {"status": "failed", "reason": str(e)}
# 使用例
saga = OrderSaga({
"inventory": InventoryService(),
"payment": PaymentService(),
"order": OrderService(),
"notification": NotificationService()
})
result = await saga.execute({
"items": [{"product_id": "p1", "qty": 2}],
"payment": {"card": "xxx", "amount": 5000}
})
3.3 CQRS(Command Query Responsibility Segregation)
書き込み(Command)と読み取り(Query)を分離する。
from dataclasses import dataclass
from typing import List, Optional
# コマンド(書き込み)
@dataclass
class CreateUserCommand:
email: str
password: str
name: str
@dataclass
class UpdateUserCommand:
user_id: str
email: Optional[str] = None
name: Optional[str] = None
# クエリ(読み取り)
@dataclass
class GetUserQuery:
user_id: str
@dataclass
class ListUsersQuery:
page: int = 1
per_page: int = 20
role: Optional[str] = None
# コマンドハンドラー(書き込みモデル)
class CreateUserHandler:
def __init__(self, write_db, event_publisher):
self.write_db = write_db
self.publisher = event_publisher
async def handle(self, command: CreateUserCommand):
# 書き込み専用モデルで処理
user = await self.write_db.users.insert({
"email": command.email,
"password_hash": hash_password(command.password),
"name": command.name
})
# イベント発行(読み取りモデル更新用)
await self.publisher.publish("user_created", {
"user_id": user.id,
"email": user.email,
"name": user.name
})
return user
# クエリハンドラー(読み取りモデル)
class GetUserHandler:
def __init__(self, read_db):
self.read_db = read_db # 読み取り最適化 DB(Elasticsearch 等)
async def handle(self, query: GetUserQuery):
# 読み取り専用モデルから高速取得
return await self.read_db.users.get_by_id(query.user_id)
class ListUsersHandler:
def __init__(self, read_db):
self.read_db = read_db
async def handle(self, query: ListUsersQuery):
# 複雑な検索・集計も可能
users = await self.read_db.users.search(
filters={"role": query.role} if query.role else {},
pagination={"page": query.page, "per_page": query.per_page}
)
return users
3.4 イベントソーシング
状態をイベントのリストとして保存する。
from datetime import datetime
from typing import List, Dict
# イベント定義
@dataclass
class Event:
event_id: str
event_type: str
aggregate_id: str
timestamp: str
data: Dict
# 口座のイベント例
class AccountEvent:
@staticmethod
def account_created(account_id: str, initial_balance: int) -> Event:
return Event(
event_id=f"evt_{uuid.uuid4()}",
event_type="AccountCreated",
aggregate_id=account_id,
timestamp=datetime.utcnow().isoformat(),
data={"initial_balance": initial_balance}
)
@staticmethod
def money_deposited(account_id: str, amount: int) -> Event:
return Event(
event_id=f"evt_{uuid.uuid4()}",
event_type="MoneyDeposited",
aggregate_id=account_id,
timestamp=datetime.utcnow().isoformat(),
data={"amount": amount}
)
@staticmethod
def money_withdrawn(account_id: str, amount: int) -> Event:
return Event(
event_id=f"evt_{uuid.uuid4()}",
event_type="MoneyWithdrawn",
aggregate_id=account_id,
timestamp=datetime.utcnow().isoformat(),
data={"amount": amount}
)
# イベントストア
class EventStore:
def __init__(self, db):
self.db = db
async def append(self, event: Event):
await self.db.events.insert(event.__dict__)
async def get_history(self, aggregate_id: str) -> List[Event]:
events = await self.db.events.find({"aggregate_id": aggregate_id})
return [Event(**e) for e in events]
# アグリゲート:イベントから状態を再構築
class Account:
def __init__(self, account_id: str):
self.account_id = account_id
self.balance = 0
self.version = 0
def apply(self, event: Event):
"""イベントを適用して状態を更新"""
if event.event_type == "AccountCreated":
self.balance = event.data["initial_balance"]
elif event.event_type == "MoneyDeposited":
self.balance += event.data["amount"]
elif event.event_type == "MoneyWithdrawn":
self.balance -= event.data["amount"]
self.version += 1
@classmethod
def from_history(cls, account_id: str, history: List[Event]):
"""イベント履歴からアグリゲートを再構築"""
account = cls(account_id)
for event in history:
account.apply(event)
return account
# 使用例
async def transfer_money(from_account: str, to_account: str, amount: int):
event_store = EventStore(db)
# 送金元口座の履歴を読み込み
from_history = await event_store.get_history(from_account)
from_account_obj = Account.from_history(from_account, from_history)
# 残高チェック
if from_account_obj.balance < amount:
raise ValueError("残高不足")
# 新規イベントを保存
await event_store.append(AccountEvent.money_withdrawn(from_account, amount))
await event_store.append(AccountEvent.money_deposited(to_account, amount))
4. 耐障害性パターン
4.1 サーキットブレーカー
from enum import Enum
from datetime import datetime, timedelta
from typing import Callable, Any
class CircuitState(Enum):
CLOSED = "closed" # 通常動作
OPEN = "open" # 遮断中
HALF_OPEN = "half_open" # 試験動作
class CircuitBreaker:
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: int = 30,
half_open_max_calls: int = 3
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.half_open_max_calls = half_open_max_calls
self.state = CircuitState.CLOSED
self.failure_count = 0
self.success_count = 0
self.last_failure_time = None
self.half_open_calls = 0
async def call(self, func: Callable, *args, **kwargs) -> Any:
"""サーキットブレーカー経由で関数を実行"""
if self.state == CircuitState.OPEN:
# 復旧時間経過チェック
if datetime.utcnow() > self.last_failure_time + timedelta(seconds=self.recovery_timeout):
self.state = CircuitState.HALF_OPEN
self.half_open_calls = 0
else:
raise Exception("Circuit breaker is OPEN")
try:
result = await func(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise
def _on_success(self):
self.failure_count = 0
if self.state == CircuitState.HALF_OPEN:
self.half_open_calls += 1
if self.half_open_calls >= self.half_open_max_calls:
self.state = CircuitState.CLOSED
def _on_failure(self):
self.failure_count += 1
self.last_failure_time = datetime.utcnow()
if self.state == CircuitState.HALF_OPEN:
self.state = CircuitState.OPEN
elif self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
# 使用例
circuit_breaker = CircuitBreaker(failure_threshold=5, recovery_timeout=30)
async def call_external_service():
return await circuit_breaker.call(
external_service.request,
"GET", "/api/data"
)
4.2 リトライパターン
import asyncio
import random
from functools import wraps
def retry_with_backoff(
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0,
exponential_base: float = 2.0,
jitter: bool = True
):
"""指数バックオフ付きリトライデコレーター"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(max_retries + 1):
try:
return await func(*args, **kwargs)
except Exception as e:
last_exception = e
if attempt == max_retries:
break
# バックオフ時間計算
delay = min(base_delay * (exponential_base ** attempt), max_delay)
if jitter:
delay *= random.uniform(0.5, 1.5)
print(f"リトライ {attempt + 1}/{max_retries}、{delay:.2f}秒後に再試行")
await asyncio.sleep(delay)
raise last_exception
return wrapper
return decorator
# 使用例
@retry_with_backoff(max_retries=3, base_delay=1.0)
async def call_unreliable_service():
# 一時的なエラーが発生する可能性のあるサービス
async with httpx.AsyncClient() as client:
response = await client.get("http://unreliable-service/api")
response.raise_for_status()
return response.json()
4.3 バルクヘッドパターン
import asyncio
from asyncio import Semaphore
class Bulkhead:
"""バルクヘッド(区画)パターン:リソース隔離"""
def __init__(self, max_concurrent: int):
self.semaphore = Semaphore(max_concurrent)
async def execute(self, func, *args, **kwargs):
async with self.semaphore:
return await func(*args, **kwargs)
# 使用例:サービスごとにリソースを隔離
user_service_bulkhead = Bulkhead(max_concurrent=10)
order_service_bulkhead = Bulkhead(max_concurrent=20)
payment_service_bulkhead = Bulkhead(max_concurrent=5)
async def handle_user_request():
return await user_service_bulkhead.execute(
user_service.process_request
)
async def handle_order_request():
return await order_service_bulkhead.execute(
order_service.process_request
)
# 決済サービスが輻輳しても、他のサービスは影響を受けない
5. 可観測性
5.1 分散トレーシング
from opentelemetry import trace
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
# 設定
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer(__name__)
# Jaeger エクスポーター
jaeger_exporter = JaegerExporter(
agent_host_name="localhost",
agent_port=6831,
)
span_processor = BatchSpanProcessor(jaeger_exporter)
trace.get_tracer_provider().add_span_processor(span_processor)
# トレース使用例
async def process_order(order_id: str):
with tracer.start_as_current_span("process_order") as span:
span.set_attribute("order.id", order_id)
# 在庫サービス呼び出し
with tracer.start_as_current_span("check_inventory") as child_span:
child_span.set_attribute("order.id", order_id)
await inventory_service.check(order_id)
# 決済サービス呼び出し
with tracer.start_as_current_span("process_payment") as child_span:
child_span.set_attribute("order.id", order_id)
await payment_service.charge(order_id)
5.2 メトリクス収集
from prometheus_client import Counter, Histogram, generate_latest
import time
from functools import wraps
# メトリクス定義
REQUEST_COUNT = Counter(
'service_requests_total',
'Total service requests',
['service', 'method', 'status']
)
REQUEST_LATENCY = Histogram(
'service_request_duration_seconds',
'Service request latency',
['service', 'method']
)
def measure_metrics(service_name: str):
"""メトリクス収集デコレーター"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
start_time = time.time()
status = "success"
try:
result = await func(*args, **kwargs)
return result
except Exception as e:
status = "error"
raise
finally:
duration = time.time() - start_time
REQUEST_COUNT.labels(
service=service_name,
method=func.__name__,
status=status
).inc()
REQUEST_LATENCY.labels(
service=service_name,
method=func.__name__
).observe(duration)
return wrapper
return decorator
# 使用例
@measure_metrics("order_service")
async def create_order(order_data):
return await order_service.create(order_data)
6. マイクロサービスのアンチパターン
6.1 分散モノリス
❌ 分散モノリス:
- サービス数が多すぎる(50+)
- サービス間の依存関係が複雑
- デプロイが全て同時必要
- 呼び出しチェーンが深すぎる(A→B→C→D→E)
✅ 適切なアーキテクチャ:
- サービス数は team 数程度(5-15)
- 依存関係は DAG(有向非循環グラフ)
- 独立デプロイ可能
- 呼び出しは最大 2-3 階層
6.2 premature_optimization
# ❌ 过早最適化:いきなりマイクロサービス
class Monolith:
pass
class MicroserviceA: # オーバーエンジニアリング
pass
class MicroserviceB: # 複雑性だけ増加
pass
# ✅ 段階的移行
# 1. モノリスで始める(迅速な開発)
# 2. 成長に応じて抽出
# - 負荷の高い部分
# - 独立してスケールが必要な部分
# - 異なる技術スタックが必要な部分
まとめ
マイクロサービスアーキテクチャの要点を整理する:
- サービス分割: DDD のバウンデッドコンテキスト、単一責任原則
- 通信: API Gateway、サービスメッシュ、非同期イベント駆動
- データ管理: データベース・パー・サービス、Saga、CQRS、イベントソーシング
- 耐障害性: サーキットブレーカー、リトライ、バルクヘッド
- 可観測性: 分散トレーシング、メトリクス、構造化ログ
- アンチパターン: 分散モノリス、 premature_optimization
マイクロサービスは「目的」ではなく「手段」だ。ビジネスの成長段階と組織構造に合わせて、適切なアーキテクチャを選択することが重要である。
免責事項 — 掲載情報は執筆時点のものです。料金・機能は変更される場合があります。最新情報は各公式サイトをご確認ください。