AI

OSSのタスクキュー比較:Celery vs BullMQ vs Temporal で非同期処理とワークフローを実装する

オープンソースラボ編集部2026年6月14日

OSSのタスクキュー比較:Celery vs BullMQ vs Temporal で非同期処理とワークフローを実装する

AWS SQS+Lambda(月$0.20/100万リクエスト〜)・Temporal Cloud(月$25〜)に対して、Celery(最もPythonで使われるタスクキュー)・BullMQ(Node.js/RedisベースのジョブキューUI付き)・Temporal(マイクロサービスのワークフロー長期実行)はOSSのタスクキュー・ワークフローエンジンです。

タスクキューが解決する問題

Webリクエストで直接処理すると遅い・不安定・失敗時に再試行できない処理を非同期化します:

  • メール送信: APIレスポンスをすぐ返してバックグラウンドでSendGridにメール送信
  • AI処理: GPT-4に画像分析を依頼して完了を通知(30秒以上かかる処理)
  • データETL: 夜間に大量データを変換・集計・DB保存するバッチ処理
  • Saga/長期トランザクション: 注文→決済→在庫確保→配送手配の一連フローを管理

主要ツールの概要

Celery

2009年に公開されたPython製のOSS分散タスクキューです。GitHubスター24k+。PythonのデファクトタスクキューLibraryで、RabbitMQ・Redis・Amazon SQS等のブローカーを使ってタスクを非同期実行します。Djangoとの統合が最も充実しており、スケジューリング(celery-beat)・監視(Flower)・失敗時の再試行・バッチ処理・チェーン・コードをグループ化したコールバックが可能です。

# celery.py - Celeryアプリの設定
from celery import Celery

app = Celery("myapp")
app.config_from_object("celeryconfig")

# celeryconfig.py
broker_url = "redis://localhost:6379/0"
result_backend = "redis://localhost:6379/1"
task_serializer = "json"
result_serializer = "json"
accept_content = ["json"]
timezone = "Asia/Tokyo"
enable_utc = True

# ビートスケジュール(定期実行)
beat_schedule = {
    "daily-report": {
        "task": "tasks.generate_daily_report",
        "schedule": 3600 * 24,     # 毎日
        "options": {"expires": 7200},
    },
    "cleanup-old-sessions": {
        "task": "tasks.cleanup_sessions",
        "schedule": 3600,          # 毎時
    },
}
# tasks.py - タスク定義
from celery import current_app as app
from celery.utils.log import get_task_logger
import time

logger = get_task_logger(__name__)

# シンプルなタスク
@app.task(bind=True, max_retries=3, default_retry_delay=60)
def send_welcome_email(self, user_id: str, email: str):
    try:
        logger.info(f"Sending welcome email to {email}")
        # SendGrid APIを呼び出す
        sg_client.send_mail(
            to=email,
            subject="ようこそ!",
            template="welcome",
        )
        return {"status": "sent", "user_id": user_id}
    except Exception as exc:
        logger.error(f"Email send failed: {exc}")
        raise self.retry(exc=exc)

# AI処理タスク(時間のかかる処理)
@app.task(time_limit=300)  # 5分タイムアウト
def analyze_document(document_id: str, file_url: str):
    response = openai_client.chat.completions.create(
        model="gpt-4o",
        messages=[{
            "role": "user",
            "content": [
                {"type": "text", "text": "この文書を要約してください"},
                {"type": "image_url", "image_url": {"url": file_url}},
            ],
        }],
    )
    summary = response.choices[0].message.content
    # DB に保存
    db.update("documents", {"id": document_id, "summary": summary})
    return {"document_id": document_id, "summary_length": len(summary)}

# チェーン(タスクを順番に実行)
from celery import chain
workflow = chain(
    validate_order.s(order_id),
    charge_payment.s(),
    reserve_inventory.s(),
    send_confirmation_email.s(),
)
result = workflow.apply_async()

# グループ(並列実行して結果をまとめる)
from celery import group
parallel_jobs = group(
    analyze_document.s(doc_id, url)
    for doc_id, url in documents
)
results = parallel_jobs.apply_async().get(timeout=120)
# FastAPIからCeleryタスクを呼び出す
from fastapi import FastAPI
from celery.result import AsyncResult

app = FastAPI()

@app.post("/documents/{document_id}/analyze")
async def trigger_analysis(document_id: str):
    task = analyze_document.apply_async(
        args=[document_id, f"https://storage.example.com/{document_id}"],
        countdown=0,          # すぐ実行
        expires=3600,         # 1時間以内に実行されなければ破棄
    )
    return {"task_id": task.id, "status": "queued"}

@app.get("/tasks/{task_id}")
async def get_task_status(task_id: str):
    result = AsyncResult(task_id)
    return {
        "task_id": task_id,
        "status": result.status,     # PENDING / STARTED / SUCCESS / FAILURE
        "result": result.result if result.ready() else None,
    }

BullMQ

2021年に公開されたTypeScript/Node.js製のジョブキューライブラリです。GitHubスター6k+。RedisベースのジョブキューにBull Board(UI)が使いやすいツールで、優先度付きキュー・遅延ジョブ・繰り返しジョブ・ジョブ依存関係・Rate Limit・ワーカーのコンカレンシー制御が可能です。Next.js・Hono等のNode.js WebフレームワークとシームレスにAPIルートから統合できます。

// src/queue/index.ts - BullMQのキュー設定
import { Queue, Worker, QueueEvents } from "bullmq";
import { Redis } from "ioredis";

const connection = new Redis({ host: "localhost", port: 6379, maxRetriesPerRequest: null });

// ジョブキューの定義
export const emailQueue = new Queue("email", {
  connection,
  defaultJobOptions: {
    attempts: 3,                 // 失敗時に3回再試行
    backoff: { type: "exponential", delay: 5000 },  // 5秒・10秒・20秒と待機
    removeOnComplete: { count: 100 },  // 完了ジョブを100件残して削除
    removeOnFail: { count: 50 },
  },
});

export const aiQueue = new Queue("ai-processing", {
  connection,
  defaultJobOptions: {
    attempts: 2,
    timeout: 300_000,           // 5分タイムアウト
  },
});

// ジョブを追加する関数
export async function queueWelcomeEmail(userId: string, email: string) {
  return emailQueue.add(
    "welcome-email",
    { userId, email },
    {
      delay: 5000,              // 5秒後に実行(メール送信の遅延)
      priority: 1,              // 優先度(低い数字=高優先)
    }
  );
}

export async function queueDocumentAnalysis(documentId: string, fileUrl: string) {
  return aiQueue.add("analyze-document", { documentId, fileUrl });
}
// src/queue/workers.ts - ワーカーの定義
import { Worker } from "bullmq";

// メール送信ワーカー
const emailWorker = new Worker(
  "email",
  async (job) => {
    const { userId, email } = job.data;
    console.log(`Processing job ${job.name} for ${email}`);

    if (job.name === "welcome-email") {
      await sendgrid.send({
        to: email,
        subject: "ようこそ!",
        templateId: "d-welcome-template-id",
        dynamicTemplateData: { userId },
      });
    }
    return { sent: true, email };
  },
  {
    connection,
    concurrency: 5,             // 同時に5件並列処理
  }
);

// AIワーカー(コンカレンシーを低く設定してAPI制限に対応)
const aiWorker = new Worker(
  "ai-processing",
  async (job) => {
    const { documentId, fileUrl } = job.data;
    await job.updateProgress(10);  // 進捗更新

    const analysis = await openai.chat.completions.create({...});
    await job.updateProgress(80);

    await db.documents.update({ id: documentId, analysis: analysis.choices[0].message.content });
    await job.updateProgress(100);

    return { documentId, done: true };
  },
  { connection, concurrency: 2 }  // OpenAI APIのレート制限対応
);

emailWorker.on("completed", (job) => console.log(`${job.id} completed`));
emailWorker.on("failed", (job, err) => console.error(`${job?.id} failed: ${err.message}`));
// Next.js API Route からBullMQを使う
// src/app/api/analyze/route.ts
import { queueDocumentAnalysis } from "@/queue";

export async function POST(req: Request) {
  const { documentId, fileUrl } = await req.json();
  const job = await queueDocumentAnalysis(documentId, fileUrl);
  return Response.json({ jobId: job.id, status: "queued" });
}

export async function GET(req: Request) {
  const jobId = new URL(req.url).searchParams.get("jobId");
  const job = await aiQueue.getJob(jobId!);
  if (!job) return Response.json({ error: "Not found" }, { status: 404 });

  return Response.json({
    jobId,
    state: await job.getState(),      // waiting / active / completed / failed
    progress: job.progress,
    result: await job.returnvalue,
  });
}

機能比較表

比較項目CeleryBullMQTemporal
言語PythonTypeScript/JSGo/Java/Python/TS
ブローカーRedis/RabbitMQ/SQSRedis専用Temporal Server
ワークフローチェーン/グループフローコード定義(最強)
長期実行限定的限定的✅(数日〜数週間)
UIFlowerBull BoardWebUI標準
GitHub Stars24k+6k+12k+

タスクキューのメトリクスをGrafanaで可視化するにはDevOpsカテゴリ/categories/devopsのGrafana+Prometheusと組み合わせてください。タスク完了・エラーをチームに通知するにはCommunicationカテゴリ/categories/communicationのSlack Webhookと連携できます。

FAQ

Q. Celery・BullMQ・Temporalのどれを選ぶべきですか?

A. スタック・ユースケースで決まります。Celery: PythonのDjango/FastAPIアプリで、メール送信・バッチ処理・AIタスクの非同期化が必要な場合。Pythonエコシステムでは一択。BullMQ: Node.js/TypeScriptのアプリで、RedisがすでにあってジョブキューUIが欲しい場合。Next.jsとの統合が最もシンプル。Temporal: 注文処理・支払い・在庫確保のような複数ステップが連鎖する長期ワークフロー(数分〜数日かかる処理)を確実に実行したい場合。障害後の自動再開(Resume)が保証される点がCelery/BullMQと決定的に違います。

Q. BullMQのジョブ状況をWebUIで確認するには?

A. Bull Board(@bull-board/api)をExpressまたはNext.jsに組み込みます。

import { createBullBoard } from "@bull-board/api";
import { BullMQAdapter } from "@bull-board/api/bullMQAdapter";
import { ExpressAdapter } from "@bull-board/express";

const serverAdapter = new ExpressAdapter();
serverAdapter.setBasePath("/admin/queues");

createBullBoard({
  queues: [new BullMQAdapter(emailQueue), new BullMQAdapter(aiQueue)],
  serverAdapter,
});

app.use("/admin/queues", serverAdapter.getRouter());
// http://localhost:3000/admin/queues でジョブ状況・失敗ジョブ・再実行ができるUI表示

Q. CeleryのワーカーをKubernetesにデプロイするベストプラクティスは?

A. CeleryワーカーはKEDA(Kubernetes Event-Driven Autoscaler)でキューの深さに応じて自動スケールするのが最もコスト効率がいい構成です。KedaのRedisScalerを使いキューに積まれたジョブ数に基づいてDeploymentのレプリカ数を0〜Nに自動調整します。terminationGracePeriodSecondsを長めに設定(300秒)して実行中タスクが強制終了されないようにするのも重要です。

Q. タスクキューを使わずにREST APIで非同期処理(バックグラウンドジョブ)を実装するリスクは?

A. よくあるアンチパターン: setTimeout(() => sendEmail(), 0)でAPIレスポンス後にバックグラウンド送信する実装は、①プロセスがクラッシュしたらジョブが消える②Vercel/Lambda等のサーバーレス環境ではレスポンス送信後にNode.jsプロセスが停止してバックグラウンド処理が完了しない③失敗時の再試行がない、という問題があります。タスクキューを使えばジョブはRedisに永続化され、サーバー再起動後も実行待ちのまま保持されます。

まとめ

ユースケース推奨ツール
Python(Django/FastAPI)の非同期タスクCelery
Node.js/TypeScriptのジョブキュー+UIBullMQ
長期ワークフロー・確実性・自動リカバリーTemporal

関連外部リソース

他の記事も読む

Let's Build Together

OSS導入、自社だけで悩まない。

ツール選定から構築・運用・AI活用まで、オープンソースラボ運営元のClasslessが伴走します。初回のご相談は無料です。