iceberg-loader
A convenience wrapper around PyIceberg that simplifies data loading into Apache Iceberg tables. PyArrow-first, handles messy JSON, schema evolution, idempotent replace, upsert, batching, and streaming out of the box.
Status: Actively developed and under testing. PRs are welcome! Currently tested against Hive Metastore; REST Catalog support is planned.
Features
- Arrow-first:
pa.Table,RecordBatch, IPC. - Messy JSON friendly: dict/list/mixed → JSON strings.
- Schema evolution (opt-in).
- Idempotent replace (
replace_filter) and upsert. - Commit interval for long streams.
- Maintenance helpers (expire snapshots).
Install
pip install "iceberg-loader[all]"
Or with uv:
uv pip install "iceberg-loader[all]"
Extras
| Extra | Description |
|---|---|
hive |
Hive Metastore support |
s3fs |
S3 filesystem support |
pyiceberg-core |
PyIceberg core |
all |
All extras |
Compatibility
- Python: 3.10, 3.11, 3.12, 3.13, 3.14
- PyArrow: >= 18.0.0
- PyIceberg: >= 0.7.1
Quickstart
import pyarrow as pa
from pyiceberg.catalog import load_catalog
from iceberg_loader import LoaderConfig, load_data_to_iceberg
catalog = load_catalog("default")
data = pa.Table.from_pydict({"id": [1, 2], "signup_date": ["2023-01-01", "2023-01-02"]})
config = LoaderConfig(write_mode="append", partition_col="day(signup_date)", schema_evolution=True)
load_data_to_iceberg(data, ("db", "users"), catalog, config=config)
Usage
Basic Example
import pyarrow as pa
from pyiceberg.catalog import load_catalog
from iceberg_loader import LoaderConfig, load_data_to_iceberg
catalog = load_catalog("default")
data = pa.Table.from_pydict({
"id": [1, 2, 3],
"name": ["Alice", "Bob", "Charlie"],
"created_at": [1672531200000, 1672617600000, 1672704000000],
"signup_date": ["2023-01-01", "2023-01-01", "2023-01-02"]
})
config = LoaderConfig(write_mode="append", partition_col="day(signup_date)", schema_evolution=True)
result = load_data_to_iceberg(
table_data=data,
table_identifier=("my_db", "my_table"),
catalog=catalog,
config=config,
)
print(result)
# {'rows_loaded': 3, 'write_mode': 'append', 'partition_col': 'day(signup_date)', ...}
Idempotent Load (Replace Partition)
Safely re-load data for a specific day (avoiding duplicates):
config = LoaderConfig(
write_mode="append",
replace_filter="signup_date == '2023-01-01'",
partition_col="day(signup_date)",
)
load_data_to_iceberg(table_data=data, table_identifier=("my_db", "my_table"), catalog=catalog, config=config)
Upsert (Merge Into)
Merge operation (update existing rows, insert new ones) based on key columns. Requires PyIceberg >= 0.7.0.
config = LoaderConfig(write_mode="upsert", join_cols=["id"])
load_data_to_iceberg(table_data=data, table_identifier=("my_db", "my_table"), catalog=catalog, config=config)
Batch Loading
For large datasets, use load_batches_to_iceberg with an iterator of RecordBatches:
from iceberg_loader import load_batches_to_iceberg
def batch_generator():
for i in range(10):
yield some_record_batch
result = load_batches_to_iceberg(
batch_iterator=batch_generator(),
table_identifier=("my_db", "large_table"),
catalog=catalog,
config=LoaderConfig(write_mode="append", commit_interval=100),
)
Stream Loading (Arrow IPC)
Load data directly from an Apache Arrow IPC stream:
from iceberg_loader import load_ipc_stream_to_iceberg
result = load_ipc_stream_to_iceberg(
stream_source="data.arrow",
table_identifier=("my_db", "stream_table"),
catalog=catalog,
config=LoaderConfig(write_mode="append"),
)
Custom Settings
Override default table properties:
custom_props = {
'write.parquet.compression-codec': 'snappy',
'history.expire.min-snapshots-to-keep': 5,
}
config = LoaderConfig(table_properties=custom_props, write_mode="append")
load_data_to_iceberg(..., config=config)
Override Default Table Properties Globally
Built-in defaults live in iceberg_loader.core.config.TABLE_PROPERTIES (format version, Parquet compression, commit retries). To tweak them, copy the dictionary, override keys, and pass into LoaderConfig:
from pyiceberg.catalog import load_catalog
from iceberg_loader import LoaderConfig, load_data_to_iceberg
from iceberg_loader.core.config import TABLE_PROPERTIES
catalog = load_catalog("default")
custom_properties = {**TABLE_PROPERTIES, "write.parquet.compression-codec": "gzip"}
config = LoaderConfig(
write_mode="append",
table_properties=custom_properties,
)
load_data_to_iceberg(table_data, ("default", "events"), catalog, config=config)
Maintenance Helper
from iceberg_loader import expire_snapshots
table = catalog.load_table(("db", "users"))
expire_snapshots(table, keep_last=2)
Adding Load Timestamp
You can automatically add a timestamp column (e.g. _load_dttm) to every row to track when it was loaded. This is useful for ETL audit trails or partitioning by load time.
from datetime import datetime
# Will add column '_load_dttm' with current time and partition by hour
config = LoaderConfig(
write_mode="append",
load_timestamp=datetime.now(),
partition_col="hour(_load_dttm)",
)
# You can also customize the column name and day-transform it
config_custom = LoaderConfig(
write_mode="append",
load_timestamp=datetime(2025, 1, 1),
load_ts_col="etl_ts",
partition_col="day(etl_ts)",
)
LoaderConfig Reference
| Parameter | Type | Default | Description |
|---|---|---|---|
write_mode |
'append' | 'overwrite' | 'upsert' |
'overwrite' |
Data write mode |
partition_col |
str \| None |
None |
Partition column or transform (e.g., month(ts), bucket(16,id)); prefer day/hour on timestamps |
replace_filter |
str \| None |
None |
SQL-style filter for idempotent loads |
schema_evolution |
bool |
False |
Auto-add new columns |
commit_interval |
int |
0 |
Commit every N batches (0 = single transaction) |
join_cols |
list[str] \| None |
None |
Merge keys for upsert |
table_properties |
dict \| None |
None |
Custom Iceberg table properties |
load_timestamp |
datetime \| None |
None |
If set, adds _load_dttm column with this value |
load_ts_col |
str |
'_load_dttm' |
Name of the load timestamp column |
API Reference
load_batches_to_iceberg()
Main function for loading a stream of batches into an Iceberg table.
def load_batches_to_iceberg(
batch_iterator: Iterator[pa.RecordBatch] | pa.RecordBatchReader,
table_identifier: tuple[str, str],
catalog: Catalog,
config: LoaderConfig | None = None,
) -> dict[str, Any]
All load parameters (write_mode, partition_col, replace_filter, join_cols, schema_evolution, commit_interval, table_properties) are passed only via LoaderConfig. See the table in LoaderConfig Reference above.
Return Value
Dictionary with loading results:
{
'rows_loaded': int,
'batches_processed': int,
'write_mode': str,
'partition_col': str,
'table_location': str,
'snapshot_id': int | str,
'new_table_created': bool,
}
Usage Examples
Basic stream loading:
def generate_batches():
for i in range(100):
data = {"id": [i], "value": [f"row_{i}"]}
yield pa.RecordBatch.from_pydict(data)
result = load_batches_to_iceberg(
batch_iterator=generate_batches(),
table_identifier=("db", "table"),
catalog=catalog,
config=LoaderConfig(write_mode="append")
)
With commits for memory management:
result = load_batches_to_iceberg(
batch_iterator=large_batch_stream,
table_identifier=("db", "large_table"),
catalog=catalog,
config=LoaderConfig(
commit_interval=50,
schema_evolution=True
)
)
Idempotent partition loading:
result = load_batches_to_iceberg(
batch_iterator=daily_batches,
table_identifier=("db", "events"),
catalog=catalog,
config=LoaderConfig(
write_mode="append",
replace_filter="event_date == '2023-12-09'",
partition_col="day(event_date)"
)
)
Examples
See the Examples page for runnable demos covering streaming, upsert, schema evolution, messy JSON, and more.
How we version
- Semantic Versioning from
0.1.x: MINOR = new compatible features, PATCH = fixes, MAJOR = breaking changes. - Public API:
LoaderConfig,load_data_to_iceberg,load_batches_to_iceberg,load_ipc_stream_to_iceberg; other modules are internal. - Prefer partition transforms for timestamps (
day(ts),hour(ts),day(_load_dttm)) to avoid unbounded partition counts. - LoaderConfig validates partition expressions and forbids unsafe mixes (e.g.,
replace_filterwithupsert, identity partition on_load_dttm).
Release checklist
- Align versions in
pyproject.tomlandsrc/iceberg_loader/__about__.py. - Update
RELEASE.mdwith highlights/breaking changes. - Run
uv lock --lockedand commituv.lockif it changes. - Run lint (
uv run ruff check .), types (uv run mypy src/iceberg_loader tests), and tests (uv run python -m pytest). - Tag and push (
git tag -a vX.Y.Z -m "Release X.Y.Z"), let CI publish.
Development
This project uses Hatch as build backend. For local workflows prefer uv.
Run Tests
uv run python -m pytest
Linting
uv run ruff check
uv run ruff format --check
uv run mypy
Release
hatch build
twine check dist/*
twine upload --repository testpypi dist/*
twine upload dist/*
License
iceberg-loader is distributed under the terms of the MIT license.