diff --git a/tests/python_client/testcases/test_database.py b/tests/python_client/testcases/test_database.py index f4fe890abb443..0b4dea6d46426 100644 --- a/tests/python_client/testcases/test_database.py +++ b/tests/python_client/testcases/test_database.py @@ -1,7 +1,6 @@ import pytest from base.client_base import TestcaseBase -from base.collection_wrapper import ApiCollectionWrapper from common.common_type import CheckTasks, CaseLabel from common import common_func as cf from common import common_type as ct @@ -19,6 +18,7 @@ def teardown_method(self, method): teardown method: drop collection and db """ log.info("[database_teardown_method] Start teardown database test cases ...") + self._connect() # clear db for db in self.database_wrap.list_database()[0]: @@ -28,9 +28,7 @@ def teardown_method(self, method): # drop db collections colls, _ = self.utility_wrap.list_collections() for coll in colls: - cw = ApiCollectionWrapper() - cw.init_collection(coll) - cw.drop() + self.utility_wrap.drop_collection(coll) # drop db if db != ct.default_db: @@ -182,7 +180,7 @@ def test_list_db_with_invalid_timeout(self, timeout): # list db with not existed using self.database_wrap.list_database(timeout=timeout, check_task=CheckTasks.err_res, check_items={ct.err_code: 1, - ct.err_msg: "rpc deadline exceeded: Retry timeout"}) + ct.err_msg: "StatusCode.DEADLINE_EXCEEDED"}) @pytest.mark.parametrize("invalid_db_name", [(), [], 1, [1, "2", 3], (1,), {1: 1}]) def test_using_invalid_db(self, invalid_db_name): @@ -206,9 +204,9 @@ def test_using_invalid_db(self, invalid_db_name): assert collection_w.name in collections @pytest.mark.parametrize("invalid_db_name", ["12-s", "12 s", "(mn)", "中文", "%$#"]) - def test_using_invalid_db_2(self, host, port, invalid_db_name): + def test_using_invalid_db_2(self, invalid_db_name): # connect with default alias using - self.connection_wrap.connect(host=host, port=port, user=ct.default_user, password=ct.default_password) + self._connect() # create collection in default db collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix)) @@ -226,7 +224,7 @@ def teardown_method(self, method): teardown method: drop collection and db """ log.info("[database_teardown_method] Start teardown database test cases ...") - + self._connect() # clear db for db in self.database_wrap.list_database()[0]: # using db @@ -235,9 +233,7 @@ def teardown_method(self, method): # drop db collections colls, _ = self.utility_wrap.list_collections() for coll in colls: - cw = ApiCollectionWrapper() - cw.init_collection(coll) - cw.drop() + self.utility_wrap.drop_collection(coll) # drop db if db != ct.default_db: @@ -507,14 +503,14 @@ def test_drop_using_db(self): using_collections, _ = self.utility_wrap.list_collections() assert collection_w_default.name in using_collections - def test_using_db_not_existed(self, host, port): + def test_using_db_not_existed(self): """ target: test using a not existed db method: using a not existed db expected: exception """ # create db - self.connection_wrap.connect(host=host, port=port, user=ct.default_user, password=ct.default_password) + self._connect() collection_w = self.init_collection_wrap(cf.gen_unique_str(prefix)) # list collection with not exist using db -> exception @@ -547,9 +543,7 @@ def teardown_method(self, method): # drop db collections colls, _ = self.utility_wrap.list_collections() for coll in colls: - cw = ApiCollectionWrapper() - cw.init_collection(coll) - cw.drop() + self.utility_wrap.drop_collection(coll) # drop db if db != ct.default_db: