Skip to content

Commit

Permalink
Add merger for retrieve results (#18948)
Browse files Browse the repository at this point in the history
See also: #18893

Signed-off-by: yangxuan <xuan.yang@zilliz.com>

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
  • Loading branch information
XuanYang-cn authored Sep 15, 2022
1 parent 626854c commit ac1982c
Show file tree
Hide file tree
Showing 2 changed files with 442 additions and 25 deletions.
160 changes: 160 additions & 0 deletions internal/querynode/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ import (
"github.com/milvus-io/milvus/internal/util/typeutil"
)

const (
unlimited int = -1
)

func reduceStatisticResponse(results []*internalpb.GetStatisticsResponse) (*internalpb.GetStatisticsResponse, error) {
mergedResults := map[string]interface{}{
"row_count": int64(0),
Expand Down Expand Up @@ -234,6 +238,63 @@ func encodeSearchResultData(searchResultData *schemapb.SearchResultData, nq int6
return
}

func mergeInternalRetrieveResultsV2(ctx context.Context, retrieveResults []*internalpb.RetrieveResults, limit int) (*internalpb.RetrieveResults, error) {
log.Ctx(ctx).Debug("reduceInternelRetrieveResults", zap.Int("len(retrieveResults)", len(retrieveResults)))
var (
ret = &internalpb.RetrieveResults{
Ids: &schemapb.IDs{},
}

skipDupCnt int64
loopEnd int
)

validRetrieveResults := []*internalpb.RetrieveResults{}
for _, r := range retrieveResults {
size := typeutil.GetSizeOfIDs(r.GetIds())
if r == nil || len(r.GetFieldsData()) == 0 || size == 0 {
continue
}
validRetrieveResults = append(validRetrieveResults, r)
loopEnd += size
}

if len(validRetrieveResults) == 0 {
return ret, nil
}

if limit != unlimited {
loopEnd = limit
}

ret.FieldsData = make([]*schemapb.FieldData, len(validRetrieveResults[0].GetFieldsData()))
idSet := make(map[interface{}]struct{})
cursors := make([]int64, len(validRetrieveResults))
for j := 0; j < loopEnd; j++ {
sel := selectMinPK(validRetrieveResults, cursors)
if sel == -1 {
break
}

pk := typeutil.GetPK(validRetrieveResults[sel].GetIds(), cursors[sel])
if _, ok := idSet[pk]; !ok {
typeutil.AppendPKs(ret.Ids, pk)
typeutil.AppendFieldData(ret.FieldsData, validRetrieveResults[sel].GetFieldsData(), cursors[sel])
idSet[pk] = struct{}{}
} else {
// primary keys duplicate
skipDupCnt++
}
cursors[sel]++
}

if skipDupCnt > 0 {
log.Ctx(ctx).Debug("skip duplicated query result while reducing internal.RetrieveResults", zap.Int64("count", skipDupCnt))
}

return ret, nil
}

// TODO: largely based on function mergeSegcoreRetrieveResults, need rewriting
func mergeInternalRetrieveResults(ctx context.Context, retrieveResults []*internalpb.RetrieveResults) (*internalpb.RetrieveResults, error) {
var ret *internalpb.RetrieveResults
Expand Down Expand Up @@ -284,6 +345,105 @@ func mergeInternalRetrieveResults(ctx context.Context, retrieveResults []*intern
return ret, nil
}

func mergeSegcoreRetrieveResultsV2(ctx context.Context, retrieveResults []*segcorepb.RetrieveResults, limit int) (*segcorepb.RetrieveResults, error) {
log.Ctx(ctx).Debug("reduceSegcoreRetrieveResults", zap.Int("len(retrieveResults)", len(retrieveResults)))
var (
ret = &segcorepb.RetrieveResults{
Ids: &schemapb.IDs{},
}

skipDupCnt int64
loopEnd int
)

validRetrieveResults := []*segcorepb.RetrieveResults{}
for _, r := range retrieveResults {
size := typeutil.GetSizeOfIDs(r.GetIds())
if r == nil || len(r.GetOffset()) == 0 || size == 0 {
continue
}
validRetrieveResults = append(validRetrieveResults, r)
loopEnd += size
}

if len(validRetrieveResults) == 0 {
return ret, nil
}

if limit != unlimited {
loopEnd = limit
}

ret.FieldsData = make([]*schemapb.FieldData, len(validRetrieveResults[0].GetFieldsData()))
idSet := make(map[interface{}]struct{})
cursors := make([]int64, len(validRetrieveResults))
for j := 0; j < loopEnd; j++ {
sel := selectMinPK(validRetrieveResults, cursors)
if sel == -1 {
break
}

pk := typeutil.GetPK(validRetrieveResults[sel].GetIds(), cursors[sel])
if _, ok := idSet[pk]; !ok {
typeutil.AppendPKs(ret.Ids, pk)
typeutil.AppendFieldData(ret.FieldsData, validRetrieveResults[sel].GetFieldsData(), cursors[sel])
idSet[pk] = struct{}{}
} else {
// primary keys duplicate
skipDupCnt++
}
cursors[sel]++
}

if skipDupCnt > 0 {
log.Ctx(ctx).Debug("skip duplicated query result while reducing segcore.RetrieveResults", zap.Int64("count", skipDupCnt))
}

return ret, nil
}

type ResultWithID interface {
GetIds() *schemapb.IDs
}

var _ ResultWithID = &internalpb.RetrieveResults{}
var _ ResultWithID = &segcorepb.RetrieveResults{}

func selectMinPK[T ResultWithID](results []T, cursors []int64) int {
var (
sel = -1
minIntPK int64 = math.MaxInt64

firstStr = true
minStrPK = ""
)

for i, cursor := range cursors {
if int(cursor) >= typeutil.GetSizeOfIDs(results[i].GetIds()) {
continue
}

pkInterface := typeutil.GetPK(results[i].GetIds(), cursor)
switch pk := pkInterface.(type) {
case string:
if firstStr || pk < minStrPK {
firstStr = false
minStrPK = pk
sel = i
}
case int64:
if pk < minIntPK {
minIntPK = pk
sel = i
}
default:
continue
}
}

return sel
}

func mergeSegcoreRetrieveResults(ctx context.Context, retrieveResults []*segcorepb.RetrieveResults) (*segcorepb.RetrieveResults, error) {

var (
Expand Down
Loading

0 comments on commit ac1982c

Please sign in to comment.