OSSのタスクキュー比較:Celery vs BullMQ vs Temporal で非同期処理とワークフローを実装する
オープンソースラボ編集部 ・ 2026年6月14日
OSSのタスクキュー比較:Celery vs BullMQ vs Temporal で非同期処理とワークフローを実装する
AWS SQS+Lambda(月$0.20/100万リクエスト〜)・Temporal Cloud(月$25〜)に対して、Celery(最もPythonで使われるタスクキュー)・BullMQ(Node.js/RedisベースのジョブキューUI付き)・Temporal(マイクロサービスのワークフロー長期実行)はOSSのタスクキュー・ワークフローエンジンです。
タスクキューが解決する問題
Webリクエストで直接処理すると遅い・不安定・失敗時に再試行できない処理を非同期化します:
- メール送信: APIレスポンスをすぐ返してバックグラウンドでSendGridにメール送信
- AI処理: GPT-4に画像分析を依頼して完了を通知(30秒以上かかる処理)
- データETL: 夜間に大量データを変換・集計・DB保存するバッチ処理
- Saga/長期トランザクション: 注文→決済→在庫確保→配送手配の一連フローを管理
主要ツールの概要
Celery
2009年に公開されたPython製のOSS分散タスクキューです。GitHubスター24k+。PythonのデファクトタスクキューLibraryで、RabbitMQ・Redis・Amazon SQS等のブローカーを使ってタスクを非同期実行します。Djangoとの統合が最も充実しており、スケジューリング(celery-beat)・監視(Flower)・失敗時の再試行・バッチ処理・チェーン・コードをグループ化したコールバックが可能です。
# celery.py - Celeryアプリの設定
from celery import Celery
app = Celery("myapp")
app.config_from_object("celeryconfig")
# celeryconfig.py
broker_url = "redis://localhost:6379/0"
result_backend = "redis://localhost:6379/1"
task_serializer = "json"
result_serializer = "json"
accept_content = ["json"]
timezone = "Asia/Tokyo"
enable_utc = True
# ビートスケジュール(定期実行)
beat_schedule = {
"daily-report": {
"task": "tasks.generate_daily_report",
"schedule": 3600 * 24, # 毎日
"options": {"expires": 7200},
},
"cleanup-old-sessions": {
"task": "tasks.cleanup_sessions",
"schedule": 3600, # 毎時
},
}
# tasks.py - タスク定義
from celery import current_app as app
from celery.utils.log import get_task_logger
import time
logger = get_task_logger(__name__)
# シンプルなタスク
@app.task(bind=True, max_retries=3, default_retry_delay=60)
def send_welcome_email(self, user_id: str, email: str):
try:
logger.info(f"Sending welcome email to {email}")
# SendGrid APIを呼び出す
sg_client.send_mail(
to=email,
subject="ようこそ!",
template="welcome",
)
return {"status": "sent", "user_id": user_id}
except Exception as exc:
logger.error(f"Email send failed: {exc}")
raise self.retry(exc=exc)
# AI処理タスク(時間のかかる処理)
@app.task(time_limit=300) # 5分タイムアウト
def analyze_document(document_id: str, file_url: str):
response = openai_client.chat.completions.create(
model="gpt-4o",
messages=[{
"role": "user",
"content": [
{"type": "text", "text": "この文書を要約してください"},
{"type": "image_url", "image_url": {"url": file_url}},
],
}],
)
summary = response.choices[0].message.content
# DB に保存
db.update("documents", {"id": document_id, "summary": summary})
return {"document_id": document_id, "summary_length": len(summary)}
# チェーン(タスクを順番に実行)
from celery import chain
workflow = chain(
validate_order.s(order_id),
charge_payment.s(),
reserve_inventory.s(),
send_confirmation_email.s(),
)
result = workflow.apply_async()
# グループ(並列実行して結果をまとめる)
from celery import group
parallel_jobs = group(
analyze_document.s(doc_id, url)
for doc_id, url in documents
)
results = parallel_jobs.apply_async().get(timeout=120)
# FastAPIからCeleryタスクを呼び出す
from fastapi import FastAPI
from celery.result import AsyncResult
app = FastAPI()
@app.post("/documents/{document_id}/analyze")
async def trigger_analysis(document_id: str):
task = analyze_document.apply_async(
args=[document_id, f"https://storage.example.com/{document_id}"],
countdown=0, # すぐ実行
expires=3600, # 1時間以内に実行されなければ破棄
)
return {"task_id": task.id, "status": "queued"}
@app.get("/tasks/{task_id}")
async def get_task_status(task_id: str):
result = AsyncResult(task_id)
return {
"task_id": task_id,
"status": result.status, # PENDING / STARTED / SUCCESS / FAILURE
"result": result.result if result.ready() else None,
}
BullMQ
2021年に公開されたTypeScript/Node.js製のジョブキューライブラリです。GitHubスター6k+。RedisベースのジョブキューにBull Board(UI)が使いやすいツールで、優先度付きキュー・遅延ジョブ・繰り返しジョブ・ジョブ依存関係・Rate Limit・ワーカーのコンカレンシー制御が可能です。Next.js・Hono等のNode.js WebフレームワークとシームレスにAPIルートから統合できます。
// src/queue/index.ts - BullMQのキュー設定
import { Queue, Worker, QueueEvents } from "bullmq";
import { Redis } from "ioredis";
const connection = new Redis({ host: "localhost", port: 6379, maxRetriesPerRequest: null });
// ジョブキューの定義
export const emailQueue = new Queue("email", {
connection,
defaultJobOptions: {
attempts: 3, // 失敗時に3回再試行
backoff: { type: "exponential", delay: 5000 }, // 5秒・10秒・20秒と待機
removeOnComplete: { count: 100 }, // 完了ジョブを100件残して削除
removeOnFail: { count: 50 },
},
});
export const aiQueue = new Queue("ai-processing", {
connection,
defaultJobOptions: {
attempts: 2,
timeout: 300_000, // 5分タイムアウト
},
});
// ジョブを追加する関数
export async function queueWelcomeEmail(userId: string, email: string) {
return emailQueue.add(
"welcome-email",
{ userId, email },
{
delay: 5000, // 5秒後に実行(メール送信の遅延)
priority: 1, // 優先度(低い数字=高優先)
}
);
}
export async function queueDocumentAnalysis(documentId: string, fileUrl: string) {
return aiQueue.add("analyze-document", { documentId, fileUrl });
}
// src/queue/workers.ts - ワーカーの定義
import { Worker } from "bullmq";
// メール送信ワーカー
const emailWorker = new Worker(
"email",
async (job) => {
const { userId, email } = job.data;
console.log(`Processing job ${job.name} for ${email}`);
if (job.name === "welcome-email") {
await sendgrid.send({
to: email,
subject: "ようこそ!",
templateId: "d-welcome-template-id",
dynamicTemplateData: { userId },
});
}
return { sent: true, email };
},
{
connection,
concurrency: 5, // 同時に5件並列処理
}
);
// AIワーカー(コンカレンシーを低く設定してAPI制限に対応)
const aiWorker = new Worker(
"ai-processing",
async (job) => {
const { documentId, fileUrl } = job.data;
await job.updateProgress(10); // 進捗更新
const analysis = await openai.chat.completions.create({...});
await job.updateProgress(80);
await db.documents.update({ id: documentId, analysis: analysis.choices[0].message.content });
await job.updateProgress(100);
return { documentId, done: true };
},
{ connection, concurrency: 2 } // OpenAI APIのレート制限対応
);
emailWorker.on("completed", (job) => console.log(`${job.id} completed`));
emailWorker.on("failed", (job, err) => console.error(`${job?.id} failed: ${err.message}`));
// Next.js API Route からBullMQを使う
// src/app/api/analyze/route.ts
import { queueDocumentAnalysis } from "@/queue";
export async function POST(req: Request) {
const { documentId, fileUrl } = await req.json();
const job = await queueDocumentAnalysis(documentId, fileUrl);
return Response.json({ jobId: job.id, status: "queued" });
}
export async function GET(req: Request) {
const jobId = new URL(req.url).searchParams.get("jobId");
const job = await aiQueue.getJob(jobId!);
if (!job) return Response.json({ error: "Not found" }, { status: 404 });
return Response.json({
jobId,
state: await job.getState(), // waiting / active / completed / failed
progress: job.progress,
result: await job.returnvalue,
});
}
機能比較表
| 比較項目 | Celery | BullMQ | Temporal |
|---|---|---|---|
| 言語 | Python | TypeScript/JS | Go/Java/Python/TS |
| ブローカー | Redis/RabbitMQ/SQS | Redis専用 | Temporal Server |
| ワークフロー | チェーン/グループ | フロー | コード定義(最強) |
| 長期実行 | 限定的 | 限定的 | ✅(数日〜数週間) |
| UI | Flower | Bull Board | WebUI標準 |
| GitHub Stars | 24k+ | 6k+ | 12k+ |
タスクキューのメトリクスをGrafanaで可視化するにはDevOpsカテゴリ/categories/devopsのGrafana+Prometheusと組み合わせてください。タスク完了・エラーをチームに通知するにはCommunicationカテゴリ/categories/communicationのSlack Webhookと連携できます。
FAQ
Q. Celery・BullMQ・Temporalのどれを選ぶべきですか?
A. スタック・ユースケースで決まります。Celery: PythonのDjango/FastAPIアプリで、メール送信・バッチ処理・AIタスクの非同期化が必要な場合。Pythonエコシステムでは一択。BullMQ: Node.js/TypeScriptのアプリで、RedisがすでにあってジョブキューUIが欲しい場合。Next.jsとの統合が最もシンプル。Temporal: 注文処理・支払い・在庫確保のような複数ステップが連鎖する長期ワークフロー(数分〜数日かかる処理)を確実に実行したい場合。障害後の自動再開(Resume)が保証される点がCelery/BullMQと決定的に違います。
Q. BullMQのジョブ状況をWebUIで確認するには?
A. Bull Board(@bull-board/api)をExpressまたはNext.jsに組み込みます。
import { createBullBoard } from "@bull-board/api";
import { BullMQAdapter } from "@bull-board/api/bullMQAdapter";
import { ExpressAdapter } from "@bull-board/express";
const serverAdapter = new ExpressAdapter();
serverAdapter.setBasePath("/admin/queues");
createBullBoard({
queues: [new BullMQAdapter(emailQueue), new BullMQAdapter(aiQueue)],
serverAdapter,
});
app.use("/admin/queues", serverAdapter.getRouter());
// http://localhost:3000/admin/queues でジョブ状況・失敗ジョブ・再実行ができるUI表示
Q. CeleryのワーカーをKubernetesにデプロイするベストプラクティスは?
A. CeleryワーカーはKEDA(Kubernetes Event-Driven Autoscaler)でキューの深さに応じて自動スケールするのが最もコスト効率がいい構成です。KedaのRedisScalerを使いキューに積まれたジョブ数に基づいてDeploymentのレプリカ数を0〜Nに自動調整します。terminationGracePeriodSecondsを長めに設定(300秒)して実行中タスクが強制終了されないようにするのも重要です。
Q. タスクキューを使わずにREST APIで非同期処理(バックグラウンドジョブ)を実装するリスクは?
A. よくあるアンチパターン: setTimeout(() => sendEmail(), 0)でAPIレスポンス後にバックグラウンド送信する実装は、①プロセスがクラッシュしたらジョブが消える②Vercel/Lambda等のサーバーレス環境ではレスポンス送信後にNode.jsプロセスが停止してバックグラウンド処理が完了しない③失敗時の再試行がない、という問題があります。タスクキューを使えばジョブはRedisに永続化され、サーバー再起動後も実行待ちのまま保持されます。
まとめ
| ユースケース | 推奨ツール |
|---|---|
| Python(Django/FastAPI)の非同期タスク | Celery |
| Node.js/TypeScriptのジョブキュー+UI | BullMQ |
| 長期ワークフロー・確実性・自動リカバリー | Temporal |