Skip to content

Commit

Permalink
Add consistency_level paramter in search/query request (milvus-io#24499)
Browse files Browse the repository at this point in the history
Signed-off-by: zhenshan.cao <zhenshan.cao@zilliz.com>
  • Loading branch information
czs007 authored May 30, 2023
1 parent 5958b42 commit f792472
Show file tree
Hide file tree
Showing 14 changed files with 194 additions and 31 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ require (
github.com/golang/protobuf v1.5.3
github.com/klauspost/compress v1.14.4
github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d
github.com/milvus-io/milvus-proto/go-api v0.0.0-20230522080721-ef84459b8f87
github.com/milvus-io/milvus-proto/go-api v0.0.0-20230529034923-4579ee9d5723
github.com/milvus-io/milvus/pkg v0.0.0-00010101000000-000000000000
github.com/minio/minio-go/v7 v7.0.17
github.com/panjf2000/ants/v2 v2.7.2
Expand Down
12 changes: 2 additions & 10 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -579,16 +579,8 @@ github.com/microcosm-cc/bluemonday v1.0.2/go.mod h1:iVP4YcDBq+n/5fb23BhYFvIMq/le
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4=
github.com/milvus-io/milvus-proto/go-api v0.0.0-20230421091228-eaa38c831a61 h1:EX2oknwzltpw6yZ7QZEphVHM3YTysjdbb5keleFjs3M=
github.com/milvus-io/milvus-proto/go-api v0.0.0-20230421091228-eaa38c831a61/go.mod h1:148qnlmZ0Fdm1Fq+Mj/OW2uDoEP25g3mjh0vMGtkgmk=
github.com/milvus-io/milvus-proto/go-api v0.0.0-20230515081521-d963c95b041f h1:uZzVaSbUtxMdEix9By6z+M/H/XNkXRQJdZQ9HP/wHtc=
github.com/milvus-io/milvus-proto/go-api v0.0.0-20230515081521-d963c95b041f/go.mod h1:148qnlmZ0Fdm1Fq+Mj/OW2uDoEP25g3mjh0vMGtkgmk=
github.com/milvus-io/milvus-proto/go-api v0.0.0-20230517025117-8ba62a3f3a63 h1:V962mSHjOFUbuMgAXziBdbYPOCVZmN1MkqEeKpME+MA=
github.com/milvus-io/milvus-proto/go-api v0.0.0-20230517025117-8ba62a3f3a63/go.mod h1:148qnlmZ0Fdm1Fq+Mj/OW2uDoEP25g3mjh0vMGtkgmk=
github.com/milvus-io/milvus-proto/go-api v0.0.0-20230518083323-3400e837ef47 h1:Dp5AAbOSTq31QLatGXamBMk/o670MkbRi8NoW17ypew=
github.com/milvus-io/milvus-proto/go-api v0.0.0-20230518083323-3400e837ef47/go.mod h1:148qnlmZ0Fdm1Fq+Mj/OW2uDoEP25g3mjh0vMGtkgmk=
github.com/milvus-io/milvus-proto/go-api v0.0.0-20230522080721-ef84459b8f87 h1:LdDHjEjus1NdC9ELbpQa6DfUHJotJUW2kD4S+8nvjw4=
github.com/milvus-io/milvus-proto/go-api v0.0.0-20230522080721-ef84459b8f87/go.mod h1:148qnlmZ0Fdm1Fq+Mj/OW2uDoEP25g3mjh0vMGtkgmk=
github.com/milvus-io/milvus-proto/go-api v0.0.0-20230529034923-4579ee9d5723 h1:VWwQdHN1JuM/Q+9QK1bOyTpEqPHTdAKw5qOK0Lgua/c=
github.com/milvus-io/milvus-proto/go-api v0.0.0-20230529034923-4579ee9d5723/go.mod h1:148qnlmZ0Fdm1Fq+Mj/OW2uDoEP25g3mjh0vMGtkgmk=
github.com/milvus-io/pulsar-client-go v0.6.10 h1:eqpJjU+/QX0iIhEo3nhOqMNXL+TyInAs1IAHZCrCM/A=
github.com/milvus-io/pulsar-client-go v0.6.10/go.mod h1:lQqCkgwDF8YFYjKA+zOheTk1tev2B+bKj5j7+nm8M1w=
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs=
Expand Down
3 changes: 3 additions & 0 deletions internal/proxy/meta_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ type collectionInfo struct {
shardLeaders *shardLeaders
createdTimestamp uint64
createdUtcTimestamp uint64
consistencyLevel commonpb.ConsistencyLevel
}

func (info *collectionInfo) isCollectionCached() bool {
Expand Down Expand Up @@ -337,6 +338,7 @@ func (m *MetaCache) updateCollection(coll *milvuspb.DescribeCollectionResponse,
m.collInfo[collectionName].collID = coll.CollectionID
m.collInfo[collectionName].createdTimestamp = coll.CreatedTimestamp
m.collInfo[collectionName].createdUtcTimestamp = coll.CreatedUtcTimestamp
m.collInfo[collectionName].consistencyLevel = coll.ConsistencyLevel
}

func (m *MetaCache) GetPartitionID(ctx context.Context, collectionName string, partitionName string) (typeutil.UniqueID, error) {
Expand Down Expand Up @@ -477,6 +479,7 @@ func (m *MetaCache) describeCollection(ctx context.Context, collectionName strin
PhysicalChannelNames: coll.PhysicalChannelNames,
CreatedTimestamp: coll.CreatedTimestamp,
CreatedUtcTimestamp: coll.CreatedUtcTimestamp,
ConsistencyLevel: coll.ConsistencyLevel,
}
for _, field := range coll.Schema.Fields {
if field.FieldID >= common.StartOfUserFieldID {
Expand Down
39 changes: 33 additions & 6 deletions internal/proxy/task_search.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,10 +276,16 @@ func (t *searchTask) PreExecute(ctx context.Context) error {

if t.request.GetDslType() == commonpb.DslType_BoolExprV1 {
annsField, err := funcutil.GetAttrByKeyFromRepeatedKV(AnnsFieldKey, t.request.GetSearchParams())
if err != nil {
return errors.New(AnnsFieldKey + " not found in search_params")
if err != nil || len(annsField) == 0 {
if enableMultipleVectorFields {
return errors.New(AnnsFieldKey + " not found in search_params")
}
vecFieldSchema, err2 := typeutil.GetVectorFieldSchema(t.schema)
if err2 != nil {
return errors.New(AnnsFieldKey + " not found in schema")
}
annsField = vecFieldSchema.Name
}

queryInfo, offset, err := parseSearchInfo(t.request.GetSearchParams())
if err != nil {
return err
Expand Down Expand Up @@ -330,10 +336,29 @@ func (t *searchTask) PreExecute(ctx context.Context) error {
if err != nil {
return err
}
t.SearchRequest.TravelTimestamp = travelTimestamp

collectionInfo, err2 := globalMetaCache.GetCollectionInfo(ctx, collectionName)
if err2 != nil {
log.Ctx(ctx).Debug("Proxy::searchTask::PreExecute failed to GetCollectionInfo from cache",
zap.Any("collectionName", collectionName), zap.Error(err2))
return err2
}
guaranteeTs := t.request.GetGuaranteeTimestamp()
guaranteeTs = parseGuaranteeTs(guaranteeTs, t.BeginTs())
t.SearchRequest.TravelTimestamp = travelTimestamp
var consistencyLevel commonpb.ConsistencyLevel
useDefaultConsistency := t.request.GetUseDefaultConsistency()
if useDefaultConsistency {
consistencyLevel = collectionInfo.consistencyLevel
guaranteeTs = parseGuaranteeTsFromConsistency(guaranteeTs, t.BeginTs(), consistencyLevel)
} else {
consistencyLevel = t.request.GetConsistencyLevel()
//Compatibility logic, parse guarantee timestamp
if consistencyLevel == 0 && guaranteeTs > 0 {
guaranteeTs = parseGuaranteeTs(guaranteeTs, t.BeginTs())
} else {
// parse from guarantee timestamp and user input consistency level
guaranteeTs = parseGuaranteeTsFromConsistency(guaranteeTs, t.BeginTs(), consistencyLevel)
}
}
t.SearchRequest.GuaranteeTimestamp = guaranteeTs

if deadline, ok := t.TraceCtx().Deadline(); ok {
Expand All @@ -345,6 +370,8 @@ func (t *searchTask) PreExecute(ctx context.Context) error {

log.Ctx(ctx).Debug("search PreExecute done.",
zap.Uint64("travel_ts", travelTimestamp), zap.Uint64("guarantee_ts", guaranteeTs),
zap.Bool("use_default_consistency", useDefaultConsistency),
zap.Any("consistency level", consistencyLevel),
zap.Uint64("timeout_ts", t.SearchRequest.GetTimeoutTimestamp()))

return nil
Expand Down
13 changes: 13 additions & 0 deletions internal/proxy/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,19 @@ func ReplaceID2Name(oldStr string, id int64, name string) string {
return strings.ReplaceAll(oldStr, strconv.FormatInt(id, 10), name)
}

func parseGuaranteeTsFromConsistency(ts, tMax typeutil.Timestamp, consistency commonpb.ConsistencyLevel) typeutil.Timestamp {
switch consistency {
case commonpb.ConsistencyLevel_Strong:
ts = tMax
case commonpb.ConsistencyLevel_Bounded:
ratio := Params.CommonCfg.GracefulTime.GetAsDuration(time.Millisecond)
ts = tsoutil.AddPhysicalDurationOnTs(tMax, -ratio)
case commonpb.ConsistencyLevel_Eventually:
ts = 1
}
return ts
}

func parseGuaranteeTs(ts, tMax typeutil.Timestamp) typeutil.Timestamp {
switch ts {
case strongTS:
Expand Down
32 changes: 32 additions & 0 deletions internal/proxy/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1703,3 +1703,35 @@ func Test_UpsertTaskCheckPrimaryFieldData(t *testing.T) {
_, err = checkPrimaryFieldData(case6.schema, case6.result, case6.insertMsg, false)
assert.NotEqual(t, nil, err)
}

func Test_ParseGuaranteeTs(t *testing.T) {
strongTs := typeutil.Timestamp(0)
boundedTs := typeutil.Timestamp(2)
tsNow := tsoutil.GetCurrentTime()
tsMax := tsoutil.GetCurrentTime()

assert.Equal(t, tsMax, parseGuaranteeTs(strongTs, tsMax))
ratio := Params.CommonCfg.GracefulTime.GetAsDuration(time.Millisecond)
assert.Equal(t, tsoutil.AddPhysicalDurationOnTs(tsMax, -ratio), parseGuaranteeTs(boundedTs, tsMax))
assert.Equal(t, tsNow, parseGuaranteeTs(tsNow, tsMax))
}

func Test_ParseGuaranteeTsFromConsistency(t *testing.T) {
strong := commonpb.ConsistencyLevel_Strong
bounded := commonpb.ConsistencyLevel_Bounded
eventually := commonpb.ConsistencyLevel_Eventually
session := commonpb.ConsistencyLevel_Session
customized := commonpb.ConsistencyLevel_Customized

tsDefault := typeutil.Timestamp(0)
tsEventually := typeutil.Timestamp(1)
tsNow := tsoutil.GetCurrentTime()
tsMax := tsoutil.GetCurrentTime()

assert.Equal(t, tsMax, parseGuaranteeTsFromConsistency(tsDefault, tsMax, strong))
ratio := Params.CommonCfg.GracefulTime.GetAsDuration(time.Millisecond)
assert.Equal(t, tsoutil.AddPhysicalDurationOnTs(tsMax, -ratio), parseGuaranteeTsFromConsistency(tsDefault, tsMax, bounded))
assert.Equal(t, tsNow, parseGuaranteeTsFromConsistency(tsNow, tsMax, session))
assert.Equal(t, tsNow, parseGuaranteeTsFromConsistency(tsNow, tsMax, customized))
assert.Equal(t, tsEventually, parseGuaranteeTsFromConsistency(tsDefault, tsMax, eventually))
}
2 changes: 1 addition & 1 deletion pkg/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/klauspost/compress v1.14.4
github.com/lingdor/stackerror v0.0.0-20191119040541-976d8885ed76
github.com/milvus-io/milvus-proto/go-api v0.0.0-20230522080721-ef84459b8f87
github.com/milvus-io/milvus-proto/go-api v0.0.0-20230529034923-4579ee9d5723
github.com/panjf2000/ants/v2 v2.4.8
github.com/prometheus/client_golang v1.11.1
github.com/samber/lo v1.27.0
Expand Down
12 changes: 2 additions & 10 deletions pkg/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -465,16 +465,8 @@ github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5
github.com/mediocregopher/radix/v3 v3.4.2/go.mod h1:8FL3F6UQRXHXIBSPUs5h0RybMF8i4n7wVopoX3x7Bv8=
github.com/microcosm-cc/bluemonday v1.0.2/go.mod h1:iVP4YcDBq+n/5fb23BhYFvIMq/leAFZyRl6bYmGDlGc=
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
github.com/milvus-io/milvus-proto/go-api v0.0.0-20230416064425-aec3e83865b2 h1:G5uN68X/7eoCfHUkNvkbNueFhHuohCZG94te+ApLAOY=
github.com/milvus-io/milvus-proto/go-api v0.0.0-20230416064425-aec3e83865b2/go.mod h1:148qnlmZ0Fdm1Fq+Mj/OW2uDoEP25g3mjh0vMGtkgmk=
github.com/milvus-io/milvus-proto/go-api v0.0.0-20230515081521-d963c95b041f h1:uZzVaSbUtxMdEix9By6z+M/H/XNkXRQJdZQ9HP/wHtc=
github.com/milvus-io/milvus-proto/go-api v0.0.0-20230515081521-d963c95b041f/go.mod h1:148qnlmZ0Fdm1Fq+Mj/OW2uDoEP25g3mjh0vMGtkgmk=
github.com/milvus-io/milvus-proto/go-api v0.0.0-20230517025117-8ba62a3f3a63 h1:V962mSHjOFUbuMgAXziBdbYPOCVZmN1MkqEeKpME+MA=
github.com/milvus-io/milvus-proto/go-api v0.0.0-20230517025117-8ba62a3f3a63/go.mod h1:148qnlmZ0Fdm1Fq+Mj/OW2uDoEP25g3mjh0vMGtkgmk=
github.com/milvus-io/milvus-proto/go-api v0.0.0-20230518083323-3400e837ef47 h1:Dp5AAbOSTq31QLatGXamBMk/o670MkbRi8NoW17ypew=
github.com/milvus-io/milvus-proto/go-api v0.0.0-20230518083323-3400e837ef47/go.mod h1:148qnlmZ0Fdm1Fq+Mj/OW2uDoEP25g3mjh0vMGtkgmk=
github.com/milvus-io/milvus-proto/go-api v0.0.0-20230522080721-ef84459b8f87 h1:LdDHjEjus1NdC9ELbpQa6DfUHJotJUW2kD4S+8nvjw4=
github.com/milvus-io/milvus-proto/go-api v0.0.0-20230522080721-ef84459b8f87/go.mod h1:148qnlmZ0Fdm1Fq+Mj/OW2uDoEP25g3mjh0vMGtkgmk=
github.com/milvus-io/milvus-proto/go-api v0.0.0-20230529034923-4579ee9d5723 h1:VWwQdHN1JuM/Q+9QK1bOyTpEqPHTdAKw5qOK0Lgua/c=
github.com/milvus-io/milvus-proto/go-api v0.0.0-20230529034923-4579ee9d5723/go.mod h1:148qnlmZ0Fdm1Fq+Mj/OW2uDoEP25g3mjh0vMGtkgmk=
github.com/milvus-io/pulsar-client-go v0.6.10 h1:eqpJjU+/QX0iIhEo3nhOqMNXL+TyInAs1IAHZCrCM/A=
github.com/milvus-io/pulsar-client-go v0.6.10/go.mod h1:lQqCkgwDF8YFYjKA+zOheTk1tev2B+bKj5j7+nm8M1w=
github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc=
Expand Down
10 changes: 10 additions & 0 deletions pkg/util/typeutil/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,16 @@ func MergeFieldData(dst []*schemapb.FieldData, src []*schemapb.FieldData) error
return nil
}

// GetVectorFieldSchema get vector field schema from collection schema.
func GetVectorFieldSchema(schema *schemapb.CollectionSchema) (*schemapb.FieldSchema, error) {
for _, fieldSchema := range schema.Fields {
if IsVectorType(fieldSchema.DataType) {
return fieldSchema, nil
}
}
return nil, errors.New("vector field is not found")
}

// GetPrimaryFieldSchema get primary field schema from collection schema
func GetPrimaryFieldSchema(schema *schemapb.CollectionSchema) (*schemapb.FieldSchema, error) {
for _, fieldSchema := range schema.Fields {
Expand Down
58 changes: 58 additions & 0 deletions pkg/util/typeutil/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,64 @@ func TestSchema(t *testing.T) {
})
}

func TestSchema_GetVectorFieldSchema(t *testing.T) {

schemaNormal := &schemapb.CollectionSchema{
Name: "testColl",
Description: "",
AutoID: false,
Fields: []*schemapb.FieldSchema{
{
FieldID: 100,
Name: "field_int64",
IsPrimaryKey: true,
Description: "",
DataType: 5,
},
{
FieldID: 107,
Name: "field_float_vector",
IsPrimaryKey: false,
Description: "",
DataType: 101,
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.DimKey,
Value: "128",
},
},
},
},
}

t.Run("GetVectorFieldSchema", func(t *testing.T) {
fieldSchema, err := GetVectorFieldSchema(schemaNormal)
assert.Equal(t, "field_float_vector", fieldSchema.Name)
assert.Nil(t, err)
})

schemaInvalid := &schemapb.CollectionSchema{
Name: "testColl",
Description: "",
AutoID: false,
Fields: []*schemapb.FieldSchema{
{
FieldID: 100,
Name: "field_int64",
IsPrimaryKey: true,
Description: "",
DataType: 5,
},
},
}

t.Run("GetVectorFieldSchemaInvalid", func(t *testing.T) {
_, err := GetVectorFieldSchema(schemaInvalid)
assert.Error(t, err)
})

}

func TestSchema_invalid(t *testing.T) {
t.Run("Duplicate field name", func(t *testing.T) {
schema := &schemapb.CollectionSchema{
Expand Down
2 changes: 1 addition & 1 deletion scripts/run_go_unittest.sh
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ function test_util()
{
go test -race -cover ${APPLE_SILICON_FLAG} "${MILVUS_DIR}/util/funcutil/..." -failfast -count=1
go test -race -cover ${APPLE_SILICON_FLAG} "${MILVUS_DIR}/util/paramtable/..." -failfast -count=1
go test -race -cover ${APPLE_SILICON_FLAG} "${MILVUS_DIR}/util/retry/..." -failfast -count=1
go test -race -cover ${APPLE_SILICON_FLAG} "${PKG_DIR}/util/retry/..." -failfast -count=1
go test -race -cover ${APPLE_SILICON_FLAG} "${MILVUS_DIR}/util/sessionutil/..." -failfast -count=1
go test -race -cover ${APPLE_SILICON_FLAG} "${MILVUS_DIR}/util/typeutil/..." -failfast -count=1
go test -race -cover ${APPLE_SILICON_FLAG} "${MILVUS_DIR}/util/importutil/..." -failfast -count=1
Expand Down
2 changes: 1 addition & 1 deletion tests/python_client/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ allure-pytest==2.7.0
pytest-print==0.2.1
pytest-level==0.1.1
pytest-xdist==2.5.0
pymilvus==2.4.0.dev27
pymilvus==2.4.0.dev59
pytest-rerunfailures==9.1.1
git+https://github.com/Projectplace/pytest-tags
ndg-httpsclient
Expand Down
2 changes: 1 addition & 1 deletion tests/python_client/testcases/test_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -630,7 +630,7 @@ def test_collection_multi_primary_fields(self):
self._connect()
int_field_one = cf.gen_int64_field(is_primary=True)
int_field_two = cf.gen_int64_field(name="int2", is_primary=True)
error = {ct.err_code: 0, ct.err_msg: "Primary key field can only be one."}
error = {ct.err_code: 0, ct.err_msg: "Expected only one primary key field"}
self.collection_schema_wrap.init_collection_schema(
fields=[int_field_one, int_field_two, cf.gen_float_vec_field()],
check_task=CheckTasks.err_res, check_items=error)
Expand Down
36 changes: 36 additions & 0 deletions tests/python_client/testcases/test_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -1223,6 +1223,42 @@ def test_search_normal(self, nq, dim, auto_id, is_flush):
"ids": insert_ids,
"limit": default_limit})

@pytest.mark.tags(CaseLabel.L0)
def test_search_normal_without_specify_anns_field(self):
"""
target: test search normal case
method: create connection, collection, insert and search
expected: 1. search returned with 0 before travel timestamp
2. search successfully with limit(topK) after travel timestamp
"""
nq = 2
dim = 32
auto_id = True
# 1. initialize with data
collection_w, _, _, insert_ids, time_stamp = \
self.init_collection_general(prefix, True, auto_id=auto_id, dim=dim, is_flush=True)[0:5]
# 2. search before insert time_stamp
log.info("test_search_normal: searching collection %s" % collection_w.name)
vectors = [[random.random() for _ in range(dim)] for _ in range(nq)]
collection_w.search(vectors[:nq], "",
default_search_params, default_limit,
default_search_exp,
travel_timestamp=time_stamp - 1,
check_task=CheckTasks.err_res,
check_items={"err_code": 1,
"err_msg": f"only support to travel back to 0s so far"})
# 3. search after insert time_stamp
collection_w.search(vectors[:nq], "",
default_search_params, default_limit,
default_search_exp,
travel_timestamp=0,
guarantee_timestamp=0,
check_task=CheckTasks.check_search_results,
check_items={"nq": nq,
"ids": insert_ids,
"limit": default_limit})


@pytest.mark.tags(CaseLabel.L0)
def test_search_with_hit_vectors(self, nq, dim, auto_id):
"""
Expand Down

0 comments on commit f792472

Please sign in to comment.