Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
wip: working multivector query
  • Loading branch information
justin-cechmanek committed Sep 24, 2025
commit d0dff0bb0f6472d647e3f066e7bead036f965a7c
193 changes: 126 additions & 67 deletions redisvl/query/aggregate.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,76 +260,139 @@ class MultiVectorQuery(AggregationQuery):
results = index.query(query)



FT.AGGREGATE multi_vector_test
"@user_embedding:[VECTOR_RANGE 2.0 $vector_0]=>{$YIELD_DISTANCE_AS: distance_0}
| @image_embedding:[VECTOR_RANGE 2.0 $vector_1]=>{$YIELD_DISTANCE_AS: distance_1}"
PARAMS 4
vector_0 "\xcd\xcc\xcc=\xcd\xcc\xcc=\x00\x00\x00?"
vector_1 "\x9a\x99\x99\x99\x99\x99\xb9?\x9a\x99\x99\x99\x99\x99\xb9?\x9a\x99\x99\x99\x99\x99\xb9?\x9a\x99\x99\x99\x99\x99\xb9?\x9a\x99\x99\x99\x99\x99\xb9?"
APPLY "(2 - @distance_0)/2" AS score_0
APPLY "(2 - @distance_1)/2" AS score_1
DIALECT 2
APPLY "(@score_0 + @score_1)" AS combined_score
SORTBY 2 @combined_score
ASC
MAX 10
LOAD 2 score_0 score_1





FT.AGGREGATE 'idx:characters'
"@embedding1:[VECTOR_RANGE .7 $vector1]=>{$YIELD_DISTANCE_AS: vector_distance1} | @embedding2:[VECTOR_RANGE 1.0 $vector2]=>{$YIELD_DISTANCE_AS: vector_distance2} | @embedding3:[VECTOR_RANGE 1.7 $vector3]=>{$YIELD_DISTANCE_AS: vector_distance3} | @name:(James)"
ADDSCORES
SCORER BM25STD.NORM
LOAD 2 created_at @embedding
APPLY 'case(exists(@vector_distance1), @vector_distance1, 0.0)' as v1
APPLY 'case(exists(@vector_distance2), @vector_distance2, 0.0)' as v2
APPLY 'case(exists(@vector_distance3), @vector_distance3, 0.0)' as v3
"@embedding1:[VECTOR_RANGE .7 $vector1]=>{$YIELD_DISTANCE_AS: vector_distance1}
| @embedding2:[VECTOR_RANGE 1.0 $vector2]=>{$YIELD_DISTANCE_AS: vector_distance2}
| @embedding3:[VECTOR_RANGE 1.7 $vector3]=>{$YIELD_DISTANCE_AS: vector_distance3}
| @name:(James)"
### ADDSCORES
### SCORER BM25STD.NORM
### LOAD 2 created_at @embedding
APPLY '(2 - @vector_distance1)/2' as v1
APPLY '(2 - @vector_distance2)/2' as v2
APPLY '(2 - @vector_distance3)/2' as v3
APPLY '(@__score * 0.3 + (@v1 * 0.3) + (@v2 * 1.2) + (@v3 * 0.1))' AS final_score
PARAMS 6 vector1 "\xe4\xd6..." vector2 "\x89\xa0..." vector3 "\x3c\x19..."
SORTBY 2 @final_score DESC
DIALECT 2
LIMIT 0 100


"""

DISTANCE_ID: str = "vector_distance"
VECTOR_PARAM: str = "vector"

def __init__(
self,
vectors: Union[bytes, List[bytes], List[float], List[List[float]]],
vector_field_names: Union[str, List[str]],
weights: List[float] = [1.0],
return_fields: Optional[List[str]] = None,
filter_expression: Optional[Union[str, FilterExpression]] = None,
weights: Union[float, List[float]] = 1.0,
dtypes: Union[str, List[str]] = "float32",
dtypes: List[str] = ["float32"],
num_results: int = 10,
return_fields: Optional[List[str]] = None,
return_score: bool = False,
dialect: int = 2,
):
"""
Instantiates a MultiVectorQuery object.

Args:
vectors (Union[bytes, List[bytes], List[float], List[List[float]]): The vectors to perform vector similarity search.
vector_field_names (str): The vector field names to search in.
filter_expression (Optional[FilterExpression], optional): The filter expression to use.
Defaults to None.
weights (Union[float, List[float]], optional): The weights of the vector similarity.
vector_field_names (Union[str, List[str]]): The vector field names to search in.
weights (List[float]): The weights of the vector similarity.
Documents will be scored as:
score = (w1) * score1 + (w2) * score2 + (w3) * score3 + ...
Defaults to 1.0, which corresponds to equal weighting
dtype (Union[str, List[str]] optional): The data types of the vectors. Defaults to "float32" for all vectors.
num_results (int, optional): The number of results to return. Defaults to 10.
Defaults to [1.0], which corresponds to equal weighting
return_fields (Optional[List[str]], optional): The fields to return. Defaults to None.
filter_expression (Optional[Union[str, FilterExpression]]): The filter expression to use.
Defaults to None.
dtypes (List[str]): The data types of the vectors. Defaults to ["float32"] for all vectors.
num_results (int, optional): The number of results to return. Defaults to 10.
return_score (bool): Whether to return the combined vector similarity score.
Defaults to False.
dialect (int, optional): The Redis dialect version. Defaults to 2.

Raises:
ValueError: The number of vectors, vector field names, and weights do not agree.
TypeError: If the stopwords are not a set, list, or tuple of strings.
"""

self._vectors = vectors
self._vector_fields = vector_field_names
self._filter_expression = filter_expression
self._weights = weights
self._dtypes = dtypes
self._num_results = num_results

if len(vectors) == 0 or len(vector_field_names) == 0 or len(weights) == 0:
raise ValueError(
f"""The number of vectors and vector field names must be equal.
If weights are specified their number must match the number of vectors and vector field names also.
Length of vectors list: {len(vectors) = }
Length of vector_field_names list: {len(vector_field_names) = }
Length of weights list: {len(weights) = }
"""
)

if isinstance(vectors, bytes) or isinstance(vectors[0], float):
self._vectors = [vectors]
else:
self._vectors = vectors
if isinstance(vector_field_names, str):
self._vector_field_names = [vector_field_names]
else:
self._vector_field_names = vector_field_names
if len(weights) == 1:
self._weights = weights * len(vectors)
else:
self._weights = weights
if len(dtypes) == 1:
self._dtypes = dtypes * len(vectors)
else:
self._dtypes = dtypes

if (len(self._vectors) != len(self._vector_field_names)) or (
len(self._vectors) != len(self._weights)
):
raise ValueError(
f"""The number of vectors and vector field names must be equal.
If weights are specified their number must match the number of vectors and vector field names also.
Length of vectors list: {len(self._vectors) = }
Length of vector_field_names list: {len(self._vector_field_names) = }
Length of weights list: {len(self._weights) = }
"""
)

query_string = self._build_query_string()
super().__init__(query_string)

self.scorer(text_scorer)
self.add_scores()
self.apply(
vector_similarity=f"(2 - @{self.DISTANCE_ID})/2", text_score="@__score"
)
self.apply(hybrid_score=f"{1-alpha}*@text_score + {alpha}*@vector_similarity")
self.sort_by(Desc("@hybrid_score"), max=num_results) # type: ignore
# construct the scoring string based on the vector similarity scores and weights
combined_scores = []
for i, w in enumerate(self._weights):
combined_scores.append(f"@score_{i} * {w}")
combined_score_string = " + ".join(combined_scores)
combined_score_string = f"'({combined_score_string})'"

self.apply(combined_score=combined_score_string)

# self.add_scores()
self.sort_by(Desc("@combined_score"), max=num_results) # type: ignore
self.dialect(dialect)
if return_fields:
self.load(*return_fields) # type: ignore[arg-type]
Expand All @@ -341,49 +404,45 @@ def params(self) -> Dict[str, Any]:
Returns:
Dict[str, Any]: The parameters for the aggregation.
"""
if isinstance(self._vector, list):
vector = array_to_buffer(self._vector, dtype=self._dtype)
else:
vector = self._vector

params = {self.VECTOR_PARAM: vector}

params = {}
for i, (vector, vector_field, dtype) in enumerate(zip(
self._vectors, self._vector_field_names, self._dtypes
)):
if isinstance(vector, list):
vector = array_to_buffer(vector, dtype=dtype)
params[f"vector_{i}"] = vector
return params

def _tokenize_and_escape_query(self, user_query: str) -> str:
"""Convert a raw user query to a redis full text query joined by ORs
Args:
user_query (str): The user query to tokenize and escape.

Returns:
str: The tokenized and escaped query string.
Raises:
ValueError: If the text string becomes empty after stopwords are removed.
"""
escaper = TokenEscaper()

tokens = [
escaper.escape(
token.strip().strip(",").replace("“", "").replace("”", "").lower()
)
for token in user_query.split()
]
tokenized = " | ".join(
[token for token in tokens if token and token not in self._stopwords]
)

if not tokenized:
raise ValueError("text string cannot be empty after removing stopwords")
return tokenized

def _build_query_string(self) -> str:
"""Build the full query string for text search with optional filtering."""

filter_expression = self._filter_expression
if isinstance(self._filter_expression, FilterExpression):
filter_expression = str(self._filter_expression)
else:
filter_expression = ""

# base KNN query
knn_query = f"KNN {self._num_results} @{self._vector_field} ${self.VECTOR_PARAM} AS {self.DISTANCE_ID}"
knn_queries = []
range_queries = []
for i, (vector, field) in enumerate(zip(self._vectors, self._vector_field_names)):
knn_queries.append(f"[KNN {self._num_results} @{field} $vector_{i} AS distance_{i}]")
range_queries.append(f"@{field}:[VECTOR_RANGE 2.0 $vector_{i}]=>{{$YIELD_DISTANCE_AS: distance_{i}}}")

knn_query = " | ".join(knn_queries) ## knn_queries format doesn't work
knn_query = " | ".join(range_queries)

# calculate the respective vector similarities
apply_string = ""
for i, (vector, field_name, weight) in enumerate(
zip(self._vectors, self._vector_field_names, self._weights)
):
apply_string += f'APPLY "(2 - @distance_{i})/2" AS score_{i} '

return f"{filter_expression})=>[{knn_query}]"
return (
f"{knn_query} {filter_expression} {apply_string}"
if filter_expression
else f"{knn_query} {apply_string}"
)

def __str__(self) -> str:
"""Return the string representation of the query."""
return " ".join([str(x) for x in self.build_args()])