Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 11 additions & 8 deletions sdk/python/feast/infra/compute_engines/local/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,16 +192,19 @@ def execute(self, context: ExecutionContext) -> ArrowTableValue:
# Dedup strategy: sort and drop_duplicates
dedup_keys = self.column_info.join_keys
if dedup_keys:
sort_keys = [self.column_info.timestamp_column]
sort_keys = []
if self.column_info.timestamp_column in df.columns:
sort_keys.append(self.column_info.timestamp_column)
if (
self.column_info.created_timestamp_column
and self.column_info.created_timestamp_column in df.columns
self.column_info.created_timestamp_column
and self.column_info.created_timestamp_column in df.columns
):
sort_keys.append(self.column_info.created_timestamp_column)

df = self.backend.drop_duplicates(
df, keys=dedup_keys, sort_by=sort_keys, ascending=False
)
sort_keys.append(self.column_info.created_timestamp_column)

if sort_keys:
df = self.backend.drop_duplicates(
df, keys=dedup_keys, sort_by=sort_keys, ascending=False
)
Comment on lines +204 to +207
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Deduplication silently skipped when no timestamp columns are present in DataFrame

When neither timestamp_column nor created_timestamp_column is present in the DataFrame columns, sort_keys will be empty and the if sort_keys: guard at line 204 causes the entire drop_duplicates call to be skipped. This means duplicate rows (by join key) pass through undetected.

Root Cause and Impact

The PR description states the intent is to "add a fallback to deduplicate by key only when no sort columns survive (rather than crashing)." However, the implementation at lines 204-207 simply skips deduplication entirely when sort_keys is empty:

if sort_keys:
    df = self.backend.drop_duplicates(
        df, keys=dedup_keys, sort_by=sort_keys, ascending=False
    )

When sort_keys is empty (falsy), no deduplication happens at all. The correct behavior should be to still deduplicate by dedup_keys alone — just without a deterministic sort order. For example, using pandas' df.drop_duplicates(subset=dedup_keys) or equivalent.

Impact: Any feature view where timestamp_field is an internal bookkeeping column not exposed in the feature schema will have its timestamp column projected away before the dedup node runs. In this case, duplicate entity rows will silently remain in the output, leading to incorrect feature values (e.g., duplicated rows in training datasets or multiple values written to the online store for the same entity key).

Suggested change
if sort_keys:
df = self.backend.drop_duplicates(
df, keys=dedup_keys, sort_by=sort_keys, ascending=False
)
if sort_keys:
df = self.backend.drop_duplicates(
df, keys=dedup_keys, sort_by=sort_keys, ascending=False
)
else:
df = self.backend.drop_duplicates(
df, keys=dedup_keys, sort_by=dedup_keys, ascending=True
)
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

result = self.backend.to_arrow(df)
output = ArrowTableValue(result)
context.node_outputs[self.name] = output
Expand Down