Skip to content

Commit

Permalink
Add limit to proto (milvus-io#18950)
Browse files Browse the repository at this point in the history
See also: milvus-io#18893

Signed-off-by: yangxuan <xuan.yang@zilliz.com>

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
  • Loading branch information
XuanYang-cn authored Sep 1, 2022
1 parent 6d37b1d commit c054873
Show file tree
Hide file tree
Showing 7 changed files with 592 additions and 476 deletions.
1 change: 1 addition & 0 deletions internal/proto/internal.proto
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ message RetrieveRequest {
uint64 travel_timestamp = 8;
uint64 guarantee_timestamp = 9;
uint64 timeout_timestamp = 10;
int64 limit = 11; // Optional
}

message RetrieveResults {
Expand Down
286 changes: 147 additions & 139 deletions internal/proto/internalpb/internal.pb.go

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions internal/proto/milvus.proto
Original file line number Diff line number Diff line change
Expand Up @@ -758,6 +758,7 @@ message QueryRequest {
repeated string partition_names = 6;
uint64 travel_timestamp = 7;
uint64 guarantee_timestamp = 8; // guarantee_timestamp
repeated common.KeyValuePair query_params = 9; // optional
}

message QueryResults {
Expand Down
684 changes: 347 additions & 337 deletions internal/proto/milvuspb/milvus.pb.go

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions internal/proxy/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ const (
SearchParamsKey = "params"
RoundDecimalKey = "round_decimal"
OffsetKey = "offset"
LimitKey = "limit"

InsertTaskName = "InsertTask"
CreateCollectionTaskName = "CreateCollectionTask"
Expand Down
49 changes: 49 additions & 0 deletions internal/proxy/task_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type queryTask struct {
qc types.QueryCoord
ids *schemapb.IDs
collectionName string
queryParams *queryParams

resultBuf chan *internalpb.RetrieveResults
toReduceResults []*internalpb.RetrieveResults
Expand All @@ -52,6 +53,11 @@ type queryTask struct {
shardMgr *shardClientMgr
}

type queryParams struct {
limit int64
offset int64
}

// translateOutputFields translates output fields name to output fields id.
func translateToOutputFieldIDs(outputFields []string, schema *schemapb.CollectionSchema) ([]UniqueID, error) {
outputFieldIDs := make([]UniqueID, 0, len(outputFields)+1)
Expand Down Expand Up @@ -99,6 +105,42 @@ func translateToOutputFieldIDs(outputFields []string, schema *schemapb.Collectio
return outputFieldIDs, nil
}

// parseQueryParams get limit and offset from queryParamsPair, both are optional.
func parseQueryParams(queryParamsPair []*commonpb.KeyValuePair) (*queryParams, error) {
var (
limit int64
offset int64
err error
)

// if limit is provided
limitStr, err := funcutil.GetAttrByKeyFromRepeatedKV(LimitKey, queryParamsPair)
if err != nil {
return &queryParams{}, nil
}
limit, err = strconv.ParseInt(limitStr, 0, 64)
if err != nil || limit <= 0 {
return nil, fmt.Errorf("%s [%s] is invalid", LimitKey, limitStr)
}

// if offset is provided
if offsetStr, err := funcutil.GetAttrByKeyFromRepeatedKV(OffsetKey, queryParamsPair); err == nil {
offset, err = strconv.ParseInt(offsetStr, 0, 64)
if err != nil || offset < 0 {
return nil, fmt.Errorf("%s [%s] is invalid", OffsetKey, offsetStr)
}
}

if err = validateTopK(limit + offset); err != nil {
return nil, fmt.Errorf("invalid limit[%d] + offset[%d], %w", limit, offset, err)
}

return &queryParams{
limit: limit,
offset: offset,
}, nil
}

func (t *queryTask) PreExecute(ctx context.Context) error {
if t.queryShardPolicy == nil {
t.queryShardPolicy = mergeRoundRobinPolicy
Expand Down Expand Up @@ -150,6 +192,13 @@ func (t *queryTask) PreExecute(ctx context.Context) error {
log.Ctx(ctx).Debug("Get partitions in collection.", zap.Any("collectionName", collectionName),
zap.Int64("msgID", t.ID()), zap.Any("requestType", "query"))

queryParams, err := parseQueryParams(t.request.GetQueryParams())
if err != nil {
return err
}
t.queryParams = queryParams
t.RetrieveRequest.Limit = queryParams.limit + queryParams.offset

loaded, err := checkIfLoaded(ctx, t.qc, collectionName, t.RetrieveRequest.GetPartitionIDs())
if err != nil {
return fmt.Errorf("checkIfLoaded failed when query, collection:%v, partitions:%v, err = %s", collectionName, t.request.GetPartitionNames(), err)
Expand Down
46 changes: 46 additions & 0 deletions internal/proxy/task_query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,3 +338,49 @@ func Test_translateToOutputFieldIDs(t *testing.T) {
})
}
}

func TestTaskQuery_functions(t *testing.T) {
t.Run("test parseQueryParams", func(t *testing.T) {
tests := []struct {
description string

inKey []string
inValue []string

expectErr bool
outLimit int64
outOffset int64
}{
{"empty input", []string{}, []string{}, false, 0, 0},
{"valid limit=1", []string{LimitKey}, []string{"1"}, false, 1, 0},
{"valid limit=1, offset=2", []string{LimitKey, OffsetKey}, []string{"1", "2"}, false, 1, 2},
{"valid no limit, offset=2", []string{OffsetKey}, []string{"2"}, false, 0, 0},
{"invalid limit str", []string{LimitKey}, []string{"a"}, true, 0, 0},
{"invalid limit zero", []string{LimitKey}, []string{"0"}, true, 0, 0},
{"invalid offset negative", []string{LimitKey, OffsetKey}, []string{"1", "-1"}, true, 0, 0},
{"invalid limit=16384 offset=16384", []string{LimitKey, OffsetKey}, []string{"16384", "16384"}, true, 0, 0},
}

for _, test := range tests {
t.Run(test.description, func(t *testing.T) {
var inParams []*commonpb.KeyValuePair
for i := range test.inKey {
inParams = append(inParams, &commonpb.KeyValuePair{
Key: test.inKey[i],
Value: test.inValue[i],
})

}
ret, err := parseQueryParams(inParams)
if test.expectErr {
assert.Error(t, err)
assert.Empty(t, ret)
} else {
assert.NoError(t, err)
assert.Equal(t, test.outLimit, ret.limit)
assert.Equal(t, test.outOffset, ret.offset)
}
})
}
})
}

0 comments on commit c054873

Please sign in to comment.