Skip to content

Commit 7b4f3ce

Browse files
committed
bin pack write
1 parent 44948cd commit 7b4f3ce

File tree

2 files changed

+44
-43
lines changed

2 files changed

+44
-43
lines changed

pyiceberg/io/pyarrow.py

Lines changed: 41 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1717,54 +1717,55 @@ def fill_parquet_file_metadata(
17171717

17181718

17191719
def write_file(table: Table, tasks: Iterator[WriteTask], file_schema: Optional[Schema] = None) -> Iterator[DataFile]:
1720-
task = next(tasks)
1721-
1722-
try:
1723-
_ = next(tasks)
1724-
# If there are more tasks, raise an exception
1725-
raise NotImplementedError("Only unpartitioned writes are supported: https://github.com/apache/iceberg-python/issues/208")
1726-
except StopIteration:
1727-
pass
1728-
1729-
parquet_writer_kwargs = _get_parquet_writer_kwargs(table.properties)
1730-
1731-
file_path = f'{table.location()}/data/{task.generate_data_file_filename("parquet")}'
17321720
file_schema = file_schema or table.schema()
17331721
arrow_file_schema = schema_to_pyarrow(file_schema)
1734-
1735-
fo = table.io.new_output(file_path)
1722+
parquet_writer_kwargs = _get_parquet_writer_kwargs(table.properties)
17361723
row_group_size = PropertyUtil.property_as_int(
17371724
properties=table.properties,
17381725
property_name=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES,
17391726
default=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT,
17401727
)
1741-
with fo.create(overwrite=True) as fos:
1742-
with pq.ParquetWriter(fos, schema=arrow_file_schema, **parquet_writer_kwargs) as writer:
1743-
writer.write_table(task.df, row_group_size=row_group_size)
1744-
1745-
data_file = DataFile(
1746-
content=DataFileContent.DATA,
1747-
file_path=file_path,
1748-
file_format=FileFormat.PARQUET,
1749-
partition=Record(),
1750-
file_size_in_bytes=len(fo),
1751-
# After this has been fixed:
1752-
# https://github.com/apache/iceberg-python/issues/271
1753-
# sort_order_id=task.sort_order_id,
1754-
sort_order_id=None,
1755-
# Just copy these from the table for now
1756-
spec_id=table.spec().spec_id,
1757-
equality_ids=None,
1758-
key_metadata=None,
1759-
)
1728+
data_files = []
1729+
for task in tasks:
1730+
file_path = f'{table.location()}/data/{task.generate_data_file_filename("parquet")}'
1731+
fo = table.io.new_output(file_path)
1732+
with fo.create(overwrite=True) as fos:
1733+
with pq.ParquetWriter(fos, schema=arrow_file_schema, **parquet_writer_kwargs) as writer:
1734+
for batch in task.record_batches:
1735+
writer.write_batch(batch, row_group_size=row_group_size)
1736+
1737+
data_file = DataFile(
1738+
content=DataFileContent.DATA,
1739+
file_path=file_path,
1740+
file_format=FileFormat.PARQUET,
1741+
partition=Record(),
1742+
file_size_in_bytes=len(fo),
1743+
# After this has been fixed:
1744+
# https://github.com/apache/iceberg-python/issues/271
1745+
# sort_order_id=task.sort_order_id,
1746+
sort_order_id=None,
1747+
# Just copy these from the table for now
1748+
spec_id=table.spec().spec_id,
1749+
equality_ids=None,
1750+
key_metadata=None,
1751+
)
1752+
fill_parquet_file_metadata(
1753+
data_file=data_file,
1754+
parquet_metadata=writer.writer.metadata,
1755+
stats_columns=compute_statistics_plan(file_schema, table.properties),
1756+
parquet_column_mapping=parquet_path_to_id_mapping(file_schema),
1757+
)
1758+
data_files.append(data_file)
1759+
return iter(data_files)
17601760

1761-
fill_parquet_file_metadata(
1762-
data_file=data_file,
1763-
parquet_metadata=writer.writer.metadata,
1764-
stats_columns=compute_statistics_plan(file_schema, table.properties),
1765-
parquet_column_mapping=parquet_path_to_id_mapping(file_schema),
1766-
)
1767-
return iter([data_file])
1761+
def bin_pack_arrow_table(tbl: pa.Table) -> Iterator[List[pa.RecordBatch]]:
1762+
# bin-pack the table into 256 MB chunks
1763+
from pyiceberg.utils.bin_packing import PackingIterator
1764+
1765+
splits = tbl.to_batches()
1766+
target_weight = 2 << 27 # 256 MB
1767+
bin_packed = PackingIterator(splits, target_weight, lookback=2, weight_func=lambda x: x.nbytes, largest_bin_first=True)
1768+
return bin_packed
17681769

17691770

17701771
ICEBERG_UNCOMPRESSED_CODEC = "uncompressed"

pyiceberg/table/__init__.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2330,7 +2330,7 @@ def _generate_snapshot_id() -> int:
23302330
class WriteTask:
23312331
write_uuid: uuid.UUID
23322332
task_id: int
2333-
df: pa.Table
2333+
record_batches: List[pa.RecordBatch]
23342334
sort_order_id: Optional[int] = None
23352335

23362336
# Later to be extended with partition information
@@ -2359,7 +2359,7 @@ def _dataframe_to_data_files(
23592359
Returns:
23602360
An iterable that supplies datafiles that represent the table.
23612361
"""
2362-
from pyiceberg.io.pyarrow import write_file
2362+
from pyiceberg.io.pyarrow import bin_pack_arrow_table, write_file
23632363

23642364
if len(table.spec().fields) > 0:
23652365
raise ValueError("Cannot write to partitioned tables")
@@ -2369,7 +2369,7 @@ def _dataframe_to_data_files(
23692369

23702370
# This is an iter, so we don't have to materialize everything every time
23712371
# This will be more relevant when we start doing partitioned writes
2372-
yield from write_file(table, iter([WriteTask(write_uuid, next(counter), df)]), file_schema=file_schema)
2372+
yield from write_file(table, iter([WriteTask(write_uuid, next(counter), batches) for batches in bin_pack_arrow_table(df)]), file_schema=file_schema)
23732373

23742374

23752375
class _MergingSnapshotProducer:

0 commit comments

Comments
 (0)