目次
「非同期処理は現代システムのコモンインフラ」——これは分散システムを扱うエンジニアなら誰もが認める事実だ。メール送信、バッチ処理、外部 API 連携、すべてメッセージキューの上に成り立っている。本稿では主要なメッセージキューの仕組みと実装パターンを体系的に解説する。
メッセージキューの 3 つの役割
| 役割 | 説明 | 使用技術 |
|---|---|---|
| デカップリング | 送信者と受信者の結合を緩和 | キュー、トピック |
| バッファリング | 処理能力の差を吸収 | 耐久性のあるキュー |
| 非同期化 | 応答性の向上とピーク処理 | 非同期ワークキュー |
1. メッセージキューの基本アーキテクチャ
1.1 キューの仕組み
メッセージキューは送信者(Producer)がメッセージをキューに送り、受信者(Consumer)が順番に処理する非同期通信の基盤だ。
送信者 A ─────→ [キュー] ─────→ 受信者 B
│
↓
耐久性
(ディスク保存)
1.2 同期処理 vs 非同期処理
| 特性 | 同期処理 | 非同期処理(メッセージキュー) |
|---|---|---|
| 応答時間 | 即時(処理が完了するまで待つ) | 即時(キューに送るだけ) |
| 結合度 | 高い(送信者が受信者の状態を知る) | 低い(お互いの存在を知らない) |
| 信頼性 | 低い(受信者がダウンすると失敗) | 高い(キューがバッファする) |
| ピーク処理 | 弱い | 強い(キューが吸収) |
1.3 基本的な使用例(Python)
import boto3
import json
# AWS SQS のクライアント
sqs = boto3.client('sqs')
# キュー URL の取得
queue_url = sqs.get_queue_url(QueueName='my-queue')['QueueUrl']
# メッセージ送信
response = sqs.send_message(
QueueUrl=queue_url,
MessageBody=json.dumps({
"type": "email_send",
"to": "user@example.com",
"subject": "Welcome!",
"body": "Thank you for signing up."
}),
MessageAttributes={
"MessageType": {
"DataType": "String",
"StringValue": "Notification"
}
}
)
print(f"メッセージ ID: {response['MessageId']}")
2. メッセージ配信パターン
2.1 ポイント・ツー・ポイント(キュー)
1 対 1 の配信。1 つのメッセージがちょうど 1 つのコンシューマーで処理される。
┌─→ Consumer A
Producer ──┼─→ Consumer B (1 つのみが受信)
└─→ Consumer C
用途:
- タスクの分散処理(ワークキュー)
- ロードバランシング
- ジョブ処理
# 受信側(複数ワーカーで競合処理)
def worker():
while True:
response = sqs.receive_message(
QueueUrl=queue_url,
MaxNumberOfMessages=1,
WaitTimeSeconds=10 # ロングポーリング
)
if 'Messages' not in response:
continue
message = response['Messages'][0]
body = json.loads(message['Body'])
try:
process_task(body)
# 処理成功:キューから削除
sqs.delete_message(
QueueUrl=queue_url,
ReceiptHandle=message['ReceiptHandle']
)
except Exception as e:
# 処理失敗:可視化タイマーが切れてキューに戻る
print(f"処理失敗:{e}")
2.2 パブ・サブ(トピック)
1 対多の配信。1 つのメッセージがすべての購読者に配信される。
┌─→ Subscriber A → キュー A
Producer ─→ トピック ─┼─→ Subscriber B → キュー B
└─→ Subscriber C → キュー C
用途:
- イベント通知(ユーザー作成、注文確定等)
- 複数システムへのブロードキャスト
- イベントソーシング
# SNS(パブ・サブ)での送信
sns = boto3.client('sns')
topic_arn = "arn:aws:sns:us-east-1:123456789012:my-topic"
# トピックに送信(すべての購読者に配信される)
response = sns.publish(
TopicArn=topic_arn,
Message=json.dumps({
"event": "user.created",
"user_id": "usr_123",
"email": "user@example.com"
}),
Subject="新規ユーザー登録"
)
print(f"メッセージ ID: {response['MessageId']}")
2.3 パターンの選択基準
| 要件 | 推奨パターン |
|---|---|
| 1 つのワーカーで処理したい | ポイント・ツー・ポイント |
| 複数ワーカーで負荷分散 | ポイント・ツー・ポイント(競合グループ) |
| 複数システムに通知したい | パブ・サブ |
| イベントの履歴を残したい | イベントストリーム(Kafka) |
3. 主要メッセージキューの比較
3.1 主要サービスの特性
| サービス | 型 | 耐久性 | スループット | 遅延 |
|---|---|---|---|---|
| RabbitMQ | キュー/トピック | ディスク可 | 中(〜1 万 msg/s) | 低(ms 級) |
| Apache Kafka | ストリーム | ディスク | 高(〜100 万 msg/s) | 中 |
| AWS SQS | キュー | 高(S3 バックアップ) | 高 | 低(〜10ms) |
| AWS SNS+SQS | パブ・サブ | 高 | 高 | 低 |
| Google Pub/Sub | パブ・サブ | 高 | 高 | 低 |
3.2 RabbitMQ の特徴
Producer → Exchange → Queue → Consumer
Exchange の種類:
| 種類 | 説明 | 使用例 |
|---|---|---|
| Direct | ルーティングキー完全一致 | 特定キューへの配信 |
| Fanout | 全キューにブロードキャスト | 通知の一元配信 |
| Topic | パターンマッチ(*、#) | ログ配信(app.*.error) |
| Headers | ヘッダー属性でマッチ | 複雑なルーティング |
import pika
# RabbitMQ 接続
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# Exchange 宣言
channel.exchange_declare(exchange='logs', exchange_type='topic')
# パターンマッチでルーティング
# app.email.error → error キューと email キュー両方に
channel.queue_bind(exchange='logs', queue='error', routing_key='*.error')
channel.queue_bind(exchange='logs', queue='email', routing_key='email.*')
# 送信
channel.basic_publish(
exchange='logs',
routing_key='app.email.info',
body=json.dumps({"msg": "Email sent"})
)
3.3 Kafka の特徴
Kafka はイベントストリームを扱う分散ログシステムだ。
Producer → Topic(パーティション分割) → Consumer Group
| 特性 | 説明 |
|---|---|
| パーティション | トピックを複数パーティションに分割、並列処理可能 |
| オフセット | コンシューマーの処理位置を管理 |
| リテンション | 設定期間(例:7 日間)メッセージを保持 |
| Consumer Group | 同一グループ内では 1 メッセージを 1 コンシューマーのみ処理 |
from kafka import KafkaProducer, KafkaConsumer
import json
# Producer
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
# メッセージ送信
producer.send('user-events', {
"event": "user.created",
"user_id": "usr_123"
})
producer.flush()
# Consumer
consumer = KafkaConsumer(
'user-events',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
group_id='analytics-service',
auto_offset_reset='earliest'
)
for message in consumer:
print(f"受信:{message.value}")
3.4 选型基準
| 要件 | 推奨サービス |
|---|---|
| 単純なタスクキュー | SQS、RabbitMQ |
| リアルタイム通知 | RabbitMQ、SNS |
| イベントソーシング | Kafka |
| 大量ログ処理 | Kafka |
| クラウドネイティブ | 各クラウドのマネージドサービス |
4. 信頼性のある配信——リトライと DLQ
4.1 リトライ処理の基本
メッセージ処理が一時的なエラーで失敗する場合、指数バックオフによるリトライが効果的だ。
import time
import random
def process_with_retry(message, max_retries=3):
"""指数バックオフ付きリトライ"""
for attempt in range(max_retries):
try:
return process_message(message)
except TemporaryError as e:
if attempt == max_retries - 1:
raise # 最大リトライ回数超え
# 指数バックオフ(jitter 付き)
base_delay = 2 ** attempt # 2, 4, 8 秒
jitter = random.uniform(0, base_delay * 0.1)
delay = base_delay + jitter
print(f"リトライ {attempt + 1}/{max_retries}、{delay:.1f}秒後に再試行")
time.sleep(delay)
def process_message(message):
"""実際の処理(一時的なエラーが発生する可能性)"""
if random.random() < 0.3: # 30% の確率で失敗
raise TemporaryError("一時的なエラー")
print(f"処理成功:{message}")
4.2 デッドレターキュー(DLQ)
最大リトライ回数を超えたメッセージは**デッドレターキュー(Dead Letter Queue)**に送られ、後で分析・再処理できる。
# AWS SQS の DLQ 設定
import boto3
sqs = boto3.client('sqs')
# DLQ 用キューを作成
dlq_response = sqs.create_queue(QueueName='my-queue-dlq')
dlq_url = dlq_response['QueueUrl']
# DLQ の ARN を取得
dlq_arn = sqs.get_queue_attributes(
QueueUrl=dlq_url,
AttributeNames=['QueueArn']
)['Attributes']['QueueArn']
# メインキューに DLQ 設定を紐付ける
redrive_policy = {
"deadLetterTargetArn": dlq_arn,
"maxReceiveCount": "3" # 3 回失敗すると DLQ 行き
}
main_response = sqs.create_queue(
QueueName='my-queue',
Attributes={
'RedrivePolicy': json.dumps(redrive_policy),
'VisibilityTimeout': '300' # 5 分
}
)
4.3 DLQ の活用パターン
| 活用方法 | 説明 |
|---|---|
| エラー分析 | 失敗したメッセージを集計、根本原因を特定 |
| 手動再処理 | 修正後、DLQ からメッセージを再度キューに戻す |
| アラート発報 | DLQ にメッセージが溜まったら Slack 等に通知 |
# DLQ のメッセージを確認
dlq_messages = sqs.receive_message(
QueueUrl=dlq_url,
MaxNumberOfMessages=10,
WaitTimeSeconds=5
)
if 'Messages' in dlq_messages:
print(f"DLQ に {len(dlq_messages['Messages'])} 件の失敗メッセージ")
for msg in dlq_messages['Messages']:
body = json.loads(msg['Body'])
print(f"失敗内容:{body}")
# 必要に応じて再処理
# sqs.send_message(QueueUrl=main_queue_url, MessageBody=msg['Body'])
5. 順序保証の仕組み
5.1 順序保証が必要なケース
| ユースケース | 順序の重要性 |
|---|---|
| 残高更新 | 入金→出金の順序が逆になると残高が合わない |
| ステータス遷移 | 注文確定→発送通知の順序 |
| 設定変更 | 設定→適用の順序 |
5.2 順序保証の方法
方法 1: メッセージグループ(FIFO キュー)
AWS SQS FIFO キューではメッセージグループ IDを指定すると、同一グループ内で順序が保証される。
# FIFO キューへの送信(末尾に .fifo が必要)
fifo_queue_url = "https://sqs.us-east-1.amazonaws.com/123456789012/my-queue.fifo"
# ユーザーごとの順序保証
sqs.send_message(
QueueUrl=fifo_queue_url,
MessageBody=json.dumps({"action": "deposit", "amount": 10000}),
MessageGroupId="user_123", # 同一ユーザーは順番に処理
MessageDeduplicationId="msg_001"
)
sqs.send_message(
QueueUrl=fifo_queue_url,
MessageBody=json.dumps({"action": "withdraw", "amount": 5000}),
MessageGroupId="user_123", # 入金後に処理
MessageDeduplicationId="msg_002"
)
方法 2: Kafka のパーティション
Kafka では同一キーのメッセージが同一パーティションに送られることを利用して順序を保証する。
# ユーザー ID をキーとして使用(同一ユーザーは常に同一パーティションへ)
producer.send(
'order-events',
key=b"user_123", # 同一キーは同一パーティション
value={
"order_id": "ord_001",
"status": "created"
}
)
5.3 順序保証のコスト
| 特性 | 順序保証あり | 順序保証なし |
|---|---|---|
| スループット | 低い(直列処理) | 高い(並列処理) |
| 遅延 | やや高い | 低い |
| 複雑性 | 高い | 低い |
設計指針:
- 順序が必要なメッセージと不要なメッセージを分離
- 可能なら{idempotent な処理}にして順序依存をなくす
6. イベント駆動アーキテクチャのパターン
6.1 イベントソーシング
状態の変化をイベントのリストとして保存するパターン。
# イベントストア
events = [
{"type": "OrderCreated", "order_id": "ord_001", "timestamp": "2025-01-01T10:00:00Z"},
{"type": "OrderPaid", "order_id": "ord_001", "timestamp": "2025-01-01T10:05:00Z"},
{"type": "OrderShipped", "order_id": "ord_001", "timestamp": "2025-01-01T15:00:00Z"},
]
# 現在の状態をイベントから再構築
def reconstruct_order(events):
order = None
for event in events:
if event["type"] == "OrderCreated":
order = {"id": event["order_id"], "status": "created"}
elif event["type"] == "OrderPaid":
order["status"] = "paid"
elif event["type"] == "OrderShipped":
order["status"] = "shipped"
return order
order = reconstruct_order(events)
# {"id": "ord_001", "status": "shipped"}
6.2 CQRS(Command Query Responsibility Segregation)
書き込みコマンドと読み込みクエリを分離し、それぞれ異なるモデルを持つパターン。
┌─→ 読み込み DB(正規化、キャッシュ)
コマンド → 書き込み DB ─┤
└─→ イベント → 投影 → 読み込み DB 更新
# コマンド側(書き込み)
def create_order(command):
# 書き込み DB に保存
order = db.orders.insert({
"id": generate_id(),
"user_id": command["user_id"],
"status": "created"
})
# イベント発行
event_bus.publish({
"type": "OrderCreated",
"order_id": order["id"]
})
# クエリ側(読み込み)
def get_order_summary(user_id):
# 読み込み専用ビュー(キャッシュ済み)
return redis.get(f"order_summary:{user_id}")
6.3 Saga パターン
複数サービスにまたがるトランザクションをイベントの連鎖で管理するパターン。
[注文サービス] 注文作成 → [在庫サービス] 在庫確保 → [決済サービス] 決済実行
↓ ↓
失敗時:在庫解放 ←────────────────────────────── 失敗時:決済キャンセル
def place_order_saga(order_data):
"""Saga による分散トランザクション"""
saga_id = generate_saga_id()
compensations = [] # 補償トランザクション
try:
# ステップ 1: 在庫確保
inventory_result = inventory_service.reserve(order_data)
compensations.append(lambda: inventory_service.release(inventory_result))
# ステップ 2: 決済実行
payment_result = payment_service.charge(order_data)
compensations.append(lambda: payment_service.refund(payment_result))
# ステップ 3: 注文確定
order_service.confirm(order_data)
return {"status": "success"}
except Exception as e:
# 失敗:補償トランザクションを逆順で実行
print(f"Saga 失敗、補償を実行:{e}")
for compensation in reversed(compensations):
try:
compensation()
except Exception as comp_error:
print(f"補償失敗:{comp_error}")
# 手動介入が必要な場合も
return {"status": "failed", "reason": str(e)}
7. 実装上の落とし穴と対策
7.1 メッセージの重複
ネットワーク障害やタイムアウトでメッセージが重複配信される可能性がある。
対策: 冪等性(Idempotency)の確保
# 重複チェック付き処理
def process_message_idempotent(message_id, data):
# Redis で既処理チェック
if redis.exists(f"processed:{message_id}"):
print(f"重複メッセージ:{message_id}")
return
# 処理実行
process_data(data)
# 処理済みマーク(TTL 付き)
redis.setex(f"processed:{message_id}", 86400, "1") # 24 時間
7.2 ポイズンメッセージ
処理するたびにエラーになるメッセージがキューをブロックする問題。
対策: DLQ とサーキットブレーカー
def process_with_circuit_breaker(message):
"""サーキットブレーカー付き処理"""
error_count = redis.get(f"error_count:{message['type']}") or 0
if int(error_count) > 10:
# 10 回連続失敗:サーキットオープン、DLQ 行き
send_to_dlq(message, reason="circuit_open")
return
try:
process_message(message)
redis.delete(f"error_count:{message['type']}") # カウントリセット
except Exception as e:
redis.incr(f"error_count:{message['type']}")
redis.expire(f"error_count:{message['type']}", 300) # 5 分でリセット
raise
7.3 背圧(Backpressure)
コンシューマーの処理能力を超えてメッセージが溜まる問題。
対策: キュー深度の監視とスケーリング
# キュー深度の監視
def check_queue_depth(queue_url):
attrs = sqs.get_queue_attributes(
QueueUrl=queue_url,
AttributeNames=['ApproximateNumberOfMessages']
)
depth = int(attrs['Attributes']['ApproximateNumberOfMessages'])
if depth > 10000:
# 閾値超え:アラート + 自動スケーリング
alert_slack(f"キュー深度が閾値超え:{depth}")
trigger_auto_scaling()
return depth
まとめ
メッセージキューは現代の分散システムに不可欠なインフラだ。主要なポイントを整理する:
- 配信パターン: ポイント・ツー・ポイント(1 対 1)とパブ・サブ(1 対多)を使い分ける
- サービス選定: RabbitMQ(低遅延)、Kafka(大量ストリーム)、SQS(マネージド)
- 信頼性: リトライ、DLQ、冪等性で「失わない・止まらない」システムを
- 順序保証: FIFO キューやパーティションキーで必要な場面のみ保証
- イベント駆動: イベントソーシング、CQRS、Saga で複雑なビジネスロジックを管理
メッセージキューは「銀の弾丸」ではない。重複、順序、背圧といった問題に適切に対処しつつ、システムの規模と要件に合わせて適切なパターンを選択することが重要だ。
免責事項 — 掲載情報は執筆時点のものです。料金・機能は変更される場合があります。最新情報は各公式サイトをご確認ください。