Skip to content

Commit

Permalink
Keep order of match expression (infiniflow#1452)
Browse files Browse the repository at this point in the history
Fusion `weighted_sum` requires that all search expressions be ordered in
an anticipated way.

Issue link:infiniflow#1428

### Type of change

- [x] Bug Fix (non-breaking change which fixes an issue)
  • Loading branch information
yuzhichang authored Jul 10, 2024
1 parent 11c55c7 commit af9a64a
Show file tree
Hide file tree
Showing 27 changed files with 2,294 additions and 1,951 deletions.
36 changes: 19 additions & 17 deletions benchmark/remote_infinity/remote_query_benchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,37 +206,39 @@ int main() {
}
req.select_list.push_back(std::move(expr));
req.__isset.select_list = true;
KnnExpr knn_expr;
auto knn_expr = std::make_shared<KnnExpr>();
{
knn_expr.column_expr.column_name.emplace_back("col1");
knn_expr.column_expr.__isset.column_name = true;
knn_expr.__isset.column_expr = true;
auto &q = knn_expr.embedding_data.f32_array_value;
knn_expr->column_expr.column_name.emplace_back("col1");
knn_expr->column_expr.__isset.column_name = true;
knn_expr->__isset.column_expr = true;
auto &q = knn_expr->embedding_data.f32_array_value;
q.reserve(dimension);
auto src_ptr = queries + query_idx * dimension;
for (int64_t i = 0; i < dimension; ++i) {
q.push_back(src_ptr[i]);
}
knn_expr.embedding_data.__isset.f32_array_value = true;
knn_expr.__isset.embedding_data = true;
knn_expr.embedding_data_type = ElementType::type::ElementFloat32;
knn_expr.__isset.embedding_data_type = true;
knn_expr.distance_type = KnnDistanceType::type::L2;
knn_expr.__isset.distance_type = true;
knn_expr.topn = topk;
knn_expr.__isset.topn = true;
knn_expr->embedding_data.__isset.f32_array_value = true;
knn_expr->__isset.embedding_data = true;
knn_expr->embedding_data_type = ElementType::type::ElementFloat32;
knn_expr->__isset.embedding_data_type = true;
knn_expr->distance_type = KnnDistanceType::type::L2;
knn_expr->__isset.distance_type = true;
knn_expr->topn = topk;
knn_expr->__isset.topn = true;
InitParameter init_param;
{
init_param.param_name = "ef";
init_param.__isset.param_name = true;
init_param.param_value = std::to_string(ef);
init_param.__isset.param_value = true;
}
knn_expr.opt_params.push_back(std::move(init_param));
knn_expr.__isset.opt_params = true;
knn_expr->opt_params.push_back(std::move(init_param));
knn_expr->__isset.opt_params = true;
}
req.search_expr.knn_exprs.push_back(std::move(knn_expr));
req.search_expr.__isset.knn_exprs = true;
GenericMatchExpr generic_match_expr;
generic_match_expr.__set_match_vector_expr(knn_expr);
req.search_expr.match_exprs.push_back(generic_match_expr);
req.search_expr.__isset.match_exprs = true;
req.__isset.search_expr = true;
}
client.client->Select(ret, req);
Expand Down
154 changes: 97 additions & 57 deletions python/infinity/local_infinity/query_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,16 @@
from infinity.local_infinity.utils import traverse_conditions, parse_expr
from infinity.table import ExplainType as BaseExplainType


class Query(ABC):
def __init__(self, columns: Optional[List[WrapParsedExpr]], search: Optional[WrapSearchExpr], filter: Optional[WrapParsedExpr],
limit: Optional[WrapParsedExpr], offset: Optional[WrapParsedExpr]):
def __init__(
self,
columns: Optional[List[WrapParsedExpr]],
search: Optional[WrapSearchExpr],
filter: Optional[WrapParsedExpr],
limit: Optional[WrapParsedExpr],
offset: Optional[WrapParsedExpr],
):
self.columns = columns
self.search = search
self.filter = filter
Expand All @@ -27,8 +34,15 @@ def __init__(self, columns: Optional[List[WrapParsedExpr]], search: Optional[Wra


class ExplainQuery(Query):
def __init__(self, columns: Optional[List[WrapParsedExpr]], search: Optional[WrapSearchExpr], filter: Optional[WrapParsedExpr],
limit: Optional[WrapParsedExpr], offset: Optional[WrapParsedExpr], explain_type: Optional[BaseExplainType]):
def __init__(
self,
columns: Optional[List[WrapParsedExpr]],
search: Optional[WrapSearchExpr],
filter: Optional[WrapParsedExpr],
limit: Optional[WrapParsedExpr],
offset: Optional[WrapParsedExpr],
explain_type: Optional[BaseExplainType],
):
super().__init__(columns, search, filter, limit, offset)
self.explain_type = explain_type

Expand All @@ -49,19 +63,27 @@ def reset(self):
self._limit = None
self._offset = None

def knn(self, vector_column_name: str, embedding_data: VEC, embedding_data_type: str, distance_type: str,
topn: int = DEFAULT_MATCH_VECTOR_TOPN, knn_params: {} = None) -> InfinityLocalQueryBuilder:
def knn(
self,
vector_column_name: str,
embedding_data: VEC,
embedding_data_type: str,
distance_type: str,
topn: int = DEFAULT_MATCH_VECTOR_TOPN,
knn_params: {} = None,
) -> InfinityLocalQueryBuilder:
if self._search is None:
self._search = WrapSearchExpr()
if self._search.knn_exprs is None or len(self._search.knn_exprs) == 0:
self._search.knn_exprs = []
self._search.match_exprs = []

column_expr = WrapColumnExpr()
column_expr.names = [vector_column_name]
column_expr.star = False

if not isinstance(topn, int):
raise InfinityException(3073, f"Invalid topn, type should be embedded, but get {type(topn)}")
raise InfinityException(
3073, f"Invalid topn, type should be embedded, but get {type(topn)}"
)

# type casting
if isinstance(embedding_data, list):
Expand All @@ -71,48 +93,53 @@ def knn(self, vector_column_name: str, embedding_data: VEC, embedding_data_type:
elif isinstance(embedding_data, np.ndarray):
embedding_data = embedding_data.tolist()
else:
raise InfinityException(3051, f"Invalid embedding data, type should be embedded, but get {type(embedding_data)}")

if (embedding_data_type == 'tinyint' or
embedding_data_type == 'smallint' or
embedding_data_type == 'int' or
embedding_data_type == 'bigint'):
raise InfinityException(
3051,
f"Invalid embedding data, type should be embedded, but get {type(embedding_data)}",
)

if (
embedding_data_type == "tinyint"
or embedding_data_type == "smallint"
or embedding_data_type == "int"
or embedding_data_type == "bigint"
):
embedding_data = [int(x) for x in embedding_data]

data = EmbeddingData()
elem_type = EmbeddingDataType.kElemFloat
if embedding_data_type == 'bit':
if embedding_data_type == "bit":
elem_type = EmbeddingDataType.kElemBit
raise Exception(f"Invalid embedding {embedding_data[0]} type")
elif embedding_data_type == 'tinyint':
elif embedding_data_type == "tinyint":
elem_type = EmbeddingDataType.kElemInt8
data.i8_array_value = embedding_data
elif embedding_data_type == 'smallint':
elif embedding_data_type == "smallint":
elem_type = EmbeddingDataType.kElemInt16
data.i16_array_value = embedding_data
elif embedding_data_type == 'int':
elif embedding_data_type == "int":
elem_type = EmbeddingDataType.kElemInt32
data.i32_array_value = embedding_data
elif embedding_data_type == 'bigint':
elif embedding_data_type == "bigint":
elem_type = EmbeddingDataType.kElemInt64
data.i64_array_value = embedding_data
elif embedding_data_type == 'float':
elif embedding_data_type == "float":
elem_type = EmbeddingDataType.kElemFloat
data.f32_array_value = embedding_data
elif embedding_data_type == 'double':
elif embedding_data_type == "double":
elem_type = EmbeddingDataType.kElemDouble
data.f64_array_value = embedding_data
else:
raise InfinityException(3057, f"Invalid embedding {embedding_data[0]} type")

dist_type = KnnDistanceType.kInvalid
if distance_type == 'l2':
if distance_type == "l2":
dist_type = KnnDistanceType.kL2
elif distance_type == 'cosine':
elif distance_type == "cosine":
dist_type = KnnDistanceType.kCosine
elif distance_type == 'ip':
elif distance_type == "ip":
dist_type = KnnDistanceType.kInnerProduct
elif distance_type == 'hamming':
elif distance_type == "hamming":
dist_type = KnnDistanceType.kHamming
else:
raise InfinityException(3056, f"Invalid distance type {distance_type}")
Expand All @@ -130,28 +157,32 @@ def knn(self, vector_column_name: str, embedding_data: VEC, embedding_data_type:
knn_expr.topn = topn
knn_expr.opt_params = knn_opt_params

# not work
# self._search.knn_exprs.append(knn_expr)

knn_exprs = [knn_expr]
self._search.knn_exprs = self._search.knn_exprs + knn_exprs

assert(len(self._search.knn_exprs) > 0)
generic_match_expr = WrapParsedExpr()
generic_match_expr.type = ParsedExprType.kKnn
generic_match_expr.knn_expr = knn_expr
self._search.match_exprs += [generic_match_expr]
return self

def match_sparse(self, vector_column_name: str, sparse_data: SPARSE, metric_type: str, topn: int, opt_params: {} = None) \
-> InfinityLocalQueryBuilder:
def match_sparse(
self,
vector_column_name: str,
sparse_data: SPARSE,
metric_type: str,
topn: int,
opt_params: {} = None,
) -> InfinityLocalQueryBuilder:
if self._search is None:
self._search = WrapSearchExpr()
if self._search.match_sparse_exprs is None:
self._search.match_sparse_exprs = list()
self._search.match_exprs = list()

column_expr = WrapColumnExpr()
column_expr.names = [vector_column_name]
column_expr.star = False

if not isinstance(topn, int):
raise InfinityException(3073, f"Invalid topn, type should be embedded, but get {type(topn)}")
raise InfinityException(
3073, f"Invalid topn, type should be embedded, but get {type(topn)}"
)

sparse_opt_params = []
if opt_params != None:
Expand All @@ -173,45 +204,50 @@ def match_sparse(self, vector_column_name: str, sparse_data: SPARSE, metric_type
sparse_expr.i64_array_idx = sparse_data["indices"]
sparse_expr.f64_array_value = sparse_data["values"]
else:
raise InfinityException(3058, f"Invalid sparse data {sparse_data['values'][0]} type")
raise InfinityException(
3058, f"Invalid sparse data {sparse_data['values'][0]} type"
)
match_sparse_expr.sparse_expr = sparse_expr

match_sparse_expr.metric_type = metric_type
match_sparse_expr.topn = topn
match_sparse_expr.opt_params = sparse_opt_params

match_sparse_exprs = [match_sparse_expr]
self._search.match_sparse_exprs = self._search.match_sparse_exprs + match_sparse_exprs
assert(len(self._search.match_sparse_exprs) > 0)

generic_match_expr = WrapParsedExpr()
generic_match_expr.type = ParsedExprType.kMatchSparse
generic_match_expr.match_sparse_expr = match_sparse_expr
self._search.match_exprs += [generic_match_expr]
return self

def match(self, fields: str, matching_text: str, options_text: str = '') -> InfinityLocalQueryBuilder:
def match(
self, fields: str, matching_text: str, options_text: str = ""
) -> InfinityLocalQueryBuilder:
if self._search is None:
self._search = WrapSearchExpr()
if self._search.match_exprs is None:
self._search.match_exprs = list()
match_expr = WrapMatchExpr()
match_expr.fields = fields
match_expr.matching_text = matching_text
match_expr.options_text = options_text

match_exprs = [match_expr]
self._search.match_exprs = self._search.match_exprs + match_exprs
assert(len(self._search.match_exprs) > 0)

# self._search.match_exprs.append(match_expr)
generic_match_expr = WrapParsedExpr()
generic_match_expr.type = ParsedExprType.kMatch
generic_match_expr.match_expr = match_expr
self._search.match_exprs += [generic_match_expr]
return self

def fusion(self, method: str, options_text: str = '') -> InfinityLocalQueryBuilder:
def fusion(self, method: str, options_text: str = "") -> InfinityLocalQueryBuilder:
if self._search is None:
self._search = WrapSearchExpr()
fusion_expr = WrapFusionExpr()
fusion_expr.method = method
fusion_expr.options_text = options_text
# self._search.fusion_exprs.append(fusion_expr)
fusion_exprs = [fusion_expr]
self._search.fusion_exprs = self._search.fusion_exprs + fusion_exprs
assert(len(self._search.fusion_exprs) > 0)
# *WARN*: A list in nanobind wrapped object is odd that append() does nothing!
# However `list add list` still works.
# self._search.fusion_exprs.append(fusion_expr)
assert len(self._search.fusion_exprs) > 0
return self

def filter(self, where: Optional[str]) -> InfinityLocalQueryBuilder:
Expand All @@ -220,14 +256,18 @@ def filter(self, where: Optional[str]) -> InfinityLocalQueryBuilder:
return self

def limit(self, limit: Optional[int]) -> InfinityLocalQueryBuilder:
constant_exp = WrapConstantExpr(literal_type=LiteralType.kInteger, i64_value=limit)
constant_exp = WrapConstantExpr(
literal_type=LiteralType.kInteger, i64_value=limit
)
limit_expr = WrapParsedExpr(ParsedExprType.kConstant)
limit_expr.constant_expr = constant_exp
self._limit = limit_expr
return self

def offset(self, offset: Optional[int]) -> InfinityLocalQueryBuilder:
constant_exp = WrapConstantExpr(literal_type=LiteralType.kInteger, i64_value=offset)
constant_exp = WrapConstantExpr(
literal_type=LiteralType.kInteger, i64_value=offset
)
offset_expr = WrapParsedExpr(ParsedExprType.kConstant)
offset_expr.constant_expr = constant_exp
self._offset = offset_expr
Expand Down Expand Up @@ -304,7 +344,7 @@ def to_result(self):
search=self._search,
filter=self._filter,
limit=self._limit,
offset=self._offset
offset=self._offset,
)
self.reset()
return self._table._execute_query(query)
Expand All @@ -330,6 +370,6 @@ def explain(self, explain_type=ExplainType.kPhysical) -> Any:
filter=self._filter,
limit=self._limit,
offset=self._offset,
explain_type=explain_type
explain_type=explain_type,
)
return self._table._explain_query(query)
Loading

0 comments on commit af9a64a

Please sign in to comment.