Skip to content

Commit bfbc6c4

Browse files
committed
add write target file size config
1 parent 7b4f3ce commit bfbc6c4

File tree

2 files changed

+20
-6
lines changed

2 files changed

+20
-6
lines changed

pyiceberg/io/pyarrow.py

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1758,13 +1758,23 @@ def write_file(table: Table, tasks: Iterator[WriteTask], file_schema: Optional[S
17581758
data_files.append(data_file)
17591759
return iter(data_files)
17601760

1761-
def bin_pack_arrow_table(tbl: pa.Table) -> Iterator[List[pa.RecordBatch]]:
1762-
# bin-pack the table into 256 MB chunks
1761+
1762+
def bin_pack_arrow_table(tbl: pa.Table, table_properties: Properties) -> Iterator[List[pa.RecordBatch]]:
17631763
from pyiceberg.utils.bin_packing import PackingIterator
17641764

1765-
splits = tbl.to_batches()
1766-
target_weight = 2 << 27 # 256 MB
1767-
bin_packed = PackingIterator(splits, target_weight, lookback=2, weight_func=lambda x: x.nbytes, largest_bin_first=True)
1765+
target_file_size = PropertyUtil.property_as_int(
1766+
properties=table_properties,
1767+
property_name=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
1768+
default=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT,
1769+
)
1770+
assert isinstance(target_file_size, int)
1771+
bin_packed = PackingIterator(
1772+
items=tbl.to_batches(),
1773+
target_weight=target_file_size,
1774+
lookback=2,
1775+
weight_func=lambda x: x.nbytes,
1776+
largest_bin_first=True,
1777+
)
17681778
return bin_packed
17691779

17701780

pyiceberg/table/__init__.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,9 @@ class TableProperties:
156156

157157
PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX = "write.parquet.bloom-filter-enabled.column"
158158

159+
WRITE_TARGET_FILE_SIZE_BYTES = "write.target-file-size-bytes"
160+
WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT = 512 * 1024 * 1024 # 512 MB
161+
159162
DEFAULT_WRITE_METRICS_MODE = "write.metadata.metrics.default"
160163
DEFAULT_WRITE_METRICS_MODE_DEFAULT = "truncate(16)"
161164

@@ -2367,9 +2370,10 @@ def _dataframe_to_data_files(
23672370
counter = itertools.count(0)
23682371
write_uuid = write_uuid or uuid.uuid4()
23692372

2373+
write_tasks = [WriteTask(write_uuid, next(counter), batches) for batches in bin_pack_arrow_table(df, table.properties)]
23702374
# This is an iter, so we don't have to materialize everything every time
23712375
# This will be more relevant when we start doing partitioned writes
2372-
yield from write_file(table, iter([WriteTask(write_uuid, next(counter), batches) for batches in bin_pack_arrow_table(df)]), file_schema=file_schema)
2376+
yield from write_file(table, iter(write_tasks), file_schema=file_schema)
23732377

23742378

23752379
class _MergingSnapshotProducer:

0 commit comments

Comments
 (0)