Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions langchain_postgres/v2/async_vectorstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -670,6 +670,7 @@ async def __query_collection(
dense_results,
sparse_results,
**hybrid_search_config.fusion_function_parameters,
distance_strategy=self.distance_strategy,
)
return combined_results
return dense_results
Expand Down
97 changes: 75 additions & 22 deletions langchain_postgres/v2/hybrid_search_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,48 @@

from sqlalchemy import RowMapping

from .indexes import DistanceStrategy


def _normalize_scores(
results: Sequence[dict[str, Any]], is_distance_metric: bool
) -> Sequence[dict[str, Any]]:
"""Normalizes scores to a 0-1 scale, where 1 is best."""
if not results:
return []

# Get scores from the last column of each result
scores = [float(list(item.values())[-1]) for item in results]
min_score, max_score = min(scores), max(scores)
score_range = max_score - min_score

if score_range == 0:
# All documents are of the highest quality (1.0)
for item in results:
item["normalized_score"] = 1.0
return list(results)

for item in results:
# Access the score again from the last column for calculation
score = list(item.values())[-1]
normalized = (score - min_score) / score_range
if is_distance_metric:
# For distance, a lower score is better, so we invert the result.
item["normalized_score"] = 1.0 - normalized
else:
# For similarity (like keyword search), a higher score is better.
item["normalized_score"] = normalized

return list(results)


def weighted_sum_ranking(
primary_search_results: Sequence[RowMapping],
secondary_search_results: Sequence[RowMapping],
primary_results_weight: float = 0.5,
secondary_results_weight: float = 0.5,
fetch_top_k: int = 4,
**kwargs: Any,
) -> Sequence[dict[str, Any]]:
"""
Ranks documents using a weighted sum of scores from two sources.
Expand All @@ -32,35 +67,52 @@ def weighted_sum_ranking(
descending order.
"""

distance_strategy = kwargs.get(
"distance_strategy", DistanceStrategy.COSINE_DISTANCE
)
is_primary_distance = distance_strategy != DistanceStrategy.INNER_PRODUCT

# Normalize both sets of results onto a 0-1 scale
normalized_primary = _normalize_scores(
[dict(row) for row in primary_search_results],
is_distance_metric=is_primary_distance,
)

# Keyword search relevance is a similarity score (higher is better)
normalized_secondary = _normalize_scores(
[dict(row) for row in secondary_search_results], is_distance_metric=False
)

# stores computed metric with provided distance metric and weights
weighted_scores: dict[str, dict[str, Any]] = {}

# Process results from primary source
for row in primary_search_results:
values = list(row.values())
doc_id = str(values[0]) # first value is doc_id
distance = float(values[-1]) # type: ignore # last value is distance
row_values = dict(row)
row_values["distance"] = primary_results_weight * distance
weighted_scores[doc_id] = row_values

# Process results from secondary source,
# adding to existing scores or creating new ones
for row in secondary_search_results:
values = list(row.values())
doc_id = str(values[0]) # first value is doc_id
distance = float(values[-1]) # type: ignore # last value is distance
primary_score = (
weighted_scores[doc_id]["distance"] if doc_id in weighted_scores else 0.0
)
row_values = dict(row)
row_values["distance"] = distance * secondary_results_weight + primary_score
weighted_scores[doc_id] = row_values
# Process primary results
for item in normalized_primary:
doc_id = str(list(item.values())[0])
# Set the 'distance' key with the weighted primary score
item["distance"] = item["normalized_score"] * primary_results_weight
weighted_scores[doc_id] = item

# Process secondary results
for item in normalized_secondary:
doc_id = str(list(item.values())[0])
secondary_weighted_score = item["normalized_score"] * secondary_results_weight

if doc_id in weighted_scores:
# Add to the existing 'distance' score
weighted_scores[doc_id]["distance"] += secondary_weighted_score
else:
# Set the 'distance' key for the new item
item["distance"] = secondary_weighted_score
weighted_scores[doc_id] = item

# Sort the results by weighted score in descending order
ranked_results = sorted(
weighted_scores.values(), key=lambda item: item["distance"], reverse=True
)

for result in ranked_results:
result.pop("normalized_score", None)

return ranked_results[:fetch_top_k]


Expand All @@ -69,6 +121,7 @@ def reciprocal_rank_fusion(
secondary_search_results: Sequence[RowMapping],
rrf_k: float = 60,
fetch_top_k: int = 4,
**kwargs: Any,
) -> Sequence[dict[str, Any]]:
"""
Ranks documents using Reciprocal Rank Fusion (RRF) of scores from two sources.
Expand Down
6 changes: 3 additions & 3 deletions tests/unit_tests/v2/test_async_pg_vectorstore_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ async def test_hybrid_search_weighted_sum_vector_bias(
result_ids = [doc.metadata["doc_id_key"] for doc in results]

assert len(result_ids) > 0
assert result_ids[0] == "hs_doc_orange_fruit"
assert result_ids[0] == "hs_doc_generic_tech"

async def test_hybrid_search_weighted_sum_fts_bias(
self,
Expand Down Expand Up @@ -611,7 +611,7 @@ async def test_hybrid_search_fts_empty_results(
assert len(result_ids) > 0
assert "hs_doc_apple_fruit" in result_ids or "hs_doc_apple_tech" in result_ids
# The top result should be one of the apple documents based on vector search
assert results[0].metadata["doc_id_key"].startswith("hs_doc_unrelated_cat")
assert results[0].metadata["doc_id_key"].startswith("hs_doc_apple_fruit")

async def test_hybrid_search_vector_empty_results_effectively(
self,
Expand Down Expand Up @@ -639,7 +639,7 @@ async def test_hybrid_search_vector_empty_results_effectively(

# Expect results based purely on FTS search for "orange fruit"
assert len(result_ids) == 1
assert result_ids[0] == "hs_doc_generic_tech"
assert result_ids[0] == "hs_doc_orange_fruit"

async def test_hybrid_search_without_tsv_column(
self,
Expand Down
140 changes: 93 additions & 47 deletions tests/unit_tests/v2/test_hybrid_search_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
reciprocal_rank_fusion,
weighted_sum_ranking,
)
from langchain_postgres.v2.indexes import DistanceStrategy


# Helper to create mock input items that mimic RowMapping for the fusion functions
Expand All @@ -22,82 +23,127 @@ def get_row(doc_id: str, score: float, content: str = "content") -> dict:

class TestWeightedSumRanking:
def test_empty_inputs(self) -> None:
"""Tests that the function handles empty inputs gracefully."""
results = weighted_sum_ranking([], [])
assert results == []

def test_primary_only(self) -> None:
def test_primary_only_cosine_default(self) -> None:
"""Tests ranking with only primary results using default cosine distance."""
primary = [get_row("p1", 0.8), get_row("p2", 0.6)]
# Expected scores: p1 = 0.8 * 0.5 = 0.4, p2 = 0.6 * 0.5 = 0.3
results = weighted_sum_ranking( # type: ignore
# --- Calculation (Cosine = lower is better) ---
# Scores: [0.8, 0.6]. Range: 0.2. Min: 0.6.
# p1 norm: 1.0 - ((0.8 - 0.6) / 0.2) = 0.0
# p2 norm: 1.0 - ((0.6 - 0.6) / 0.2) = 1.0
# Weighted (0.5): p1 = 0.0, p2 = 0.5
# Order: p2, p1
results = weighted_sum_ranking(
primary, # type: ignore
[],
primary_results_weight=0.5,
secondary_results_weight=0.5,
)
assert len(results) == 2
assert results[0]["id_val"] == "p1"
assert results[0]["distance"] == pytest.approx(0.4)
assert results[1]["id_val"] == "p2"
assert results[1]["distance"] == pytest.approx(0.3)
assert results[0]["id_val"] == "p2"
assert results[0]["distance"] == pytest.approx(0.5)
assert results[1]["id_val"] == "p1"
assert results[1]["distance"] == pytest.approx(0.0)

def test_secondary_only(self) -> None:
secondary = [get_row("s1", 0.9), get_row("s2", 0.7)]
# Expected scores: s1 = 0.9 * 0.5 = 0.45, s2 = 0.7 * 0.5 = 0.35
"""Tests ranking with only secondary (keyword) results."""
secondary = [get_row("s1", 15.0), get_row("s2", 5.0)]
# --- Calculation (Keyword = higher is better) ---
# Scores: [15.0, 5.0]. Range: 10.0. Min: 5.0.
# s1 norm: (15.0 - 5.0) / 10.0 = 1.0
# s2 norm: (5.0 - 5.0) / 10.0 = 0.0
# Weighted (0.5): s1 = 0.5, s2 = 0.0
# Order: s1, s2
results = weighted_sum_ranking(
[],
secondary, # type: ignore
primary_results_weight=0.5,
secondary_results_weight=0.5,
)
assert len(results) == 2
assert results[0]["id_val"] == "s1"
assert results[0]["distance"] == pytest.approx(0.45)
assert results[0]["distance"] == pytest.approx(0.5)
assert results[1]["id_val"] == "s2"
assert results[1]["distance"] == pytest.approx(0.35)
assert results[1]["distance"] == pytest.approx(0.0)

def test_mixed_results_default_weights(self) -> None:
def test_mixed_results_cosine(self) -> None:
"""Tests combining cosine (lower is better) and keyword (higher is better) scores."""
primary = [get_row("common", 0.8), get_row("p_only", 0.7)]
secondary = [get_row("common", 0.9), get_row("s_only", 0.6)]
# Weights are 0.5, 0.5
# common_score = (0.8 * 0.5) + (0.9 * 0.5) = 0.4 + 0.45 = 0.85
# p_only_score = (0.7 * 0.5) = 0.35
# s_only_score = (0.6 * 0.5) = 0.30
# Order: common (0.85), p_only (0.35), s_only (0.30)

results = weighted_sum_ranking(primary, secondary) # type: ignore
secondary = [get_row("common", 9.0), get_row("s_only", 6.0)]
# --- Calculation ---
# Primary norm (inverted): common=0.0, p_only=1.0
# Secondary norm: common=1.0, s_only=0.0
# Weighted (0.5):
# common = (0.0 * 0.5) + (1.0 * 0.5) = 0.5
# p_only = (1.0 * 0.5) + 0 = 0.5
# s_only = 0 + (0.0 * 0.5) = 0.0
results = weighted_sum_ranking(
primary, # type: ignore
secondary, # type: ignore
)
assert len(results) == 3
assert results[0]["id_val"] == "common"
assert results[0]["distance"] == pytest.approx(0.85)
assert results[1]["id_val"] == "p_only"
assert results[1]["distance"] == pytest.approx(0.35)
# Check that the top two results have the correct score and IDs (order may vary)
top_ids = {res["id_val"] for res in results[:2]}
assert top_ids == {"common", "p_only"}
assert results[0]["distance"] == pytest.approx(0.5)
assert results[1]["distance"] == pytest.approx(0.5)
assert results[2]["id_val"] == "s_only"
assert results[2]["distance"] == pytest.approx(0.30)
assert results[2]["distance"] == pytest.approx(0.0)

def test_mixed_results_custom_weights(self) -> None:
primary = [get_row("d1", 1.0)] # p_w=0.2 -> 0.2
secondary = [get_row("d1", 0.5)] # s_w=0.8 -> 0.4
# Expected: d1_score = (1.0 * 0.2) + (0.5 * 0.8) = 0.2 + 0.4 = 0.6
def test_primary_max_inner_product(self) -> None:
"""Tests using MAX_INNER_PRODUCT (higher is better) for primary search."""
primary = [get_row("best", 0.9), get_row("worst", 0.1)]
secondary = [get_row("best", 20.0), get_row("worst", 5.0)]
# --- Calculation ---
# Primary norm (NOT inverted): best=1.0, worst=0.0
# Secondary norm: best=1.0, worst=0.0
# Weighted (0.5):
# best = (1.0 * 0.5) + (1.0 * 0.5) = 1.0
# worst = (0.0 * 0.5) + (0.0 * 0.5) = 0.0
results = weighted_sum_ranking(
primary, # type: ignore
secondary, # type: ignore
distance_strategy=DistanceStrategy.INNER_PRODUCT,
)
assert len(results) == 2
assert results[0]["id_val"] == "best"
assert results[0]["distance"] == pytest.approx(1.0)
assert results[1]["id_val"] == "worst"
assert results[1]["distance"] == pytest.approx(0.0)

def test_primary_euclidean(self) -> None:
"""Tests using EUCLIDEAN (lower is better) for primary search."""
primary = [get_row("closer", 10.5), get_row("farther", 25.5)]
secondary = [get_row("closer", 100.0), get_row("farther", 10.0)]
# --- Calculation ---
# Primary norm (inverted): closer=1.0, farther=0.0
# Secondary norm: closer=1.0, farther=0.0
# Weighted (0.5):
# closer = (1.0 * 0.5) + (1.0 * 0.5) = 1.0
# farther = (0.0 * 0.5) + (0.0 * 0.5) = 0.0
results = weighted_sum_ranking(
primary, # type: ignore
secondary, # type: ignore
primary_results_weight=0.2,
secondary_results_weight=0.8,
distance_strategy=DistanceStrategy.EUCLIDEAN,
)
assert len(results) == 1
assert results[0]["id_val"] == "d1"
assert results[0]["distance"] == pytest.approx(0.6)
assert len(results) == 2
assert results[0]["id_val"] == "closer"
assert results[0]["distance"] == pytest.approx(1.0)
assert results[1]["id_val"] == "farther"
assert results[1]["distance"] == pytest.approx(0.0)

def test_fetch_top_k(self) -> None:
"""Tests that fetch_top_k correctly limits the number of results."""
primary = [get_row(f"p{i}", (10 - i) / 10.0) for i in range(5)]
# Scores: 1.0, 0.9, 0.8, 0.7, 0.6
# Weighted (0.5): 0.5, 0.45, 0.4, 0.35, 0.3
results = weighted_sum_ranking(primary, [], fetch_top_k=2) # type: ignore
# p0=1.0, p1=0.9, p2=0.8, p3=0.7, p4=0.6
# The best scores (lowest distance) are p4 and p3
results = weighted_sum_ranking(
primary, # type: ignore
[],
fetch_top_k=2,
)
assert len(results) == 2
assert results[0]["id_val"] == "p0"
assert results[0]["distance"] == pytest.approx(0.5)
assert results[1]["id_val"] == "p1"
assert results[1]["distance"] == pytest.approx(0.45)
assert results[0]["id_val"] == "p4" # Has the best normalized score
assert results[1]["id_val"] == "p3"


class TestReciprocalRankFusion:
Expand Down Expand Up @@ -224,6 +270,6 @@ def test_reordering_from_inputs_weighted_sum(self) -> None:
results = weighted_sum_ranking(primary, secondary) # type: ignore
assert len(results) == 2
assert results[0]["id_val"] == "docB"
assert results[0]["distance"] == pytest.approx(0.75)
assert results[0]["distance"] == pytest.approx(1.0)
assert results[1]["id_val"] == "docA"
assert results[1]["distance"] == pytest.approx(0.55)
assert results[1]["distance"] == pytest.approx(0.0)