データレイクハウス比較:Apache Iceberg vs Delta Lake vs Hudi でS3をデータ基盤に変える
オープンソースラボ編集部 ・ 2026年6月14日
データレイクハウス比較:Apache Iceberg vs Delta Lake vs Hudi でS3をデータ基盤に変える
S3・GCS・ADLS上のParquetファイルを直接SQLで更新・タイムトラベルできるオープンテーブルフォーマットが急速に普及しています。Apache Iceberg(CNCF・最も普及)・Delta Lake(Databricks製・Delta Sharing)・Apache Hudi(Uber製・ストリーミング特化)の3つが次世代データレイクハウスのデファクトスタンダードです。
オープンテーブルフォーマットを選ぶ理由
- S3を直接変更可能に: S3上のParquetはImmutableだが、IcebergはACID対応でUPDATE/DELETE/MERGE INTO(upsert)が可能
- タイムトラベル: 過去の特定時点のデータを
AS OFクエリで参照(誤削除の復元・監査対応) - スキーマ進化: カラムの追加・型変更・リネームを既存データを壊さずに実行
- エンジン非依存: Spark・Trino・Flink・DuckDB・StarRocksなど複数クエリエンジンから同一データを参照
主要フォーマットの概要
Apache Iceberg
2018年公開(Netflix起源)、Java製のOSSです。GitHubスター7k+。最もエンジン互換性が高く、Snowflake・BigQuery・AWS Athena・Apache Spark・Trinoが公式サポートしています。REST Catalog・Hive Metastore・Nessie(Git-like カタログ)でメタデータを管理します。
# PySpark + Apache Icebergのセットアップとデータ操作
from pyspark.sql import SparkSession
spark = SparkSession.builder .appName("iceberg-demo") .config("spark.jars.packages",
"org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.0,"
"org.apache.iceberg:iceberg-aws-bundle:1.6.0") .config("spark.sql.extensions",
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") .config("spark.sql.catalog.my_catalog", "org.apache.iceberg.spark.SparkCatalog") .config("spark.sql.catalog.my_catalog.type", "rest") .config("spark.sql.catalog.my_catalog.uri", "http://localhost:8181") .config("spark.sql.catalog.my_catalog.warehouse", "s3://my-datalake/warehouse/") .getOrCreate()
# Icebergテーブルを作成
spark.sql('''
CREATE TABLE IF NOT EXISTS my_catalog.analytics.orders (
order_id BIGINT,
customer_id BIGINT,
total DECIMAL(12, 2),
status STRING,
created_at TIMESTAMP
)
USING ICEBERG
PARTITIONED BY (days(created_at))
TBLPROPERTIES (
'write.format.default' = 'parquet',
'write.parquet.compression-codec' = 'zstd',
'history.expire.max-snapshot-age-ms' = '604800000'
)
''')
# MERGE INTO(upsert操作)
spark.sql('''
MERGE INTO my_catalog.analytics.orders t
USING (
SELECT 1001 AS order_id, 'shipped' AS status, current_timestamp() AS updated_at
) s
ON t.order_id = s.order_id
WHEN MATCHED THEN UPDATE SET t.status = s.status
WHEN NOT MATCHED THEN INSERT *
''')
# タイムトラベルクエリ(1日前の状態を参照)
spark.sql('''
SELECT * FROM my_catalog.analytics.orders
TIMESTAMP AS OF date_sub(current_timestamp(), 1)
WHERE status = 'pending'
''').show()
# スキーマ進化(新しいカラムを追加)
spark.sql('''
ALTER TABLE my_catalog.analytics.orders
ADD COLUMN discount_amount DECIMAL(10, 2) AFTER total
''')
# スナップショット履歴を確認
spark.sql("SELECT * FROM my_catalog.analytics.orders.history").show()
# PyIceberg: Sparkなしで直接Icebergを操作(Python)
from pyiceberg.catalog import load_catalog
import pyarrow as pa
catalog = load_catalog("rest", **{
"uri": "http://localhost:8181",
"s3.endpoint": "http://localhost:9000",
"s3.access-key-id": "minioadmin",
"s3.secret-access-key": "minioadmin",
})
# テーブルを読み込む(DuckDBでクエリ可能)
table = catalog.load_table("analytics.orders")
# PyArrowで読み込み
arrow_table = table.scan(
row_filter="status = 'shipped'",
selected_fields=("order_id", "customer_id", "total"),
limit=1000,
).to_arrow()
print(f"件数: {len(arrow_table)}")
print(f"合計売上: {arrow_table['total'].sum()}")
# DuckDBでIcebergを直接クエリ
import duckdb
conn = duckdb.connect()
conn.execute("INSTALL iceberg; LOAD iceberg;")
result = conn.execute('''
SELECT status, COUNT(*) as cnt, SUM(total) as revenue
FROM iceberg_scan('s3://my-datalake/warehouse/analytics/orders')
GROUP BY status
''').fetchall()
Delta Lake
2019年公開(Databricks起源)、Scala製のOSSです。GitHubスター8k+。Databricks上での利用が最も最適化されており、Delta Sharing(オープンプロトコルによるデータ共有)・Delta Live Tables(DLT・宣言型ETLパイプライン)が特徴です。
# PySpark + Delta Lake
from pyspark.sql import SparkSession
from delta.tables import DeltaTable
spark = SparkSession.builder .appName("delta-demo") .config("spark.jars.packages", "io.delta:delta-spark_2.12:3.2.0") .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") .config("spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog") .getOrCreate()
# Delta テーブル作成
spark.sql('''
CREATE TABLE IF NOT EXISTS analytics.events
USING DELTA
LOCATION 's3://my-datalake/delta/events/'
AS SELECT * FROM parquet.`s3://my-datalake/raw/events/`
''')
# MERGE INTO(upsert)
delta_table = DeltaTable.forPath(spark, "s3://my-datalake/delta/events/")
delta_table.alias("target").merge(
source=spark.table("updates").alias("source"),
condition="target.event_id = source.event_id"
).whenMatchedUpdateAll() .whenNotMatchedInsertAll() .execute()
# タイムトラベル
spark.read.format("delta") .option("versionAsOf", 5) .load("s3://my-datalake/delta/events/") .show()
# Z-Ordering(クエリの高速化)
spark.sql('''
OPTIMIZE analytics.events
ZORDER BY (user_id, event_type)
''')
# Vacuum(古いファイルを削除)
spark.sql("VACUUM analytics.events RETAIN 168 HOURS")
Apache Hudi
2016年公開(Uber起源)、Java製のOSSです。GitHubスター5k+。ストリーミングUpsertに最も最適化されており、Kafka→Flink→Hudiのリアルタイムデータレイクパイプラインに強みがあります。COW(Copy on Write)とMOR(Merge on Read)の2つのテーブルタイプで書き込み速度と読み取り速度のトレードオフを選択できます。
機能比較表
| 比較項目 | Apache Iceberg | Delta Lake | Apache Hudi |
|---|---|---|---|
| ACID UPSERT | ✅ | ✅ | ✅ |
| タイムトラベル | ✅ | ✅ | ✅ |
| エンジン互換性 | ✅ 最多 | ✅ Databricks最適 | △ Spark中心 |
| ストリーミングUpsert | ✅ | ✅ | ✅ 最強 |
| DuckDB連携 | ✅ | ✅ | △ |
| Snowflakeネイティブ | ✅ | ✅ | ❌ |
データレイクハウスはDevOpsカテゴリ/categories/devopsのApache Airflow・Prefectでパイプラインをオーケストレーションして毎日のバッチETLを自動化します。LLM Toolsカテゴリ/categories/llm-toolsのベクターデータベースと組み合わせて、Icebergの構造化データとRAGの非構造化データを統合した分析基盤を構築できます。
FAQ
Q. IcebergとDelta Lakeのどちらを選べばいいですか?
A. Databricksを使っているまたは使う予定ならDelta Lake、クラウドネイティブでSnowflake・Athena・Trinoを使うならIcebergが向いています。実際のユースケース: Iceberg選択→AWS Glue Catalog+Athena+Trinoの組み合わせ・Snowflake External Tables・Apache Flink→S3のストリーミングパイプライン。Delta Lake選択→Databricks上のDelta Live Tables(宣言型ETL)・Azure Synapse Analytics・Delta Sharing(外部組織へのデータ共有)。エンジン互換性の観点ではIcebergの方がより多くのクエリエンジン(DuckDB・ClickHouse・StarRocks)をサポートしています。
Q. 既存のParquetファイルをIcebergに移行する方法は?
A. Iceberg Migration procedureを使います。①CALL my_catalog.system.snapshot('hive_catalog.db.table', 'my_catalog.db.table')でParquetファイルをコピーせずにIceberg snapshot付きカタログエントリを作成②検証後にCALL my_catalog.system.migrate('my_catalog.db.table')でin-place移行③大規模テーブル(TB単位)はSparkで新しいIcebergテーブルにCTAS(Create Table As Select)で書き直す。PyIceberg: catalog.register_table(identifier, metadata_location)で既存のparquetメタデータからIcebergテーブルを登録。移行中も既存パイプラインが停止しないようにBlue-Green的にテーブルを切り替えます。
Q. DuckDBでIcebergをローカルクエリする最短手順は?
A. ①pip install duckdb pyiceberg②DuckDBにINSTALL iceberg; LOAD iceberg;③ローカルファイルまたはS3: SELECT * FROM iceberg_scan('path/to/metadata/00001-xxxxx.metadata.json')。S3の場合はSET s3_access_key_id='xxx'; SET s3_secret_access_key='xxx'; SET s3_region='ap-northeast-1';を先に実行。PyIceberg+DuckDB: table.scan().to_duckdb(table_name='orders')でPyIcebergのスキャン結果をDuckDBのビューとして登録してSQLクエリを実行できます。ローカル開発・データ品質チェック・アドホック分析に最適なパターンです。
Q. ストリーミングデータをリアルタイムでレイクハウスに書き込む方法は?
A. Flink+Iceberg(または Hudi)の組み合わせが最も実績があります。Flink+Icebergの例: ①Kafka TopicからFlinkでデータを読み込み②IcebergSink(Flink Connector for Iceberg)でS3へcheckpointごとにcommit③小さなファイルが蓄積するため定期的にCALL system.rewrite_data_files()でファイルをコンパクション。Flink+Hudi(MORモード): 書き込みはlog fileに追記(低レイテンシ)→定期的にbase fileにマージ(COW変換)→読み取りはRealtime viewでリアルタイム最新データを参照。レイテンシ: チェックポイント間隔(5〜30秒)でS3にコミットするため準リアルタイム(秒〜分オーダー)のデータ鮮度を実現できます。
まとめ
| ユースケース | 推奨フォーマット |
|---|---|
| エンジン互換性最優先・Snowflake/Athena/Trino | Apache Iceberg |
| Databricks・Delta Live Tables・Delta Sharing | Delta Lake |
| Kafka→リアルタイムUpsert・Spark中心 | Apache Hudi |