From 55266d8d03b305e7018a9d99d1faa42336dde583 Mon Sep 17 00:00:00 2001 From: yanliang567 <82361606+yanliang567@users.noreply.github.com> Date: Wed, 7 Jun 2023 15:44:36 +0800 Subject: [PATCH] Add partition_key tests (#24701) Signed-off-by: yanliang567 --- tests/python_client/common/common_func.py | 40 +- .../testcases/test_partition_key.py | 681 ++++++++++++++++++ .../testcases/test_resourcegroup.py | 10 +- 3 files changed, 706 insertions(+), 25 deletions(-) create mode 100644 tests/python_client/testcases/test_partition_key.py diff --git a/tests/python_client/common/common_func.py b/tests/python_client/common/common_func.py index be948ab8d9030..daf32ee91d122 100644 --- a/tests/python_client/common/common_func.py +++ b/tests/python_client/common/common_func.py @@ -124,23 +124,23 @@ def gen_double_field(name=ct.default_double_field_name, is_primary=False, descri def gen_float_vec_field(name=ct.default_float_vec_field_name, is_primary=False, dim=ct.default_dim, - description=ct.default_desc): + description=ct.default_desc, **kwargs): float_vec_field, _ = ApiFieldSchemaWrapper().init_field_schema(name=name, dtype=DataType.FLOAT_VECTOR, description=description, dim=dim, - is_primary=is_primary) + is_primary=is_primary, **kwargs) return float_vec_field def gen_binary_vec_field(name=ct.default_binary_vec_field_name, is_primary=False, dim=ct.default_dim, - description=ct.default_desc): + description=ct.default_desc, **kwargs): binary_vec_field, _ = ApiFieldSchemaWrapper().init_field_schema(name=name, dtype=DataType.BINARY_VECTOR, description=description, dim=dim, - is_primary=is_primary) + is_primary=is_primary, **kwargs) return binary_vec_field def gen_default_collection_schema(description=ct.default_desc, primary_field=ct.default_int64_field_name, - auto_id=False, dim=ct.default_dim, enable_dynamic_field=False, with_json=True): + auto_id=False, dim=ct.default_dim, enable_dynamic_field=False, with_json=True, **kwargs): if enable_dynamic_field: if primary_field is ct.default_int64_field_name: fields = [gen_int64_field(), gen_float_vec_field(dim=dim)] @@ -157,50 +157,50 @@ def gen_default_collection_schema(description=ct.default_desc, primary_field=ct. schema, _ = ApiCollectionSchemaWrapper().init_collection_schema(fields=fields, description=description, primary_field=primary_field, auto_id=auto_id, - enable_dynamic_field=enable_dynamic_field) + enable_dynamic_field=enable_dynamic_field, **kwargs) return schema def gen_general_collection_schema(description=ct.default_desc, primary_field=ct.default_int64_field_name, - auto_id=False, is_binary=False, dim=ct.default_dim): + auto_id=False, is_binary=False, dim=ct.default_dim, **kwargs): if is_binary: fields = [gen_int64_field(), gen_float_field(), gen_string_field(), gen_binary_vec_field(dim=dim)] else: fields = [gen_int64_field(), gen_float_field(), gen_string_field(), gen_float_vec_field(dim=dim)] schema, _ = ApiCollectionSchemaWrapper().init_collection_schema(fields=fields, description=description, - primary_field=primary_field, auto_id=auto_id) + primary_field=primary_field, auto_id=auto_id, **kwargs) return schema def gen_string_pk_default_collection_schema(description=ct.default_desc, primary_field=ct.default_string_field_name, - auto_id=False, dim=ct.default_dim): + auto_id=False, dim=ct.default_dim, **kwargs): fields = [gen_int64_field(), gen_float_field(), gen_string_field(), gen_json_field(), gen_float_vec_field(dim=dim)] schema, _ = ApiCollectionSchemaWrapper().init_collection_schema(fields=fields, description=description, - primary_field=primary_field, auto_id=auto_id) + primary_field=primary_field, auto_id=auto_id, **kwargs) return schema def gen_json_default_collection_schema(description=ct.default_desc, primary_field=ct.default_int64_field_name, - auto_id=False, dim=ct.default_dim): + auto_id=False, dim=ct.default_dim, **kwargs): fields = [gen_int64_field(), gen_float_field(), gen_string_field(), gen_json_field(), gen_float_vec_field(dim=dim)] schema, _ = ApiCollectionSchemaWrapper().init_collection_schema(fields=fields, description=description, - primary_field=primary_field, auto_id=auto_id) + primary_field=primary_field, auto_id=auto_id, **kwargs) return schema def gen_multiple_json_default_collection_schema(description=ct.default_desc, primary_field=ct.default_int64_field_name, - auto_id=False, dim=ct.default_dim): + auto_id=False, dim=ct.default_dim, **kwargs): fields = [gen_int64_field(), gen_float_field(), gen_string_field(), gen_json_field(name="json1"), gen_json_field(name="json2"), gen_float_vec_field(dim=dim)] schema, _ = ApiCollectionSchemaWrapper().init_collection_schema(fields=fields, description=description, - primary_field=primary_field, auto_id=auto_id) + primary_field=primary_field, auto_id=auto_id, **kwargs) return schema def gen_collection_schema_all_datatype(description=ct.default_desc, primary_field=ct.default_int64_field_name, auto_id=False, dim=ct.default_dim, - enable_dynamic_field=False, with_json=True): + enable_dynamic_field=False, with_json=True, **kwargs): if enable_dynamic_field: fields = [gen_int64_field(), gen_float_vec_field(dim=dim)] else: @@ -211,22 +211,22 @@ def gen_collection_schema_all_datatype(description=ct.default_desc, fields.remove(gen_json_field()) schema, _ = ApiCollectionSchemaWrapper().init_collection_schema(fields=fields, description=description, primary_field=primary_field, auto_id=auto_id, - enable_dynamic_field=enable_dynamic_field) + enable_dynamic_field=enable_dynamic_field, **kwargs) return schema -def gen_collection_schema(fields, primary_field=None, description=ct.default_desc, auto_id=False): +def gen_collection_schema(fields, primary_field=None, description=ct.default_desc, auto_id=False, **kwargs): schema, _ = ApiCollectionSchemaWrapper().init_collection_schema(fields=fields, primary_field=primary_field, - description=description, auto_id=auto_id) + description=description, auto_id=auto_id, **kwargs) return schema def gen_default_binary_collection_schema(description=ct.default_desc, primary_field=ct.default_int64_field_name, - auto_id=False, dim=ct.default_dim): + auto_id=False, dim=ct.default_dim, **kwargs): fields = [gen_int64_field(), gen_float_field(), gen_string_field(), gen_binary_vec_field(dim=dim)] binary_schema, _ = ApiCollectionSchemaWrapper().init_collection_schema(fields=fields, description=description, primary_field=primary_field, - auto_id=auto_id) + auto_id=auto_id, **kwargs) return binary_schema diff --git a/tests/python_client/testcases/test_partition_key.py b/tests/python_client/testcases/test_partition_key.py new file mode 100644 index 0000000000000..80ab75822b31f --- /dev/null +++ b/tests/python_client/testcases/test_partition_key.py @@ -0,0 +1,681 @@ +import pytest + +from base.client_base import TestcaseBase +from common import common_func as cf +from common import common_type as ct +from common.common_type import CaseLabel, CheckTasks +from utils.util_pymilvus import * + + +class TestPartitionKeyParams(TestcaseBase): + @pytest.mark.tags(CaseLabel.L0) + @pytest.mark.parametrize("par_key_field", [ct.default_int64_field_name, ct.default_string_field_name]) + def test_partition_key_on_field_schema(self, par_key_field): + """ + Method: + 1. create a collection with partition key on + 2. verify insert, build, load and search successfully + 3. drop collection + """ + self._connect() + pk_field = cf.gen_int64_field(name='pk', is_primary=True) + int64_field = cf.gen_int64_field(is_partition_key=(par_key_field == ct.default_int64_field_name)) + string_field = cf.gen_string_field(is_partition_key=(par_key_field == ct.default_string_field_name)) + vector_field = cf.gen_float_vec_field() + schema = cf.gen_collection_schema(fields=[pk_field, int64_field, string_field, vector_field], auto_id=True) + c_name = cf.gen_unique_str("par_key") + collection_w, _ = self.collection_wrap.init_collection(name=c_name, schema=schema) + + # insert + nb = 1000 + string_prefix = cf.gen_str_by_length(length=6) + entities_per_parkey = 10 + for _ in range(entities_per_parkey): + int64_values = [i for i in range(0, nb)] + string_values = [string_prefix + str(i) for i in range(0, nb)] + float_vec_values = gen_vectors(nb, ct.default_dim) + data = [int64_values, string_values, float_vec_values] + collection_w.insert(data) + + # flush + collection_w.flush() + # build index + collection_w.create_index(field_name=vector_field.name, index_params=ct.default_index) + # load + collection_w.load() + # search + nq = 10 + search_vectors = gen_vectors(nq, ct.default_dim) + # search with mixed filtered + res1 = collection_w.search(data=search_vectors, anns_field=vector_field.name, + param=ct.default_search_params, limit=entities_per_parkey, + expr=f'{int64_field.name} in [1,3,5] && {string_field.name} in ["{string_prefix}1","{string_prefix}3","{string_prefix}5"]', + output_fields=[int64_field.name, string_field.name], + check_task=CheckTasks.check_search_results, + check_items={"nq": nq, "limit": ct.default_limit})[0] + # search with partition key filter only or with non partition key + res2 = collection_w.search(data=search_vectors, anns_field=vector_field.name, + param=ct.default_search_params, limit=entities_per_parkey, + expr=f'{int64_field.name} in [1,3,5]', + output_fields=[int64_field.name, string_field.name], + check_task=CheckTasks.check_search_results, + check_items={"nq": nq, "limit": ct.default_limit})[0] + # search with partition key filter only or with non partition key + res3 = collection_w.search(data=search_vectors, anns_field=vector_field.name, + param=ct.default_search_params, limit=entities_per_parkey, + expr=f'{string_field.name} in ["{string_prefix}1","{string_prefix}3","{string_prefix}5"]', + output_fields=[int64_field.name, string_field.name], + check_task=CheckTasks.check_search_results, + check_items={"nq": nq, "limit": ct.default_limit})[0] + # assert the results persist + assert res1.ids == res2.ids == res3.ids + + @pytest.mark.tags(CaseLabel.L0) + @pytest.mark.parametrize("par_key_field", [ct.default_int64_field_name, ct.default_string_field_name]) + @pytest.mark.parametrize("index_on_par_key_field", [True, False]) + def test_partition_key_on_collection_schema(self, par_key_field, index_on_par_key_field): + """ + Method: + 1. create a collection with partition key on collection schema with customized num_partitions + 2. verify insert, build, load and search successfully + 3. drop collection + """ + self._connect() + pk_field = cf.gen_string_field(name='pk', is_primary=True) + int64_field = cf.gen_int64_field() + string_field = cf.gen_string_field() + vector_field = cf.gen_float_vec_field() + schema = cf.gen_collection_schema(fields=[pk_field, int64_field, string_field, vector_field], + auto_id=False, partition_key_field=par_key_field) + c_name = cf.gen_unique_str("par_key") + collection_w, _ = self.collection_wrap.init_collection(name=c_name, schema=schema, num_partitions=9) + + # insert + nb = 1000 + string_prefix = cf.gen_str_by_length(length=6) + entities_per_parkey = 20 + for n in range(entities_per_parkey): + pk_values = [str(i) for i in range(n * nb, (n+1)*nb)] + int64_values = [i for i in range(0, nb)] + string_values = [string_prefix + str(i) for i in range(0, nb)] + float_vec_values = gen_vectors(nb, ct.default_dim) + data = [pk_values, int64_values, string_values, float_vec_values] + collection_w.insert(data) + + # flush + collection_w.flush() + # build index + collection_w.create_index(field_name=vector_field.name, index_params=ct.default_index) + if index_on_par_key_field: + collection_w.create_index(field_name=par_key_field, index_params={}) + # load + collection_w.load() + # search + nq = 10 + search_vectors = gen_vectors(nq, ct.default_dim) + # search with mixed filtered + res1 = collection_w.search(data=search_vectors, anns_field=vector_field.name, + param=ct.default_search_params, limit=entities_per_parkey, + expr=f'{int64_field.name} in [1,3,5] && {string_field.name} in ["{string_prefix}1","{string_prefix}3","{string_prefix}5"]', + output_fields=[int64_field.name, string_field.name], + check_task=CheckTasks.check_search_results, + check_items={"nq": nq, "limit": ct.default_limit})[0] + + @pytest.mark.tags(CaseLabel.L1) + def test_partition_key_off_in_field_but_enable_in_schema(self): + """ + Method: + 1. create a collection with partition key off in field schema but enable in collection schema + 2. verify the collection created successfully + """ + self._connect() + pk_field = cf.gen_int64_field(name='pk', is_primary=True) + int64_field = cf.gen_int64_field(is_partition_key=False) + string_field = cf.gen_string_field() + vector_field = cf.gen_float_vec_field() + schema = cf.gen_collection_schema(fields=[pk_field, int64_field, string_field, vector_field], + partition_key_field=int64_field.name, auto_id=True) + + err_msg = "fail to create collection" + c_name = cf.gen_unique_str("par_key") + collection_w, _ = self.collection_wrap.init_collection(name=c_name, schema=schema, + num_partitions=10) + assert len(collection_w.partitions) == 10 + + @pytest.mark.skip("need more investigation") + @pytest.mark.tags(CaseLabel.L1) + def test_partition_key_bulk_insert(self): + """ + Method: + 1. create a collection with partition key on + 2. bulk insert data + 3. verify the data bulk inserted and be searched successfully + """ + self._connect() + pk_field = cf.gen_int64_field(name='pk', is_primary=True) + int64_field = cf.gen_int64_field() + string_field = cf.gen_string_field(is_partition_key=True) + vector_field = cf.gen_float_vec_field() + schema = cf.gen_collection_schema(fields=[pk_field, int64_field, string_field, vector_field], + auto_id=True) + c_name = cf.gen_unique_str("par_key") + collection_w, _ = self.collection_wrap.init_collection(name=c_name, schema=schema, + num_partitions=10) + # bulk insert + nb = 1000 + string_prefix = cf.gen_str_by_length(length=6) + entities_per_parkey = 20 + for n in range(entities_per_parkey): + pk_values = [str(i) for i in range(n * nb, (n+1)*nb)] + int64_values = [i for i in range(0, nb)] + string_values = [string_prefix + str(i) for i in range(0, nb)] + float_vec_values = gen_vectors(nb, ct.default_dim) + data = [pk_values, int64_values, string_values, float_vec_values] + collection_w.insert(data) + + # flush + collection_w.flush() + # build index + collection_w.create_index(field_name=vector_field.name, index_params=ct.default_index) + # load + collection_w.load() + # search + nq = 10 + search_vectors = gen_vectors(nq, ct.default_dim) + # search with mixed filtered + res1 = collection_w.search(data=search_vectors, anns_field=vector_field.name, + param=ct.default_search_params, limit=entities_per_parkey, + expr=f'{int64_field.name} in [1,3,5] && {string_field.name} in ["{string_prefix}1","{string_prefix}3","{string_prefix}5"]', + output_fields=[int64_field.name, string_field.name], + check_task=CheckTasks.check_search_results, + check_items={"nq": nq, "limit": ct.default_limit})[0] + + +class TestPartitionKeyInvalidParams(TestcaseBase): + @pytest.mark.tags(CaseLabel.L1) + def test_max_partitions(self): + """ + Method: + 1. create a collection with max partitions + 2. insert, build, load and search + 3. drop collection + 4. create a collection with max partitions + 1 + 5. verify the error raised + """ + max_partition = 4096 + self._connect() + pk_field = cf.gen_int64_field(name='pk', is_primary=True) + int64_field = cf.gen_int64_field() + string_field = cf.gen_string_field(is_partition_key=True) + vector_field = cf.gen_float_vec_field() + schema = cf.gen_collection_schema(fields=[pk_field, int64_field, string_field, vector_field], + auto_id=True) + c_name = cf.gen_unique_str("par_key") + collection_w, _ = self.collection_wrap.init_collection(name=c_name, schema=schema, + num_partitions=max_partition) + assert len(collection_w.partitions) == max_partition + + # insert + nb = 100 + string_prefix = cf.gen_str_by_length(length=6) + for _ in range(5): + int64_values = [i for i in range(0, nb)] + string_values = [string_prefix + str(i) for i in range(0, nb)] + float_vec_values = gen_vectors(nb, ct.default_dim) + data = [int64_values, string_values, float_vec_values] + collection_w.insert(data) + + # drop collection + collection_w.drop() + + # create a collection with min partitions - 1 + num_partitions = max_partition + 1 + err_msg = f"partition number ({num_partitions}) exceeds max configuration ({max_partition})" + c_name = cf.gen_unique_str("par_key") + collection_w, _ = self.collection_wrap.init_collection(name=c_name, schema=schema, + num_partitions=num_partitions, + check_task=CheckTasks.err_res, + check_items={"err_code": 2, "err_msg": err_msg}) + + @pytest.mark.tags(CaseLabel.L1) + def test_min_partitions(self): + """ + Method: + 1. create a collection with min partitions + 2. insert, build, load and search + 3. drop collection + 4. create a collection with min partitions - 1 + 5. verify the error raised + """ + min_partition = 1 + self._connect() + pk_field = cf.gen_string_field(name='pk', is_primary=True) + int64_field = cf.gen_int64_field() + string_field = cf.gen_string_field() + vector_field = cf.gen_float_vec_field() + schema = cf.gen_collection_schema(fields=[pk_field, int64_field, string_field, vector_field], + partition_key_field=int64_field.name) + c_name = cf.gen_unique_str("par_key") + collection_w, _ = self.collection_wrap.init_collection(name=c_name, schema=schema, + num_partitions=min_partition) + assert len(collection_w.partitions) == min_partition + + # insert + nb = 100 + string_prefix = cf.gen_str_by_length(length=6) + for _ in range(5): + pk_values = [str(i) for i in range(0, nb)] + int64_values = [i for i in range(0, nb)] + string_values = [string_prefix + str(i) for i in range(0, nb)] + float_vec_values = gen_vectors(nb, ct.default_dim) + data = [pk_values, int64_values, string_values, float_vec_values] + collection_w.insert(data) + collection_w.flush() + + # drop collection + collection_w.drop() + + # create a collection with min partitions - 1 + err_msg = "The specified num_partitions should be greater than or equal to 1" + c_name = cf.gen_unique_str("par_key") + collection_w, _ = self.collection_wrap.init_collection(name=c_name, schema=schema, + num_partitions=min_partition - 1, + check_task=CheckTasks.err_res, + check_items={"err_code": 2, "err_msg": err_msg}) + collection_w, _ = self.collection_wrap.init_collection(name=c_name, schema=schema, + num_partitions=min_partition - 3, + check_task=CheckTasks.err_res, + check_items={"err_code": 2, "err_msg": err_msg}) + + @pytest.mark.tags(CaseLabel.L0) + @pytest.mark.parametrize("is_par_key", [None, "", "invalid", 0.1, [], {}, ()]) + def test_invalid_partition_key_values(self, is_par_key): + """ + Method: + 1. create a collection with invalid partition keys + 2. verify the error raised + """ + self._connect() + err_msg = "Param is_partition_key must be bool type" + int64_field = cf.gen_int64_field(is_partition_key=is_par_key, + check_task=CheckTasks.err_res, + check_items={"err_code": 2, "err_msg": err_msg}) + + @pytest.mark.tags(CaseLabel.L0) + @pytest.mark.parametrize("num_partitions", [True, False, "", "invalid", 0.1, [], {}, ()]) + def test_invalid_partitions_values(self, num_partitions): + """ + Method: + 1. create a collection with invalid num_partitions + 2. verify the error raised + """ + self._connect() + pk_field = cf.gen_int64_field(name='pk', is_primary=True) + int64_field = cf.gen_int64_field(is_partition_key=True) + string_field = cf.gen_string_field() + vector_field = cf.gen_float_vec_field() + schema = cf.gen_collection_schema(fields=[pk_field, int64_field, string_field, vector_field]) + + err_msg = "invalid num_partitions type" + c_name = cf.gen_unique_str("par_key") + collection_w, _ = self.collection_wrap.init_collection(name=c_name, schema=schema, + num_partitions=num_partitions, + check_task=CheckTasks.err_res, + check_items={"err_code": 2, "err_msg": err_msg}) + + @pytest.mark.tags(CaseLabel.L0) + def test_partition_key_on_multi_fields(self): + """ + Method: + 1. create a collection with partition key on multi fields + 2. verify the error raised + """ + self._connect() + # both defined in field schema + pk_field = cf.gen_int64_field(name='pk', is_primary=True) + int64_field = cf.gen_int64_field(is_partition_key=True) + string_field = cf.gen_string_field(is_partition_key=True) + vector_field = cf.gen_float_vec_field() + err_msg = "Expected only one partition key field" + schema = cf.gen_collection_schema(fields=[pk_field, int64_field, string_field, vector_field], + auto_id=True, + check_task=CheckTasks.err_res, + check_items={"err_code": 2, "err_msg": err_msg}) + + # both defined in collection schema + err_msg = "Param partition_key_field must be str type" + int64_field = cf.gen_int64_field() + string_field = cf.gen_string_field() + schema = cf.gen_collection_schema(fields=[pk_field, int64_field, string_field, vector_field], + partition_key_field=[int64_field.name, string_field.name], + auto_id=True, + check_task=CheckTasks.err_res, + check_items={"err_code": 2, "err_msg": err_msg}) + + # one defined in field schema, one defined in collection schema + err_msg = "Expected only one partition key field" + int64_field = cf.gen_int64_field(is_partition_key=True) + string_field = cf.gen_string_field() + schema = cf.gen_collection_schema(fields=[pk_field, int64_field, string_field, vector_field], + partition_key_field=string_field.name, + auto_id=True, + check_task=CheckTasks.err_res, + check_items={"err_code": 2, "err_msg": err_msg}) + + @pytest.mark.tags(CaseLabel.L0) + @pytest.mark.parametrize("is_int64_primary", [True, False]) + def test_partition_key_on_primary_key(self, is_int64_primary): + """ + Method: + 1. create a collection with partition key on primary key + 2. verify the error raised + """ + self._connect() + if is_int64_primary: + pk_field = cf.gen_int64_field(name='pk', is_primary=True, is_partition_key=True) + else: + pk_field = cf.gen_string_field(name='pk', is_primary=True, is_partition_key=True) + int64_field = cf.gen_int64_field() + string_field = cf.gen_string_field() + vector_field = cf.gen_float_vec_field() + schema = cf.gen_collection_schema(fields=[pk_field, int64_field, string_field, vector_field], + auto_id=False) + + err_msg = "the partition key field must not be primary field" + c_name = cf.gen_unique_str("par_key") + collection_w, _ = self.collection_wrap.init_collection(name=c_name, schema=schema, + check_task=CheckTasks.err_res, + check_items={"err_code": 2, "err_msg": err_msg}) + + # if settings on collection schema + if is_int64_primary: + pk_field = cf.gen_int64_field(name='pk', is_primary=True) + else: + pk_field = cf.gen_string_field(name='pk', is_primary=True) + schema = cf.gen_collection_schema(fields=[pk_field, int64_field, string_field, vector_field], + partition_key_field=pk_field.name, + auto_id=False) + + err_msg = "the partition key field must not be primary field" + c_name = cf.gen_unique_str("par_key") + collection_w, _ = self.collection_wrap.init_collection(name=c_name, schema=schema, + check_task=CheckTasks.err_res, + check_items={"err_code": 2, "err_msg": err_msg}) + + @pytest.mark.tags(CaseLabel.L0) + def test_partition_key_on_and_off(self): + """ + Method: + 1. create a collection with one field partition key on and the other field partition key on in schema + 2. verify the error raised + """ + self._connect() + pk_field = cf.gen_int64_field(name='pk', is_primary=True) + int64_field = cf.gen_int64_field(is_partition_key=True) + string_field = cf.gen_string_field() + vector_field = cf.gen_float_vec_field() + err_msg = "Expected only one partition key field" + schema = cf.gen_collection_schema(fields=[pk_field, int64_field, string_field, vector_field], + partition_key_field=vector_field.name, + auto_id=True, + check_task=CheckTasks.err_res, + check_items={"err_code": 2, "err_msg": err_msg}) + + # if two fields with same type + string_field = cf.gen_string_field(name="string1", is_partition_key=True) + string_field2 = cf.gen_string_field(name="string2") + err_msg = "Expected only one partition key field" + schema = cf.gen_collection_schema(fields=[pk_field, string_field, string_field2, vector_field], + partition_key_field=string_field2.name, + auto_id=True, + check_task=CheckTasks.err_res, + check_items={"err_code": 2, "err_msg": err_msg}) + + @pytest.mark.tags(CaseLabel.L0) + @pytest.mark.parametrize("field_type", [DataType.FLOAT_VECTOR, DataType.BINARY_VECTOR, DataType.FLOAT, + DataType.DOUBLE, DataType.BOOL, DataType.INT8, + DataType.INT16, DataType.INT32, DataType.JSON]) + def test_partition_key_on_invalid_type_fields(self, field_type): + """ + Method: + 1. create a collection with partition key on invalid type fields + 2. verify the error raised + """ + self._connect() + pk_field = cf.gen_int64_field(name='pk', is_primary=True) + int8_field = cf.gen_int8_field(is_partition_key=(field_type == DataType.INT8)) + int16_field = cf.gen_int16_field(is_partition_key=(field_type == DataType.INT16)) + int32_field = cf.gen_int32_field(is_partition_key=(field_type == DataType.INT32)) + bool_field = cf.gen_bool_field(is_partition_key=(field_type == DataType.BOOL)) + float_field = cf.gen_float_field(is_partition_key=(field_type == DataType.FLOAT)) + double_field = cf.gen_double_field(is_partition_key=(field_type == DataType.DOUBLE)) + json_field = cf.gen_json_field(is_partition_key=(field_type == DataType.JSON)) + int64_field = cf.gen_int64_field() + string_field = cf.gen_string_field() + vector_field = cf.gen_float_vec_field(is_partition_key=(field_type == DataType.FLOAT_VECTOR)) + if field_type == DataType.BINARY_VECTOR: + vector_field = cf.gen_binary_vec_field(is_partition_key=(field_type == DataType.BINARY_VECTOR)) + + err_msg = "Partition key field type must be DataType.INT64 or DataType.VARCHAR" + schema = cf.gen_collection_schema(fields=[pk_field, int8_field, int16_field, int32_field, + bool_field, float_field, double_field, json_field, + int64_field, string_field, vector_field], + auto_id=True, + check_task=CheckTasks.err_res, + check_items={"err_code": 2, "err_msg": err_msg}) + + @pytest.mark.tags(CaseLabel.L1) + def test_partition_key_on_not_existed_fields(self): + """ + Method: + 1. create a collection with partition key on not existed fields + 2. verify the error raised + """ + self._connect() + pk_field = cf.gen_int64_field(name='pk', is_primary=True) + int64_field = cf.gen_int64_field() + string_field = cf.gen_string_field() + vector_field = cf.gen_float_vec_field() + err_msg = "the specified partition key field {non_existing_field} not exist" + schema = cf.gen_collection_schema(fields=[pk_field, int64_field, string_field, vector_field], + partition_key_field="non_existing_field", + auto_id=True, + check_task=CheckTasks.err_res, + check_items={"err_code": 2, "err_msg": err_msg}) + + @pytest.mark.tags(CaseLabel.L1) + def test_partition_key_on_empty_and_num_partitions_set(self): + """ + Method: + 1. create a collection with partition key on empty and num_partitions set + 2. verify the error raised + """ + self._connect() + pk_field = cf.gen_int64_field(name='pk', is_primary=True) + int64_field = cf.gen_int64_field() + string_field = cf.gen_string_field() + vector_field = cf.gen_float_vec_field() + err_msg = "the specified partition key field {} not exist" + schema = cf.gen_collection_schema(fields=[pk_field, int64_field, string_field, vector_field], + partition_key_field="", auto_id=True, + check_task=CheckTasks.err_res, + check_items={"err_code": 2, "err_msg": err_msg}) + + schema = cf.gen_default_collection_schema() + err_msg = "num_partitions should only be specified with partition key field enabled" + c_name = cf.gen_unique_str("par_key") + collection_w, _ = self.collection_wrap.init_collection(name=c_name, schema=schema, + num_partitions=200, + check_task=CheckTasks.err_res, + check_items={"err_code": 2, "err_msg": err_msg}) + + @pytest.mark.tags(CaseLabel.L1) + @pytest.mark.parametrize("invalid_data", [99, True, None, [], {}, ()]) + def test_partition_key_insert_invalid_data(self, invalid_data): + """ + Method: + 1. create a collection with partition key on int64 field + 2. insert entities with invalid partition key + 3. verify the error raised + """ + self._connect() + pk_field = cf.gen_string_field(name='pk', is_primary=True) + int64_field = cf.gen_int64_field() + string_field = cf.gen_string_field() + vector_field = cf.gen_float_vec_field() + schema = cf.gen_collection_schema(fields=[pk_field, int64_field, string_field, vector_field], + partition_key_field=string_field.name, auto_id=False) + + c_name = cf.gen_unique_str("par_key") + collection_w, _ = self.collection_wrap.init_collection(name=c_name, schema=schema) + + # insert + nb = 10 + string_prefix = cf.gen_str_by_length(length=6) + pk_values = [str(i) for i in range(0, nb)] + int64_values = [i for i in range(0, nb)] + string_values = [string_prefix + str(i) for i in range(0, nb)] + string_values[1] = invalid_data # inject invalid data + float_vec_values = gen_vectors(nb, ct.default_dim) + data = [pk_values, int64_values, string_values, float_vec_values] + + err_msg = "expect string input" + self.collection_wrap.insert(data, check_task=CheckTasks.err_res, check_items={"err_code": 2, "err_msg": err_msg}) + + +class TestPartitionApiForbidden(TestcaseBase): + @pytest.mark.tags(CaseLabel.L1) + def test_create_partition(self): + """ + Method: + 1. return error if create partition when partition key is on + 2. return error if insert partition when partition key is on + 3. return error if drop partition when partition key is on + 4. return success if show partition when partition key is on + 5. return error if load partition when partition key is on + 6. return error if release partition when partition key is on + Expected: raise exception + """ + self._connect() + pk_field = cf.gen_int64_field(name='pk', is_primary=True) + int64_field = cf.gen_int64_field() + string_field = cf.gen_string_field(is_partition_key=True) + vector_field = cf.gen_float_vec_field() + schema = cf.gen_collection_schema(fields=[pk_field, int64_field, string_field, vector_field], auto_id=True) + c_name = cf.gen_unique_str("par_key") + collection_w, _ = self.collection_wrap.init_collection(name=c_name, schema=schema) + + # create partition + err_msg = "disable create partition if partition key mode is used" + partition_name = cf.gen_unique_str("partition") + self.collection_wrap.create_partition(partition_name, + check_task=CheckTasks.err_res, + check_items={"err_code": 2, "err_msg": err_msg}) + self.partition_wrap.init_partition(collection_w, partition_name, + check_task=CheckTasks.err_res, + check_items={"err_code": 2, "err_msg": err_msg}) + + # get partition is allowed + partitions = self.collection_wrap.partitions + collection_w.partition(partitions[0].name) + self.partition_wrap.init_partition(collection_w, partitions[0].name) + assert self.partition_wrap.name == partitions[0].name + # has partition is allowed + assert collection_w.has_partition(partitions[0].name) + assert self.utility_wrap.has_partition(collection_w.name, partitions[0].name) + + # insert + nb = 100 + string_prefix = cf.gen_str_by_length(length=6) + entities_per_parkey = 10 + for _ in range(entities_per_parkey): + int64_values = [i for i in range(0, nb)] + string_values = [string_prefix + str(i) for i in range(0, nb)] + float_vec_values = gen_vectors(nb, ct.default_dim) + data = [int64_values, string_values, float_vec_values] + self.collection_wrap.insert(data) + + err_msg = "not support manually specifying the partition names if partition key mode is used" + self.partition_wrap.insert(data, check_task=CheckTasks.err_res, + check_items={"err_code": 2, "err_msg": err_msg}) + self.collection_wrap.insert(data, partition_name=partitions[0].name, + check_task=CheckTasks.err_res, + check_items={"err_code": 2, "err_msg": err_msg}) + + err_msg = "disable load partitions if partition key mode is used" + self.partition_wrap.load(check_task=CheckTasks.err_res, + check_items={"err_code": 2, "err_msg": err_msg}) + self.collection_wrap.load(partition_names=[partitions[0].name], + check_task=CheckTasks.err_res, + check_items={"err_code": 2, "err_msg": err_msg}) + + # flush + collection_w.flush() + + # build index + collection_w.create_index(field_name=vector_field.name, index_params=ct.default_index) + # load + collection_w.load() + # search + nq = 10 + search_vectors = gen_vectors(nq, ct.default_dim) + # search with mixed filtered + res1 = self.collection_wrap.search(data=search_vectors, anns_field=vector_field.name, + param=ct.default_search_params, limit=entities_per_parkey, + expr=f'{int64_field.name} in [1,3,5] && {string_field.name} in ["{string_prefix}1","{string_prefix}3","{string_prefix}5"]', + output_fields=[int64_field.name, string_field.name], + check_task=CheckTasks.check_search_results, + check_items={"nq": nq, "limit": ct.default_limit})[0] + pks = res1[0].ids[:3] + err_msg = "not support manually specifying the partition names if partition key mode is used" + self.collection_wrap.search(data=search_vectors, anns_field=vector_field.name, partition_names=[partitions[0].name], + param=ct.default_search_params, limit=entities_per_parkey, + expr=f'{int64_field.name} in [1,3,5]', + output_fields=[int64_field.name, string_field.name], + check_task=CheckTasks.err_res, + check_items={"err_code": nq, "err_msg": err_msg}) + self.partition_wrap.search(data=search_vectors, anns_field=vector_field.name, + params=ct.default_search_params, limit=entities_per_parkey, + expr=f'{string_field.name} in ["{string_prefix}1","{string_prefix}3","{string_prefix}5"]', + output_fields=[int64_field.name, string_field.name], + check_task=CheckTasks.err_res, + check_items={"err_code": nq, "err_msg": err_msg}) + + # partition loading progress is allowed + self.utility_wrap.loading_progress(collection_name=collection_w.name) + self.utility_wrap.loading_progress(collection_name=collection_w.name, + partition_names=[partitions[0].name]) + + # partition wait for loading complete is allowed + self.utility_wrap.wait_for_loading_complete(collection_name=collection_w.name) + self.utility_wrap.wait_for_loading_complete(collection_name=collection_w.name, + partition_names=[partitions[0].name]) + # partition flush is allowed: #24165 + self.partition_wrap.flush() + + # partition delete is not allowed + self.partition_wrap.delete(expr=f'{pk_field.name} in {pks}', + check_task=CheckTasks.err_res, check_items={"err_code": 2, "err_msg": err_msg}) + self.collection_wrap.delete(expr=f'{pk_field.name} in {pks}', partition_name=partitions[0].name, + check_task=CheckTasks.err_res, check_items={"err_code": 2, "err_msg": err_msg}) + # partition query is not allowed + self.partition_wrap.query(expr=f'{pk_field.name} in {pks}', + check_task=CheckTasks.err_res, check_items={"err_code": 2, "err_msg": err_msg}) + self.collection_wrap.query(expr=f'{pk_field.name} in {pks}', partition_names=[partitions[0].name], + check_task=CheckTasks.err_res, check_items={"err_code": 2, "err_msg": err_msg}) + # partition upsert is not allowed + # self.partition_wrap.upsert(data=data, check_task=CheckTasks.err_res, + # check_items={"err_code": 2, "err_msg": err_msg}) + # self.collection_wrap.upsert(data=data, partition_name=partitions[0].name, + # chek_task=CheckTasks.err_res, check_items={"err_code": 2, "err_msg": err_msg}) + # partition release + err_msg = "disable release partitions if partition key mode is used" + self.partition_wrap.release(check_task=CheckTasks.err_res, check_items={"err_code": 2, "err_msg": err_msg}) + # partition drop + err_msg = "disable drop partition if partition key mode is used" + self.partition_wrap.drop(check_task=CheckTasks.err_res, check_items={"err_code": 2, "err_msg": err_msg}) + + # # partition bulk insert + # self.utility_wrap.do_bulk_insert(collection_w.name, files, partition_names=[partitions[0].name], + # check_task=CheckTasks.err_res, + # check_items={"err_code": 2, "err_msg": err_msg}) diff --git a/tests/python_client/testcases/test_resourcegroup.py b/tests/python_client/testcases/test_resourcegroup.py index 7ae60557f7ded..e53f329327194 100644 --- a/tests/python_client/testcases/test_resourcegroup.py +++ b/tests/python_client/testcases/test_resourcegroup.py @@ -183,7 +183,7 @@ def test_create_rg_dup_name(self): check_task=ct.CheckTasks.check_rg_property, check_items={"name": rg_name}) error = {ct.err_code: 999, - ct.err_msg: "failed to create resource group, err=resource group already exist"} + ct.err_msg: "resource group already exist"} self.init_resource_group(name=rg_name, check_task=ct.CheckTasks.err_res, check_items=error) @@ -243,7 +243,7 @@ def test_create_max_number_rgs(self): rgs = self.utility_wrap.list_resource_groups()[0] assert len(rgs) == max_rg_num - error = {ct.err_code: 999, ct.err_msg: 'failed to create resource group, err=resource group num reach limit'} + error = {ct.err_code: 999, ct.err_msg: 'resource group num reach limit'} self.init_resource_group(name=cf.gen_unique_str('rg'), check_task=CheckTasks.err_res, check_items=error) @@ -577,7 +577,7 @@ def test_transfer_node_from_default_rg(self, with_growing): check_items=default_rg_info) @pytest.mark.tags(CaseLabel.MultiQueryNodes) - @pytest.mark.parametrize("num_node", [0, 99, -1, 0.5, True, "str"]) + @pytest.mark.parametrize("num_node", [0, 99, -1, 0.5, "str"]) def test_transfer_node_with_wrong_num_node(self, num_node): """ Method: @@ -1426,7 +1426,7 @@ def test_transfer_multi_replicas_into_one_rg(self, with_growing): rgB_name = cf.gen_unique_str('rgB') self.init_resource_group(name=rgB_name) - # transfer 1 nodes to rgA, 2 nodes to rgB + # transfer 2 nodes to rgA, 3 nodes to rgB self.utility_wrap.transfer_node(source=ct.default_resource_group_name, target=rgA_name, num_node=2) @@ -1509,7 +1509,7 @@ def test_transfer_multi_replicas_into_one_rg(self, with_growing): ) # wait transfer completed for replicas - time.sleep(60) + time.sleep(120) # make growing if with_growing: res, _ = collection_w.insert(cf.gen_default_list_data(nb, dim=dim, start=7 * nb))