AI

メッセージキュー比較:RabbitMQ vs Redis Streams vs NATS で非同期処理を実装する

オープンソースラボ編集部2026年6月14日

メッセージキュー比較:RabbitMQ vs Redis Streams vs NATS で非同期処理を実装する

モノリスからマイクロサービスへの移行や、Webリクエストから重い処理を切り離す非同期アーキテクチャでは**メッセージキュー(MQ)**が中心的な役割を果たします。RabbitMQ(AMQP・エンタープライズ実績)・Redis Streams(Redis統合・軽量)・NATS(Go製・超高速・クラウドネイティブ)の3つがOSS MQの主要選択肢です。

メッセージキューを使う理由

  • 非同期化: メール送信・PDF生成・AI推論などの重い処理をHTTPレスポンスから切り離す
  • 耐障害性: Consumerがダウンしてもメッセージをキューに保持して復旧後に再処理
  • スケーラビリティ: Consumerを増やすだけで処理能力をスケールアウト
  • デカップリング: Producerと Consumerが直接通信しないため互いに独立してデプロイ・変更できる

主要MQの概要

RabbitMQ

2007年公開、Erlang製のOSSです。GitHubスター12k+。AMQP 0-9-1プロトコル・Exchange/Queue/Bindingの柔軟なルーティングが特徴のエンタープライズ向けMQです。Direct・Fanout・Topic・Headersの4つのExchangeタイプで複雑なメッセージルーティングを実現し、管理UIで全キューの状態をリアルタイム監視できます。

# docker-compose.yml - RabbitMQ
version: "3.8"
services:
  rabbitmq:
    image: rabbitmq:3.13-management-alpine
    restart: unless-stopped
    ports:
      - "5672:5672"    # AMQP
      - "15672:15672"  # 管理UI
    environment:
      RABBITMQ_DEFAULT_USER: admin
      RABBITMQ_DEFAULT_PASS: ${RABBITMQ_PASSWORD}
      RABBITMQ_DEFAULT_VHOST: /
    volumes:
      - rabbitmq_data:/var/lib/rabbitmq
    healthcheck:
      test: rabbitmq-diagnostics -q ping
      interval: 10s
      timeout: 5s
      retries: 5

volumes:
  rabbitmq_data:
# Python (pika): RabbitMQ Producer / Consumer
import pika
import json
import time
from datetime import datetime

RABBITMQ_URL = 'amqp://admin:password@localhost:5672/'

# Producer: メッセージをキューに送信
def send_email_task(to_email: str, subject: str, body: str):
    connection = pika.BlockingConnection(pika.URLParameters(RABBITMQ_URL))
    channel = connection.channel()

    # キューを宣言(存在しない場合は作成・durable=Trueで再起動後も保持)
    channel.queue_declare(queue='email_tasks', durable=True)

    message = json.dumps({
        'to': to_email,
        'subject': subject,
        'body': body,
        'created_at': datetime.utcnow().isoformat(),
    })

    channel.basic_publish(
        exchange='',
        routing_key='email_tasks',
        body=message,
        properties=pika.BasicProperties(
            delivery_mode=pika.DeliveryMode.Persistent,  # メッセージをディスクに永続化
            content_type='application/json',
        ),
    )
    print(f'メール送信タスクをキューに投入: {to_email}')
    connection.close()

# Consumer: ワーカープロセスでメッセージを処理
def start_email_worker():
    connection = pika.BlockingConnection(pika.URLParameters(RABBITMQ_URL))
    channel = connection.channel()
    channel.queue_declare(queue='email_tasks', durable=True)

    # 1ワーカーが同時に1メッセージのみ処理(公平ディスパッチ)
    channel.basic_qos(prefetch_count=1)

    def process_email(ch, method, properties, body):
        task = json.loads(body)
        print(f'メール送信中: {task["to"]}')
        time.sleep(2)  # 実際のメール送信処理
        print(f'メール送信完了: {task["to"]}')
        # ACKを送信(処理完了を通知)
        ch.basic_ack(delivery_tag=method.delivery_tag)

    channel.basic_consume(queue='email_tasks', on_message_callback=process_email)
    print('ワーカー起動中...')
    channel.start_consuming()

Redis Streams

2018年公開(Redis 5.0)、C製のOSSです。既存のRedisにそのままメッセージキュー機能を追加できます。Consumer Groups・ACK機能・最後に読んだ位置(last ID)の管理をRedis Streamsが担うため、Kafkaのような耐久性のある非同期処理をRedisだけで実現できます。

# Python (redis-py): Redis Streams Producer / Consumer Group
import redis
import json
import time

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

STREAM_NAME = 'order_events'
GROUP_NAME = 'order_processors'
CONSUMER_NAME = 'worker-1'

# Streamとグループを初期化
try:
    r.xgroup_create(STREAM_NAME, GROUP_NAME, id='0', mkstream=True)
    print(f'Consumer group "{GROUP_NAME}" を作成しました')
except redis.exceptions.ResponseError:
    print(f'Consumer group "{GROUP_NAME}" は既に存在します')

# Producer: イベントをStreamに追加
def publish_order_event(order_id: str, event_type: str, data: dict):
    message_id = r.xadd(
        STREAM_NAME,
        {
            'order_id': order_id,
            'event_type': event_type,
            'data': json.dumps(data),
            'timestamp': str(time.time()),
        },
        maxlen=10000,  # 最大10000メッセージ保持
    )
    print(f'イベント投入: {message_id} - {event_type}')
    return message_id

# Consumer: メッセージを処理してACK
def process_order_events():
    while True:
        # Consumer Groupから未処理メッセージを取得(ブロッキング2秒)
        messages = r.xreadgroup(
            GROUP_NAME,
            CONSUMER_NAME,
            {STREAM_NAME: '>'},  # '>' = 未割り当てのメッセージのみ
            count=10,
            block=2000,
        )

        if not messages:
            continue

        for stream, events in messages:
            for message_id, data in events:
                order_id = data['order_id']
                event_type = data['event_type']
                print(f'処理中: {event_type} - 注文 {order_id}')

                # 実際の処理ロジック
                time.sleep(0.5)

                # ACKを送信(処理完了)
                r.xack(STREAM_NAME, GROUP_NAME, message_id)
                print(f'完了: {message_id}')

# 未ACKのメッセージを再処理(Consumer障害時の回復)
def recover_pending_messages():
    pending = r.xpending_range(STREAM_NAME, GROUP_NAME, '-', '+', 100)
    for msg in pending:
        if msg['time_since_delivered'] > 60000:  # 60秒以上経過
            r.xclaim(STREAM_NAME, GROUP_NAME, CONSUMER_NAME, 60000, [msg['message_id']])

NATS

2011年公開(Synadia)、Go製のOSSです。GitHubスター16k+。数百万メッセージ/秒を処理できる超高速なクラウドネイティブMQです。JetStream(永続化・ACK付きストリーム)を追加することでKafkaライクな耐久性のある処理も実現できます。マイクロサービスのコアメッセージングバスとして採用例が多く、Kubernetes上でのサービス間通信にも使われます。

# Python (nats-py): NATS Publisher / Subscriber
import asyncio
import nats

NATS_URL = 'nats://localhost:4222'

# Publisher: メッセージを発行
async def publish_events():
    nc = await nats.connect(NATS_URL)
    js = nc.jetstream()

    # JetStreamストリームを作成(永続化)
    await js.add_stream(name='ORDERS', subjects=['orders.*'])

    # イベントを発行
    for i in range(10):
        await js.publish(
            'orders.created',
            f'{{"order_id": "order-{i:03d}", "amount": {i * 1000}}}'.encode()
        )
        print(f'発行: order-{i:03d}')

    await nc.drain()

# Subscriber: メッセージを受信して処理
async def subscribe_and_process():
    nc = await nats.connect(NATS_URL)
    js = nc.jetstream()

    # 耐久サブスクライバー(Consumer Nameで再接続後も続きから処理)
    sub = await js.subscribe(
        'orders.*',
        durable='order_processor',
        config=nats.js.api.ConsumerConfig(
            ack_wait=30,         # 30秒以内にACKしないと再配信
            max_deliver=3,       # 最大3回再試行
            ack_policy=nats.js.api.AckPolicy.EXPLICIT,
        ),
    )

    async for msg in sub.messages:
        print(f'受信: {msg.subject} - {msg.data.decode()}')
        await asyncio.sleep(0.1)  # 処理
        await msg.ack()           # ACK送信

asyncio.run(publish_events())
asyncio.run(subscribe_and_process())

機能比較表

比較項目RabbitMQRedis StreamsNATS JetStream
耐久性・永続化
スループット最高
管理UI
追加インフラ必要Redis既存ならゼロ必要
AMQP対応
K8sネイティブ

メッセージキューはDevOpsカテゴリ/categories/devopsのCelery(Python分散タスクキュー)・Sidekiq(Ruby)・BullMQ(Node.js)などのWorkerフレームワークのバックエンドとして利用します。LLM Toolsカテゴリ/categories/llm-toolsのAI推論タスク(画像生成・テキスト要約)を非同期キューで処理してAPIのレスポンスタイムを短縮します。

FAQ

Q. RabbitMQとRedis Streamsはどう使い分けますか?

A. 複雑なルーティング・エンタープライズ機能(DLX・Priority Queue・Federation)が必要ならRabbitMQ既にRedisを使っている・シンプルなPub/Sub・追加インフラを増やしたくないならRedis Streamsが向いています。RabbitMQの優位点: ①Topic Exchange(ワイルドカードルーティング: order.*.created)で柔軟なメッセージ振り分け②Dead Letter Exchange(DLX)で失敗メッセージを別キューに自動転送して分析③Management UIでキューのメッセージ数・消費レート・コネクション状態をリアルタイム監視。Redis Streamsの優位点: ①Redis(キャッシュ・セッション)と同じインフラでMQ機能も実現②Consumer Groupsで複数ワーカーへの並列配信③XRANGEで過去のメッセージを履歴として検索・再生。

Q. NATSはApache Kafkaと比べてどう違いますか?

A. NATSは超低レイテンシ・シンプル・軽量、Kafkaは大規模ログ収集・ストリーム処理・長期保存に向いています。NATS JetStreamの特徴: ①サーバー1バイナリで起動(Zookeeperなし)②P2P・Request/Reply・Queue GroupsをCore NATSだけで実装③JetStreamの永続化はファイルベース(Kafkaよりセットアップが簡単)④メッセージ保持期間(TTL)または最大サイズ(bytes)でストリームをトリミング。Kafkaが勝る場面: ①数TB規模のログを数ヶ月保持してBatchで集計②Kafka StreamsまたはksqlDBでリアルタイムストリーム処理③Kafka ConnectでDBのCDC(変更データキャプチャ)をそのままストリームに流す。スタートアップ・マイクロサービスにはNATS、大規模データエンジニアリングにはKafkaという住み分けが一般的です。

Q. Celery(Python)はRabbitMQとRedisのどちらをBrokerとして使えばいいですか?

A. 本番環境の耐障害性・メッセージの永続化が重要ならRabbitMQ、シンプルさ・開発速度・小〜中規模ならRedisが向いています。Celery + RabbitMQ: ①CELERY_BROKER_URL = 'amqp://user:pass@rabbitmq:5672/'②タスクの永続化(task_acks_late=True)でWorkerクラッシュ時も再処理③task_reject_on_worker_lost=Trueでワーカーロスト時に自動再キュー。Celery + Redis: ①CELERY_BROKER_URL = 'redis://localhost:6379/0'②セットアップが最もシンプル③Redis Clusterモードは追加設定が必要。どちらもtask_serializer='json'result_backend(タスク結果の保存先)・task_time_limit(タイムアウト)の設定は共通です。

Q. メッセージキューの死活監視アラートはどう設定しますか?

A. ①RabbitMQ: Management HTTP API(/api/queues)をPrometheusでスクレイプ→rabbitmq_queue_messagesが閾値超過でSlackアラート②Redis Streams: XLEN stream_nameでPending件数を監視→ XPENDING stream group - + 1000で未ACKメッセージを確認③NATS: JetStream APIの/jszエンドポイントでConsumerのPending/Ack/Redeliveredを取得。共通の監視指標: ①Queue深度(キューに積み上がったメッセージ数が急増=Consumerが詰まっているサイン)②Dead Letter Queue(DLQ)の件数(失敗タスクの蓄積)③Consumer lag(最新メッセージとConsumerが読んだ位置の差)④メッセージ処理時間(P95/P99のレイテンシ)。

まとめ

ユースケース推奨ツール
複雑なルーティング・エンタープライズMQRabbitMQ
Redis既存環境・追加インフラなしRedis Streams
マイクロサービスバス・超高速・KubernetesNATS JetStream

関連外部リソース

他の記事も読む

Let's Build Together

OSS導入、自社だけで悩まない。

ツール選定から構築・運用・AI活用まで、オープンソースラボ運営元のClasslessが伴走します。初回のご相談は無料です。