Skip to content

Commit

Permalink
Update python interface for fusion operator (infiniflow#1657)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?

Update python interface for fusion operator
Remove CommonMatchTensorExpr
fix infiniflow#1655

### Type of change

- [x] Documentation Update
- [x] Refactoring
- [x] Test cases
- [x] Python SDK impacted, Need to update PyPI
  • Loading branch information
yangzq50 authored Aug 15, 2024
1 parent c7d932d commit 024f82b
Show file tree
Hide file tree
Showing 18 changed files with 164 additions and 224 deletions.
41 changes: 23 additions & 18 deletions docs/references/pysdk_api_reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -1706,14 +1706,14 @@ for question in questions:
## fusion

```python
table_object.fusion(method, options_text, commonMatchTensorExpr = None)
table_object.fusion(method, topn, fusion_params = None)
```

Builds a fusion expression.

### Parameters

#### method: `str`
#### method: `str`, *Required*

Supported reranking methods for multi-way retrieval include:

Expand All @@ -1724,23 +1724,29 @@ Supported reranking methods for multi-way retrieval include:
- `"match_tensor"`
Infinity's tensor-based reranking approach. This is used for reranker dense vector, sparse vector, or full-text retrieval paths.

#### options_text: `str`, *Required
#### topn: `int`, *Required*

Mandatory setting for the fused reranking.
Specifies the number of the most relevant rows to retrieve, e.g., `topn=10` to obtain the ten most relevant rows.

A non-empty, semicolon-separated string specifying the following reranking options:
#### fusion_params: `Optional[dict]`, *Differs across methods*

- **Common options**: `str`, *Required*
Mandatory settings for the fused reranking.
- `"topn"`: Specifies the number of the most relevant rows to retrieve, e.g., `"topn=10"` to obtain the ten most relevant rows.
A non-empty dict specifying the following reranking options:

- **RRF-specific options**: `str`, *Optional*
- **RRF-specific options**: *Optional*
Settings when employing RRF for reranking.
- `"rank_constant"`: The smoothing constant for RRF reranking. Typically set to `60`, e.g., `"topn=10;rank_constant=60"`.
- `"rank_constant"`: The smoothing constant for RRF reranking. Typically set to `60`, e.g., `{"rank_constant": 60}`.

- **weighted_sum-specific options**: `str`, *Optional*
- **weighted_sum-specific options**: *Optional*
Settings when employing Weighted Sum for reranking.
- `"weights"`: Specifies the weight for each retrieval path. For example, `"weights=1,2,0.5"` sets weights of `1`, `2`, and `0.5` for the first, second, and third retrieval paths, respectively. The default weight of each retrieval path is `1.0`. If `"weight"` is not specified, all retrieval paths will be assiged the default weight of `1.0`.
- `"weights"`: Specifies the weight for each retrieval path. For example, `{"weights": "1,2,0.5"}` sets weights of `1`, `2`, and `0.5` for the first, second, and third retrieval paths, respectively. The default weight of each retrieval path is `1.0`. If `"weight"` is not specified, all retrieval paths will be assiged the default weight of `1.0`.

#### commonMatchTensorExpr: `commonMatchTensorExpr()`, *Optional*
- **match_tensor-specific options**: *Required when using match_tensor fusion method*
Settings when employing match_tensor for reranking.
- `"field"`: The name of the tensor column to be used for reranking.
- `"data"`: The tensor data to query against. This should be provided as a list of lists or a two-dimensional NumPy
array of numerical values.
- `"data_type"`: The element data type of the query tensor. Usually `"float"`.

### Returns

Expand All @@ -1761,7 +1767,7 @@ table_object.output(["num", "body", "vec", "sparse", "year", "tensor", "_score"]
.match_sparse("sparse", {"indices": [0, 20, 80], "values": [1.0, 2.0, 3.0]}, "ip", 3)
.match("body", "blooms", "topn=10")
.filter("year < 2024")
.fusion("rrf", "topn=2")
.fusion("rrf", 2)
.to_pl()
```

Expand All @@ -1771,7 +1777,7 @@ table_object.output(["num", "body", "vec", "sparse", "year", "tensor", "_score"]
.match_sparse("sparse", {"indices": [0, 20, 80], "values": [1.0, 2.0, 3.0]}, "ip", 3)
.match("body", "blooms", "topn=10")
.filter("year < 2024")
.fusion("rrf", "topn=2;rank_constant=30")
.fusion("rrf", 2, {"rank_constant": 30})
.to_pl()
```

Expand All @@ -1783,21 +1789,20 @@ table_object.output(["num", "body", "vec", "sparse", "year", "tensor", "_score"]
.match_sparse("sparse", {"indices": [0, 20, 80], "values": [1.0, 2.0, 3.0]}, "ip", 3)
.match("body", "blooms", "topn=10")
.filter("year < 2024")
.fusion("weighted_sum", "topn=2;weights=1,2,0.5")
.fusion("weighted_sum", 2, {"weights": "1,2,0.5"})
.to_pl()
```

#### Use tensor reranking

```python {8}
# You must import `CommonMatchTensorExpr`to set tensor reranking parameters
from infinity.common import CommonMatchTensorExpr
table_object.output(["num", "body", "vec", "sparse", "year", "tensor", "_score"])
.knn("vec", [3.0, 2.8, 2.7, 3.1], "float", "cosine", 3)
.match_sparse("sparse", {"indices": [0, 20, 80], "values": [1.0, 2.0, 3.0]}, "ip", 3)
.match("body", "blooms", "topn=10")
.filter("year < 2024")
.fusion("match_tensor", "topn=2", commonMatchTensorExpr("tensor", [[0.0, -10.0, 0.0, 0.7], [9.2, 45.6, -55.8, 3.5]], "float", "maxsim"))
.fusion("match_tensor", 2, {"field": "tensor", "data_type": "float",
"data": [[0.0, -10.0, 0.0, 0.7], [9.2, 45.6, -55.8, 3.5]]})
.to_pl()
```

Expand Down
10 changes: 4 additions & 6 deletions example/ColBERT_reranker_example/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@
from typing import Union
# NOTICE: please check which infinity you are using, local or remote
# this statement is for local infinity
from infinity.local_infinity.types import make_match_tensor_expr
# enable the following import statement to use remote infinity
# from infinity.remote_thrift.types import make_match_tensor_expr
# from infinity.common import LOCAL_HOST


Expand Down Expand Up @@ -142,8 +140,7 @@ def query_match_tensor(self, query_str: str, output_columns: list[str], top_n: i
raise ValueError("Dimension error.")
query_result = self.colbert_test_table.output(output_columns).match_tensor(target_col_name,
query_tensor.numpy(force=True),
'float', 'maxsim',
f'topn={top_n}').to_pl()
'float', top_n).to_pl()
print(query_result)
return query_result

Expand All @@ -164,10 +161,11 @@ def query_fusion(self, query_str: str, output_columns: list[str], final_top_n: i
query_tensor = self.ckpt.queryFromText([query_str])[0]
if query_tensor.dim() != 2 or query_tensor.size(1) != 128:
raise ValueError("Dimension error.")
rerank_expr = make_match_tensor_expr(target_col_name, query_tensor.numpy(force=True), 'float', 'maxsim')
query_result = self.colbert_test_table.output(output_columns).match(self.inner_col_txt, query_str,
f'topn={first_stage_top_n}').fusion(
'match_tensor', f'topn={final_top_n}', match_tensor_expr=rerank_expr).to_pl()
method='match_tensor', topn=final_top_n,
fusion_params={"field": target_col_name, "data": query_tensor.numpy(force=True),
"data_type": "float"}).to_pl()
print(query_result)
return query_result

Expand Down
14 changes: 4 additions & 10 deletions example/hybrid_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
"""

import infinity
from infinity.common import CommonMatchTensorExpr

try:
# open a local directory to store the data
Expand Down Expand Up @@ -84,7 +83,7 @@
)

# TODO: dense vector + sparse vector + full-text + structured data filter + tensor reranker
# result = table_instance.output(["num", "body"]).knn("vec", [3.0, 2.8, 2.7, 3.1], "float", "ip", 3).match("body", "blooms","topn=1").fusion("rrf").to_pl()
# result = table_instance.output(["num", "body"]).knn("vec", [3.0, 2.8, 2.7, 3.1], "float", "ip", 3).match("body", "blooms","topn=1").fusion(method="rrf").to_pl()

result = (
table_instance.output(
Expand All @@ -97,14 +96,9 @@
.match("body", "blooms", "topn=10")
.filter("year < 2024")
.fusion(
"match_tensor",
"topn=2",
CommonMatchTensorExpr(
"tensor",
[[0.9, 0.0, 0.0, 0.0], [1.1, 0.0, 0.0, 0.0]],
"float",
"maxsim",
),
method="match_tensor", topn=2,
fusion_params={"field": "tensor", "data_type": "float",
"data": [[0.9, 0.0, 0.0, 0.0], [1.1, 0.0, 0.0, 0.0]]}
)
.to_pl()
)
Expand Down
6 changes: 4 additions & 2 deletions example/tensor_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,10 @@
},
]
)

result = table_instance.output(["num", "vec", "_score"]).match_tensor("vec", [[0.9, 0.0, 0.0, 0.0], [1.1, 0.0, 0.0, 0.0]], 'float', 'maxsim', 'topn=2').to_pl()
result = table_instance.output(["num", "vec", "_score"]).match_tensor(column_name="vec", topn=2,
query_data=[[0.9, 0.0, 0.0, 0.0],
[1.1, 0.0, 0.0, 0.0]],
query_data_type='float').to_pl()
print(result)
infinity_instance.disconnect()

Expand Down
12 changes: 6 additions & 6 deletions python/benchmark/mldr_benchmark/get_search_colbert_rerank.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@
from mldr_common_tools import query_yields, apply_funcs
from transformers import HfArgumentParser
import infinity
from infinity.common import LOCAL_HOST, CommonMatchTensorExpr
from infinity.remote_thrift.types import make_match_tensor_expr
from infinity.common import LOCAL_HOST


@dataclass
Expand All @@ -40,9 +39,10 @@ class RerankOption:


def apply_colbert_rerank(result_table, rerank_tensor, max_hits: int, rerank_option):
colbert_rerank_column_name = {'colbert': 'colbert_col', 'colbert_bit': 'colbert_bit_col'}
rerank_expr = CommonMatchTensorExpr(colbert_rerank_column_name[rerank_option], rerank_tensor, 'float', 'maxsim')
return result_table.fusion('match_tensor', f'topn={max_hits}', rerank_expr)
colbert_rerank_column_names = {'colbert': 'colbert_col', 'colbert_bit': 'colbert_bit_col'}
query_column_name = colbert_rerank_column_names[rerank_option]
return result_table.fusion(method='match_tensor', topn=max_hits,
fusion_params={"field": query_column_name, "data": rerank_tensor, "data_type": "float"})


def powerset_above_2(s: list):
Expand Down Expand Up @@ -80,7 +80,7 @@ def rerank_rff_query_func(self, query_targets_list, apply_funcs_list, rerank_tar
result_table = self.infinity_table.output(["docid_col", "_score"])
for query_target, apply_func in zip(query_targets_list, apply_funcs_list):
result_table = apply_func(result_table, query_target, max_hits)
result_table = result_table.fusion('rrf', options_text=f'topn={max_hits}')
result_table = result_table.fusion(method='rrf', topn=max_hits)
result_table = apply_colbert_rerank(result_table, rerank_target, max_hits, rerank_option)
result = result_table.to_pl()
return result['docid_col'], result['SCORE']
Expand Down
2 changes: 1 addition & 1 deletion python/benchmark/mldr_benchmark/get_search_rrf.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ def fusion_query(self, query_targets_list: list, apply_funcs_list: list, max_hit
result_table = self.infinity_table.output(["docid_col", "_score"])
for query_target, apply_func in zip(query_targets_list, apply_funcs_list):
result_table = apply_func(result_table, query_target, max_hits)
result_table = result_table.fusion('rrf', options_text=f'topn={max_hits}')
result_table = result_table.fusion(method='rrf', topn=max_hits)
result = result_table.to_pl()
return result['docid_col'], result['SCORE']

Expand Down
2 changes: 1 addition & 1 deletion python/benchmark/mldr_benchmark/get_search_weighted_sum.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def weighted_sum_query(self, query_targets_list: list, apply_funcs_list: list, m
result_table = self.infinity_table.output(["docid_col", "_score"])
for query_target, apply_func in zip(query_targets_list, apply_funcs_list):
result_table = apply_func(result_table, query_target, max_hits)
result_table = result_table.fusion('weighted_sum', options_text=f'topn={max_hits};weights={weights_str}')
result_table = result_table.fusion(method='weighted_sum', topn=max_hits, fusion_params={"weights": weights_str})
result = result_table.to_pl()
return result['docid_col'], result['SCORE']

Expand Down
4 changes: 2 additions & 2 deletions python/benchmark/mldr_benchmark/mldr_common_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,8 +277,8 @@ def apply_sparse(table, query_embedding: dict, max_hits: int):


def apply_colbert(table, query_tensor: list[list], max_hits: int):
return table.match_tensor("colbert_col", query_tensor, 'float', 'maxsim',
f'topn={max_hits};emvb_threshold_first=0.3;emvb_threshold_final=0.4')
return table.match_tensor(column_name="colbert_col", query_data=query_tensor, query_data_type='float',
topn=max_hits, extra_option={"emvb_threshold_first": 0.3, "emvb_threshold_final": 0.4})


apply_funcs = {'bm25': apply_bm25, 'dense': apply_dense, 'sparse': apply_sparse, 'colbert': apply_colbert}
Expand Down
9 changes: 0 additions & 9 deletions python/infinity/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,5 @@ def __init__(self, error_code=0, error_message=None):
self.error_code = error_code
self.error_message = error_message

class CommonMatchTensorExpr:
def __init__(self, vector_column_name: str, embedding_data: VEC, embedding_data_type: str,
method_type: str, extra_option: str = None):
self.vector_column_name = vector_column_name
self.embedding_data = embedding_data
self.embedding_data_type = embedding_data_type
self.method_type = method_type
self.extra_option = extra_option

DEFAULT_MATCH_VECTOR_TOPN = 10
DEFAULT_MATCH_SPARSE_TOPN = 10
52 changes: 28 additions & 24 deletions python/infinity/local_infinity/query_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from pyarrow import Table
from sqlglot import condition, maybe_parse

from infinity.common import VEC, SPARSE, InfinityException, DEFAULT_MATCH_VECTOR_TOPN, CommonMatchTensorExpr
from infinity.common import VEC, SPARSE, InfinityException
from infinity.embedded_infinity_ext import *
from infinity.local_infinity.types import logic_type_to_dtype, make_match_tensor_expr
from infinity.local_infinity.utils import traverse_conditions, parse_expr
Expand Down Expand Up @@ -251,46 +251,50 @@ def match(

def match_tensor(
self,
vector_column_name: str,
embedding_data: VEC,
embedding_data_type: str,
method_type: str,
extra_option: str,
column_name: str,
query_data: VEC,
query_data_type: str,
topn: int,
extra_option: Optional[dict] = None,
) -> InfinityLocalQueryBuilder:
if self._search is None:
self._search = WrapSearchExpr()
self._search.match_exprs = list()
option_str = f"topn={topn}"
if extra_option is not None:
for k, v in extra_option.items():
option_str += f";{k}={v}"
match_tensor_expr = WrapParsedExpr(ParsedExprType.kMatchTensor)
match_tensor_expr.match_tensor_expr = make_match_tensor_expr(
vector_column_name,
embedding_data,
embedding_data_type,
method_type,
extra_option
vector_column_name=column_name,
embedding_data=query_data,
embedding_data_type=query_data_type,
method_type="maxsim",
extra_option=option_str,
)

self._search.match_exprs += [match_tensor_expr]
return self

def fusion(self, method: str, options_text: str = None, match_tensor_expr: CommonMatchTensorExpr = None) -> InfinityLocalQueryBuilder:
def fusion(self, method: str, topn: int, fusion_params: Optional[dict]) -> InfinityLocalQueryBuilder:
if self._search is None:
self._search = WrapSearchExpr()
fusion_expr = WrapFusionExpr()
fusion_expr.method = method
if (options_text is not None) and (options_text != ""):
fusion_expr.options_text = options_text
if match_tensor_expr is not None:
final_option_text = f"topn={topn}"
if method in ["rrf", "weighted_sum"]:
if isinstance(fusion_params, dict):
for k, v in fusion_params.items():
if k == "topn":
raise InfinityException(ErrorCode.INVALID_EXPRESSION, "topn is not allowed in fusion params")
final_option_text += f";{k}={v}"
elif method in ["match_tensor"]:
fusion_expr.has_match_tensor_expr = True
fusion_expr.match_tensor_expr = make_match_tensor_expr(
match_tensor_expr.vector_column_name,
match_tensor_expr.embedding_data,
match_tensor_expr.embedding_data_type,
match_tensor_expr.method_type,
match_tensor_expr.extra_option
)
vector_column_name=fusion_params["field"], embedding_data=fusion_params["data"],
embedding_data_type=fusion_params["data_type"], method_type="maxsim", extra_option=None)
else:
fusion_expr.has_match_tensor_expr = False

raise InfinityException(ErrorCode.INVALID_EXPRESSION, "Invalid fusion method")
fusion_expr.options_text = final_option_text
self._search.fusion_exprs += [fusion_expr]
# *WARN*: A list in nanobind wrapped object is odd that append() does nothing!
# However `list add list` still works.
Expand Down
13 changes: 6 additions & 7 deletions python/infinity/local_infinity/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from infinity.embedded_infinity_ext import WrapIndexInfo, ImportOptions, CopyFileType, WrapParsedExpr, \
ParsedExprType, WrapUpdateExpr, ExportOptions, WrapOptimizeOptions
from infinity.common import ConflictType, DEFAULT_MATCH_VECTOR_TOPN
from infinity.common import INSERT_DATA, VEC, SPARSE, InfinityException, CommonMatchTensorExpr
from infinity.common import INSERT_DATA, VEC, SPARSE, InfinityException
from infinity.errors import ErrorCode
from infinity.index import IndexInfo
from infinity.local_infinity.query_builder import Query, InfinityLocalQueryBuilder, ExplainQuery
Expand Down Expand Up @@ -333,15 +333,14 @@ def match(self, fields: str, matching_text: str, options_text: str = ''):
return self

@params_type_check
def match_tensor(self, vector_column_name: str, embedding_data: VEC, embedding_data_type: str, method_type: str,
extra_option: str):
self.query_builder.match_tensor(vector_column_name, embedding_data, embedding_data_type, method_type,
extra_option)
def match_tensor(self, column_name: str, query_data: VEC, query_data_type: str, topn: int,
extra_option: Optional[dict] = None):
self.query_builder.match_tensor(column_name, query_data, query_data_type, topn, extra_option)
return self

@params_type_check
def fusion(self, method: str, options_text: str = '', match_tensor_expr: CommonMatchTensorExpr=None):
self.query_builder.fusion(method, options_text, match_tensor_expr)
def fusion(self, method: str, topn: int, fusion_params: Optional[dict] = None):
self.query_builder.fusion(method, topn, fusion_params)
return self

def output(self, columns: Optional[List[str]]):
Expand Down
Loading

0 comments on commit 024f82b

Please sign in to comment.