AI

オープンソースメッセージブローカー比較:Apache Kafka vs RabbitMQ vs NATS でイベント駆動アーキテクチャを構築する

オープンソースラボ編集部2026年6月13日

オープンソースメッセージブローカー比較:Apache Kafka vs RabbitMQ vs NATS でイベント駆動アーキテクチャを構築する

マイクロサービス間の非同期通信・イベントストリーミング・ジョブキューを構築するメッセージブローカーをオープンソースでセルフホストしましょう。Apache Kafka・RabbitMQ・NATSはAmazon SQS/SNS・Google Pub/Subの代替として使えます。

メッセージブローカーが必要な場面

  • マイクロサービス間の非同期通信: APIコールではなくイベントで連携してサービスを疎結合に
  • イベントストリーミング: ユーザー行動ログ・IoTセンサーデータのリアルタイム処理
  • ジョブキュー: バックグラウンドタスク(メール送信・画像変換・通知)の非同期実行
  • イベントソーシング: システムの状態変化を全てイベントとして記録
  • CDCデータパイプライン: データベースの変更をリアルタイムで下流システムに反映

主要ツールの概要

Apache Kafka

LinkedInが開発した分散イベントストリーミングプラットフォームです。数百万メッセージ/秒のスループット・ログの永続保持・コンシューマーグループによる並列処理が特徴です。

# Kafka + ZooKeeperをdocker-composeで起動
version: '3.8'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.6.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:7.6.0
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"

  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    ports:
      - "8080:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
# KafkaプロデューサーとコンシューマーをPythonで実装
from kafka import KafkaProducer, KafkaConsumer
import json

# プロデューサー(メッセージ送信側)
producer = KafkaProducer(
    bootstrap_servers=["localhost:9092"],
    value_serializer=lambda v: json.dumps(v, ensure_ascii=False).encode("utf-8"),
    key_serializer=lambda k: k.encode("utf-8"),
)

# ユーザー行動イベントを送信
event = {
    "user_id": "user-12345",
    "action": "purchase",
    "product_id": "prod-99",
    "amount": 4980,
    "timestamp": "2026-06-14T10:30:00Z",
}
producer.send("user-events", key="user-12345", value=event)
producer.flush()

# コンシューマー(メッセージ受信側)
consumer = KafkaConsumer(
    "user-events",
    bootstrap_servers=["localhost:9092"],
    group_id="analytics-group",
    value_deserializer=lambda v: json.loads(v.decode("utf-8")),
    auto_offset_reset="earliest",
    enable_auto_commit=True,
)

for message in consumer:
    event = message.value
    print(f"User {event['user_id']} purchased {event['product_id']}")
    # 分析・集計処理
    update_analytics_db(event)
# Kafka Streams APIでリアルタイム集計(confluent-kafka使用)
from confluent_kafka import Consumer, Producer
from collections import defaultdict
import json, time

class SalesAggregator:
    def __init__(self):
        self.hourly_sales = defaultdict(float)

    def process(self, event: dict):
        hour = event["timestamp"][:13]  # "2026-06-14T10"
        self.hourly_sales[hour] += event["amount"]

consumer = Consumer({
    "bootstrap.servers": "localhost:9092",
    "group.id": "sales-aggregator",
    "auto.offset.reset": "earliest",
})
consumer.subscribe(["user-events"])

aggregator = SalesAggregator()
while True:
    msg = consumer.poll(1.0)
    if msg and not msg.error():
        event = json.loads(msg.value())
        if event["action"] == "purchase":
            aggregator.process(event)

RabbitMQ

Erlangで実装された成熟したメッセージブローカーです。AMQPプロトコル・Exchange-Queueパターン・デッドレターキュー・プライオリティキューをサポートします。ジョブキュー・タスクキューに最適です。

# RabbitMQをDockerで起動
docker run -d   --name rabbitmq   -p 5672:5672   -p 15672:15672   -e RABBITMQ_DEFAULT_USER=admin   -e RABBITMQ_DEFAULT_PASS=password   rabbitmq:3.13-management
# UIは http://localhost:15672 でアクセス
# RabbitMQでジョブキューを実装(pika使用)
import pika
import json
import time

# 接続設定
connection = pika.BlockingConnection(
    pika.ConnectionParameters(host="localhost", port=5672,
        credentials=pika.PlainCredentials("admin", "password"))
)
channel = connection.channel()

# キューの宣言(存在しなければ作成)
channel.queue_declare(
    queue="email_jobs",
    durable=True,  # サーバー再起動後も残る
    arguments={
        "x-message-ttl": 86400000,  # 24時間でメッセージ期限切れ
        "x-dead-letter-exchange": "dead_letter",  # 失敗時の送信先
    },
)

# プロデューサー: メールジョブを追加
def enqueue_email(to: str, subject: str, body: str):
    job = {"to": to, "subject": subject, "body": body}
    channel.basic_publish(
        exchange="",
        routing_key="email_jobs",
        body=json.dumps(job),
        properties=pika.BasicProperties(
            delivery_mode=pika.DeliveryMode.Persistent,  # 永続化
        ),
    )
    print(f"Enqueued email to {to}")

enqueue_email("user@example.com", "ご注文確認", "ご注文ありがとうございます。")

# コンシューマー(ワーカー)
def process_email(ch, method, properties, body):
    job = json.loads(body)
    print(f"Sending email to {job['to']}: {job['subject']}")
    # 実際のメール送信処理
    time.sleep(0.5)  # 処理時間シミュレーション
    ch.basic_ack(delivery_tag=method.delivery_tag)  # 処理完了を通知

channel.basic_qos(prefetch_count=1)  # 1ワーカーに1件ずつ
channel.basic_consume(queue="email_jobs", on_message_callback=process_email)
channel.start_consuming()

NATS

Goで実装された軽量・超高速メッセージングシステムです。パブ/サブ・リクエスト/レスポンス・キューグループをシンプルなAPIで提供します。JetStream拡張でメッセージの永続化とKafkaライクなストリーミングが使えます。

# NATSをDockerで起動(JetStream有効)
docker run -d   -p 4222:4222   -p 8222:8222   nats:2.10   --jetstream   --http_port 8222
import asyncio
import nats
import json

async def main():
    nc = await nats.connect("nats://localhost:4222")
    js = nc.jetstream()

    # JetStreamストリームの作成
    await js.add_stream(name="orders", subjects=["orders.>"])

    # パブリッシャー: 注文イベントを送信
    order = {"order_id": "ord-001", "user_id": "user-123", "total": 5980}
    ack = await js.publish("orders.created", json.dumps(order).encode())
    print(f"Published: seq={ack.seq}")

    # サブスクライバー: 注文イベントを処理
    async def process_order(msg):
        order = json.loads(msg.data)
        print(f"Processing order {order['order_id']}")
        await msg.ack()

    sub = await js.subscribe("orders.created", cb=process_order, durable="order-processor")

    # 30秒間メッセージを受信
    await asyncio.sleep(30)
    await nc.close()

asyncio.run(main())

機能比較表

比較項目Apache KafkaRabbitMQNATS
スループット最高(数百万/秒)高(数万/秒)最高
レイテンシ低(数ms)低(数ms)超低(<1ms)
メッセージ永続化✅ デフォルト✅ オプション✅ JetStream
メッセージ保持期間✅ 設定可⚠️ 消費後削除✅ JetStream
コンシューマーグループ✅ キューグループ
ブロードキャスト
リクエスト/レスポンス⚠️⚠️✅ ネイティブ
スキーマレジストリ⚠️
管理UI✅ Control Center✅ Management UI
Kubernetes対応✅ Strimzi
学習コスト
ライセンスApache 2.0MPL 2.0Apache 2.0
GitHub Stars29k+12k+16k+

DevOps・データパイプラインツールはDevOpsカテゴリ(/categories/devops)で一覧でき、ローコード・自動化ツールはローコードカテゴリ(/categories/low-code)でも探せます。

FAQ

Q. KafkaはAmazon SQS/SNSの代替になりますか?

A. ユースケースによります。SQS(シンプルキュー)の代替としてはRabbitMQやNATS、SNS(パブ/サブ通知)の代替はNATSが近いです。Kafkaが最も適するのは「高スループットのイベントストリーミング・ログ集約・CDCパイプライン」です。SQSのような「シンプルなバックグラウンドジョブキュー」ならRabbitMQの方がシンプルです。AWSのKinesisはKafkaと同様の使い方ができます。

Q. RabbitMQとKafkaはどちらを選べばいいですか?

A. 目的によって明確に分かれます。RabbitMQが適するケース: 複雑なルーティング(タイプ別・条件別にメッセージを振り分け)・ジョブキューとしての信頼性(処理確認ACK・デッドレターキュー)・消費後にメッセージ削除でOK。Kafkaが適するケース: 高スループット(100万件/秒以上)・メッセージの長期保持と再処理・複数コンシューマーが同じデータを独立して読む・イベントソーシング。

Q. Kafkaのメッセージはどのくらいの期間保持できますか?

A. デフォルトは7日間ですが、retention.msの設定で変更できます。ディスク容量が許す限り無期限に保持することも可能です(コンプライアンス・監査ログ目的では数年保持する企業もあります)。これがRabbitMQとの大きな違いで、消費済みメッセージを遡って再処理できるため「何かバグがあった場合にメッセージを再処理する」「新しいサービスが過去データを学習する」ユースケースに対応できます。

Q. NATSのJetStreamはどういう用途に使いますか?

A. JetStream以前のNATSはAt-Most-Once(最大1回)の配送保証でした。JetStreamはAt-Least-Once(最低1回)保証・ストリーム永続化・コンシューマーの任意ポイントからの再読み込みを追加します。IoT・エッジコンピューティング(ネットワークが不安定な環境)・マイクロサービス間の軽量イベント通信に特に向いています。

まとめ

ユースケース推奨ツール
高スループットイベントストリーミングApache Kafka
ジョブキュー・バックグラウンドタスクRabbitMQ
軽量・超低レイテンシメッセージングNATS
Amazon SQS/SNS代替RabbitMQ / NATS

関連外部リソース

他の記事も読む

Let's Build Together

OSS導入、自社だけで悩まない。

ツール選定から構築・運用・AI活用まで、オープンソースラボ運営元のClasslessが伴走します。初回のご相談は無料です。