Skip to content

Partition pruning does not work when values in predicate do not match column datatype #3278

Open
@ldacey

Description

@ldacey

Environment

Delta-rs version:
0.25.2

Binding:

Environment:

  • Cloud provider: Google/Azure
  • OS: Mac
  • Other:

Bug

What happened:
When I upgraded to 0.25 a lot of my delta tables began to fail due to pod OOM errors despite providing more and more memory to them. Disabling streamed_exec=True fixed this for me.

It seems like partition pruning is not happening - tables with hundreds or thousands are partitions are being fully scanned even when the merge predicates are specific and point directly to a single partition. For example, one of my tables is just a single date of data (76k rows) which is merged into a table partitioned by month_id and date_id. This should (and used to be) a quick merge since the predicate we use is explicit. This fails with streamed_exec=True though and debugging shows all 2600+ files are scanned.

What you expected to happen:
Predicates which filter out partitions should be respected.

How to reproduce it:

import os
import shutil
import pyarrow as pa
from deltalake import DeltaTable

table_path1 = "test_delta_not_streamed"
table_path2 = "test_delta_streamed"

for path in [table_path1, table_path2]:
    if os.path.exists(path):
        shutil.rmtree(path)
    os.makedirs(path)

schema = pa.schema([
    pa.field("month_id", pa.int32()),
    pa.field("date_id", pa.int32()),
    pa.field("unique_row_hash", pa.string()),
    pa.field("col1", pa.string())
])

dt1 = DeltaTable.create(
    table_uri=table_path1,
    schema=schema,
    partition_by=["month_id", "date_id"]
)

dt2 = DeltaTable.create(
    table_uri=table_path2,
    schema=schema,
    partition_by=["month_id", "date_id"]
)

partitions = [
    (202501, 20250101),
    (202502, 20250201),
    (202502, 20250226),
    (202503, 20250301),
]

rows = []
for month_id, date_id in partitions:
    rows.append([month_id, date_id, f"hash_{month_id}_{date_id}", "value"])

source_table = pa.Table.from_arrays(
    [
        pa.array([row[0] for row in rows], type=pa.int32()),
        pa.array([row[1] for row in rows], type=pa.int32()),
        pa.array([row[2] for row in rows], type=pa.string()),
        pa.array([row[3] for row in rows], type=pa.string()),
    ],
    names=["month_id", "date_id", "unique_row_hash", "col1"]
)

for dt in [dt1, dt2]:
    dt.merge(
        source=source_table,
        predicate="s.unique_row_hash = t.unique_row_hash",
        source_alias="s",
        target_alias="t"
    ).when_not_matched_insert_all().execute()

print(f"Table 1 has {len(dt1.files())} files")
print(f"Table 2 has {len(dt2.files())} files")


source_table_new = pa.Table.from_arrays(
    [
        pa.array([202502], type=pa.int32()),
        pa.array([20250226], type=pa.int32()),
        pa.array(["new_hash"], type=pa.string()),
        pa.array(["new_value"], type=pa.string()),
    ],
    names=["month_id", "date_id", "unique_row_hash", "col1"]
)

merge_predicate = "(s.unique_row_hash = t.unique_row_hash) AND (s.month_id = t.month_id AND t.month_id = 202502 AND s.date_id = t.date_id AND t.date_id = 20250226)"
print(f"Using merge predicate: {merge_predicate}")

print("\nTesting with streamed_exec=False:")
result1 = dt1.merge(
    source=source_table_new,
    predicate=merge_predicate,
    source_alias="s",
    target_alias="t",
    streamed_exec=False
).when_not_matched_insert_all().execute()

print(f"streamed_exec=False results: {result1}")

print("\nTesting with streamed_exec=True:")
result2 = dt2.merge(
    source=source_table_new,
    predicate=merge_predicate,
    source_alias="s",
    target_alias="t",
    streamed_exec=True
).when_not_matched_insert_all().execute()
print(f"streamed_exec=True results: {result2}\n")


print(f"Files scanned (not streamed): {result1['num_target_files_scanned']}")
print(f"Files skipped (not streamed): {result1['num_target_files_skipped_during_scan']}")
print(f"Files scanned (streamed): {result2['num_target_files_scanned']}")
print(f"Files skipped (streamed): {result2['num_target_files_skipped_during_scan']}")

More details:
The results show that when streamed_exec=True the merge is not skipping all files.

Results:

Table 1 has 4 files
Table 2 has 4 files
Using merge predicate: (s.unique_row_hash = t.unique_row_hash) AND (s.month_id = t.month_id AND t.month_id = 202502 AND s.date_id = t.date_id AND t.date_id = 20250226)

Testing with streamed_exec=False:
streamed_exec=False results: {'num_source_rows': 1, 'num_target_rows_inserted': 1, 'num_target_rows_updated': 0, 'num_target_rows_deleted': 0, 'num_target_rows_copied': 0, 'num_output_rows': 1, 'num_target_files_scanned': 0, 'num_target_files_skipped_during_scan': 4, 'num_target_files_added': 1, 'num_target_files_removed': 0, 'execution_time_ms': 10, 'scan_time_ms': 0, 'rewrite_time_ms': 0}

Testing with streamed_exec=True:
streamed_exec=True results: {'num_source_rows': 1, 'num_target_rows_inserted': 1, 'num_target_rows_updated': 0, 'num_target_rows_deleted': 0, 'num_target_rows_copied': 0, 'num_output_rows': 1, 'num_target_files_scanned': 1, 'num_target_files_skipped_during_scan': 3, 'num_target_files_added': 1, 'num_target_files_removed': 0, 'execution_time_ms': 10, 'scan_time_ms': 0, 'rewrite_time_ms': 1}

Files scanned (not streamed): 0
Files skipped (not streamed): 4
Files scanned (streamed): 1
Files skipped (streamed): 3

FYI - I noticed that when I quoted the partition values in the predicate it made the situation worse.

month_id and date_id are integers in the data, but my custom merge function adds partitions to the predicate and I am quoting all values:

    predicates = [
        f"s.{key} = t.{key} AND t.{key} = '{value}'"
        for key, _, value in partition_filters
    ]

So my real-world example is impacted by this. I was under the impression that we need to always use strings with delta-rs. Maybe that is just for partitions filters, like [("month_id", "=", "202502")], and not predicates?

# note that the date/month are now quoted as strings
Using merge predicate: (s.unique_row_hash = t.unique_row_hash) AND (s.month_id = t.month_id AND t.month_id = '202502' AND s.date_id = t.date_id AND t.date_id = '20250226')

Testing with streamed_exec=False:
streamed_exec=False results: {'num_source_rows': 1, 'num_target_rows_inserted': 1, 'num_target_rows_updated': 0, 'num_target_rows_deleted': 0, 'num_target_rows_copied': 0, 'num_output_rows': 1, 'num_target_files_scanned': 0, 'num_target_files_skipped_during_scan': 4, 'num_target_files_added': 1, 'num_target_files_removed': 0, 'execution_time_ms': 11, 'scan_time_ms': 0, 'rewrite_time_ms': 0}

Testing with streamed_exec=True:
streamed_exec=True results: {'num_source_rows': 1, 'num_target_rows_inserted': 1, 'num_target_rows_updated': 0, 'num_target_rows_deleted': 0, 'num_target_rows_copied': 0, 'num_output_rows': 1, 'num_target_files_scanned': 4, 'num_target_files_skipped_during_scan': 0, 'num_target_files_added': 1, 'num_target_files_removed': 0, 'execution_time_ms': 10, 'scan_time_ms': 0, 'rewrite_time_ms': 1}

Files scanned (not streamed): 0
Files skipped (not streamed): 4
Files scanned (streamed): 4
Files skipped (streamed): 0

Activity

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions