メッセージキュー比較:RabbitMQ vs Redis Streams vs NATS で非同期処理を実装する
オープンソースラボ編集部 ・ 2026年6月14日
メッセージキュー比較:RabbitMQ vs Redis Streams vs NATS で非同期処理を実装する
モノリスからマイクロサービスへの移行や、Webリクエストから重い処理を切り離す非同期アーキテクチャでは**メッセージキュー(MQ)**が中心的な役割を果たします。RabbitMQ(AMQP・エンタープライズ実績)・Redis Streams(Redis統合・軽量)・NATS(Go製・超高速・クラウドネイティブ)の3つがOSS MQの主要選択肢です。
メッセージキューを使う理由
- 非同期化: メール送信・PDF生成・AI推論などの重い処理をHTTPレスポンスから切り離す
- 耐障害性: Consumerがダウンしてもメッセージをキューに保持して復旧後に再処理
- スケーラビリティ: Consumerを増やすだけで処理能力をスケールアウト
- デカップリング: Producerと Consumerが直接通信しないため互いに独立してデプロイ・変更できる
主要MQの概要
RabbitMQ
2007年公開、Erlang製のOSSです。GitHubスター12k+。AMQP 0-9-1プロトコル・Exchange/Queue/Bindingの柔軟なルーティングが特徴のエンタープライズ向けMQです。Direct・Fanout・Topic・Headersの4つのExchangeタイプで複雑なメッセージルーティングを実現し、管理UIで全キューの状態をリアルタイム監視できます。
# docker-compose.yml - RabbitMQ
version: "3.8"
services:
rabbitmq:
image: rabbitmq:3.13-management-alpine
restart: unless-stopped
ports:
- "5672:5672" # AMQP
- "15672:15672" # 管理UI
environment:
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: ${RABBITMQ_PASSWORD}
RABBITMQ_DEFAULT_VHOST: /
volumes:
- rabbitmq_data:/var/lib/rabbitmq
healthcheck:
test: rabbitmq-diagnostics -q ping
interval: 10s
timeout: 5s
retries: 5
volumes:
rabbitmq_data:
# Python (pika): RabbitMQ Producer / Consumer
import pika
import json
import time
from datetime import datetime
RABBITMQ_URL = 'amqp://admin:password@localhost:5672/'
# Producer: メッセージをキューに送信
def send_email_task(to_email: str, subject: str, body: str):
connection = pika.BlockingConnection(pika.URLParameters(RABBITMQ_URL))
channel = connection.channel()
# キューを宣言(存在しない場合は作成・durable=Trueで再起動後も保持)
channel.queue_declare(queue='email_tasks', durable=True)
message = json.dumps({
'to': to_email,
'subject': subject,
'body': body,
'created_at': datetime.utcnow().isoformat(),
})
channel.basic_publish(
exchange='',
routing_key='email_tasks',
body=message,
properties=pika.BasicProperties(
delivery_mode=pika.DeliveryMode.Persistent, # メッセージをディスクに永続化
content_type='application/json',
),
)
print(f'メール送信タスクをキューに投入: {to_email}')
connection.close()
# Consumer: ワーカープロセスでメッセージを処理
def start_email_worker():
connection = pika.BlockingConnection(pika.URLParameters(RABBITMQ_URL))
channel = connection.channel()
channel.queue_declare(queue='email_tasks', durable=True)
# 1ワーカーが同時に1メッセージのみ処理(公平ディスパッチ)
channel.basic_qos(prefetch_count=1)
def process_email(ch, method, properties, body):
task = json.loads(body)
print(f'メール送信中: {task["to"]}')
time.sleep(2) # 実際のメール送信処理
print(f'メール送信完了: {task["to"]}')
# ACKを送信(処理完了を通知)
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(queue='email_tasks', on_message_callback=process_email)
print('ワーカー起動中...')
channel.start_consuming()
Redis Streams
2018年公開(Redis 5.0)、C製のOSSです。既存のRedisにそのままメッセージキュー機能を追加できます。Consumer Groups・ACK機能・最後に読んだ位置(last ID)の管理をRedis Streamsが担うため、Kafkaのような耐久性のある非同期処理をRedisだけで実現できます。
# Python (redis-py): Redis Streams Producer / Consumer Group
import redis
import json
import time
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
STREAM_NAME = 'order_events'
GROUP_NAME = 'order_processors'
CONSUMER_NAME = 'worker-1'
# Streamとグループを初期化
try:
r.xgroup_create(STREAM_NAME, GROUP_NAME, id='0', mkstream=True)
print(f'Consumer group "{GROUP_NAME}" を作成しました')
except redis.exceptions.ResponseError:
print(f'Consumer group "{GROUP_NAME}" は既に存在します')
# Producer: イベントをStreamに追加
def publish_order_event(order_id: str, event_type: str, data: dict):
message_id = r.xadd(
STREAM_NAME,
{
'order_id': order_id,
'event_type': event_type,
'data': json.dumps(data),
'timestamp': str(time.time()),
},
maxlen=10000, # 最大10000メッセージ保持
)
print(f'イベント投入: {message_id} - {event_type}')
return message_id
# Consumer: メッセージを処理してACK
def process_order_events():
while True:
# Consumer Groupから未処理メッセージを取得(ブロッキング2秒)
messages = r.xreadgroup(
GROUP_NAME,
CONSUMER_NAME,
{STREAM_NAME: '>'}, # '>' = 未割り当てのメッセージのみ
count=10,
block=2000,
)
if not messages:
continue
for stream, events in messages:
for message_id, data in events:
order_id = data['order_id']
event_type = data['event_type']
print(f'処理中: {event_type} - 注文 {order_id}')
# 実際の処理ロジック
time.sleep(0.5)
# ACKを送信(処理完了)
r.xack(STREAM_NAME, GROUP_NAME, message_id)
print(f'完了: {message_id}')
# 未ACKのメッセージを再処理(Consumer障害時の回復)
def recover_pending_messages():
pending = r.xpending_range(STREAM_NAME, GROUP_NAME, '-', '+', 100)
for msg in pending:
if msg['time_since_delivered'] > 60000: # 60秒以上経過
r.xclaim(STREAM_NAME, GROUP_NAME, CONSUMER_NAME, 60000, [msg['message_id']])
NATS
2011年公開(Synadia)、Go製のOSSです。GitHubスター16k+。数百万メッセージ/秒を処理できる超高速なクラウドネイティブMQです。JetStream(永続化・ACK付きストリーム)を追加することでKafkaライクな耐久性のある処理も実現できます。マイクロサービスのコアメッセージングバスとして採用例が多く、Kubernetes上でのサービス間通信にも使われます。
# Python (nats-py): NATS Publisher / Subscriber
import asyncio
import nats
NATS_URL = 'nats://localhost:4222'
# Publisher: メッセージを発行
async def publish_events():
nc = await nats.connect(NATS_URL)
js = nc.jetstream()
# JetStreamストリームを作成(永続化)
await js.add_stream(name='ORDERS', subjects=['orders.*'])
# イベントを発行
for i in range(10):
await js.publish(
'orders.created',
f'{{"order_id": "order-{i:03d}", "amount": {i * 1000}}}'.encode()
)
print(f'発行: order-{i:03d}')
await nc.drain()
# Subscriber: メッセージを受信して処理
async def subscribe_and_process():
nc = await nats.connect(NATS_URL)
js = nc.jetstream()
# 耐久サブスクライバー(Consumer Nameで再接続後も続きから処理)
sub = await js.subscribe(
'orders.*',
durable='order_processor',
config=nats.js.api.ConsumerConfig(
ack_wait=30, # 30秒以内にACKしないと再配信
max_deliver=3, # 最大3回再試行
ack_policy=nats.js.api.AckPolicy.EXPLICIT,
),
)
async for msg in sub.messages:
print(f'受信: {msg.subject} - {msg.data.decode()}')
await asyncio.sleep(0.1) # 処理
await msg.ack() # ACK送信
asyncio.run(publish_events())
asyncio.run(subscribe_and_process())
機能比較表
| 比較項目 | RabbitMQ | Redis Streams | NATS JetStream |
|---|---|---|---|
| 耐久性・永続化 | ✅ | ✅ | ✅ |
| スループット | 中 | 高 | 最高 |
| 管理UI | ✅ | △ | ✅ |
| 追加インフラ | 必要 | Redis既存ならゼロ | 必要 |
| AMQP対応 | ✅ | ❌ | ❌ |
| K8sネイティブ | △ | △ | ✅ |
メッセージキューはDevOpsカテゴリ/categories/devopsのCelery(Python分散タスクキュー)・Sidekiq(Ruby)・BullMQ(Node.js)などのWorkerフレームワークのバックエンドとして利用します。LLM Toolsカテゴリ/categories/llm-toolsのAI推論タスク(画像生成・テキスト要約)を非同期キューで処理してAPIのレスポンスタイムを短縮します。
FAQ
Q. RabbitMQとRedis Streamsはどう使い分けますか?
A. 複雑なルーティング・エンタープライズ機能(DLX・Priority Queue・Federation)が必要ならRabbitMQ、既にRedisを使っている・シンプルなPub/Sub・追加インフラを増やしたくないならRedis Streamsが向いています。RabbitMQの優位点: ①Topic Exchange(ワイルドカードルーティング: order.*.created)で柔軟なメッセージ振り分け②Dead Letter Exchange(DLX)で失敗メッセージを別キューに自動転送して分析③Management UIでキューのメッセージ数・消費レート・コネクション状態をリアルタイム監視。Redis Streamsの優位点: ①Redis(キャッシュ・セッション)と同じインフラでMQ機能も実現②Consumer Groupsで複数ワーカーへの並列配信③XRANGEで過去のメッセージを履歴として検索・再生。
Q. NATSはApache Kafkaと比べてどう違いますか?
A. NATSは超低レイテンシ・シンプル・軽量、Kafkaは大規模ログ収集・ストリーム処理・長期保存に向いています。NATS JetStreamの特徴: ①サーバー1バイナリで起動(Zookeeperなし)②P2P・Request/Reply・Queue GroupsをCore NATSだけで実装③JetStreamの永続化はファイルベース(Kafkaよりセットアップが簡単)④メッセージ保持期間(TTL)または最大サイズ(bytes)でストリームをトリミング。Kafkaが勝る場面: ①数TB規模のログを数ヶ月保持してBatchで集計②Kafka StreamsまたはksqlDBでリアルタイムストリーム処理③Kafka ConnectでDBのCDC(変更データキャプチャ)をそのままストリームに流す。スタートアップ・マイクロサービスにはNATS、大規模データエンジニアリングにはKafkaという住み分けが一般的です。
Q. Celery(Python)はRabbitMQとRedisのどちらをBrokerとして使えばいいですか?
A. 本番環境の耐障害性・メッセージの永続化が重要ならRabbitMQ、シンプルさ・開発速度・小〜中規模ならRedisが向いています。Celery + RabbitMQ: ①CELERY_BROKER_URL = 'amqp://user:pass@rabbitmq:5672/'②タスクの永続化(task_acks_late=True)でWorkerクラッシュ時も再処理③task_reject_on_worker_lost=Trueでワーカーロスト時に自動再キュー。Celery + Redis: ①CELERY_BROKER_URL = 'redis://localhost:6379/0'②セットアップが最もシンプル③Redis Clusterモードは追加設定が必要。どちらもtask_serializer='json'・result_backend(タスク結果の保存先)・task_time_limit(タイムアウト)の設定は共通です。
Q. メッセージキューの死活監視アラートはどう設定しますか?
A. ①RabbitMQ: Management HTTP API(/api/queues)をPrometheusでスクレイプ→rabbitmq_queue_messagesが閾値超過でSlackアラート②Redis Streams: XLEN stream_nameでPending件数を監視→ XPENDING stream group - + 1000で未ACKメッセージを確認③NATS: JetStream APIの/jszエンドポイントでConsumerのPending/Ack/Redeliveredを取得。共通の監視指標: ①Queue深度(キューに積み上がったメッセージ数が急増=Consumerが詰まっているサイン)②Dead Letter Queue(DLQ)の件数(失敗タスクの蓄積)③Consumer lag(最新メッセージとConsumerが読んだ位置の差)④メッセージ処理時間(P95/P99のレイテンシ)。
まとめ
| ユースケース | 推奨ツール |
|---|---|
| 複雑なルーティング・エンタープライズMQ | RabbitMQ |
| Redis既存環境・追加インフラなし | Redis Streams |
| マイクロサービスバス・超高速・Kubernetes | NATS JetStream |