Description
Environment
Delta-rs version:
0.25.2
Binding:
Environment:
- Cloud provider: Google/Azure
- OS: Mac
- Other:
Bug
What happened:
When I upgraded to 0.25 a lot of my delta tables began to fail due to pod OOM errors despite providing more and more memory to them. Disabling streamed_exec=True
fixed this for me.
It seems like partition pruning is not happening - tables with hundreds or thousands are partitions are being fully scanned even when the merge predicates are specific and point directly to a single partition. For example, one of my tables is just a single date of data (76k rows) which is merged into a table partitioned by month_id and date_id. This should (and used to be) a quick merge since the predicate we use is explicit. This fails with streamed_exec=True
though and debugging shows all 2600+ files are scanned.
What you expected to happen:
Predicates which filter out partitions should be respected.
How to reproduce it:
import os
import shutil
import pyarrow as pa
from deltalake import DeltaTable
table_path1 = "test_delta_not_streamed"
table_path2 = "test_delta_streamed"
for path in [table_path1, table_path2]:
if os.path.exists(path):
shutil.rmtree(path)
os.makedirs(path)
schema = pa.schema([
pa.field("month_id", pa.int32()),
pa.field("date_id", pa.int32()),
pa.field("unique_row_hash", pa.string()),
pa.field("col1", pa.string())
])
dt1 = DeltaTable.create(
table_uri=table_path1,
schema=schema,
partition_by=["month_id", "date_id"]
)
dt2 = DeltaTable.create(
table_uri=table_path2,
schema=schema,
partition_by=["month_id", "date_id"]
)
partitions = [
(202501, 20250101),
(202502, 20250201),
(202502, 20250226),
(202503, 20250301),
]
rows = []
for month_id, date_id in partitions:
rows.append([month_id, date_id, f"hash_{month_id}_{date_id}", "value"])
source_table = pa.Table.from_arrays(
[
pa.array([row[0] for row in rows], type=pa.int32()),
pa.array([row[1] for row in rows], type=pa.int32()),
pa.array([row[2] for row in rows], type=pa.string()),
pa.array([row[3] for row in rows], type=pa.string()),
],
names=["month_id", "date_id", "unique_row_hash", "col1"]
)
for dt in [dt1, dt2]:
dt.merge(
source=source_table,
predicate="s.unique_row_hash = t.unique_row_hash",
source_alias="s",
target_alias="t"
).when_not_matched_insert_all().execute()
print(f"Table 1 has {len(dt1.files())} files")
print(f"Table 2 has {len(dt2.files())} files")
source_table_new = pa.Table.from_arrays(
[
pa.array([202502], type=pa.int32()),
pa.array([20250226], type=pa.int32()),
pa.array(["new_hash"], type=pa.string()),
pa.array(["new_value"], type=pa.string()),
],
names=["month_id", "date_id", "unique_row_hash", "col1"]
)
merge_predicate = "(s.unique_row_hash = t.unique_row_hash) AND (s.month_id = t.month_id AND t.month_id = 202502 AND s.date_id = t.date_id AND t.date_id = 20250226)"
print(f"Using merge predicate: {merge_predicate}")
print("\nTesting with streamed_exec=False:")
result1 = dt1.merge(
source=source_table_new,
predicate=merge_predicate,
source_alias="s",
target_alias="t",
streamed_exec=False
).when_not_matched_insert_all().execute()
print(f"streamed_exec=False results: {result1}")
print("\nTesting with streamed_exec=True:")
result2 = dt2.merge(
source=source_table_new,
predicate=merge_predicate,
source_alias="s",
target_alias="t",
streamed_exec=True
).when_not_matched_insert_all().execute()
print(f"streamed_exec=True results: {result2}\n")
print(f"Files scanned (not streamed): {result1['num_target_files_scanned']}")
print(f"Files skipped (not streamed): {result1['num_target_files_skipped_during_scan']}")
print(f"Files scanned (streamed): {result2['num_target_files_scanned']}")
print(f"Files skipped (streamed): {result2['num_target_files_skipped_during_scan']}")
More details:
The results show that when streamed_exec=True
the merge is not skipping all files.
Results:
Table 1 has 4 files
Table 2 has 4 files
Using merge predicate: (s.unique_row_hash = t.unique_row_hash) AND (s.month_id = t.month_id AND t.month_id = 202502 AND s.date_id = t.date_id AND t.date_id = 20250226)
Testing with streamed_exec=False:
streamed_exec=False results: {'num_source_rows': 1, 'num_target_rows_inserted': 1, 'num_target_rows_updated': 0, 'num_target_rows_deleted': 0, 'num_target_rows_copied': 0, 'num_output_rows': 1, 'num_target_files_scanned': 0, 'num_target_files_skipped_during_scan': 4, 'num_target_files_added': 1, 'num_target_files_removed': 0, 'execution_time_ms': 10, 'scan_time_ms': 0, 'rewrite_time_ms': 0}
Testing with streamed_exec=True:
streamed_exec=True results: {'num_source_rows': 1, 'num_target_rows_inserted': 1, 'num_target_rows_updated': 0, 'num_target_rows_deleted': 0, 'num_target_rows_copied': 0, 'num_output_rows': 1, 'num_target_files_scanned': 1, 'num_target_files_skipped_during_scan': 3, 'num_target_files_added': 1, 'num_target_files_removed': 0, 'execution_time_ms': 10, 'scan_time_ms': 0, 'rewrite_time_ms': 1}
Files scanned (not streamed): 0
Files skipped (not streamed): 4
Files scanned (streamed): 1
Files skipped (streamed): 3
FYI - I noticed that when I quoted the partition values in the predicate it made the situation worse.
month_id and date_id are integers in the data, but my custom merge function adds partitions to the predicate and I am quoting all values:
predicates = [
f"s.{key} = t.{key} AND t.{key} = '{value}'"
for key, _, value in partition_filters
]
So my real-world example is impacted by this. I was under the impression that we need to always use strings with delta-rs. Maybe that is just for partitions filters, like [("month_id", "=", "202502")]
, and not predicates?
# note that the date/month are now quoted as strings
Using merge predicate: (s.unique_row_hash = t.unique_row_hash) AND (s.month_id = t.month_id AND t.month_id = '202502' AND s.date_id = t.date_id AND t.date_id = '20250226')
Testing with streamed_exec=False:
streamed_exec=False results: {'num_source_rows': 1, 'num_target_rows_inserted': 1, 'num_target_rows_updated': 0, 'num_target_rows_deleted': 0, 'num_target_rows_copied': 0, 'num_output_rows': 1, 'num_target_files_scanned': 0, 'num_target_files_skipped_during_scan': 4, 'num_target_files_added': 1, 'num_target_files_removed': 0, 'execution_time_ms': 11, 'scan_time_ms': 0, 'rewrite_time_ms': 0}
Testing with streamed_exec=True:
streamed_exec=True results: {'num_source_rows': 1, 'num_target_rows_inserted': 1, 'num_target_rows_updated': 0, 'num_target_rows_deleted': 0, 'num_target_rows_copied': 0, 'num_output_rows': 1, 'num_target_files_scanned': 4, 'num_target_files_skipped_during_scan': 0, 'num_target_files_added': 1, 'num_target_files_removed': 0, 'execution_time_ms': 10, 'scan_time_ms': 0, 'rewrite_time_ms': 1}
Files scanned (not streamed): 0
Files skipped (not streamed): 4
Files scanned (streamed): 4
Files skipped (streamed): 0
Activity