Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 71 additions & 22 deletions langchain_postgres/v2/hybrid_search_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,38 @@
from .indexes import DistanceStrategy


def _normalize_scores(
results: Sequence[dict[str, Any]], is_distance_metric: bool
) -> Sequence[dict[str, Any]]:
"""Normalizes scores to a 0-1 scale, where 1 is best."""
if not results:
return []

# Get scores from the last column of each result
scores = [float(list(item.values())[-1]) for item in results]
min_score, max_score = min(scores), max(scores)
score_range = max_score - min_score

if score_range == 0:
# All documents are of the highest quality (1.0)
for item in results:
item["normalized_score"] = 1.0
return list(results)

for item in results:
# Access the score again from the last column for calculation
score = list(item.values())[-1]
normalized = (score - min_score) / score_range
if is_distance_metric:
# For distance, a lower score is better, so we invert the result.
item["normalized_score"] = 1.0 - normalized
else:
# For similarity (like keyword search), a higher score is better.
item["normalized_score"] = normalized

return list(results)


def weighted_sum_ranking(
primary_search_results: Sequence[RowMapping],
secondary_search_results: Sequence[RowMapping],
Expand Down Expand Up @@ -35,35 +67,52 @@ def weighted_sum_ranking(
descending order.
"""

distance_strategy = kwargs.get(
"distance_strategy", DistanceStrategy.COSINE_DISTANCE
)
is_primary_distance = distance_strategy != DistanceStrategy.INNER_PRODUCT

# Normalize both sets of results onto a 0-1 scale
normalized_primary = _normalize_scores(
[dict(row) for row in primary_search_results],
is_distance_metric=is_primary_distance,
)

# Keyword search relevance is a similarity score (higher is better)
normalized_secondary = _normalize_scores(
[dict(row) for row in secondary_search_results], is_distance_metric=False
)

# stores computed metric with provided distance metric and weights
weighted_scores: dict[str, dict[str, Any]] = {}

# Process results from primary source
for row in primary_search_results:
values = list(row.values())
doc_id = str(values[0]) # first value is doc_id
distance = float(values[-1]) # type: ignore # last value is distance
row_values = dict(row)
row_values["distance"] = primary_results_weight * distance
weighted_scores[doc_id] = row_values

# Process results from secondary source,
# adding to existing scores or creating new ones
for row in secondary_search_results:
values = list(row.values())
doc_id = str(values[0]) # first value is doc_id
distance = float(values[-1]) # type: ignore # last value is distance
primary_score = (
weighted_scores[doc_id]["distance"] if doc_id in weighted_scores else 0.0
)
row_values = dict(row)
row_values["distance"] = distance * secondary_results_weight + primary_score
weighted_scores[doc_id] = row_values
# Process primary results
for item in normalized_primary:
doc_id = str(list(item.values())[0])
# Set the 'distance' key with the weighted primary score
item["distance"] = item["normalized_score"] * primary_results_weight
weighted_scores[doc_id] = item

# Process secondary results
for item in normalized_secondary:
doc_id = str(list(item.values())[0])
secondary_weighted_score = item["normalized_score"] * secondary_results_weight

if doc_id in weighted_scores:
# Add to the existing 'distance' score
weighted_scores[doc_id]["distance"] += secondary_weighted_score
else:
# Set the 'distance' key for the new item
item["distance"] = secondary_weighted_score
weighted_scores[doc_id] = item

# Sort the results by weighted score in descending order
ranked_results = sorted(
weighted_scores.values(), key=lambda item: item["distance"], reverse=True
)

for result in ranked_results:
result.pop("normalized_score", None)

return ranked_results[:fetch_top_k]


Expand Down
6 changes: 3 additions & 3 deletions tests/unit_tests/v2/test_async_pg_vectorstore_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ async def test_hybrid_search_weighted_sum_vector_bias(
result_ids = [doc.metadata["doc_id_key"] for doc in results]

assert len(result_ids) > 0
assert result_ids[0] == "hs_doc_orange_fruit"
assert result_ids[0] == "hs_doc_generic_tech"

async def test_hybrid_search_weighted_sum_fts_bias(
self,
Expand Down Expand Up @@ -611,7 +611,7 @@ async def test_hybrid_search_fts_empty_results(
assert len(result_ids) > 0
assert "hs_doc_apple_fruit" in result_ids or "hs_doc_apple_tech" in result_ids
# The top result should be one of the apple documents based on vector search
assert results[0].metadata["doc_id_key"].startswith("hs_doc_unrelated_cat")
assert results[0].metadata["doc_id_key"].startswith("hs_doc_apple_fruit")

async def test_hybrid_search_vector_empty_results_effectively(
self,
Expand Down Expand Up @@ -639,7 +639,7 @@ async def test_hybrid_search_vector_empty_results_effectively(

# Expect results based purely on FTS search for "orange fruit"
assert len(result_ids) == 1
assert result_ids[0] == "hs_doc_generic_tech"
assert result_ids[0] == "hs_doc_orange_fruit"

async def test_hybrid_search_without_tsv_column(
self,
Expand Down
135 changes: 90 additions & 45 deletions tests/unit_tests/v2/test_hybrid_search_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,82 +27,127 @@ def get_row(doc_id: str, score: float, content: str = "content") -> RowMapping:

class TestWeightedSumRanking:
def test_empty_inputs(self) -> None:
"""Tests that the function handles empty inputs gracefully."""
results = weighted_sum_ranking([], [])
assert results == []

def test_primary_only(self) -> None:
def test_primary_only_cosine_default(self) -> None:
"""Tests ranking with only primary results using default cosine distance."""
primary = [get_row("p1", 0.8), get_row("p2", 0.6)]
# Expected scores: p1 = 0.8 * 0.5 = 0.4, p2 = 0.6 * 0.5 = 0.3
results = weighted_sum_ranking( # type: ignore
# --- Calculation (Cosine = lower is better) ---
# Scores: [0.8, 0.6]. Range: 0.2. Min: 0.6.
# p1 norm: 1.0 - ((0.8 - 0.6) / 0.2) = 0.0
# p2 norm: 1.0 - ((0.6 - 0.6) / 0.2) = 1.0
# Weighted (0.5): p1 = 0.0, p2 = 0.5
# Order: p2, p1
results = weighted_sum_ranking(
primary, # type: ignore
[],
primary_results_weight=0.5,
secondary_results_weight=0.5,
)
assert len(results) == 2
assert results[0]["id_val"] == "p1"
assert results[0]["distance"] == pytest.approx(0.4)
assert results[1]["id_val"] == "p2"
assert results[1]["distance"] == pytest.approx(0.3)
assert results[0]["id_val"] == "p2"
assert results[0]["distance"] == pytest.approx(0.5)
assert results[1]["id_val"] == "p1"
assert results[1]["distance"] == pytest.approx(0.0)

def test_secondary_only(self) -> None:
secondary = [get_row("s1", 0.9), get_row("s2", 0.7)]
# Expected scores: s1 = 0.9 * 0.5 = 0.45, s2 = 0.7 * 0.5 = 0.35
"""Tests ranking with only secondary (keyword) results."""
secondary = [get_row("s1", 15.0), get_row("s2", 5.0)]
# --- Calculation (Keyword = higher is better) ---
# Scores: [15.0, 5.0]. Range: 10.0. Min: 5.0.
# s1 norm: (15.0 - 5.0) / 10.0 = 1.0
# s2 norm: (5.0 - 5.0) / 10.0 = 0.0
# Weighted (0.5): s1 = 0.5, s2 = 0.0
# Order: s1, s2
results = weighted_sum_ranking(
[],
secondary, # type: ignore
primary_results_weight=0.5,
secondary_results_weight=0.5,
)
assert len(results) == 2
assert results[0]["id_val"] == "s1"
assert results[0]["distance"] == pytest.approx(0.45)
assert results[0]["distance"] == pytest.approx(0.5)
assert results[1]["id_val"] == "s2"
assert results[1]["distance"] == pytest.approx(0.35)
assert results[1]["distance"] == pytest.approx(0.0)

def test_mixed_results_default_weights(self) -> None:
def test_mixed_results_cosine(self) -> None:
"""Tests combining cosine (lower is better) and keyword (higher is better) scores."""
primary = [get_row("common", 0.8), get_row("p_only", 0.7)]
secondary = [get_row("common", 0.9), get_row("s_only", 0.6)]
# Weights are 0.5, 0.5
# common_score = (0.8 * 0.5) + (0.9 * 0.5) = 0.4 + 0.45 = 0.85
# p_only_score = (0.7 * 0.5) = 0.35
# s_only_score = (0.6 * 0.5) = 0.30
# Order: common (0.85), p_only (0.35), s_only (0.30)

results = weighted_sum_ranking(primary, secondary) # type: ignore
secondary = [get_row("common", 9.0), get_row("s_only", 6.0)]
# --- Calculation ---
# Primary norm (inverted): common=0.0, p_only=1.0
# Secondary norm: common=1.0, s_only=0.0
# Weighted (0.5):
# common = (0.0 * 0.5) + (1.0 * 0.5) = 0.5
# p_only = (1.0 * 0.5) + 0 = 0.5
# s_only = 0 + (0.0 * 0.5) = 0.0
results = weighted_sum_ranking(
primary, # type: ignore
secondary, # type: ignore
)
assert len(results) == 3
assert results[0]["id_val"] == "common"
assert results[0]["distance"] == pytest.approx(0.85)
assert results[1]["id_val"] == "p_only"
assert results[1]["distance"] == pytest.approx(0.35)
# Check that the top two results have the correct score and IDs (order may vary)
top_ids = {res["id_val"] for res in results[:2]}
assert top_ids == {"common", "p_only"}
assert results[0]["distance"] == pytest.approx(0.5)
assert results[1]["distance"] == pytest.approx(0.5)
assert results[2]["id_val"] == "s_only"
assert results[2]["distance"] == pytest.approx(0.30)
assert results[2]["distance"] == pytest.approx(0.0)

def test_mixed_results_custom_weights(self) -> None:
primary = [get_row("d1", 1.0)] # p_w=0.2 -> 0.2
secondary = [get_row("d1", 0.5)] # s_w=0.8 -> 0.4
# Expected: d1_score = (1.0 * 0.2) + (0.5 * 0.8) = 0.2 + 0.4 = 0.6
def test_primary_max_inner_product(self) -> None:
"""Tests using MAX_INNER_PRODUCT (higher is better) for primary search."""
primary = [get_row("best", 0.9), get_row("worst", 0.1)]
secondary = [get_row("best", 20.0), get_row("worst", 5.0)]
# --- Calculation ---
# Primary norm (NOT inverted): best=1.0, worst=0.0
# Secondary norm: best=1.0, worst=0.0
# Weighted (0.5):
# best = (1.0 * 0.5) + (1.0 * 0.5) = 1.0
# worst = (0.0 * 0.5) + (0.0 * 0.5) = 0.0
results = weighted_sum_ranking(
primary, # type: ignore
secondary, # type: ignore
distance_strategy=DistanceStrategy.INNER_PRODUCT,
)
assert len(results) == 2
assert results[0]["id_val"] == "best"
assert results[0]["distance"] == pytest.approx(1.0)
assert results[1]["id_val"] == "worst"
assert results[1]["distance"] == pytest.approx(0.0)

def test_primary_euclidean(self) -> None:
"""Tests using EUCLIDEAN (lower is better) for primary search."""
primary = [get_row("closer", 10.5), get_row("farther", 25.5)]
secondary = [get_row("closer", 100.0), get_row("farther", 10.0)]
# --- Calculation ---
# Primary norm (inverted): closer=1.0, farther=0.0
# Secondary norm: closer=1.0, farther=0.0
# Weighted (0.5):
# closer = (1.0 * 0.5) + (1.0 * 0.5) = 1.0
# farther = (0.0 * 0.5) + (0.0 * 0.5) = 0.0
results = weighted_sum_ranking(
primary, # type: ignore
secondary, # type: ignore
primary_results_weight=0.2,
secondary_results_weight=0.8,
distance_strategy=DistanceStrategy.EUCLIDEAN,
)
assert len(results) == 1
assert results[0]["id_val"] == "d1"
assert results[0]["distance"] == pytest.approx(0.6)
assert len(results) == 2
assert results[0]["id_val"] == "closer"
assert results[0]["distance"] == pytest.approx(1.0)
assert results[1]["id_val"] == "farther"
assert results[1]["distance"] == pytest.approx(0.0)

def test_fetch_top_k(self) -> None:
"""Tests that fetch_top_k correctly limits the number of results."""
primary = [get_row(f"p{i}", (10 - i) / 10.0) for i in range(5)]
# Scores: 1.0, 0.9, 0.8, 0.7, 0.6
# Weighted (0.5): 0.5, 0.45, 0.4, 0.35, 0.3
results = weighted_sum_ranking(primary, [], fetch_top_k=2) # type: ignore
# p0=1.0, p1=0.9, p2=0.8, p3=0.7, p4=0.6
# The best scores (lowest distance) are p4 and p3
results = weighted_sum_ranking(
primary, # type: ignore
[],
fetch_top_k=2,
)
assert len(results) == 2
assert results[0]["id_val"] == "p0"
assert results[0]["distance"] == pytest.approx(0.5)
assert results[1]["id_val"] == "p1"
assert results[1]["distance"] == pytest.approx(0.45)
assert results[0]["id_val"] == "p4" # Has the best normalized score
assert results[1]["id_val"] == "p3"


class TestReciprocalRankFusion:
Expand Down