Skip to content

Commit c30e43a

Browse files
authored
Bug Fix: Position Deletes + row_filter yields less data when the DataFile is large (apache#1141)
* add more tests for position deletes * multiple append and deletes * test * fix * adopt nit
1 parent e5a58b3 commit c30e43a

File tree

2 files changed

+65
-4
lines changed

2 files changed

+65
-4
lines changed

pyiceberg/io/pyarrow.py

+12-4
Original file line numberDiff line numberDiff line change
@@ -1238,10 +1238,13 @@ def _task_to_record_batches(
12381238
for batch in batches:
12391239
next_index = next_index + len(batch)
12401240
current_index = next_index - len(batch)
1241+
output_batches = iter([batch])
12411242
if positional_deletes:
12421243
# Create the mask of indices that we're interested in
12431244
indices = _combine_positional_deletes(positional_deletes, current_index, current_index + len(batch))
12441245
batch = batch.take(indices)
1246+
output_batches = iter([batch])
1247+
12451248
# Apply the user filter
12461249
if pyarrow_filter is not None:
12471250
# we need to switch back and forth between RecordBatch and Table
@@ -1251,10 +1254,15 @@ def _task_to_record_batches(
12511254
arrow_table = arrow_table.filter(pyarrow_filter)
12521255
if len(arrow_table) == 0:
12531256
continue
1254-
batch = arrow_table.to_batches()[0]
1255-
yield _to_requested_schema(
1256-
projected_schema, file_project_schema, batch, downcast_ns_timestamp_to_us=True, use_large_types=use_large_types
1257-
)
1257+
output_batches = arrow_table.to_batches()
1258+
for output_batch in output_batches:
1259+
yield _to_requested_schema(
1260+
projected_schema,
1261+
file_project_schema,
1262+
output_batch,
1263+
downcast_ns_timestamp_to_us=True,
1264+
use_large_types=use_large_types,
1265+
)
12581266

12591267

12601268
def _task_to_table(

tests/integration/test_deletes.py

+53
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,59 @@ def test_delete_partitioned_table_positional_deletes_empty_batch(spark: SparkSes
292292
assert len(reader.read_all()) == 0
293293

294294

295+
@pytest.mark.integration
296+
@pytest.mark.filterwarnings("ignore:Merge on read is not yet supported, falling back to copy-on-write")
297+
def test_read_multiple_batches_in_task_with_position_deletes(spark: SparkSession, session_catalog: RestCatalog) -> None:
298+
identifier = "default.test_read_multiple_batches_in_task_with_position_deletes"
299+
300+
run_spark_commands(
301+
spark,
302+
[
303+
f"DROP TABLE IF EXISTS {identifier}",
304+
f"""
305+
CREATE TABLE {identifier} (
306+
number int
307+
)
308+
USING iceberg
309+
TBLPROPERTIES(
310+
'format-version' = 2,
311+
'write.delete.mode'='merge-on-read',
312+
'write.update.mode'='merge-on-read',
313+
'write.merge.mode'='merge-on-read'
314+
)
315+
""",
316+
],
317+
)
318+
319+
tbl = session_catalog.load_table(identifier)
320+
321+
arrow_table = pa.Table.from_arrays(
322+
[
323+
pa.array(list(range(1, 1001)) * 100),
324+
],
325+
schema=pa.schema([pa.field("number", pa.int32())]),
326+
)
327+
328+
tbl.append(arrow_table)
329+
330+
run_spark_commands(
331+
spark,
332+
[
333+
f"""
334+
DELETE FROM {identifier} WHERE number in (1, 2, 3, 4)
335+
""",
336+
],
337+
)
338+
339+
tbl.refresh()
340+
341+
reader = tbl.scan(row_filter="number <= 50").to_arrow_batch_reader()
342+
assert isinstance(reader, pa.RecordBatchReader)
343+
pyiceberg_count = len(reader.read_all())
344+
expected_count = 46 * 100
345+
assert pyiceberg_count == expected_count, f"Failing check. {pyiceberg_count} != {expected_count}"
346+
347+
295348
@pytest.mark.integration
296349
@pytest.mark.filterwarnings("ignore:Merge on read is not yet supported, falling back to copy-on-write")
297350
def test_overwrite_partitioned_table(spark: SparkSession, session_catalog: RestCatalog) -> None:

0 commit comments

Comments
 (0)