AI

データパイプライン比較:dbt vs Airbyte vs Singer でETLを構築する

オープンソースラボ編集部2026年6月14日

データパイプライン比較:dbt vs Airbyte vs Singer でETLを構築する

BIツール・データウェアハウス・機械学習基盤を支える**データパイプライン(ELT/ETL)**は、分散したデータソース(Salesforce・PostgreSQL・Google Analytics)からデータを抽出・ロード・変換して分析可能な形式に整えるインフラです。dbt(SQLベース変換・データモデリング)・Airbyte(OSSデータ統合プラットフォーム・300+コネクタ)・Singer(Meltanoベース・タップ/ターゲットプロトコル)の3つが2026年のOSSデータパイプライン主要ツールです。

OSSデータパイプラインを使う理由

  • コスト削減: Fivetran($500/月〜)・Stitch($100/月〜)→Airbyte OSS+dBTで月$50のVPS費用のみ
  • データ制御: 顧客データ・業務データを自社インフラでパイプライン処理
  • カスタムコネクタ: 社内システム・独自APIへのコネクタを開発・OSS化
  • 変換の透明性: dbtのDAG(有向非巡回グラフ)で変換ロジックの依存関係を可視化

主要ツールの概要

dbt(data build tool)

2016年公開(dbt Labs)、Python製のOSSです。GitHubスター10k+。SQL+Jinjaテンプレートでデータウェアハウスの変換ロジックを記述・テスト・ドキュメント化するELTの「T(Transform)」専門ツールです。BigQuery・Snowflake・Redshift・PostgreSQL・DuckDB等30+のウェアハウスに対応し、lineageグラフでデータの流れを可視化します。

# dbt プロジェクト構造(dbt init my_project)
# my_project/
# ├── dbt_project.yml
# ├── profiles.yml
# ├── models/
# │   ├── staging/         # ソーステーブルをそのままSQL化
# │   │   ├── stg_orders.sql
# │   │   └── stg_customers.sql
# │   ├── intermediate/    # 中間変換テーブル
# │   └── marts/           # BI用最終テーブル
# │       └── fct_revenue.sql
# └── tests/               # データ品質テスト

# profiles.yml: DBWへの接続設定(秘密情報は.envに)
my_project:
  target: dev
  outputs:
    dev:
      type: postgres
      host: "{{ env_var('DB_HOST') }}"
      port: 5432
      dbname: analytics
      user: "{{ env_var('DB_USER') }}"
      password: "{{ env_var('DB_PASS') }}"
      schema: dbt_dev
      threads: 4
    prod:
      type: bigquery
      method: service-account
      project: my-gcp-project
      dataset: dbt_prod
      keyfile: /secrets/gcp-key.json
      threads: 8
-- models/staging/stg_orders.sql: ソーステーブルの標準化
WITH source AS (
    SELECT * FROM {{ source('raw', 'orders') }}  -- ソース定義からテーブルを参照
),
renamed AS (
    SELECT
        id                      AS order_id,
        customer_id,
        created_at::timestamp   AS ordered_at,
        status,
        ROUND(total_amount, 2)  AS total_amount_jpy,
        shipping_address->>'prefecture' AS prefecture,  -- JSONフィールドを展開
        {{ dbt_utils.surrogate_key(['id', 'customer_id']) }} AS order_sk  -- dbt-utilsマクロ
    FROM source
    WHERE deleted_at IS NULL  -- ソフトデリートを除外
)
SELECT * FROM renamed
-- models/marts/fct_revenue.sql: 日別売上ファクトテーブル
{{ config(materialized='incremental', unique_key='revenue_date') }}
-- incremental: 前回実行以降の新データのみを処理(フルリフレッシュより高速)

WITH orders AS (
    SELECT * FROM {{ ref('stg_orders') }}  -- staging参照(依存関係を自動解決)
    {% if is_incremental() %}
        WHERE ordered_at >= (SELECT MAX(revenue_date) FROM {{ this }})
    {% endif %}
),
daily_revenue AS (
    SELECT
        DATE_TRUNC('day', ordered_at) AS revenue_date,
        prefecture,
        COUNT(*) AS order_count,
        SUM(total_amount_jpy) AS revenue_jpy,
        AVG(total_amount_jpy) AS avg_order_value
    FROM orders
    WHERE status NOT IN ('cancelled', 'refunded')
    GROUP BY 1, 2
)
SELECT * FROM daily_revenue
# Python: dbt CLIをPythonから自動実行(Airflow/Prefectタスク)
import subprocess
from pathlib import Path

def run_dbt(command: str, project_dir: str, env: dict = None) -> dict:
    '''dbt CLIコマンドをPythonから実行'''
    import os
    proc_env = {**os.environ, **(env or {})}
    result = subprocess.run(
        ['dbt', *command.split(), '--profiles-dir', project_dir],
        capture_output=True, text=True, cwd=project_dir, env=proc_env,
    )
    return {'returncode': result.returncode, 'stdout': result.stdout[-2000:], 'stderr': result.stderr[-500:]}

def daily_pipeline(project_dir: str = '/opt/dbt/my_project') -> None:
    '''日次dbtパイプラインを実行(staging→intermediate→marts)'''
    steps = [
        'dbt deps',          # パッケージインストール
        'dbt source freshness',  # ソースデータの鮮度チェック
        'dbt run --select staging',     # stagingモデル実行
        'dbt test --select staging',    # stagingデータテスト
        'dbt run --select marts',       # martsモデル実行
        'dbt test --select marts',      # martsデータテスト
    ]
    for step in steps:
        result = run_dbt(step, project_dir)
        if result['returncode'] != 0:
            raise RuntimeError(f'{step} 失敗:
{result["stderr"]}')
    print('パイプライン完了')

Airbyte

2020年公開、Java/Python製のOSSです。GitHubスター17k+。300+コネクタを持つOSSデータ統合プラットフォームで、Salesforce・Google Ads・GitHub・PostgreSQL・MongoDB等からSnowflake・BigQuery・Redshiftへのデータ同期をノーコードUIまたはPython SDK(PyAirbyte)で設定できます。ELTの「EL(Extract・Load)」を担当します。

# docker-compose.yml: Airbyte(Community Edition)
version: "3.8"
services:
  airbyte-server:
    image: airbyte/server:latest
    restart: unless-stopped
    ports:
      - "8001:8001"
    environment:
      AIRBYTE_VERSION: latest
      DATABASE_URL: postgresql://airbyte:${DB_PASS}@airbyte-db:5432/airbyte
      CONFIG_ROOT: /data
    depends_on: [airbyte-db]
    volumes:
      - airbyte_data:/data

  airbyte-webapp:
    image: airbyte/webapp:latest
    restart: unless-stopped
    ports:
      - "8000:8000"

  airbyte-db:
    image: postgres:16-alpine
    environment:
      POSTGRES_DB: airbyte
      POSTGRES_USER: airbyte
      POSTGRES_PASSWORD: ${DB_PASS}
    volumes:
      - airbyte_db:/var/lib/postgresql/data

volumes:
  airbyte_data:
  airbyte_db:
# Python: PyAirbyte でProgrammatic Data Sync(Airbyteをコードで操作)
import airbyte as ab

# Airbyte Community Cloudまたはセルフホストに接続
source = ab.get_source(
    'source-github',
    config={
        'credentials': {'personal_access_token': 'ghp_xxxxx'},
        'repositories': ['my-org/my-repo'],
    },
)
source.check()  # 接続テスト

# DuckDB(ローカル)またはPostgreSQLにロード
cache = ab.get_default_cache()  # DuckDBキャッシュ(ローカル分析)
result = source.read(cache=cache)

# Pandasデータフレームとして取得(dbtの代わりに直接分析)
import pandas as pd
df = result['issues'].to_pandas()
print(df[['number', 'title', 'state', 'created_at']].head(20))

Singer(Meltano)

2016年公開(Stitch)、Python製のOSSです。GitHubスター5k+(singer-python)。タップ(Tap:データ抽出)とターゲット(Target:データロード)のプロトコルを定義したOSS標準で、400+のコミュニティTap/Targetが公開されています。MeltanoはSingerをラップしてGit管理・CI/CD統合・スケジューリングを追加したELTオーケストレーターです。

# meltano.yml: Meltanoプロジェクト設定
version: 1
default_environment: dev

plugins:
  extractors:
    - name: tap-github          # GitHubからデータ抽出
      variant: MeltanoLabs
      pip_url: git+https://github.com/MeltanoLabs/tap-github.git
      config:
        repositories:
          - my-org/my-repo
        metrics:
          - issues
          - pull_requests
  loaders:
    - name: target-postgres     # PostgreSQLにロード
      variant: meltanolabs
      pip_url: meltanolabs-target-postgres
      config:
        host: "{{ env_var('PG_HOST') }}"
        port: 5432
        database: analytics
        user: "{{ env_var('PG_USER') }}"
        password: "{{ env_var('PG_PASS') }}"
        default_target_schema: raw_github
  utilities:
    - name: dbt-postgres        # 変換ステップ(ELTのT)
      variant: dbt-labs
      pip_url: dbt-core dbt-postgres
# Meltano: ELT実行コマンド
meltano install                               # プラグインインストール
meltano run tap-github target-postgres        # EL(抽出・ロード)
meltano run dbt-postgres:run dbt-postgres:test  # T(変換・テスト)
meltano schedule add daily-elt --extractor tap-github --loader target-postgres --interval @daily

機能比較表

比較項目dbtAirbyteSinger/Meltano
役割変換(T)抽出・ロード(EL)EL統合
コネクタ数N/A300+400+
GUI✅ dbt Cloud✅ Web UI△ CLI
SQL変換✅ 最強
GitHub Stars10k+17k+5k+

データパイプラインはDevOpsカテゴリ/categories/devopsのデータウェアハウス(ClickHouse/DuckDB)と組み合わせてモダンデータスタック(Airbyte→ClickHouse→dbt→Metabase)を構築します。LLMツールカテゴリ/categories/llm-toolsのRAGフレームワーク(LlamaIndex/LangChain)のデータ準備パイプラインとして、dbtで前処理したデータをベクトルストアに投入するAI対応データパイプラインを実現します。

FAQ

Q. dbtモデルのマテリアライズ(materialization)の種類とベストプラクティスは?

A. dbtのマテリアライズは4種類あり、用途によって使い分けますview(デフォルト): SQLビューとして定義→常に最新データ→ストレージ使用なし→クエリごとに再計算(stagingに最適)。table:毎回テーブルを作り直す→高速クエリ→大量データに使うとリフレッシュに時間がかかる(小中規模のmartsに最適)。incremental:前回実行以降の新規・更新データのみ処理→大規模ファクトテーブルに必須(unique_key設定でupsert)。ephemeral:一時的なCTE(共通テーブル式)→テーブル/ビューを作成しない→他のモデルのCTEとして展開(再利用のみが目的の共通ロジックに)。設定: {{ config(materialized='incremental', unique_key='id', on_schema_change='append_new_columns') }}

Q. Airbyteのカスタムコネクタを作成して社内APIからデータを抽出するには?

A. Airbyteのコネクタ開発フレームワーク(CDK: Connector Development Kit)を使ってPython製のカスタムソースコネクタを10分で作成できます。手順: ①airbyte-cdkインストール: pip install airbyte-cdk②コネクタクラスを継承: class SourceMyAPI(AbstractSource)check_connection(接続テスト)・streams(ストリーム一覧)メソッドを実装④各ストリームクラス: HttpStream継承→url_basepathrequest_headersparse_responseを実装⑤Dockerコンテナ化: airbyte-integrations/connectors/source-my-api/Dockerfileを作成⑥AirbyteのSettingsでカスタムDockerイメージを登録→UIから接続設定可能。テスト: python main.py spec(設定スキーマ)・python main.py check --config config.jsonpython main.py discover --config config.jsonで動作確認。

Q. dbtとAirbyteを組み合わせたモダンデータスタックはどう構成しますか?

A. Airbyte(EL)→データウェアハウス(Snowflake/BigQuery)→dbt(T)→BIツール(Metabase/Grafana)の4層構成がモダンデータスタックのスタンダードです。構成: ①Airbyte: PostgreSQL・Salesforce・Google AdsのデータをBigQueryのrawスキーマにロード(毎時間・差分同期)②dbt: models/staging/でrawデータを標準化→models/marts/でBIが使いやすいwide tableを作成→dbt run --select tag:dailyで日次実行→dbt testでデータ品質チェック③BigQuery: dbtが作成したマートテーブルを保持④Metabase/Superset: martsテーブルをデータソースとして接続→BI担当者がSQLなしでチャートを作成。オーケストレーション: Prefect・Airflow・MeltanoのスケジューラーでAirbyte→dbtの順序実行を管理。

Q. SingerのTapプロトコルを使ってカスタムデータ抽出を実装するには?

A. Singer仕様(JSON Schema・Stateによる差分管理・Catalogによるストリーム選択)に従ったPython製TapをCLIとして実装します。最小実装: tap-myapp.pyで①--configでAPIキー等を受け取り②discoverモードで対応ストリームのCatalogをJSON出力③syncモードでデータをJSON Lines(SCHEMARECORDSTATEメッセージ)として標準出力に書き出す。Singer pipe: tap-myapp --config config.json | target-postgres --config pg_config.jsonでパイプライン実行。Meltano統合: meltano add extractor --from-ref tap-myappでMeltanoに登録→meltano.ymlで管理→CI/CDでmeltano run tap-myapp target-postgresを自動実行。差分管理: STATEメッセージのbookmarkフィールドで最後に取得したレコードのIDや日時を記録→次回実行でconfig.start_dateとして使用→増分抽出を実現。

まとめ

ユースケース推奨ツール
SQL変換・データモデリング・テストdbt
300+ソースからのデータ統合・GUIAirbyte
CLI自動化・カスタムコネクタ・Git管理Singer/Meltano

関連外部リソース

他の記事も読む

Let's Build Together

OSS導入、自社だけで悩まない。

ツール選定から構築・運用・AI活用まで、オープンソースラボ運営元のClasslessが伴走します。初回のご相談は無料です。