ワークフローエンジン比較:Temporal vs Airflow vs Prefect で分散ワークフローを管理する
オープンソースラボ編集部 ・ 2026年6月14日
ワークフローエンジン比較:Temporal vs Airflow vs Prefect で分散ワークフローを管理する
マイクロサービス・バッチETL・MLパイプラインでは、失敗時の自動リトライ・状態の永続化・タイムアウト・並列実行を信頼性高く実装する必要があります。Temporal(Go製・コードファーストワークフロー)・Apache Airflow(Python・DAG・データエンジニアリング標準)・Prefect(Python・モダンUI・ハイブリッドクラウド)の3つがOSSワークフローエンジンの主要選択肢です。
ワークフローエンジンを選ぶ理由
- 信頼性: ステップの失敗・サービス再起動後も状態を保存して自動リトライ・再開
- 可視性: 各ワークフロー実行の状態・ログ・タイムラインをUIで可視化
- スケジューリング: cron式・イベントトリガー・手動実行を統一管理
- 並列化: ファンアウト(並列タスク実行)・依存関係グラフを宣言的に定義
主要ツールの概要
Temporal
2019年公開(Cadence後継)、Go製のOSSです。GitHubスター12k+。**コードがそのままワークフロー定義になる(コードファースト)**エンジンで、Workflow・Activity・Signalの3概念でほぼすべての分散処理パターンを実装できます。Uberの決済・Airbnbの予約・Stripe・Hashicorpで本番利用されています。
# docker-compose.yml - Temporal サーバー(開発環境)
version: "3.8"
services:
temporal:
image: temporalio/auto-setup:1.24.2
restart: unless-stopped
ports:
- "7233:7233" # gRPC フロントエンド
environment:
- DB=postgresql
- DB_PORT=5432
- POSTGRES_USER=temporal
- POSTGRES_PWD=${POSTGRES_PASSWORD}
- POSTGRES_SEEDS=postgresql
depends_on:
- postgresql
temporal-ui:
image: temporalio/ui:2.28.0
restart: unless-stopped
ports:
- "8080:8080"
environment:
- TEMPORAL_ADDRESS=temporal:7233
postgresql:
image: postgres:16-alpine
environment:
POSTGRES_USER: temporal
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
volumes:
- temporal_pg_data:/var/lib/postgresql/data
volumes:
temporal_pg_data:
# Temporal Python SDK: 注文処理ワークフロー
import asyncio
from datetime import timedelta
from temporalio import activity, workflow
from temporalio.client import Client
from temporalio.worker import Worker
# Activity(実際の処理ロジック)を定義
@activity.defn
async def validate_order(order_id: str) -> bool:
# 在庫確認・支払い認証など
print(f'注文バリデーション: {order_id}')
return True
@activity.defn
async def charge_payment(order_id: str, amount: float) -> str:
print(f'決済処理: {order_id} = {amount}円')
return f'charge_{order_id}'
@activity.defn
async def ship_order(order_id: str, charge_id: str) -> None:
print(f'出荷処理: {order_id} (charge: {charge_id})')
@activity.defn
async def send_notification(order_id: str, status: str) -> None:
print(f'通知送信: {order_id} -> {status}')
# Workflow(オーケストレーション)を定義
@workflow.defn
class OrderFulfillmentWorkflow:
@workflow.run
async def run(self, order_id: str, amount: float) -> str:
# バリデーション(5秒タイムアウト・3回リトライ)
is_valid = await workflow.execute_activity(
validate_order,
order_id,
start_to_close_timeout=timedelta(seconds=5),
retry_policy=workflow.RetryPolicy(maximum_attempts=3),
)
if not is_valid:
raise ValueError(f'注文 {order_id} はバリデーション失敗')
# 決済処理(30秒タイムアウト・自動リトライ)
charge_id = await workflow.execute_activity(
charge_payment,
args=[order_id, amount],
start_to_close_timeout=timedelta(seconds=30),
)
# 出荷と通知を並列実行
await asyncio.gather(
workflow.execute_activity(
ship_order,
args=[order_id, charge_id],
start_to_close_timeout=timedelta(minutes=10),
),
workflow.execute_activity(
send_notification,
args=[order_id, 'shipped'],
start_to_close_timeout=timedelta(seconds=10),
),
)
return f'注文 {order_id} 完了 (charge: {charge_id})'
async def main():
client = await Client.connect('localhost:7233')
# Workerを起動(バックグラウンドで実行)
async with Worker(
client,
task_queue='order-processing',
workflows=[OrderFulfillmentWorkflow],
activities=[validate_order, charge_payment, ship_order, send_notification],
):
# ワークフローを実行
result = await client.execute_workflow(
OrderFulfillmentWorkflow.run,
args=['order-001', 9800.0],
id='order-001-workflow',
task_queue='order-processing',
)
print(f'結果: {result}')
if __name__ == '__main__':
asyncio.run(main())
Apache Airflow
2015年公開(Airbnb起源)、Python製のOSSです。GitHubスター37k+。PythonコードでDAG(有向非巡回グラフ)を定義するデータパイプライン標準のワークフローエンジンです。豊富なOperator(S3・BigQuery・Snowflake・Spark・dbt・Slack等)で外部サービスとの統合が即座に実装できます。
# Airflow DAG: データパイプライン(ETL + dbt + 通知)
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
from airflow.providers.slack.operators.slack import SlackAPIPostOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'data-team',
'retries': 3,
'retry_delay': timedelta(minutes=5),
'retry_exponential_backoff': True,
'email_on_failure': True,
'email': ['data-alerts@yourcompany.com'],
}
def check_data_freshness(**kwargs) -> str:
'''データ鮮度チェック: 正常ならtransformへ、異常ならalertへ'''
from airflow.providers.postgres.hooks.postgres import PostgresHook
hook = PostgresHook(postgres_conn_id='analytics_db')
count = hook.get_first('''
SELECT COUNT(*) FROM raw_orders
WHERE created_at >= NOW() - INTERVAL '2 hours'
''')[0]
return 'transform_data' if count > 100 else 'alert_low_data'
with DAG(
dag_id='daily_analytics_pipeline',
schedule_interval='0 6 * * *',
start_date=datetime(2024, 1, 1),
default_args=default_args,
catchup=False,
tags=['analytics', 'production'],
) as dag:
freshness_check = BranchPythonOperator(
task_id='check_data_freshness',
python_callable=check_data_freshness,
)
transform = BashOperator(
task_id='transform_data',
bash_command='dbt run --project-dir /opt/dbt/analytics --select tag:daily',
)
test = BashOperator(
task_id='dbt_test',
bash_command='dbt test --project-dir /opt/dbt/analytics --select tag:daily',
)
alert_low = SlackAPIPostOperator(
task_id='alert_low_data',
slack_conn_id='slack_default',
channel='#data-alerts',
text=':warning: データ件数が少なすぎます。パイプラインを確認してください。',
)
notify_success = SlackAPIPostOperator(
task_id='notify_success',
slack_conn_id='slack_default',
channel='#data-team',
text=':white_check_mark: 日次分析パイプライン完了',
)
freshness_check >> [transform, alert_low]
transform >> test >> notify_success
Prefect
2018年公開、Python製のOSSです。GitHubスター16k+。Pythonデコレーターで既存コードをそのままワークフロー化できるモダンなアプローチと、Prefect Cloud・セルフホストサーバーのハイブリッド対応が特徴です。
# Prefect: ML推論パイプライン
from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta
@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1))
def load_data(date: str) -> list:
'''データ取得(1時間キャッシュ付き)'''
return [{'id': i, 'value': i * 2} for i in range(1000)]
@task(retries=3, retry_delay_seconds=30)
def preprocess(data: list) -> list:
'''前処理(3回リトライ)'''
return [{'id': d['id'], 'features': [d['value']]} for d in data]
@task
def run_inference(features: list) -> list:
'''モデル推論'''
return [{'id': f['id'], 'prediction': sum(f['features'])} for f in features]
@task
def save_results(predictions: list, date: str) -> None:
'''結果を保存'''
print(f'{date}: {len(predictions)}件の推論結果を保存')
@flow(name='daily-ml-inference', log_prints=True)
def daily_inference_pipeline(date: str = '2026-06-14'):
data = load_data(date)
preprocessed = preprocess(data)
predictions = run_inference(preprocessed)
save_results(predictions, date)
return len(predictions)
if __name__ == '__main__':
daily_inference_pipeline()
機能比較表
| 比較項目 | Temporal | Apache Airflow | Prefect |
|---|---|---|---|
| 失敗耐性・状態永続化 | ✅ 最強 | ✅ | ✅ |
| DataOps/ETL向き | △ | ✅ 最適 | ✅ |
| 学習コスト | 中〜高 | 中 | 低 |
| マイクロサービス向き | ✅ | △ | △ |
| UI/可視化 | ✅ | ✅ | ✅ |
| GitHub Stars | 12k+ | 37k+ | 16k+ |
ワークフローエンジンはDevOpsカテゴリ/categories/devopsのデータレイクハウス(Iceberg/Delta Lake)・dbt・Sparkyと統合してETLパイプライン全体をオーケストレーションします。LLM Toolsカテゴリ/categories/llm-toolsのRAGパイプライン(ドキュメント取得→チャンク分割→ベクトル化→Upsert)のスケジューリングと失敗リトライにも活用されています。
FAQ
Q. TemporalとAirflowの使い分けは?
A. マイクロサービスの長時間ビジネスロジック(注文処理・決済・予約)ならTemporal、データエンジニアリングのETLパイプライン・バッチジョブならAirflowが向いています。決め手となる違い: ①Temporalは「Signal」でワークフロー実行中に外部からステータス変更(注文キャンセル信号)を受け付けられる②Airflowは100+のProvider(Snowflake・dbt・Spark・BigQuery・Slack等)で外部ツール統合が即座にできる③Temporalの状態永続化はJVMベースのCassandra/PostgreSQLで大規模でも高耐久④Airflowのスケジューラーはcronで十分で、データ更新が来たらETLを実行するトリガー型に向いている。
Q. PrefectはAirflowのどこを改善しましたか?
A. Airflowの複雑さとデプロイのしんどさをモダンPythonで解消したのがPrefectです。具体的な改善点: ①DAGファイルを別途定義するAirflowに対してPrefectは通常のPython関数に@flow/@taskデコレーターを付けるだけで既存コードをワークフロー化②デプロイはPrefect Cloudまたはセルフホストサーバーにworkspacでコードを送信するだけ(Kubernetes依存なし)③Airflowのdag_idとrun_idに対してPrefectはflowとdeploymentの概念で環境ごとの設定を分離④TaskのCacheキー(task_input_hash)で同一入力の再計算を自動スキップ⑤Prefect Cloudは月10,000回のフロー実行まで無料。
Q. k8sでAirflowを本番運用するのに必要なリソースは?
A. Airflow on KubernetesExecutorの最小構成: ①Webserver: 2 CPU・2GB RAM(HA構成では2レプリカ)②Scheduler: 2 CPU・2GB RAM③PostgreSQL(または CloudSQL): 2CPU・4GB RAM④Redis(CeleryExecutorの場合): 1CPU・1GB RAM⑤各Workerは起動時にDynamic Podとして生成(1タスク=1Pod)。Helmチャート(apache-airflow/airflow)でデプロイ: helm install airflow apache-airflow/airflow --set executor=KubernetesExecutor --set postgresql.enabled=true。Astronomer(商用マネージドAirflow)・Google Cloud Composer・Amazon MWAA(Managed Workflows for Apache Airflow)のマネージドサービスも運用コストを考慮すると選択肢になります。
Q. TemporalのSignalとQueryの違いは何ですか?
A. Signal: ワークフロー実行中に外部から状態変更を注入(副作用あり)、Query: ワークフローの現在の状態を読み取り専用で取得(副作用なし)。Signalの使用例: 注文処理ワークフローが「承認待ち」状態でブロックしている時にclient.get_workflow_handle('order-001').signal('approve')で承認シグナルを送信してワークフローを先に進める。Queryの使用例: client.get_workflow_handle('order-001').query('get_status')で現在のステータス(pending/processing/shipped)をUIから取得。Signal受信でブロックする例: approval = workflow.wait_for_signal('approve', timeout=timedelta(hours=24))で24時間タイムアウト付きの人間承認フローを実装。
まとめ
| ユースケース | 推奨ツール |
|---|---|
| マイクロサービス・ビジネスプロセス・決済 | Temporal |
| データエンジニアリング・ETL・dbt統合 | Apache Airflow |
| MLパイプライン・シンプルなバッチ・Python既存コード | Prefect |