AI

ワークフローエンジン比較:Temporal vs Airflow vs Prefect で分散ワークフローを管理する

オープンソースラボ編集部2026年6月14日

ワークフローエンジン比較:Temporal vs Airflow vs Prefect で分散ワークフローを管理する

マイクロサービス・バッチETL・MLパイプラインでは、失敗時の自動リトライ・状態の永続化・タイムアウト・並列実行を信頼性高く実装する必要があります。Temporal(Go製・コードファーストワークフロー)・Apache Airflow(Python・DAG・データエンジニアリング標準)・Prefect(Python・モダンUI・ハイブリッドクラウド)の3つがOSSワークフローエンジンの主要選択肢です。

ワークフローエンジンを選ぶ理由

  • 信頼性: ステップの失敗・サービス再起動後も状態を保存して自動リトライ・再開
  • 可視性: 各ワークフロー実行の状態・ログ・タイムラインをUIで可視化
  • スケジューリング: cron式・イベントトリガー・手動実行を統一管理
  • 並列化: ファンアウト(並列タスク実行)・依存関係グラフを宣言的に定義

主要ツールの概要

Temporal

2019年公開(Cadence後継)、Go製のOSSです。GitHubスター12k+。**コードがそのままワークフロー定義になる(コードファースト)**エンジンで、Workflow・Activity・Signalの3概念でほぼすべての分散処理パターンを実装できます。Uberの決済・Airbnbの予約・Stripe・Hashicorpで本番利用されています。

# docker-compose.yml - Temporal サーバー(開発環境)
version: "3.8"
services:
  temporal:
    image: temporalio/auto-setup:1.24.2
    restart: unless-stopped
    ports:
      - "7233:7233"   # gRPC フロントエンド
    environment:
      - DB=postgresql
      - DB_PORT=5432
      - POSTGRES_USER=temporal
      - POSTGRES_PWD=${POSTGRES_PASSWORD}
      - POSTGRES_SEEDS=postgresql
    depends_on:
      - postgresql

  temporal-ui:
    image: temporalio/ui:2.28.0
    restart: unless-stopped
    ports:
      - "8080:8080"
    environment:
      - TEMPORAL_ADDRESS=temporal:7233

  postgresql:
    image: postgres:16-alpine
    environment:
      POSTGRES_USER: temporal
      POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
    volumes:
      - temporal_pg_data:/var/lib/postgresql/data

volumes:
  temporal_pg_data:
# Temporal Python SDK: 注文処理ワークフロー
import asyncio
from datetime import timedelta
from temporalio import activity, workflow
from temporalio.client import Client
from temporalio.worker import Worker

# Activity(実際の処理ロジック)を定義
@activity.defn
async def validate_order(order_id: str) -> bool:
    # 在庫確認・支払い認証など
    print(f'注文バリデーション: {order_id}')
    return True

@activity.defn
async def charge_payment(order_id: str, amount: float) -> str:
    print(f'決済処理: {order_id} = {amount}円')
    return f'charge_{order_id}'

@activity.defn
async def ship_order(order_id: str, charge_id: str) -> None:
    print(f'出荷処理: {order_id} (charge: {charge_id})')

@activity.defn
async def send_notification(order_id: str, status: str) -> None:
    print(f'通知送信: {order_id} -> {status}')

# Workflow(オーケストレーション)を定義
@workflow.defn
class OrderFulfillmentWorkflow:
    @workflow.run
    async def run(self, order_id: str, amount: float) -> str:
        # バリデーション(5秒タイムアウト・3回リトライ)
        is_valid = await workflow.execute_activity(
            validate_order,
            order_id,
            start_to_close_timeout=timedelta(seconds=5),
            retry_policy=workflow.RetryPolicy(maximum_attempts=3),
        )

        if not is_valid:
            raise ValueError(f'注文 {order_id} はバリデーション失敗')

        # 決済処理(30秒タイムアウト・自動リトライ)
        charge_id = await workflow.execute_activity(
            charge_payment,
            args=[order_id, amount],
            start_to_close_timeout=timedelta(seconds=30),
        )

        # 出荷と通知を並列実行
        await asyncio.gather(
            workflow.execute_activity(
                ship_order,
                args=[order_id, charge_id],
                start_to_close_timeout=timedelta(minutes=10),
            ),
            workflow.execute_activity(
                send_notification,
                args=[order_id, 'shipped'],
                start_to_close_timeout=timedelta(seconds=10),
            ),
        )

        return f'注文 {order_id} 完了 (charge: {charge_id})'

async def main():
    client = await Client.connect('localhost:7233')

    # Workerを起動(バックグラウンドで実行)
    async with Worker(
        client,
        task_queue='order-processing',
        workflows=[OrderFulfillmentWorkflow],
        activities=[validate_order, charge_payment, ship_order, send_notification],
    ):
        # ワークフローを実行
        result = await client.execute_workflow(
            OrderFulfillmentWorkflow.run,
            args=['order-001', 9800.0],
            id='order-001-workflow',
            task_queue='order-processing',
        )
        print(f'結果: {result}')

if __name__ == '__main__':
    asyncio.run(main())

Apache Airflow

2015年公開(Airbnb起源)、Python製のOSSです。GitHubスター37k+。PythonコードでDAG(有向非巡回グラフ)を定義するデータパイプライン標準のワークフローエンジンです。豊富なOperator(S3・BigQuery・Snowflake・Spark・dbt・Slack等)で外部サービスとの統合が即座に実装できます。

# Airflow DAG: データパイプライン(ETL + dbt + 通知)
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
from airflow.providers.slack.operators.slack import SlackAPIPostOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'data-team',
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'retry_exponential_backoff': True,
    'email_on_failure': True,
    'email': ['data-alerts@yourcompany.com'],
}

def check_data_freshness(**kwargs) -> str:
    '''データ鮮度チェック: 正常ならtransformへ、異常ならalertへ'''
    from airflow.providers.postgres.hooks.postgres import PostgresHook
    hook = PostgresHook(postgres_conn_id='analytics_db')
    count = hook.get_first('''
        SELECT COUNT(*) FROM raw_orders
        WHERE created_at >= NOW() - INTERVAL '2 hours'
    ''')[0]
    return 'transform_data' if count > 100 else 'alert_low_data'

with DAG(
    dag_id='daily_analytics_pipeline',
    schedule_interval='0 6 * * *',
    start_date=datetime(2024, 1, 1),
    default_args=default_args,
    catchup=False,
    tags=['analytics', 'production'],
) as dag:

    freshness_check = BranchPythonOperator(
        task_id='check_data_freshness',
        python_callable=check_data_freshness,
    )

    transform = BashOperator(
        task_id='transform_data',
        bash_command='dbt run --project-dir /opt/dbt/analytics --select tag:daily',
    )

    test = BashOperator(
        task_id='dbt_test',
        bash_command='dbt test --project-dir /opt/dbt/analytics --select tag:daily',
    )

    alert_low = SlackAPIPostOperator(
        task_id='alert_low_data',
        slack_conn_id='slack_default',
        channel='#data-alerts',
        text=':warning: データ件数が少なすぎます。パイプラインを確認してください。',
    )

    notify_success = SlackAPIPostOperator(
        task_id='notify_success',
        slack_conn_id='slack_default',
        channel='#data-team',
        text=':white_check_mark: 日次分析パイプライン完了',
    )

    freshness_check >> [transform, alert_low]
    transform >> test >> notify_success

Prefect

2018年公開、Python製のOSSです。GitHubスター16k+。Pythonデコレーターで既存コードをそのままワークフロー化できるモダンなアプローチと、Prefect Cloud・セルフホストサーバーのハイブリッド対応が特徴です。

# Prefect: ML推論パイプライン
from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta

@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1))
def load_data(date: str) -> list:
    '''データ取得(1時間キャッシュ付き)'''
    return [{'id': i, 'value': i * 2} for i in range(1000)]

@task(retries=3, retry_delay_seconds=30)
def preprocess(data: list) -> list:
    '''前処理(3回リトライ)'''
    return [{'id': d['id'], 'features': [d['value']]} for d in data]

@task
def run_inference(features: list) -> list:
    '''モデル推論'''
    return [{'id': f['id'], 'prediction': sum(f['features'])} for f in features]

@task
def save_results(predictions: list, date: str) -> None:
    '''結果を保存'''
    print(f'{date}: {len(predictions)}件の推論結果を保存')

@flow(name='daily-ml-inference', log_prints=True)
def daily_inference_pipeline(date: str = '2026-06-14'):
    data = load_data(date)
    preprocessed = preprocess(data)
    predictions = run_inference(preprocessed)
    save_results(predictions, date)
    return len(predictions)

if __name__ == '__main__':
    daily_inference_pipeline()

機能比較表

比較項目TemporalApache AirflowPrefect
失敗耐性・状態永続化✅ 最強
DataOps/ETL向き✅ 最適
学習コスト中〜高
マイクロサービス向き
UI/可視化
GitHub Stars12k+37k+16k+

ワークフローエンジンはDevOpsカテゴリ/categories/devopsのデータレイクハウス(Iceberg/Delta Lake)・dbt・Sparkyと統合してETLパイプライン全体をオーケストレーションします。LLM Toolsカテゴリ/categories/llm-toolsのRAGパイプライン(ドキュメント取得→チャンク分割→ベクトル化→Upsert)のスケジューリングと失敗リトライにも活用されています。

FAQ

Q. TemporalとAirflowの使い分けは?

A. マイクロサービスの長時間ビジネスロジック(注文処理・決済・予約)ならTemporalデータエンジニアリングのETLパイプライン・バッチジョブならAirflowが向いています。決め手となる違い: ①Temporalは「Signal」でワークフロー実行中に外部からステータス変更(注文キャンセル信号)を受け付けられる②Airflowは100+のProvider(Snowflake・dbt・Spark・BigQuery・Slack等)で外部ツール統合が即座にできる③Temporalの状態永続化はJVMベースのCassandra/PostgreSQLで大規模でも高耐久④Airflowのスケジューラーはcronで十分で、データ更新が来たらETLを実行するトリガー型に向いている。

Q. PrefectはAirflowのどこを改善しましたか?

A. Airflowの複雑さとデプロイのしんどさをモダンPythonで解消したのがPrefectです。具体的な改善点: ①DAGファイルを別途定義するAirflowに対してPrefectは通常のPython関数に@flow/@taskデコレーターを付けるだけで既存コードをワークフロー化②デプロイはPrefect Cloudまたはセルフホストサーバーにworkspacでコードを送信するだけ(Kubernetes依存なし)③Airflowのdag_idとrun_idに対してPrefectはflowとdeploymentの概念で環境ごとの設定を分離④TaskのCacheキー(task_input_hash)で同一入力の再計算を自動スキップ⑤Prefect Cloudは月10,000回のフロー実行まで無料。

Q. k8sでAirflowを本番運用するのに必要なリソースは?

A. Airflow on KubernetesExecutorの最小構成: ①Webserver: 2 CPU・2GB RAM(HA構成では2レプリカ)②Scheduler: 2 CPU・2GB RAM③PostgreSQL(または CloudSQL): 2CPU・4GB RAM④Redis(CeleryExecutorの場合): 1CPU・1GB RAM⑤各Workerは起動時にDynamic Podとして生成(1タスク=1Pod)。Helmチャート(apache-airflow/airflow)でデプロイ: helm install airflow apache-airflow/airflow --set executor=KubernetesExecutor --set postgresql.enabled=true。Astronomer(商用マネージドAirflow)・Google Cloud Composer・Amazon MWAA(Managed Workflows for Apache Airflow)のマネージドサービスも運用コストを考慮すると選択肢になります。

Q. TemporalのSignalとQueryの違いは何ですか?

A. Signal: ワークフロー実行中に外部から状態変更を注入(副作用あり)、Query: ワークフローの現在の状態を読み取り専用で取得(副作用なし)。Signalの使用例: 注文処理ワークフローが「承認待ち」状態でブロックしている時にclient.get_workflow_handle('order-001').signal('approve')で承認シグナルを送信してワークフローを先に進める。Queryの使用例: client.get_workflow_handle('order-001').query('get_status')で現在のステータス(pending/processing/shipped)をUIから取得。Signal受信でブロックする例: approval = workflow.wait_for_signal('approve', timeout=timedelta(hours=24))で24時間タイムアウト付きの人間承認フローを実装。

まとめ

ユースケース推奨ツール
マイクロサービス・ビジネスプロセス・決済Temporal
データエンジニアリング・ETL・dbt統合Apache Airflow
MLパイプライン・シンプルなバッチ・Python既存コードPrefect

関連外部リソース

他の記事も読む

Let's Build Together

OSS導入、自社だけで悩まない。

ツール選定から構築・運用・AI活用まで、オープンソースラボ運営元のClasslessが伴走します。初回のご相談は無料です。