オープンソースメッセージブローカー比較:Apache Kafka vs RabbitMQ vs NATS でイベント駆動アーキテクチャを構築する
オープンソースラボ編集部 ・ 2026年6月13日
オープンソースメッセージブローカー比較:Apache Kafka vs RabbitMQ vs NATS でイベント駆動アーキテクチャを構築する
マイクロサービス間の非同期通信・イベントストリーミング・ジョブキューを構築するメッセージブローカーをオープンソースでセルフホストしましょう。Apache Kafka・RabbitMQ・NATSはAmazon SQS/SNS・Google Pub/Subの代替として使えます。
メッセージブローカーが必要な場面
- マイクロサービス間の非同期通信: APIコールではなくイベントで連携してサービスを疎結合に
- イベントストリーミング: ユーザー行動ログ・IoTセンサーデータのリアルタイム処理
- ジョブキュー: バックグラウンドタスク(メール送信・画像変換・通知)の非同期実行
- イベントソーシング: システムの状態変化を全てイベントとして記録
- CDCデータパイプライン: データベースの変更をリアルタイムで下流システムに反映
主要ツールの概要
Apache Kafka
LinkedInが開発した分散イベントストリーミングプラットフォームです。数百万メッセージ/秒のスループット・ログの永続保持・コンシューマーグループによる並列処理が特徴です。
# Kafka + ZooKeeperをdocker-composeで起動
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.6.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:7.6.0
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
kafka-ui:
image: provectuslabs/kafka-ui:latest
ports:
- "8080:8080"
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
# KafkaプロデューサーとコンシューマーをPythonで実装
from kafka import KafkaProducer, KafkaConsumer
import json
# プロデューサー(メッセージ送信側)
producer = KafkaProducer(
bootstrap_servers=["localhost:9092"],
value_serializer=lambda v: json.dumps(v, ensure_ascii=False).encode("utf-8"),
key_serializer=lambda k: k.encode("utf-8"),
)
# ユーザー行動イベントを送信
event = {
"user_id": "user-12345",
"action": "purchase",
"product_id": "prod-99",
"amount": 4980,
"timestamp": "2026-06-14T10:30:00Z",
}
producer.send("user-events", key="user-12345", value=event)
producer.flush()
# コンシューマー(メッセージ受信側)
consumer = KafkaConsumer(
"user-events",
bootstrap_servers=["localhost:9092"],
group_id="analytics-group",
value_deserializer=lambda v: json.loads(v.decode("utf-8")),
auto_offset_reset="earliest",
enable_auto_commit=True,
)
for message in consumer:
event = message.value
print(f"User {event['user_id']} purchased {event['product_id']}")
# 分析・集計処理
update_analytics_db(event)
# Kafka Streams APIでリアルタイム集計(confluent-kafka使用)
from confluent_kafka import Consumer, Producer
from collections import defaultdict
import json, time
class SalesAggregator:
def __init__(self):
self.hourly_sales = defaultdict(float)
def process(self, event: dict):
hour = event["timestamp"][:13] # "2026-06-14T10"
self.hourly_sales[hour] += event["amount"]
consumer = Consumer({
"bootstrap.servers": "localhost:9092",
"group.id": "sales-aggregator",
"auto.offset.reset": "earliest",
})
consumer.subscribe(["user-events"])
aggregator = SalesAggregator()
while True:
msg = consumer.poll(1.0)
if msg and not msg.error():
event = json.loads(msg.value())
if event["action"] == "purchase":
aggregator.process(event)
RabbitMQ
Erlangで実装された成熟したメッセージブローカーです。AMQPプロトコル・Exchange-Queueパターン・デッドレターキュー・プライオリティキューをサポートします。ジョブキュー・タスクキューに最適です。
# RabbitMQをDockerで起動
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=password rabbitmq:3.13-management
# UIは http://localhost:15672 でアクセス
# RabbitMQでジョブキューを実装(pika使用)
import pika
import json
import time
# 接続設定
connection = pika.BlockingConnection(
pika.ConnectionParameters(host="localhost", port=5672,
credentials=pika.PlainCredentials("admin", "password"))
)
channel = connection.channel()
# キューの宣言(存在しなければ作成)
channel.queue_declare(
queue="email_jobs",
durable=True, # サーバー再起動後も残る
arguments={
"x-message-ttl": 86400000, # 24時間でメッセージ期限切れ
"x-dead-letter-exchange": "dead_letter", # 失敗時の送信先
},
)
# プロデューサー: メールジョブを追加
def enqueue_email(to: str, subject: str, body: str):
job = {"to": to, "subject": subject, "body": body}
channel.basic_publish(
exchange="",
routing_key="email_jobs",
body=json.dumps(job),
properties=pika.BasicProperties(
delivery_mode=pika.DeliveryMode.Persistent, # 永続化
),
)
print(f"Enqueued email to {to}")
enqueue_email("user@example.com", "ご注文確認", "ご注文ありがとうございます。")
# コンシューマー(ワーカー)
def process_email(ch, method, properties, body):
job = json.loads(body)
print(f"Sending email to {job['to']}: {job['subject']}")
# 実際のメール送信処理
time.sleep(0.5) # 処理時間シミュレーション
ch.basic_ack(delivery_tag=method.delivery_tag) # 処理完了を通知
channel.basic_qos(prefetch_count=1) # 1ワーカーに1件ずつ
channel.basic_consume(queue="email_jobs", on_message_callback=process_email)
channel.start_consuming()
NATS
Goで実装された軽量・超高速メッセージングシステムです。パブ/サブ・リクエスト/レスポンス・キューグループをシンプルなAPIで提供します。JetStream拡張でメッセージの永続化とKafkaライクなストリーミングが使えます。
# NATSをDockerで起動(JetStream有効)
docker run -d -p 4222:4222 -p 8222:8222 nats:2.10 --jetstream --http_port 8222
import asyncio
import nats
import json
async def main():
nc = await nats.connect("nats://localhost:4222")
js = nc.jetstream()
# JetStreamストリームの作成
await js.add_stream(name="orders", subjects=["orders.>"])
# パブリッシャー: 注文イベントを送信
order = {"order_id": "ord-001", "user_id": "user-123", "total": 5980}
ack = await js.publish("orders.created", json.dumps(order).encode())
print(f"Published: seq={ack.seq}")
# サブスクライバー: 注文イベントを処理
async def process_order(msg):
order = json.loads(msg.data)
print(f"Processing order {order['order_id']}")
await msg.ack()
sub = await js.subscribe("orders.created", cb=process_order, durable="order-processor")
# 30秒間メッセージを受信
await asyncio.sleep(30)
await nc.close()
asyncio.run(main())
機能比較表
| 比較項目 | Apache Kafka | RabbitMQ | NATS |
|---|---|---|---|
| スループット | 最高(数百万/秒) | 高(数万/秒) | 最高 |
| レイテンシ | 低(数ms) | 低(数ms) | 超低(<1ms) |
| メッセージ永続化 | ✅ デフォルト | ✅ オプション | ✅ JetStream |
| メッセージ保持期間 | ✅ 設定可 | ⚠️ 消費後削除 | ✅ JetStream |
| コンシューマーグループ | ✅ | ✅ キューグループ | ✅ |
| ブロードキャスト | ✅ | ✅ | ✅ |
| リクエスト/レスポンス | ⚠️ | ⚠️ | ✅ ネイティブ |
| スキーマレジストリ | ✅ | ⚠️ | ❌ |
| 管理UI | ✅ Control Center | ✅ Management UI | ✅ |
| Kubernetes対応 | ✅ Strimzi | ✅ | ✅ |
| 学習コスト | 高 | 中 | 低 |
| ライセンス | Apache 2.0 | MPL 2.0 | Apache 2.0 |
| GitHub Stars | 29k+ | 12k+ | 16k+ |
DevOps・データパイプラインツールはDevOpsカテゴリ(/categories/devops)で一覧でき、ローコード・自動化ツールはローコードカテゴリ(/categories/low-code)でも探せます。
FAQ
Q. KafkaはAmazon SQS/SNSの代替になりますか?
A. ユースケースによります。SQS(シンプルキュー)の代替としてはRabbitMQやNATS、SNS(パブ/サブ通知)の代替はNATSが近いです。Kafkaが最も適するのは「高スループットのイベントストリーミング・ログ集約・CDCパイプライン」です。SQSのような「シンプルなバックグラウンドジョブキュー」ならRabbitMQの方がシンプルです。AWSのKinesisはKafkaと同様の使い方ができます。
Q. RabbitMQとKafkaはどちらを選べばいいですか?
A. 目的によって明確に分かれます。RabbitMQが適するケース: 複雑なルーティング(タイプ別・条件別にメッセージを振り分け)・ジョブキューとしての信頼性(処理確認ACK・デッドレターキュー)・消費後にメッセージ削除でOK。Kafkaが適するケース: 高スループット(100万件/秒以上)・メッセージの長期保持と再処理・複数コンシューマーが同じデータを独立して読む・イベントソーシング。
Q. Kafkaのメッセージはどのくらいの期間保持できますか?
A. デフォルトは7日間ですが、retention.msの設定で変更できます。ディスク容量が許す限り無期限に保持することも可能です(コンプライアンス・監査ログ目的では数年保持する企業もあります)。これがRabbitMQとの大きな違いで、消費済みメッセージを遡って再処理できるため「何かバグがあった場合にメッセージを再処理する」「新しいサービスが過去データを学習する」ユースケースに対応できます。
Q. NATSのJetStreamはどういう用途に使いますか?
A. JetStream以前のNATSはAt-Most-Once(最大1回)の配送保証でした。JetStreamはAt-Least-Once(最低1回)保証・ストリーム永続化・コンシューマーの任意ポイントからの再読み込みを追加します。IoT・エッジコンピューティング(ネットワークが不安定な環境)・マイクロサービス間の軽量イベント通信に特に向いています。
まとめ
| ユースケース | 推奨ツール |
|---|---|
| 高スループットイベントストリーミング | Apache Kafka |
| ジョブキュー・バックグラウンドタスク | RabbitMQ |
| 軽量・超低レイテンシメッセージング | NATS |
| Amazon SQS/SNS代替 | RabbitMQ / NATS |