Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions langchain_postgres/v2/async_vectorstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -670,6 +670,7 @@ async def __query_collection(
dense_results,
sparse_results,
**hybrid_search_config.fusion_function_parameters,
distance_strategy=self.distance_strategy,
)
return combined_results
return dense_results
Expand Down
62 changes: 38 additions & 24 deletions langchain_postgres/v2/hybrid_search_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@

from sqlalchemy import RowMapping

from .indexes import DistanceStrategy


def weighted_sum_ranking(
primary_search_results: Sequence[RowMapping],
secondary_search_results: Sequence[RowMapping],
primary_results_weight: float = 0.5,
secondary_results_weight: float = 0.5,
fetch_top_k: int = 4,
**kwargs: Any,
) -> Sequence[dict[str, Any]]:
"""
Ranks documents using a weighted sum of scores from two sources.
Expand Down Expand Up @@ -69,6 +72,7 @@ def reciprocal_rank_fusion(
secondary_search_results: Sequence[RowMapping],
rrf_k: float = 60,
fetch_top_k: int = 4,
**kwargs: Any,
) -> Sequence[dict[str, Any]]:
"""
Ranks documents using Reciprocal Rank Fusion (RRF) of scores from two sources.
Expand All @@ -87,35 +91,45 @@ def reciprocal_rank_fusion(
A list of (document_id, rrf_score) tuples, sorted by rrf_score
in descending order.
"""
distance_strategy = kwargs.get(
"distance_strategy", DistanceStrategy.COSINE_DISTANCE
)
rrf_scores: dict[str, dict[str, Any]] = {}

# Process results from primary source
for rank, row in enumerate(
sorted(primary_search_results, key=lambda item: item["distance"], reverse=True)
):
values = list(row.values())
doc_id = str(values[0])
row_values = dict(row)
primary_score = rrf_scores[doc_id]["distance"] if doc_id in rrf_scores else 0.0
primary_score += 1.0 / (rank + rrf_k)
row_values["distance"] = primary_score
rrf_scores[doc_id] = row_values
# Determine sorting order based on the vector distance strategy.
# For COSINE & EUCLIDEAN(distance), we sort ascending (reverse=False).
# For INNER_PRODUCT (similarity), we sort descending (reverse=True).
is_similarity_metric = distance_strategy == DistanceStrategy.INNER_PRODUCT
sorted_primary = sorted(
primary_search_results,
key=lambda item: item["distance"],
reverse=is_similarity_metric,
)

for rank, row in enumerate(sorted_primary):
doc_id = str(list(row.values())[0])
if doc_id not in rrf_scores:
rrf_scores[doc_id] = dict(row)
rrf_scores[doc_id]["distance"] = 0.0
# Add the "normalized" rank score
rrf_scores[doc_id]["distance"] += 1.0 / (rank + rrf_k)

# Process results from secondary source
for rank, row in enumerate(
sorted(
secondary_search_results, key=lambda item: item["distance"], reverse=True
)
):
values = list(row.values())
doc_id = str(values[0])
row_values = dict(row)
secondary_score = (
rrf_scores[doc_id]["distance"] if doc_id in rrf_scores else 0.0
)
secondary_score += 1.0 / (rank + rrf_k)
row_values["distance"] = secondary_score
rrf_scores[doc_id] = row_values
# Keyword search relevance is always "higher is better" -> sort descending
sorted_secondary = sorted(
secondary_search_results,
key=lambda item: item["distance"],
reverse=True,
)

for rank, row in enumerate(sorted_secondary):
doc_id = str(list(row.values())[0])
if doc_id not in rrf_scores:
rrf_scores[doc_id] = dict(row)
rrf_scores[doc_id]["distance"] = 0.0
# Add the rank score from this list to the existing score
rrf_scores[doc_id]["distance"] += 1.0 / (rank + rrf_k)

# Sort the results by rrf score in descending order
# Sort the results by weighted score in descending order
Expand Down
212 changes: 126 additions & 86 deletions tests/unit_tests/v2/test_hybrid_search_config.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
from typing import cast

import pytest
from sqlalchemy import RowMapping

from langchain_postgres.v2.hybrid_search_config import (
reciprocal_rank_fusion,
weighted_sum_ranking,
)
from langchain_postgres.v2.indexes import DistanceStrategy


# Helper to create mock input items that mimic RowMapping for the fusion functions
def get_row(doc_id: str, score: float, content: str = "content") -> dict:
def get_row(doc_id: str, score: float, content: str = "content") -> RowMapping:
"""
Simulates a RowMapping-like dictionary.
The fusion functions expect to extract doc_id as the first value and
Expand All @@ -17,7 +21,8 @@ def get_row(doc_id: str, score: float, content: str = "content") -> dict:
# Python dicts maintain insertion order (Python 3.7+).
# This structure ensures list(row.values())[0] is doc_id and
# list(row.values())[-1] is score.
return {"id_val": doc_id, "content_field": content, "distance": score}
row_dict = {"id_val": doc_id, "content_field": content, "distance": score}
return cast(RowMapping, row_dict)


class TestWeightedSumRanking:
Expand Down Expand Up @@ -102,30 +107,31 @@ def test_fetch_top_k(self) -> None:

class TestReciprocalRankFusion:
def test_empty_inputs(self) -> None:
"""Tests that the function handles empty inputs gracefully."""
results = reciprocal_rank_fusion([], [])
assert results == []

def test_primary_only(self) -> None:
primary = [
get_row("p1", 0.8),
get_row("p2", 0.6),
] # p1 rank 0, p2 rank 1
"""Tests RRF with only primary results using default cosine (lower is better)."""
primary = [get_row("p1", 0.8), get_row("p2", 0.6)]
rrf_k = 60
# p1_score = 1 / (0 + 60)
# p2_score = 1 / (1 + 60)
# --- Calculation (Cosine: lower is better) ---
# Sorted order: p2 (0.6) -> rank 0; p1 (0.8) -> rank 1
# p2_score = 1 / (0 + 60)
# p1_score = 1 / (1 + 60)
results = reciprocal_rank_fusion(primary, [], rrf_k=rrf_k) # type: ignore
assert len(results) == 2
assert results[0]["id_val"] == "p1"
assert results[0]["id_val"] == "p2"
assert results[0]["distance"] == pytest.approx(1.0 / (0 + rrf_k))
assert results[1]["id_val"] == "p2"
assert results[1]["id_val"] == "p1"
assert results[1]["distance"] == pytest.approx(1.0 / (1 + rrf_k))

def test_secondary_only(self) -> None:
secondary = [
get_row("s1", 0.9),
get_row("s2", 0.7),
] # s1 rank 0, s2 rank 1
"""Tests RRF with only secondary results (higher is better)."""
secondary = [get_row("s1", 0.9), get_row("s2", 0.7)]
rrf_k = 60
# --- Calculation (Keyword: higher is better) ---
# Sorted order: s1 (0.9) -> rank 0; s2 (0.7) -> rank 1
results = reciprocal_rank_fusion([], secondary, rrf_k=rrf_k) # type: ignore
assert len(results) == 2
assert results[0]["id_val"] == "s1"
Expand All @@ -134,96 +140,130 @@ def test_secondary_only(self) -> None:
assert results[1]["distance"] == pytest.approx(1.0 / (1 + rrf_k))

def test_mixed_results_default_k(self) -> None:
primary = [get_row("common", 0.8), get_row("p_only", 0.7)]
secondary = [get_row("common", 0.9), get_row("s_only", 0.6)]
"""Tests fusion with default cosine (lower better) and keyword (higher better)."""
primary = [
get_row("common", 0.8),
get_row("p_only", 0.7),
] # Order: p_only, common
secondary = [
get_row("common", 0.9),
get_row("s_only", 0.6),
] # Order: common, s_only
rrf_k = 60
# common_score = (1/(0+k))_prim + (1/(0+k))_sec = 2/k
# p_only_score = (1/(1+k))_prim = 1/(k+1)
# s_only_score = (1/(1+k))_sec = 1/(k+1)
# --- Calculation ---
# common: rank 1 in P (1/61) + rank 0 in S (1/60) -> highest score
# p_only: rank 0 in P (1/60)
# s_only: rank 1 in S (1/61)
results = reciprocal_rank_fusion(primary, secondary, rrf_k=rrf_k) # type: ignore
assert len(results) == 3
assert results[0]["id_val"] == "common"
assert results[0]["distance"] == pytest.approx(2.0 / rrf_k)
# Check the next two elements, their order might vary due to tie in score
next_ids = {results[1]["id_val"], results[2]["id_val"]}
next_scores = {results[1]["distance"], results[2]["distance"]}
assert next_ids == {"p_only", "s_only"}
for score in next_scores:
assert score == pytest.approx(1.0 / (1 + rrf_k))
assert results[0]["distance"] == pytest.approx(1 / 61 + 1 / 60)
assert results[1]["id_val"] == "p_only"
assert results[1]["distance"] == pytest.approx(1 / 60)
assert results[2]["id_val"] == "s_only"
assert results[2]["distance"] == pytest.approx(1 / 61)

def test_fetch_top_k_rrf(self) -> None:
"""Tests that fetch_top_k limits results correctly after fusion."""
# Using cosine distance (lower is better)
primary = [get_row(f"p{i}", (10 - i) / 10.0) for i in range(5)]
rrf_k = 1
results = reciprocal_rank_fusion(primary, [], rrf_k=rrf_k, fetch_top_k=2) # type: ignore
# Scores: [1.0, 0.9, 0.8, 0.7, 0.6]
# Sorted order: p4 (0.6), p3 (0.7), p2 (0.8), ...
results = reciprocal_rank_fusion(primary, [], fetch_top_k=2) # type: ignore
assert len(results) == 2
assert results[0]["id_val"] == "p0"
assert results[0]["distance"] == pytest.approx(1.0 / (0 + rrf_k))
assert results[1]["id_val"] == "p1"
assert results[1]["distance"] == pytest.approx(1.0 / (1 + rrf_k))
assert results[0]["id_val"] == "p4"
assert results[1]["id_val"] == "p3"

def test_rrf_content_preservation(self) -> None:
"""Tests that the data from the first time a document is seen is kept."""
primary = [get_row("doc1", 0.9, content="Primary Content")]
secondary = [get_row("doc1", 0.8, content="Secondary Content")]
# RRF processes primary then secondary. If a doc is in both,
# the content from the secondary list will overwrite primary's.
results = reciprocal_rank_fusion(primary, secondary, rrf_k=60) # type: ignore
# RRF processes primary first. When "doc1" is seen, its data is stored.
# It will not be overwritten by the "doc1" from the secondary list.
results = reciprocal_rank_fusion(primary, secondary) # type: ignore
assert len(results) == 1
assert results[0]["id_val"] == "doc1"
assert results[0]["content_field"] == "Secondary Content"
assert results[0]["content_field"] == "Primary Content"

# If only in primary
results_prim_only = reciprocal_rank_fusion(primary, [], rrf_k=60) # type: ignore
assert results_prim_only[0]["content_field"] == "Primary Content"
# If only in secondary
results_prim_only = reciprocal_rank_fusion([], secondary, rrf_k=60) # type: ignore
assert results_prim_only[0]["content_field"] == "Secondary Content"

def test_reordering_from_inputs_rrf(self) -> None:
"""
Tests that RRF fused ranking can be different from both primary and secondary
input rankings.
Primary Order: A, B, C
Secondary Order: C, B, A
Fused Order: (A, C) tied, then B
"""
primary = [
get_row("docA", 0.9),
get_row("docB", 0.8),
get_row("docC", 0.1),
]
secondary = [
get_row("docC", 0.9),
get_row("docB", 0.5),
get_row("docA", 0.2),
]
rrf_k = 1.0 # Using 1.0 for k to simplify rank score calculation
# docA_score = 1/(0+1) [P] + 1/(2+1) [S] = 1 + 1/3 = 4/3
# docB_score = 1/(1+1) [P] + 1/(1+1) [S] = 1/2 + 1/2 = 1
# docC_score = 1/(2+1) [P] + 1/(0+1) [S] = 1/3 + 1 = 4/3
"""Tests that RRF can produce a ranking different from the inputs."""
primary = [get_row("docA", 0.9), get_row("docB", 0.8), get_row("docC", 0.1)]
secondary = [get_row("docC", 0.9), get_row("docB", 0.5), get_row("docA", 0.2)]
rrf_k = 1.0
# --- Calculation (Primary sorted ascending, Secondary descending) ---
# Primary ranks: docC (0), docB (1), docA (2)
# Secondary ranks: docC (0), docB (1), docA (2)
# docC_score = 1/(0+1) [P] + 1/(0+1) [S] = 2.0
# docB_score = 1/(1+1) [P] + 1/(1+1) [S] = 1.0
# docA_score = 1/(2+1) [P] + 1/(2+1) [S] = 2/3
results = reciprocal_rank_fusion(primary, secondary, rrf_k=rrf_k) # type: ignore
assert len(results) == 3
assert {results[0]["id_val"], results[1]["id_val"]} == {"docA", "docC"}
assert results[0]["distance"] == pytest.approx(4.0 / 3.0)
assert results[1]["distance"] == pytest.approx(4.0 / 3.0)
assert results[2]["id_val"] == "docB"
assert results[2]["distance"] == pytest.approx(1.0)

def test_reordering_from_inputs_weighted_sum(self) -> None:
"""
Tests that the fused ranking can be different from both primary and secondary
input rankings.
Primary Order: A (0.9), B (0.7)
Secondary Order: B (0.8), A (0.2)
Fusion (0.5/0.5 weights):
docA_score = (0.9 * 0.5) + (0.2 * 0.5) = 0.45 + 0.10 = 0.55
docB_score = (0.7 * 0.5) + (0.8 * 0.5) = 0.35 + 0.40 = 0.75
Expected Fused Order: docB (0.75), docA (0.55)
This is different from Primary (A,B) and Secondary (B,A) in terms of
original score, but the fusion logic changes the effective contribution).
"""
primary = [get_row("docA", 0.9), get_row("docB", 0.7)]
secondary = [get_row("docB", 0.8), get_row("docA", 0.2)]
assert results[0]["id_val"] == "docC"
assert results[0]["distance"] == pytest.approx(2.0)
assert results[1]["id_val"] == "docB"
assert results[1]["distance"] == pytest.approx(1.0)
assert results[2]["id_val"] == "docA"
assert results[2]["distance"] == pytest.approx(2.0 / 3.0)

results = weighted_sum_ranking(primary, secondary) # type: ignore
# --------------------------------------------------------------------------
## New Tests for Other Strategies and Edge Cases

def test_mixed_results_max_inner_product(self) -> None:
"""Tests fusion with MAX_INNER_PRODUCT (higher is better) for primary."""
primary = [get_row("best", 0.9), get_row("worst", 0.1)] # Order: best, worst
secondary = [get_row("best", 20.0), get_row("worst", 5.0)] # Order: best, worst
rrf_k = 10
# best: rank 0 in P + rank 0 in S -> 1/10 + 1/10 = 0.2
# worst: rank 1 in P + rank 1 in S -> 1/11 + 1/11
results = reciprocal_rank_fusion(
primary, # type: ignore
secondary, # type: ignore
rrf_k=rrf_k,
distance_strategy=DistanceStrategy.INNER_PRODUCT,
)
assert len(results) == 2
assert results[0]["id_val"] == "best"
assert results[0]["distance"] == pytest.approx(0.2)
assert results[1]["id_val"] == "worst"
assert results[1]["distance"] == pytest.approx(2.0 / 11.0)

def test_mixed_results_euclidean(self) -> None:
"""Tests fusion with EUCLIDEAN (lower is better) for primary."""
primary = [
get_row("closer", 10.5),
get_row("farther", 25.5),
] # Order: closer, farther
secondary = [
get_row("closer", 100.0),
get_row("farther", 10.0),
] # Order: closer, farther
rrf_k = 10
# closer: rank 0 in P + rank 0 in S -> 1/10 + 1/10 = 0.2
# farther: rank 1 in P + rank 1 in S -> 1/11 + 1/11
results = reciprocal_rank_fusion(
primary, # type: ignore
secondary, # type: ignore
rrf_k=rrf_k,
distance_strategy=DistanceStrategy.EUCLIDEAN,
)
assert len(results) == 2
assert results[0]["id_val"] == "docB"
assert results[0]["distance"] == pytest.approx(0.75)
assert results[1]["id_val"] == "docA"
assert results[1]["distance"] == pytest.approx(0.55)
assert results[0]["id_val"] == "closer"
assert results[0]["distance"] == pytest.approx(0.2)
assert results[1]["id_val"] == "farther"
assert results[1]["distance"] == pytest.approx(2.0 / 11.0)

def test_rrf_with_identical_scores(self) -> None:
"""Tests that stable sort is preserved for identical scores."""
# Python's sorted() is stable. p1 appears before p2 in the list.
primary = [get_row("p1", 0.5), get_row("p2", 0.5)]
rrf_k = 60
# Expected order (stable sort): p1 (rank 0), p2 (rank 1)
results = reciprocal_rank_fusion(primary, []) # type: ignore
assert results[0]["id_val"] == "p1"
assert results[0]["distance"] == pytest.approx(1 / 60)
assert results[1]["id_val"] == "p2"
assert results[1]["distance"] == pytest.approx(1 / 61)