diff --git a/tests/python_client/bulk_load/bulk_load_data.py b/tests/python_client/bulk_load/bulk_load_data.py index 93007128ee990..7eb89dea9515d 100644 --- a/tests/python_client/bulk_load/bulk_load_data.py +++ b/tests/python_client/bulk_load/bulk_load_data.py @@ -1,15 +1,14 @@ import time import os -from minio import Minio -from minio.error import S3Error import numpy as np import random from sklearn import preprocessing from common.common_func import gen_unique_str +from minio_comm import copy_files_to_minio - -minio = "10.96.1.23:9000" # TODO update hardcode -bucket_name = "yanliang-bulk-load" # TODO update hardcode +# TODO: remove hardcode with input configurations +minio = "minio_address:port" # minio service and port +bucket_name = "milvus-bulk-load" # bucket name of milvus is using data_source = "/tmp/bulk_load_data" @@ -17,17 +16,34 @@ FLOAT = "float" +class DataField: + pk_field = "uid" + vec_field = "vectors" + int_field = "int_scalar" + string_field = "string_scalar" + bool_field = "bool_scalar" + float_field = "float_scalar" + + +class DataErrorType: + one_entity_wrong_dim = "one_entity_wrong_dim" + str_on_int_pk = "str_on_int_pk" + int_on_float_scalar = "int_on_float_scalar" + float_on_int_pk = "float_on_int_pk" + typo_on_bool = "typo_on_bool" + + def gen_file_prefix(row_based=True, auto_id=True, prefix=""): if row_based: if auto_id: - return f"{prefix}row_auto" + return f"{prefix}_row_auto" else: - return f"{prefix}row_cust" + return f"{prefix}_row_cust" else: if auto_id: - return f"{prefix}col_auto" + return f"{prefix}_col_auto" else: - return f"{prefix}col_cust" + return f"{prefix}_col_cust" def entity_suffix(rows): @@ -54,7 +70,15 @@ def gen_binary_vectors(nb, dim): return vectors -def gen_row_based_json_file(row_file, str_pk, multi_scalars, float_vect, rows, dim, autoid): +def gen_row_based_json_file(row_file, str_pk, data_fields, float_vect, + rows, dim, start_uid=0, err_type="", **kwargs): + + if err_type == DataErrorType.str_on_int_pk: + str_pk = True + if err_type == DataErrorType.one_entity_wrong_dim: + wrong_dim = dim + 8 # add 8 to compatible with binary vectors + wrong_row = kwargs.get("wrong_position", start_uid) + with open(row_file, "w") as f: f.write("{") f.write("\n") @@ -64,28 +88,44 @@ def gen_row_based_json_file(row_file, str_pk, multi_scalars, float_vect, rows, d if i > 0: f.write(",") f.write("\n") - # pk fields - if not autoid: - if str_pk: - f.write('{"uid":"' + str(gen_unique_str()) + '",') - else: - f.write('{"uid":' + str(i) + ',') - else: - f.write('{') # scalar fields - if multi_scalars: - f.write('"int_scalar":' + str(random.randint(-999999, 9999999)) + ',') - f.write('"float_scalar":' + str(random.random()) + ',') - f.write('"string_scalar":"' + str(gen_unique_str()) + '",') - f.write('"bool_scalar":' + str(random.choice(["true", "false"])) + ',') - - # vector field - if float_vect: - vectors = gen_float_vectors(1, dim) - else: - vectors = gen_binary_vectors(1, (dim//8)) - f.write('"vectors":' + ",".join(str(x) for x in vectors) + "}") + f.write('{') + for j in range(len(data_fields)): + data_field = data_fields[j] + if data_field == DataField.pk_field: + if str_pk: + f.write('"uid":"' + str(gen_unique_str()) + '"') + else: + if err_type == DataErrorType.float_on_int_pk: + f.write('"uid":' + str(i + start_uid + random.random()) + '') + else: + f.write('"uid":' + str(i + start_uid) + '') + if data_field == DataField.int_field: + f.write('"int_scalar":' + str(random.randint(-999999, 9999999)) + '') + if data_field == DataField.float_field: + if err_type == DataErrorType.int_on_float_scalar: + f.write('"float_scalar":' + str(random.randint(-999999, 9999999)) + '') + else: + f.write('"float_scalar":' + str(random.random()) + '') + if data_field == DataField.string_field: + f.write('"string_scalar":"' + str(gen_unique_str()) + '"') + if data_field == DataField.bool_field: + if err_type == DataErrorType.typo_on_bool: + f.write('"bool_scalar":' + str(random.choice(["True", "False", "TRUE", "FALSE", "0", "1"])) + '') + else: + f.write('"bool_scalar":' + str(random.choice(["true", "false"])) + '') + if data_field == DataField.vec_field: + # vector field + if err_type == DataErrorType.one_entity_wrong_dim and i == wrong_row: + vectors = gen_float_vectors(1, wrong_dim) if float_vect else gen_binary_vectors(1, (wrong_dim // 8)) + else: + vectors = gen_float_vectors(1, dim) if float_vect else gen_binary_vectors(1, (dim//8)) + f.write('"vectors":' + ",".join(str(x) for x in vectors) + '') + # not write common for the last field + if j != len(data_fields) - 1: + f.write(',') + f.write('}') f.write("\n") f.write("]") f.write("\n") @@ -93,217 +133,305 @@ def gen_row_based_json_file(row_file, str_pk, multi_scalars, float_vect, rows, d f.write("\n") -def gen_column_base_json_file(col_file, str_pk, float_vect, multi_scalars, rows, dim, autoid): +def gen_column_base_json_file(col_file, str_pk, data_fields, float_vect, + rows, dim, start_uid=0, err_type="", **kwargs): + if err_type == DataErrorType.str_on_int_pk: + str_pk = True + with open(col_file, "w") as f: f.write("{") f.write("\n") - # pk columns - if not autoid: - if str_pk == "str_pk": - f.write('"uid":["' + ',"'.join(str(gen_unique_str()) + '"' for i in range(rows)) + '],') - f.write("\n") - else: - f.write('"uid":[' + ",".join(str(i) for i in range(rows)) + "],") - f.write("\n") - - # scalar columns - if multi_scalars: - f.write('"int_scalar":[' + ",".join(str(random.randint(-999999, 9999999)) for i in range(rows)) + "],") - f.write("\n") - f.write('"float_scalar":[' + ",".join(str(random.random()) for i in range(rows)) + "],") - f.write("\n") - f.write('"string_scalar":["' + ',"'.join(str(gen_unique_str()) + '"' for i in range(rows)) + '],') - f.write("\n") - f.write('"bool_scalar":[' + ",".join(str(random.choice(["true", "false"])) for i in range(rows)) + "],") - f.write("\n") - - # vector columns - if float_vect: - vectors = gen_float_vectors(rows, dim) - f.write('"vectors":[' + ",".join(str(x) for x in vectors) + "]") - f.write("\n") - else: - vectors = gen_binary_vectors(rows, (dim//8)) - f.write('"vectors":[' + ",".join(str(x) for x in vectors) + "]") - f.write("\n") - + if rows > 0: + # data columns + for j in range(len(data_fields)): + data_field = data_fields[j] + if data_field == DataField.pk_field: + if str_pk: + f.write('"uid":["' + ',"'.join(str(gen_unique_str()) + '"' for i in range(rows)) + ']') + f.write("\n") + else: + if err_type == DataErrorType.float_on_int_pk: + f.write('"uid":[' + ",".join( + str(i + random.random()) for i in range(start_uid, start_uid + rows)) + "]") + else: + f.write('"uid":[' + ",".join(str(i) for i in range(start_uid, start_uid + rows)) + "]") + f.write("\n") + if data_field == DataField.int_field: + f.write('"int_scalar":[' + ",".join(str( + random.randint(-999999, 9999999)) for i in range(rows)) + "]") + f.write("\n") + if data_field == DataField.float_field: + if err_type == DataErrorType.int_on_float_scalar: + f.write('"float_scalar":[' + ",".join( + str(random.randint(-999999, 9999999)) for i in range(rows)) + "]") + else: + f.write('"float_scalar":[' + ",".join( + str(random.random()) for i in range(rows)) + "]") + f.write("\n") + if data_field == DataField.string_field: + f.write('"string_scalar":["' + ',"'.join(str( + gen_unique_str()) + '"' for i in range(rows)) + ']') + f.write("\n") + if data_field == DataField.bool_field: + if err_type == DataErrorType.typo_on_bool: + f.write('"bool_scalar":[' + ",".join( + str(random.choice(["True", "False", "TRUE", "FALSE", "1", "0"])) for i in range(rows)) + "]") + else: + f.write('"bool_scalar":[' + ",".join( + str(random.choice(["true", "false"])) for i in range(rows)) + "]") + f.write("\n") + if data_field == DataField.vec_field: + # vector columns + if err_type == DataErrorType.one_entity_wrong_dim: + wrong_dim = dim + 8 # add 8 to compatible with binary vectors + wrong_row = kwargs.get("wrong_position", 0) + if wrong_row <= 0: + vectors1 = [] + else: + vectors1 = gen_float_vectors(wrong_row, dim) if float_vect else gen_binary_vectors(wrong_row, (dim//8)) + if wrong_row >= rows -1: + vectors2 = [] + else: + vectors2 = gen_float_vectors(rows-wrong_row-1, dim) if float_vect else gen_binary_vectors(rows-wrong_row-1, (dim//8)) + + vectors_wrong_dim = gen_float_vectors(1, wrong_dim) if float_vect else gen_binary_vectors(1, (wrong_dim//8)) + vectors = vectors1 + vectors_wrong_dim + vectors2 + else: + vectors = gen_float_vectors(rows, dim) if float_vect else gen_binary_vectors(rows, (dim//8)) + f.write('"vectors":[' + ",".join(str(x) for x in vectors) + "]") + f.write("\n") + if j != len(data_fields) - 1: + f.write(",") f.write("}") f.write("\n") -def gen_vectors_in_numpy_file(dir, vector_type, rows, dim, num): - # vector columns - if vector_type == FLOAT: - vectors = gen_float_vectors(rows, dim) - else: - vectors = gen_binary_vectors(rows, (dim // 8)) +def gen_vectors_in_numpy_file(dir, float_vector, rows, dim, force=False): + file_name = f"{DataField.vec_field}.npy" + file = f'{dir}/{file_name}' - suffix = entity_suffix(rows) - # print(vectors) - arr = np.array(vectors) - path = f"{dir}/{dim}d_{suffix}_{num}" - if not os.path.isdir(path): - os.mkdir(path) - file = f'{path}/vectors_{dim}d_{suffix}.npy' - np.save(file, arr) - - -def gen_scalars_in_numpy_file(dir, vector_type, rows, dim, num, start): - # scalar columns - if vector_type == FLOAT: - data = [random.random() for i in range(rows)] - elif vector_type == "int": - data = [i for i in range(start, start + rows)] - - suffix = entity_suffix(rows) - path = f"{dir}/{dim}d_{suffix}_{num}" - arr = np.array(data) - file = f'{path}/uid.npy' - np.save(file, arr) + if not os.path.exists(file) or force: + # vector columns + vectors = [] + if rows > 0: + if float_vector: + vectors = gen_float_vectors(rows, dim) + else: + vectors = gen_binary_vectors(rows, (dim // 8)) + arr = np.array(vectors) + np.save(file, arr) + return file_name + + +def gen_int_or_float_in_numpy_file(dir, data_field, rows, start=0, force=False): + file_name = f"{data_field}.npy" + file = f"{dir}/{file_name}" + if not os.path.exists(file) or force: + # non vector columns + data = [] + if rows > 0: + if data_field == DataField.float_field: + data = [random.random() for _ in range(rows)] + elif data_field == DataField.pk_field: + data = [i for i in range(start, start + rows)] + elif data_field == DataField.int_field: + data = [random.randint(-999999, 9999999) for _ in range(rows)] + arr = np.array(data) + np.save(file, arr) + return file_name + + +def gen_file_name(row_based, rows, dim, auto_id, str_pk, + float_vector, data_fields, file_num, file_type, err_type): + row_suffix = entity_suffix(rows) + field_suffix = "" + if len(data_fields) > 3: + field_suffix = "multi_scalars_" + else: + for data_field in data_fields: + if data_field != DataField.vec_field: + field_suffix += f"{data_field}_" + vt = "" + if DataField.vec_field in data_fields: + vt = "float_vectors_" if float_vector else "binary_vectors_" -def gen_json_file_name(row_based, rows, dim, auto_id, str_pk, float_vector, multi_scalars, file_num): - suffix = entity_suffix(rows) - scalars = "only" - if multi_scalars: - scalars = "multi_scalars" - vt = FLOAT - if not float_vector: - vt = BINARY pk = "" if str_pk: pk = "str_pk_" - prefix = gen_file_prefix(row_based=row_based, auto_id=auto_id) - return f"{prefix}_{pk}{vt}_vectors_{scalars}_{dim}d_{suffix}_{file_num}.json" + prefix = gen_file_prefix(row_based=row_based, auto_id=auto_id, prefix=err_type) + file_name = f"{prefix}_{pk}{vt}{field_suffix}{dim}d_{row_suffix}_{file_num}{file_type}" + return file_name + + +def gen_subfolder(root, dim, rows, file_num): + suffix = entity_suffix(rows) + subfolder = f"{dim}d_{suffix}_{file_num}" + path = f"{root}/{subfolder}" + if not os.path.isdir(path): + os.mkdir(path) + return subfolder -def gen_json_files(row_based, rows, dim, auto_id, str_pk, float_vector, multi_scalars, file_nums, force=False): - """ - row_based: Boolean - generate row based json file if True - generate column base json file if False - rows: entities of data - dim: dim of vector data - auto_id: Boolean - generate primary key data if False, else not - str_pk: Boolean - generate string as primary key if True, else generate INT64 as pk - float_vector: Boolean - generate float vectors if True, else binary vectors - multi_scalars: Boolean - only generate vector data (and pk data depended on auto_switches) if False - besides vector data, generate INT, STRING, BOOLEAN, etc scalar data if True - file_nums: file numbers that to be generated - """ +def gen_json_files(row_based, rows, dim, auto_id, str_pk, + float_vector, data_fields, file_nums, multi_folder, + file_type, err_type, force, **kwargs): # gen json files + files = [] + start_uid = 0 + # make sure pk field exists when not auto_id + if not auto_id and DataField.pk_field not in data_fields: + data_fields.append(DataField.pk_field) for i in range(file_nums): - file_name = gen_json_file_name(row_based=row_based, rows=rows,dim=dim, - auto_id=auto_id,str_pk=str_pk,float_vector=float_vector, - multi_scalars=multi_scalars, file_num=i) + file_name = gen_file_name(row_based=row_based, rows=rows, dim=dim, + auto_id=auto_id, str_pk=str_pk, float_vector=float_vector, + data_fields=data_fields, file_num=i, file_type=file_type, err_type=err_type) file = f"{data_source}/{file_name}" + if multi_folder: + subfolder = gen_subfolder(root=data_source, dim=dim, rows=rows, file_num=i) + file = f"{data_source}/{subfolder}/{file_name}" if not os.path.exists(file) or force: if row_based: - gen_row_based_json_file(row_file=file, str_pk=str_pk, - float_vect=float_vector, multi_scalars=multi_scalars, - rows=rows, dim=dim, autoid=auto_id) + gen_row_based_json_file(row_file=file, str_pk=str_pk, float_vect=float_vector, + data_fields=data_fields, rows=rows, dim=dim, + start_uid=start_uid, err_type=err_type, **kwargs) else: - gen_column_base_json_file(col_file=file, str_pk=str_pk, - float_vect=float_vector, multi_scalars=multi_scalars, - rows=rows, dim=dim, autoid=auto_id) - -def gen_npy_files(): - # # gen numpy files - # uid = 0 - # for i in range(file_nums): - # gen_vectors_in_numpy_file(data_dir, FLOAT, rows=rows_list[0], dim=dim_list[0], num=i) - # gen_scalars_in_numpy_file(data_dir, "int", rows=rows_list[0], dim=dim_list[0], num=i, start=uid) - # uid += rows_list[0] - pass - - -def copy_files_to_bucket(client, r_source, bucket_name, force=False): - # check 'xxx' bucket exist. - found = client.bucket_exists(bucket_name) - if not found: - print(f"Bucket {bucket_name} not found, create it.") - client.make_bucket(bucket_name) - - # copy files from root source folder - os.chdir(r_source) - onlyfiles = [f for f in os.listdir(r_source) if - os.path.isfile(os.path.join(r_source, f))] - for file in onlyfiles: - if not file.startswith("."): - found = False - try: - result = client.stat_object(bucket_name, file) - found = True - except S3Error as exc: - pass - - if force: - res = client.fput_object(bucket_name, file, f"{r_source}/{file}") - print(res.object_name) - elif not found: - res = client.fput_object(bucket_name, file, f"{r_source}/{file}") - print(res.object_name) - - # copy subfolders - sub_folders = [f.name for f in os.scandir(r_source) if f.is_dir()] - for sub_folder in sub_folders: - if sub_folder not in ["backup", "tested"]: - source = f"{r_source}/{sub_folder}" - os.chdir(source) - onlyfiles = [f for f in os.listdir(source) if - os.path.isfile(os.path.join(source, f))] - for file in onlyfiles: - if not file.startswith("."): - found = False - try: - result = client.stat_object(bucket_name, f"{sub_folder}/{file}") - found = True - except S3Error as exc: - pass - - if force: - res = client.fput_object(bucket_name, f"{sub_folder}/{file}", f"{source}/{file}") - print(res.object_name) - elif not found: - res = client.fput_object(bucket_name, f"{sub_folder}/{file}", f"{source}/{file}") - print(res.object_name) - - -def copy_files_to_minio(host, bucket_name, access_key="minioadmin", secret_key="minioadmin", secure=False): - client = Minio( - host, - access_key=access_key, - secret_key=secret_key, - secure=secure, - ) - try: - # TODO: not copy all the files, just copy the new generated files - copy_files_to_bucket(client, r_source=data_source, bucket_name=bucket_name, force=False) - except S3Error as exc: - print("error occurred.", exc) - - -def parpar_bulk_load_data(json_file, row_based, rows, dim, auto_id, str_pk, float_vector, multi_scalars, file_nums, force=False): - if json_file: - gen_json_files(row_based=row_based, rows=rows, dim=dim, - auto_id=auto_id, str_pk=str_pk, float_vector=float_vector, - multi_scalars=multi_scalars, file_nums=file_nums, force=force) - - copy_files_to_minio(host=minio, bucket_name=bucket_name) + gen_column_base_json_file(col_file=file, str_pk=str_pk, float_vect=float_vector, + data_fields=data_fields, rows=rows, dim=dim, + start_uid=start_uid, err_type=err_type, **kwargs) + start_uid += rows + if multi_folder: + files.append(f"{subfolder}/{file_name}") + else: + files.append(file_name) + return files + + +def gen_npy_files(float_vector, rows, dim, data_fields, file_nums=1, force=False): + # gen numpy files + files = [] + start_uid = 0 + if file_nums == 1: + # gen the numpy file without subfolders if only one set of files + for data_field in data_fields: + if data_field == DataField.vec_field: + file_name = gen_vectors_in_numpy_file(dir=data_source, float_vector=float_vector, + rows=rows, dim=dim, force=force) + else: + file_name = gen_int_or_float_in_numpy_file(dir=data_source, data_field=data_field, + rows=rows, force=force) + files.append(file_name) else: - # TODO: for npy files - # gen_npy_files() - # copy() - pass - -# if __name__ == '__main__': -# gen_json_files(row_based=True, rows=10, -# dim=4, auto_id=False, str_pk=False, -# float_vector=True, multi_scalars=False, file_nums=2) -# -# copy_files_to_minio(host=minio, bucket_name=bucket_name) + for i in range(file_nums): + subfolder = gen_subfolder(root=data_source, dim=dim, rows=rows, file_num=i) + dir = f"{data_source}/{subfolder}" + for data_field in data_fields: + if data_field == DataField.vec_field: + file_name = gen_vectors_in_numpy_file(dir=dir, float_vector=float_vector, rows=rows, dim=dim, force=force) + else: + file_name = gen_int_or_float_in_numpy_file(dir=dir, data_field=data_field, rows=rows, start=start_uid, force=force) + files.append(f"{subfolder}/{file_name}") + start_uid += rows + return files + + +def prepare_bulk_load_json_files(row_based=True, rows=100, dim=128, + auto_id=True, str_pk=False, float_vector=True, + data_fields=[], file_nums=1, multi_folder=False, + file_type=".json", err_type="", force=False, **kwargs): + """ + Generate files based on the params in json format and copy them to minio + + :param row_based: indicate the file(s) to be generated is row based or not + :type row_based: boolean + + :param rows: the number entities to be generated in the file(s) + :type rows: int + + :param dim: dim of vector data + :type dim: int + + :param auto_id: generate primary key data or not + :type auto_id: boolean + + :param str_pk: generate string or int as primary key + :type str_pk: boolean + + :param: float_vector: generate float vectors or binary vectors + :type float_vector: boolean + + :param: data_fields: data fields to be generated in the file(s): + It supports one or all of [pk, vectors, int, float, string, boolean] + Note: it automatically adds pk field if auto_id=False + :type data_fields: list + + :param file_nums: file numbers to be generated + :type file_nums: int + + :param multi_folder: generate the files in bucket root folder or new subfolders + :type multi_folder: boolean + + :param file_type: specify the file suffix to be generate + :type file_type: str + + :param err_type: inject some errors in the file(s). + All errors should be predefined in DataErrorType + :type err_type: str + + :param force: re-generate the file(s) regardless existing or not + :type force: boolean + + :param **kwargs + * *wrong_position* (``int``) -- + indicate the error entity in the file if DataErrorType.one_entity_wrong_dim + + :return list + file names list + """ + files = gen_json_files(row_based=row_based, rows=rows, dim=dim, + auto_id=auto_id, str_pk=str_pk, float_vector=float_vector, + data_fields=data_fields, file_nums=file_nums, multi_folder=multi_folder, + file_type=file_type, err_type=err_type, force=force, **kwargs) + + copy_files_to_minio(host=minio, r_source=data_source, files=files, bucket_name=bucket_name, force=force) + return files + + +def prepare_bulk_load_numpy_files(rows, dim, data_fields=[DataField.vec_field], + float_vector=True, file_nums=1, force=False): + """ + Generate column based files based on params in numpy format and copy them to the minio + Note: each field in data_fields would be generated one numpy file. + + :param rows: the number entities to be generated in the file(s) + :type rows: int + + :param dim: dim of vector data + :type dim: int + + :param: float_vector: generate float vectors or binary vectors + :type float_vector: boolean + + :param: data_fields: data fields to be generated in the file(s): + it support one or all of [int_pk, vectors, int, float] + Note: it does not automatically adds pk field + :type data_fields: list + + :param file_nums: file numbers to be generated + The file(s) would be geneated in data_source folder if file_nums = 1 + The file(s) would be generated in different subfolers if file_nums > 1 + :type file_nums: int + + :param force: re-generate the file(s) regardless existing or not + :type force: boolean + + Return: List + File name list or file name with subfolder list + """ + files = gen_npy_files(rows=rows, dim=dim, float_vector=float_vector, + data_fields=data_fields, + file_nums=file_nums, force=force) + + copy_files_to_minio(host=minio, r_source=data_source, files=files, bucket_name=bucket_name, force=force) + return files diff --git a/tests/python_client/bulk_load/minio_comm.py b/tests/python_client/bulk_load/minio_comm.py new file mode 100644 index 0000000000000..4ead5852ec742 --- /dev/null +++ b/tests/python_client/bulk_load/minio_comm.py @@ -0,0 +1,43 @@ +import os +from minio import Minio +from minio.error import S3Error +from utils.util_log import test_log as log + + +def copy_files_to_bucket(client, r_source, target_files, bucket_name, force=False): + # check the bucket exist + found = client.bucket_exists(bucket_name) + if not found: + log.error(f"Bucket {bucket_name} not found.") + return + + # copy target files from root source folder + os.chdir(r_source) + for target_file in target_files: + found = False + try: + result = client.stat_object(bucket_name, target_file) + found = True + except S3Error as exc: + pass + + if force or not found: + res = client.fput_object(bucket_name, target_file, f"{r_source}/{target_file}") + log.info(f"copied {res.object_name} to minio") + else: + log.info(f"skip copy {res.object_name} to minio") + + +def copy_files_to_minio(host, r_source, files, bucket_name, access_key="minioadmin", secret_key="minioadmin", + secure=False, force=False): + client = Minio( + host, + access_key=access_key, + secret_key=secret_key, + secure=secure, + ) + try: + copy_files_to_bucket(client, r_source=r_source, target_files=files, bucket_name=bucket_name, force=force) + except S3Error as exc: + log.error("fail to copy files to minio", exc) + diff --git a/tests/python_client/bulk_load/test_bulk_load.py b/tests/python_client/bulk_load/test_bulk_load.py index b694ac570c084..7dd2bfbf7f4ac 100644 --- a/tests/python_client/bulk_load/test_bulk_load.py +++ b/tests/python_client/bulk_load/test_bulk_load.py @@ -8,15 +8,14 @@ from common import common_type as ct from common.common_type import CaseLabel, CheckTasks, BulkLoadStates from utils.util_log import test_log as log -from bulk_load_data import parpar_bulk_load_data, gen_json_file_name +from bulk_load_data import prepare_bulk_load_json_files, prepare_bulk_load_numpy_files,\ + DataField as df, DataErrorType -vec_field = "vectors" -pk_field = "uid" -float_field = "float_scalar" -int_field = "int_scalar" -bool_field = "bool_scalar" -string_field = "string_scalar" +default_vec_only_fields = [df.vec_field] +default_multi_fields = [df.vec_field, df.int_field, df.string_field, + df.bool_field, df.float_field] +default_vec_n_int_fields = [df.vec_field, df.int_field] def entity_suffix(entities): @@ -42,7 +41,7 @@ def gen_file_prefix(row_based=True, auto_id=True, prefix=""): return f"{prefix}col_cust" -class TestImport(TestcaseBase): +class TestBulkLoad(TestcaseBase): def setup_class(self): log.info("[setup_import] Start setup class...") @@ -71,18 +70,13 @@ def test_float_vector_only(self, row_based, auto_id, dim, entities): 5. verify search successfully 6. verify query successfully """ - parpar_bulk_load_data(json_file=True, row_based=row_based, rows=entities, dim=dim, - auto_id=auto_id, str_pk=False, float_vector=True, - multi_scalars=False, file_nums=1) - # TODO: file names shall be return by gen_json_files - file_name = gen_json_file_name(row_based=row_based, rows=entities, - dim=dim, auto_id=auto_id, str_pk=False, - float_vector=True, multi_scalars=False, file_num=0) - files = [file_name] + files = prepare_bulk_load_json_files(row_based=row_based, rows=entities, + dim=dim, auto_id=auto_id, + data_fields=default_vec_only_fields, force=True) self._connect() c_name = cf.gen_unique_str("bulkload") - fields = [cf.gen_int64_field(name=pk_field, is_primary=True), - cf.gen_float_vec_field(name=vec_field, dim=dim)] + fields = [cf.gen_int64_field(name=df.pk_field, is_primary=True), + cf.gen_float_vec_field(name=df.vec_field, dim=dim)] schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id) self.collection_wrap.init_collection(c_name, schema=schema) # import data @@ -92,11 +86,12 @@ def test_float_vector_only(self, row_based, auto_id, dim, entities): row_based=row_based, files=files) logging.info(f"bulk load task ids:{task_ids}") - completed, _ = self.utility_wrap.wait_for_bulk_load_tasks_completed(task_ids=task_ids, - timeout=30) + success, _ = self.utility_wrap.wait_for_bulk_load_tasks_completed( + task_ids=task_ids, + timeout=30) tt = time.time() - t0 - log.info(f"bulk load state:{completed} in {tt}") - assert completed + log.info(f"bulk load state:{success} in {tt}") + assert success num_entities = self.collection_wrap.num_entities log.info(f" collection entities: {num_entities}") @@ -104,10 +99,10 @@ def test_float_vector_only(self, row_based, auto_id, dim, entities): # verify imported data is available for search self.collection_wrap.load() - log.info(f"query seg info: {self.utility_wrap.get_query_segment_info(c_name)[0]}") + # log.info(f"query seg info: {self.utility_wrap.get_query_segment_info(c_name)[0]}") search_data = cf.gen_vectors(1, dim) search_params = {"metric_type": "L2", "params": {"nprobe": 2}} - res, _ = self.collection_wrap.search(search_data, vec_field, + res, _ = self.collection_wrap.search(search_data, df.vec_field, param=search_params, limit=1, check_task=CheckTasks.check_search_results, check_items={"nq": 1, @@ -118,7 +113,6 @@ def test_float_vector_only(self, row_based, auto_id, dim, entities): @pytest.mark.parametrize("row_based", [True, False]) @pytest.mark.parametrize("dim", [8]) # 8 @pytest.mark.parametrize("entities", [100]) # 100 - @pytest.mark.xfail(reason="milvus crash issue #16858") def test_str_pk_float_vector_only(self, row_based, dim, entities): """ collection schema: [str_pk, float_vector] @@ -131,12 +125,14 @@ def test_str_pk_float_vector_only(self, row_based, dim, entities): 6. verify query successfully """ auto_id = False # no auto id for string_pk schema - prefix = gen_file_prefix(row_based=row_based, auto_id=auto_id) - files = [f"{prefix}_str_pk_float_vectors_only_{dim}d_{entities}.json"] + string_pk = True + files = prepare_bulk_load_json_files(row_based=row_based, rows=entities, + dim=dim, auto_id=auto_id, str_pk=string_pk, + data_fields=default_vec_only_fields) self._connect() - c_name = cf.gen_unique_str(prefix) - fields = [cf.gen_string_field(name=pk_field, is_primary=True), - cf.gen_float_vec_field(name=vec_field, dim=dim)] + c_name = cf.gen_unique_str() + fields = [cf.gen_string_field(name=df.pk_field, is_primary=True), + cf.gen_float_vec_field(name=df.vec_field, dim=dim)] schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id) self.collection_wrap.init_collection(c_name, schema=schema) # import data @@ -160,7 +156,7 @@ def test_str_pk_float_vector_only(self, row_based, dim, entities): log.info(f"query seg info: {self.utility_wrap.get_query_segment_info(c_name)[0]}") search_data = cf.gen_vectors(1, dim) search_params = {"metric_type": "L2", "params": {"nprobe": 2}} - res, _ = self.collection_wrap.search(search_data, vec_field, + res, _ = self.collection_wrap.search(search_data, df.vec_field, param=search_params, limit=1, check_task=CheckTasks.check_search_results, check_items={"nq": 1, @@ -171,7 +167,7 @@ def test_str_pk_float_vector_only(self, row_based, dim, entities): @pytest.mark.parametrize("row_based", [True, False]) @pytest.mark.parametrize("auto_id", [True, False]) @pytest.mark.parametrize("dim", [4]) - @pytest.mark.parametrize("entities", [10000]) + @pytest.mark.parametrize("entities", [3000]) def test_partition_float_vector_int_scalar(self, row_based, auto_id, dim, entities): """ collection: customized partitions @@ -183,13 +179,14 @@ def test_partition_float_vector_int_scalar(self, row_based, auto_id, dim, entiti 5. verify index status 6. verify search and query """ - prefix = gen_file_prefix(row_based=row_based, auto_id=auto_id) - files = [f"{prefix}_float_vectors_int_scalar_{dim}d_{entities}.json"] + files = prepare_bulk_load_json_files(row_based=row_based, rows=entities, + dim=dim, auto_id=auto_id, + data_fields=default_vec_n_int_fields, file_nums=1) self._connect() - c_name = cf.gen_unique_str(prefix) - fields = [cf.gen_int64_field(name=pk_field, is_primary=True), - cf.gen_float_vec_field(name=vec_field, dim=dim), - cf.gen_int32_field(name="int_scalar")] + c_name = cf.gen_unique_str() + fields = [cf.gen_int64_field(name=df.pk_field, is_primary=True), + cf.gen_float_vec_field(name=df.vec_field, dim=dim), + cf.gen_int32_field(name=df.int_field)] schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id) self.collection_wrap.init_collection(c_name, schema=schema) # create a partition @@ -197,7 +194,7 @@ def test_partition_float_vector_int_scalar(self, row_based, auto_id, dim, entiti m_partition, _ = self.collection_wrap.create_partition(partition_name=p_name) # build index before bulk load index_params = {"index_type": "IVF_SQ8", "params": {"nlist": 128}, "metric_type": "L2"} - self.collection_wrap.create_index(field_name=vec_field, index_params=index_params) + self.collection_wrap.create_index(field_name=df.vec_field, index_params=index_params) # load before bulk load self.collection_wrap.load(partition_names=[p_name]) @@ -208,9 +205,9 @@ def test_partition_float_vector_int_scalar(self, row_based, auto_id, dim, entiti row_based=row_based, files=files) logging.info(f"bulk load task ids:{task_ids}") - success, _ = self.utility_wrap.\ + success, state = self.utility_wrap.\ wait_for_bulk_load_tasks_completed(task_ids=task_ids, - target_state=BulkLoadStates.BulkLoadDataQueryable, + target_state=BulkLoadStates.BulkLoadPersisted, timeout=30) tt = time.time() - t0 log.info(f"bulk load state:{success} in {tt}") @@ -219,24 +216,30 @@ def test_partition_float_vector_int_scalar(self, row_based, auto_id, dim, entiti assert m_partition.num_entities == entities assert self.collection_wrap.num_entities == entities + # TODO: remove sleep when indexed state ready + sleep(5) + res, _ = self.utility_wrap.index_building_progress(c_name) + exp_res = {'total_rows': entities, 'indexed_rows': entities} + assert res == exp_res + log.info(f"query seg info: {self.utility_wrap.get_query_segment_info(c_name)[0]}") search_data = cf.gen_vectors(1, dim) search_params = {"metric_type": "L2", "params": {"nprobe": 16}} - res, _ = self.collection_wrap.search(search_data, vec_field, + res, _ = self.collection_wrap.search(search_data, df.vec_field, param=search_params, limit=1, check_task=CheckTasks.check_search_results, check_items={"nq": 1, "limit": 1}) @pytest.mark.tags(CaseLabel.L3) - @pytest.mark.parametrize("row_based", [True]) - @pytest.mark.parametrize("auto_id", [True]) + @pytest.mark.parametrize("row_based", [True, False]) + @pytest.mark.parametrize("auto_id", [True, False]) @pytest.mark.parametrize("dim", [16]) - @pytest.mark.parametrize("entities", [10]) + @pytest.mark.parametrize("entities", [200]) + @pytest.mark.xfail(reason="issue #16890") def test_binary_vector_only(self, row_based, auto_id, dim, entities): """ - collection: auto_id collection schema: [pk, binary_vector] Steps: 1. create collection @@ -248,18 +251,20 @@ def test_binary_vector_only(self, row_based, auto_id, dim, entities): 7. verify search successfully 6. verify query successfully """ - prefix = gen_file_prefix(row_based=row_based, auto_id=auto_id) - files = [f"{prefix}_binary_vectors_only_{dim}d_{entities}.json"] + float_vec = False + files = prepare_bulk_load_json_files(row_based=row_based, rows=entities, + dim=dim, auto_id=auto_id, float_vector=float_vec, + data_fields=default_vec_only_fields) self._connect() c_name = cf.gen_unique_str() - fields = [cf.gen_int64_field(name=pk_field, is_primary=True), - cf.gen_binary_vec_field(name=vec_field, dim=dim)] + fields = [cf.gen_int64_field(name=df.pk_field, is_primary=True), + cf.gen_binary_vec_field(name=df.vec_field, dim=dim)] schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id) self.collection_wrap.init_collection(c_name, schema=schema) # build index before bulk load binary_index_params = {"index_type": "BIN_IVF_FLAT", "metric_type": "JACCARD", "params": {"nlist": 64}} - self.collection_wrap.create_index(field_name=vec_field, index_params=binary_index_params) + self.collection_wrap.create_index(field_name=df.vec_field, index_params=binary_index_params) # import data t0 = time.time() @@ -267,20 +272,21 @@ def test_binary_vector_only(self, row_based, auto_id, dim, entities): row_based=row_based, files=files) logging.info(f"bulk load task ids:{task_ids}") + # TODO: Update to BulkLoadDataIndexed when issue #16889 fixed success, _ = self.utility_wrap.wait_for_bulk_load_tasks_completed( task_ids=task_ids, - target_state=BulkLoadStates.BulkLoadDataIndexed, + target_state=BulkLoadStates.BulkLoadPersisted, timeout=30) tt = time.time() - t0 log.info(f"bulk load state:{success} in {tt}") assert success - # verify build index status - # sleep(3) - # TODO: verify build index after index_building_progress() refactor - res, _ = self.utility_wrap.index_building_progress(c_name) - exp_res = {'total_rows': entities, 'indexed_rows': entities} - assert res == exp_res + # # verify build index status + # sleep(10) + # # TODO: verify build index after issue #16890 fixed + # res, _ = self.utility_wrap.index_building_progress(c_name) + # exp_res = {'total_rows': entities, 'indexed_rows': entities} + # assert res == exp_res # TODO: verify num entities assert self.collection_wrap.num_entities == entities @@ -291,7 +297,7 @@ def test_binary_vector_only(self, row_based, auto_id, dim, entities): # verify search and query search_data = cf.gen_binary_vectors(1, dim)[1] search_params = {"metric_type": "JACCARD", "params": {"nprobe": 10}} - res, _ = self.collection_wrap.search(search_data, vec_field, + res, _ = self.collection_wrap.search(search_data, df.vec_field, param=search_params, limit=1, check_task=CheckTasks.check_search_results, check_items={"nq": 1, @@ -301,8 +307,9 @@ def test_binary_vector_only(self, row_based, auto_id, dim, entities): @pytest.mark.parametrize("row_based", [True, False]) @pytest.mark.parametrize("auto_id", [True, False]) @pytest.mark.parametrize("fields_num_in_file", ["equal", "more", "less"]) # "equal", "more", "less" - @pytest.mark.parametrize("dim", [1024]) # 1024 - @pytest.mark.parametrize("entities", [5000]) # 5000 + @pytest.mark.parametrize("dim", [16]) + @pytest.mark.parametrize("entities", [500]) + # it occasionally fails due to issue #16947 def test_float_vector_multi_scalars(self, row_based, auto_id, fields_num_in_file, dim, entities): """ collection schema: [pk, float_vector, @@ -319,16 +326,17 @@ def test_float_vector_multi_scalars(self, row_based, auto_id, fields_num_in_file 7. verify search successfully 6. verify query successfully """ - prefix = gen_file_prefix(row_based=row_based, auto_id=auto_id) - files = [f"{prefix}_float_vectors_multi_scalars_{dim}d_{entities}.json"] + files = prepare_bulk_load_json_files(row_based=row_based, rows=entities, + dim=dim, auto_id=auto_id, + data_fields=default_multi_fields) additional_field = "int_scalar_add" self._connect() - c_name = cf.gen_unique_str(prefix) - fields = [cf.gen_int64_field(name=pk_field, is_primary=True), - cf.gen_float_vec_field(name=vec_field, dim=dim), - cf.gen_int32_field(name=int_field), - cf.gen_string_field(name=string_field), - cf.gen_bool_field(name=bool_field)] + c_name = cf.gen_unique_str() + fields = [cf.gen_int64_field(name=df.pk_field, is_primary=True), + cf.gen_float_vec_field(name=df.vec_field, dim=dim), + cf.gen_int32_field(name=df.int_field), + cf.gen_string_field(name=df.string_field), + cf.gen_bool_field(name=df.bool_field)] if fields_num_in_file == "more": fields.pop() elif fields_num_in_file == "less": @@ -345,12 +353,12 @@ def test_float_vector_multi_scalars(self, row_based, auto_id, fields_num_in_file logging.info(f"bulk load task ids:{task_ids}") success, states = self.utility_wrap.wait_for_bulk_load_tasks_completed( task_ids=task_ids, - target_state=BulkLoadStates.BulkLoadDataQueryable, + target_state=BulkLoadStates.BulkLoadPersisted, timeout=30) tt = time.time() - t0 log.info(f"bulk load state:{success} in {tt}") if fields_num_in_file == "less": - assert not success # TODO: check error msg + assert not success if row_based: failed_reason = f"JSON row validator: field {additional_field} missed at the row 0" else: @@ -361,9 +369,9 @@ def test_float_vector_multi_scalars(self, row_based, auto_id, fields_num_in_file else: assert success - # TODO: assert num entities - log.info(f" collection entities: {self.collection_wrap.num_entities}") - assert self.collection_wrap.num_entities == entities + num_entities = self.collection_wrap.num_entities + log.info(f" collection entities: {num_entities}") + assert num_entities == entities # verify no index res, _ = self.collection_wrap.has_index() @@ -371,7 +379,7 @@ def test_float_vector_multi_scalars(self, row_based, auto_id, fields_num_in_file # verify search and query search_data = cf.gen_vectors(1, dim) search_params = {"metric_type": "L2", "params": {"nprobe": 2}} - res, _ = self.collection_wrap.search(search_data, vec_field, + res, _ = self.collection_wrap.search(search_data, df.vec_field, param=search_params, limit=1, check_task=CheckTasks.check_search_results, check_items={"nq": 1, @@ -381,7 +389,7 @@ def test_float_vector_multi_scalars(self, row_based, auto_id, fields_num_in_file # build index index_params = {"index_type": "HNSW", "params": {"M": 8, "efConstruction": 100}, "metric_type": "IP"} - self.collection_wrap.create_index(field_name=vec_field, index_params=index_params) + self.collection_wrap.create_index(field_name=df.vec_field, index_params=index_params) # release collection and reload self.collection_wrap.release() @@ -393,7 +401,7 @@ def test_float_vector_multi_scalars(self, row_based, auto_id, fields_num_in_file # search and query search_params = {"params": {"ef": 64}, "metric_type": "IP"} - res, _ = self.collection_wrap.search(search_data, vec_field, + res, _ = self.collection_wrap.search(search_data, df.vec_field, param=search_params, limit=1, check_task=CheckTasks.check_search_results, check_items={"nq": 1, @@ -401,10 +409,9 @@ def test_float_vector_multi_scalars(self, row_based, auto_id, fields_num_in_file @pytest.mark.tags(CaseLabel.L3) @pytest.mark.parametrize("row_based", [True, False]) - @pytest.mark.parametrize("fields_num_in_file", ["equal"]) # "equal", "more", "less" - @pytest.mark.parametrize("dim", [4]) # 1024 - @pytest.mark.parametrize("entities", [10]) # 5000 - @pytest.mark.xfail(reason="milvus crash issue #16858") + @pytest.mark.parametrize("fields_num_in_file", ["equal", "more", "less"]) # "equal", "more", "less" + @pytest.mark.parametrize("dim", [16]) # 1024 + @pytest.mark.parametrize("entities", [500]) # 5000 def test_string_pk_float_vector_multi_scalars(self, row_based, fields_num_in_file, dim, entities): """ collection schema: [str_pk, float_vector, @@ -421,21 +428,24 @@ def test_string_pk_float_vector_multi_scalars(self, row_based, fields_num_in_fil 7. verify search successfully 6. verify query successfully """ - prefix = gen_file_prefix(row_based=row_based, auto_id=False) - files = [f"{prefix}_str_pk_float_vectors_multi_scalars_{dim}d_{entities}.json"] + string_pk = True + auto_id = False + files = prepare_bulk_load_json_files(row_based=row_based, rows=entities, + dim=dim, auto_id=auto_id, str_pk=string_pk, + data_fields=default_multi_fields) additional_field = "int_scalar_add" self._connect() - c_name = cf.gen_unique_str(prefix) - fields = [cf.gen_string_field(name=pk_field, is_primary=True), - cf.gen_float_vec_field(name=vec_field, dim=dim), - cf.gen_int32_field(name=int_field), - cf.gen_string_field(name=string_field), - cf.gen_bool_field(name=bool_field)] + c_name = cf.gen_unique_str() + fields = [cf.gen_string_field(name=df.pk_field, is_primary=True), + cf.gen_float_vec_field(name=df.vec_field, dim=dim), + cf.gen_int32_field(name=df.int_field), + cf.gen_string_field(name=df.string_field), + cf.gen_bool_field(name=df.bool_field)] if fields_num_in_file == "more": fields.pop() elif fields_num_in_file == "less": fields.append(cf.gen_int32_field(name=additional_field)) - schema = cf.gen_collection_schema(fields=fields, auto_id=False) + schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id) self.collection_wrap.init_collection(c_name, schema=schema) # load collection self.collection_wrap.load() @@ -447,7 +457,7 @@ def test_string_pk_float_vector_multi_scalars(self, row_based, fields_num_in_fil logging.info(f"bulk load task ids:{task_ids}") success, states = self.utility_wrap.wait_for_bulk_load_tasks_completed( task_ids=task_ids, - target_state=BulkLoadStates.BulkLoadDataQueryable, + target_state=BulkLoadStates.BulkLoadPersisted, timeout=30) tt = time.time() - t0 log.info(f"bulk load state:{success} in {tt}") @@ -473,7 +483,7 @@ def test_string_pk_float_vector_multi_scalars(self, row_based, fields_num_in_fil # verify search and query search_data = cf.gen_vectors(1, dim) search_params = {"metric_type": "L2", "params": {"nprobe": 2}} - res, _ = self.collection_wrap.search(search_data, vec_field, + res, _ = self.collection_wrap.search(search_data, df.vec_field, param=search_params, limit=1, check_task=CheckTasks.check_search_results, check_items={"nq": 1, @@ -483,7 +493,7 @@ def test_string_pk_float_vector_multi_scalars(self, row_based, fields_num_in_fil # build index index_params = {"index_type": "HNSW", "params": {"M": 8, "efConstruction": 100}, "metric_type": "IP"} - self.collection_wrap.create_index(field_name=vec_field, index_params=index_params) + self.collection_wrap.create_index(field_name=df.vec_field, index_params=index_params) # release collection and reload self.collection_wrap.release() @@ -495,20 +505,20 @@ def test_string_pk_float_vector_multi_scalars(self, row_based, fields_num_in_fil # search and query search_params = {"params": {"ef": 64}, "metric_type": "IP"} - res, _ = self.collection_wrap.search(search_data, vec_field, + res, _ = self.collection_wrap.search(search_data, df.vec_field, param=search_params, limit=1, check_task=CheckTasks.check_search_results, check_items={"nq": 1, "limit": 1}) @pytest.mark.tags(CaseLabel.L3) - @pytest.mark.parametrize("row_based", [True]) # True, False - @pytest.mark.parametrize("auto_id", [True]) # True, False + @pytest.mark.parametrize("row_based", [True, False]) # True, False + @pytest.mark.parametrize("auto_id", [True, False]) # True, False @pytest.mark.parametrize("dim", [16]) # 16 - @pytest.mark.parametrize("entities", [3000]) # 3000 - @pytest.mark.parametrize("file_nums", [10]) # 10, max task nums 32? need improve - @pytest.mark.parametrize("multi_folder", [False]) # True, False - @pytest.mark.xfail(reason="BulkloadIndexed cannot be reached for issue ##16848") + @pytest.mark.parametrize("entities", [100]) # 3000 + @pytest.mark.parametrize("file_nums", [2]) # 10 + @pytest.mark.parametrize("multi_folder", [True, False]) # True, False + # TODO: reason="BulkloadIndexed cannot be reached for issue #16889") def test_float_vector_from_multi_files(self, row_based, auto_id, dim, entities, file_nums, multi_folder): """ collection: auto_id @@ -523,40 +533,35 @@ def test_float_vector_from_multi_files(self, row_based, auto_id, dim, entities, 6. verify search successfully 7. verify query successfully """ - prefix = gen_file_prefix(row_based=row_based, auto_id=auto_id) - files = [] - if not multi_folder: - for i in range(file_nums): - files.append(f"{prefix}_float_vectors_multi_scalars_{dim}d_{entities}_{i}.json") - else: - # sub_folder index 20 to 29 - for i in range(20, 30): - files.append(f"/sub{i}/{prefix}_float_vectors_multi_scalars_{dim}d_{entities}_{i}.json") + files = prepare_bulk_load_json_files(row_based=row_based, rows=entities, + dim=dim, auto_id=auto_id, + data_fields=default_multi_fields, + file_nums=file_nums, multi_folder=multi_folder) self._connect() - c_name = cf.gen_unique_str(prefix) - fields = [cf.gen_int64_field(name=pk_field, is_primary=True), - cf.gen_float_vec_field(name=vec_field, dim=dim), - cf.gen_int32_field(name=int_field), - cf.gen_string_field(name=string_field), - cf.gen_bool_field(name=bool_field) + c_name = cf.gen_unique_str() + fields = [cf.gen_int64_field(name=df.pk_field, is_primary=True), + cf.gen_float_vec_field(name=df.vec_field, dim=dim), + cf.gen_int32_field(name=df.int_field), + cf.gen_string_field(name=df.string_field), + cf.gen_bool_field(name=df.bool_field) ] schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id) self.collection_wrap.init_collection(c_name, schema=schema) # build index index_params = ct.default_index - self.collection_wrap.create_index(field_name=vec_field, index_params=index_params) + self.collection_wrap.create_index(field_name=df.vec_field, index_params=index_params) # load collection self.collection_wrap.load() # import data t0 = time.time() task_ids, _ = self.utility_wrap.bulk_load(collection_name=c_name, - partition_name='', row_based=row_based, files=files) logging.info(f"bulk load task ids:{task_ids}") + # TODO: update to BulkLoadDataIndexed after issue #16889 fixed success, states = self.utility_wrap.wait_for_bulk_load_tasks_completed( task_ids=task_ids, - target_state=BulkLoadStates.BulkLoadDataIndexed, + target_state=BulkLoadStates.BulkLoadPersisted, timeout=300) tt = time.time() - t0 log.info(f"bulk load state:{success} in {tt}") @@ -572,14 +577,14 @@ def test_float_vector_from_multi_files(self, row_based, auto_id, dim, entities, assert self.collection_wrap.num_entities == entities * file_nums # verify index built - res, _ = self.utility_wrap.index_building_progress(c_name) - exp_res = {'total_rows': entities * file_nums, 'indexed_rows': entities * file_nums} - assert res == exp_res + # res, _ = self.utility_wrap.index_building_progress(c_name) + # exp_res = {'total_rows': entities * file_nums, 'indexed_rows': entities * file_nums} + # assert res == exp_res # verify search and query search_data = cf.gen_vectors(1, dim) search_params = ct.default_search_params - res, _ = self.collection_wrap.search(search_data, vec_field, + res, _ = self.collection_wrap.search(search_data, df.vec_field, param=search_params, limit=1, check_task=CheckTasks.check_search_results, check_items={"nq": 1, @@ -591,10 +596,10 @@ def test_float_vector_from_multi_files(self, row_based, auto_id, dim, entities, @pytest.mark.parametrize("row_based", [True, False]) @pytest.mark.parametrize("auto_id", [True, False]) @pytest.mark.parametrize("multi_fields", [True, False]) - @pytest.mark.parametrize("dim", [128]) # 128 - @pytest.mark.parametrize("entities", [1000]) # 1000 + @pytest.mark.parametrize("dim", [15]) + @pytest.mark.parametrize("entities", [200]) # TODO: string data shall be re-generated - def test_float_vector_from_npy_file(self, row_based, auto_id, multi_fields, dim, entities): + def test_float_vector_from_numpy_file(self, row_based, auto_id, multi_fields, dim, entities): """ collection schema 1: [pk, float_vector] schema 2: [pk, float_vector, int_scalar, string_scalar, float_scalar, bool_scalar] @@ -607,33 +612,41 @@ def test_float_vector_from_npy_file(self, row_based, auto_id, multi_fields, dim, 4.1 verify the data entities equal the import data 4.2 verify search and query successfully """ - vec_field = f"vectors_{dim}d_{entities}" - self._connect() - c_name = cf.gen_unique_str() + data_fields = [df.vec_field] + np_files = prepare_bulk_load_numpy_files(rows=entities, dim=dim, data_fields=data_fields, + force=True) if not multi_fields: - fields = [cf.gen_int64_field(name=pk_field, is_primary=True), - cf.gen_float_vec_field(name=vec_field, dim=dim)] + fields = [cf.gen_int64_field(name=df.pk_field, is_primary=True), + cf.gen_float_vec_field(name=df.vec_field, dim=dim)] + if not auto_id: + scalar_fields = [df.pk_field] + else: + scalar_fields = None else: - fields = [cf.gen_int64_field(name=pk_field, is_primary=True), - cf.gen_float_vec_field(name=vec_field, dim=dim), - cf.gen_int32_field(name=int_field), - cf.gen_string_field(name=string_field), - cf.gen_bool_field(name=bool_field) + fields = [cf.gen_int64_field(name=df.pk_field, is_primary=True), + cf.gen_float_vec_field(name=df.vec_field, dim=dim), + cf.gen_int32_field(name=df.int_field), + cf.gen_string_field(name=df.string_field), + cf.gen_bool_field(name=df.bool_field) ] + if not auto_id: + scalar_fields = [df.pk_field, df.float_field, df.int_field, df.string_field, df.bool_field] + else: + scalar_fields = [df.int_field, df.string_field, df.bool_field, df.float_field] + + files = np_files + if scalar_fields is not None: + json_files = prepare_bulk_load_json_files(row_based=row_based, dim=dim, + auto_id=auto_id, rows=entities, + data_fields=scalar_fields, force=True) + files = np_files + json_files + + self._connect() + c_name = cf.gen_unique_str() schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id) self.collection_wrap.init_collection(c_name, schema=schema) # import data - files = [f"{vec_field}.npy"] # npy file name shall be the vector field name - if not multi_fields: - if not auto_id: - files.append(f"col_uid_only_{dim}d_{entities}.json") - files.reverse() - else: - if not auto_id: - files.append(f"col_uid_multi_scalars_{dim}d_{entities}.json") - else: - files.append(f"col_multi_scalars_{dim}d_{entities}.json") t0 = time.time() task_ids, _ = self.utility_wrap.bulk_load(collection_name=c_name, row_based=row_based, @@ -647,17 +660,13 @@ def test_float_vector_from_npy_file(self, row_based, auto_id, multi_fields, dim, if row_based: assert not success failed_reason1 = "unsupported file type for row-based mode" - if auto_id: - failed_reason2 = f"invalid row-based JSON format, the key {int_field} is not found" - else: - failed_reason2 = f"invalid row-based JSON format, the key {pk_field} is not found" + failed_reason2 = f"JSON row validator: field {df.vec_field} missed at the row 0" for state in states.values(): assert state.state_name == "BulkLoadFailed" assert failed_reason1 in state.infos.get("failed_reason", "") or \ failed_reason2 in state.infos.get("failed_reason", "") else: assert success - # TODO: assert num entities log.info(f" collection entities: {self.collection_wrap.num_entities}") assert self.collection_wrap.num_entities == entities @@ -666,7 +675,7 @@ def test_float_vector_from_npy_file(self, row_based, auto_id, multi_fields, dim, log.info(f"query seg info: {self.utility_wrap.get_query_segment_info(c_name)[0]}") search_data = cf.gen_vectors(1, dim) search_params = {"metric_type": "L2", "params": {"nprobe": 2}} - res, _ = self.collection_wrap.search(search_data, vec_field, + res, _ = self.collection_wrap.search(search_data, df.vec_field, param=search_params, limit=1, check_task=CheckTasks.check_search_results, check_items={"nq": 1, @@ -688,16 +697,18 @@ def test_data_type_float_on_int_pk(self, row_based, dim, entities): 3. verify the data entities 4. verify query successfully """ - prefix = gen_file_prefix(row_based=row_based, auto_id=False, prefix="float_on_int_pk_") - files = [f"{prefix}_float_vectors_multi_scalars_{dim}d_{entities}_0.json"] + files = prepare_bulk_load_json_files(row_based=row_based, rows=entities, + dim=dim, auto_id=False, + data_fields=default_multi_fields, + err_type=DataErrorType.float_on_int_pk, force=True) self._connect() - c_name = cf.gen_unique_str(prefix) + c_name = cf.gen_unique_str() # TODO: add string pk - fields = [cf.gen_int64_field(name=pk_field, is_primary=True), - cf.gen_float_vec_field(name=vec_field, dim=dim), - cf.gen_int32_field(name=int_field), - cf.gen_string_field(name=string_field), - cf.gen_bool_field(name=bool_field) + fields = [cf.gen_int64_field(name=df.pk_field, is_primary=True), + cf.gen_float_vec_field(name=df.vec_field, dim=dim), + cf.gen_int32_field(name=df.int_field), + cf.gen_string_field(name=df.string_field), + cf.gen_bool_field(name=df.bool_field) ] schema = cf.gen_collection_schema(fields=fields, auto_id=False) self.collection_wrap.init_collection(c_name, schema=schema) @@ -716,8 +727,8 @@ def test_data_type_float_on_int_pk(self, row_based, dim, entities): self.collection_wrap.load() # the pk value was automatically convert to int from float - res, _ = self.collection_wrap.query(expr=f"{pk_field} in [3]", output_fields=[pk_field]) - assert [{pk_field: 3}] == res + res, _ = self.collection_wrap.query(expr=f"{df.pk_field} in [3]", output_fields=[df.pk_field]) + assert [{df.pk_field: 3}] == res @pytest.mark.tags(CaseLabel.L3) @pytest.mark.parametrize("row_based", [True, False]) @@ -735,17 +746,20 @@ def test_data_type_int_on_float_scalar(self, row_based, auto_id, dim, entities): 3. verify the data entities 4. verify query successfully """ - prefix = gen_file_prefix(row_based=row_based, auto_id=auto_id, prefix="int_on_float_scalar_") - files = [f"{prefix}_float_vectors_multi_scalars_{dim}d_{entities}_0.json"] + files = prepare_bulk_load_json_files(row_based=row_based, rows=entities, + dim=dim, auto_id=auto_id, + data_fields=default_multi_fields, + err_type=DataErrorType.int_on_float_scalar, force=True) + self._connect() - c_name = cf.gen_unique_str(prefix) + c_name = cf.gen_unique_str() # TODO: add string pk - fields = [cf.gen_int64_field(name=pk_field, is_primary=True), - cf.gen_float_vec_field(name=vec_field, dim=dim), - cf.gen_int32_field(name=int_field), - cf.gen_float_field(name=float_field), - cf.gen_string_field(name=string_field), - cf.gen_bool_field(name=bool_field) + fields = [cf.gen_int64_field(name=df.pk_field, is_primary=True), + cf.gen_float_vec_field(name=df.vec_field, dim=dim), + cf.gen_int32_field(name=df.int_field), + cf.gen_float_field(name=df.float_field), + cf.gen_string_field(name=df.string_field), + cf.gen_bool_field(name=df.bool_field) ] schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id) self.collection_wrap.init_collection(c_name, schema=schema) @@ -763,20 +777,84 @@ def test_data_type_int_on_float_scalar(self, row_based, auto_id, dim, entities): self.collection_wrap.load() - # the pk value was automatically convert to int from float - res, _ = self.collection_wrap.query(expr=f"{float_field} in [1.0]", output_fields=[float_field]) - assert res[0].get(float_field, 0) == 1.0 + # it was automatically converted from int to float + search_data = cf.gen_vectors(1, dim) + search_params = {"metric_type": "L2", "params": {"nprobe": 2}} + res, _ = self.collection_wrap.search(search_data, df.vec_field, + param=search_params, limit=1, + check_task=CheckTasks.check_search_results, + check_items={"nq": 1, + "limit": 1}) + uids = res[0].ids + res, _ = self.collection_wrap.query(expr=f"{df.pk_field} in {uids}", output_fields=[df.float_field]) + assert isinstance(res[0].get(df.float_field, 1), float) @pytest.mark.tags(CaseLabel.L3) @pytest.mark.parametrize("auto_id", [True, False]) - @pytest.mark.parametrize("dim", [16]) # 128 - @pytest.mark.parametrize("entities", [100]) # 1000 - @pytest.mark.parametrize("file_nums", [32]) # 32, max task nums 32? need improve - @pytest.mark.skip(season="redesign after issue #16698 fixed") - def test_multi_numpy_files_from_multi_folders(self, auto_id, dim, entities, file_nums): + @pytest.mark.parametrize("dim", [128]) # 128 + @pytest.mark.parametrize("entities", [1000]) # 1000 + @pytest.mark.parametrize("with_int_field", [True, False]) + def test_with_uid_n_int_numpy(self, auto_id, dim, entities, with_int_field): """ collection schema 1: [pk, float_vector] - data file: .npy files + data file: vectors.npy and uid.npy + Steps: + 1. create collection + 2. import data + 3. verify failed with errors + """ + data_fields = [df.vec_field] + fields = [cf.gen_int64_field(name=df.pk_field, is_primary=True), + cf.gen_float_vec_field(name=df.vec_field, dim=dim)] + if not auto_id: + data_fields.append(df.pk_field) + if with_int_field: + data_fields.append(df.int_field) + fields.append(cf.gen_int64_field(name=df.int_field)) + files = prepare_bulk_load_numpy_files(rows=entities, dim=dim, + data_fields=data_fields, + force=True) + self._connect() + c_name = cf.gen_unique_str() + schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id) + self.collection_wrap.init_collection(c_name, schema=schema) + + # import data + t0 = time.time() + task_ids, _ = self.utility_wrap.bulk_load(collection_name=c_name, + row_based=False, + files=files) + logging.info(f"bulk load task ids:{task_ids}") + success, states = self.utility_wrap.wait_for_bulk_load_tasks_completed(task_ids=task_ids, + timeout=30) + tt = time.time() - t0 + log.info(f"bulk load state:{success} in {tt}") + assert success + num_entities = self.collection_wrap.num_entities + log.info(f" collection entities: {num_entities}") + assert num_entities == entities + + # verify imported data is available for search + self.collection_wrap.load() + # log.info(f"query seg info: {self.utility_wrap.get_query_segment_info(c_name)[0]}") + search_data = cf.gen_vectors(1, dim) + search_params = {"metric_type": "L2", "params": {"nprobe": 2}} + res, _ = self.collection_wrap.search(search_data, df.vec_field, + param=search_params, limit=1, + check_task=CheckTasks.check_search_results, + check_items={"nq": 1, + "limit": 1}) + + @pytest.mark.tags(CaseLabel.L3) + @pytest.mark.parametrize("auto_id", [True]) + @pytest.mark.parametrize("dim", [6]) + @pytest.mark.parametrize("entities", [10]) + @pytest.mark.parametrize("file_nums", [2]) # 32, max task nums 32? need improve + @pytest.mark.xfail(reason="only one numpy file imported successfully, issue #16992") + def test_multi_numpy_files_from_diff_folders(self, auto_id, dim, entities, file_nums): + """ + collection schema 1: [pk, float_vector] + data file: .npy files in different folders Steps: 1. create collection 2. import data @@ -785,30 +863,32 @@ def test_multi_numpy_files_from_multi_folders(self, auto_id, dim, entities, file 4.1 verify the data entities equal the import data 4.2 verify search and query successfully """ - vec_field = f"vectors_{dim}d_{entities}" + row_based = False # numpy files supports only column based + data_fields = [df.vec_field] + if not auto_id: + data_fields.append(df.pk_field) + files = prepare_bulk_load_numpy_files(rows=entities, dim=dim, + data_fields=data_fields, + file_nums=file_nums, force=True) self._connect() c_name = cf.gen_unique_str() - fields = [cf.gen_int64_field(name=pk_field, is_primary=True), - cf.gen_float_vec_field(name=vec_field, dim=dim)] + fields = [cf.gen_int64_field(name=df.pk_field, is_primary=True), + cf.gen_float_vec_field(name=df.vec_field, dim=dim)] schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id) self.collection_wrap.init_collection(c_name, schema=schema) # build index index_params = ct.default_index - self.collection_wrap.create_index(field_name=vec_field, index_params=index_params) + self.collection_wrap.create_index(field_name=df.vec_field, index_params=index_params) # load collection self.collection_wrap.load() - # import data - for i in range(file_nums): - files = [f"/{i}/{vec_field}.npy"] # npy file name shall be the vector field name - if not auto_id: - files.append(f"/{i}/{pk_field}.npy") - t0 = time.time() - task_ids, _ = self.utility_wrap.bulk_load(collection_name=c_name, - row_based=False, - files=files) - logging.info(f"bulk load task ids:{task_ids}") - success, states = self.utility_wrap.wait_for_bulk_load_tasks_completed(task_ids=task_ids, - timeout=30) + t0 = time.time() + task_ids, _ = self.utility_wrap.bulk_load(collection_name=c_name, + row_based=row_based, + files=files) + success, states = self.utility_wrap.\ + wait_for_bulk_load_tasks_completed(task_ids=task_ids, + target_state=BulkLoadStates.BulkLoadPersisted, + timeout=30) tt = time.time() - t0 log.info(f"bulk load state:{success} in {tt}") @@ -817,10 +897,10 @@ def test_multi_numpy_files_from_multi_folders(self, auto_id, dim, entities, file assert self.collection_wrap.num_entities == entities * file_nums # verify search and query - sleep(10) + # sleep(10) search_data = cf.gen_vectors(1, dim) search_params = ct.default_search_params - res, _ = self.collection_wrap.search(search_data, vec_field, + res, _ = self.collection_wrap.search(search_data, df.vec_field, param=search_params, limit=1, check_task=CheckTasks.check_search_results, check_items={"nq": 1, @@ -908,7 +988,7 @@ def test_from_customize_bucket(self): # pass -class TestImportInvalidParams(TestcaseBase): +class TestBulkLoadInvalidParams(TestcaseBase): @pytest.mark.tags(CaseLabel.L3) @pytest.mark.parametrize("row_based", [True, False]) def test_non_existing_file(self, row_based): @@ -923,8 +1003,8 @@ def test_non_existing_file(self, row_based): files = ["not_existing.json"] self._connect() c_name = cf.gen_unique_str() - fields = [cf.gen_int64_field(name=pk_field, is_primary=True), - cf.gen_float_vec_field(name=vec_field, dim=ct.default_dim)] + fields = [cf.gen_int64_field(name=df.pk_field, is_primary=True), + cf.gen_float_vec_field(name=df.vec_field, dim=ct.default_dim)] schema = cf.gen_collection_schema(fields=fields) self.collection_wrap.init_collection(c_name, schema=schema) @@ -938,7 +1018,7 @@ def test_non_existing_file(self, row_based): success, states = self.utility_wrap.wait_for_bulk_load_tasks_completed(task_ids=task_ids, timeout=30) assert not success - failed_reason = "minio file manage cannot be found" + failed_reason = f"the file {files[0]} is empty" for state in states.values(): assert state.state_name == "BulkLoadFailed" assert failed_reason in state.infos.get("failed_reason", "") @@ -948,20 +1028,23 @@ def test_non_existing_file(self, row_based): @pytest.mark.parametrize("auto_id", [True, False]) def test_empty_json_file(self, row_based, auto_id): """ - collection: either auto_id or not collection schema: [pk, float_vector] + data file: empty file Steps: 1. create collection 2. import data, but the data file(s) is empty 3. verify import fail if column based 4. verify import successfully if row based """ - # set the wrong row based params - files = ["empty.json"] + # set 0 entities + entities = 0 + files = prepare_bulk_load_json_files(row_based=row_based, rows=entities, + dim=ct.default_dim, auto_id=auto_id, + data_fields=default_vec_only_fields) self._connect() c_name = cf.gen_unique_str() - fields = [cf.gen_int64_field(name=pk_field, is_primary=True), - cf.gen_float_vec_field(name=vec_field, dim=ct.default_dim)] + fields = [cf.gen_int64_field(name=df.pk_field, is_primary=True), + cf.gen_float_vec_field(name=df.vec_field, dim=ct.default_dim)] schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id) self.collection_wrap.init_collection(c_name, schema=schema) @@ -973,19 +1056,16 @@ def test_empty_json_file(self, row_based, auto_id): logging.info(f"bulk load task ids:{task_ids}") success, states = self.utility_wrap.wait_for_bulk_load_tasks_completed(task_ids=task_ids, timeout=30) - if row_based: - assert success - else: - assert not success - failed_reason = "JSON column consumer: row count is 0" - for state in states.values(): - assert state.state_name == "BulkLoadFailed" - assert failed_reason in state.infos.get("failed_reason", "") + assert not success + failed_reason = "JSON parse: row count is 0" + for state in states.values(): + assert state.state_name == "BulkLoadFailed" + assert failed_reason in state.infos.get("failed_reason", "") @pytest.mark.tags(CaseLabel.L3) @pytest.mark.parametrize("row_based", [True, False]) @pytest.mark.parametrize("auto_id", [True, False]) - @pytest.mark.parametrize("dim", [128]) # 8 + @pytest.mark.parametrize("dim", [8]) # 8 @pytest.mark.parametrize("entities", [100]) # 100 def test_wrong_file_type(self, row_based, auto_id, dim, entities): """ @@ -996,22 +1076,25 @@ def test_wrong_file_type(self, row_based, auto_id, dim, entities): 2. import data 3. verify import failed with errors """ - prefix = gen_file_prefix(row_based=row_based, auto_id=auto_id, prefix="err_file_type_") if row_based: if auto_id: - data_type = ".csv" + file_type = ".csv" else: - data_type = "" + file_type = "" else: if auto_id: - data_type = ".npy" + file_type = ".npy" else: - data_type = ".txt" - files = [f"{prefix}_float_vectors_only_{dim}d_{entities}{data_type}"] + file_type = ".txt" + files = prepare_bulk_load_json_files(row_based=row_based, rows=entities, + dim=dim, auto_id=auto_id, + data_fields=default_vec_only_fields, + file_type=file_type) + self._connect() - c_name = cf.gen_unique_str(prefix) - fields = [cf.gen_int64_field(name=pk_field, is_primary=True), - cf.gen_float_vec_field(name=vec_field, dim=dim)] + c_name = cf.gen_unique_str() + fields = [cf.gen_int64_field(name=df.pk_field, is_primary=True), + cf.gen_float_vec_field(name=df.vec_field, dim=dim)] schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id) self.collection_wrap.init_collection(c_name, schema=schema) # import data @@ -1040,20 +1123,22 @@ def test_wrong_file_type(self, row_based, auto_id, dim, entities): @pytest.mark.parametrize("entities", [100]) def test_wrong_row_based_values(self, row_based, auto_id, dim, entities): """ - collection: either auto_id or not - import data: not existing file(s) + collection schema: [pk, float_vector] + data files: wrong row based values Steps: 1. create collection - 3. import data, but the data file(s) not exists + 3. import data with wrong row based value 4. verify import failed with errors """ # set the wrong row based params - prefix = gen_file_prefix(row_based=not row_based) - files = [f"{prefix}_float_vectors_only_{dim}d_{entities}.json"] + wrong_row_based = not row_based + files = prepare_bulk_load_json_files(row_based=wrong_row_based, rows=entities, + dim=dim, auto_id=auto_id, + data_fields=default_vec_only_fields) self._connect() c_name = cf.gen_unique_str() - fields = [cf.gen_int64_field(name=pk_field, is_primary=True), - cf.gen_float_vec_field(name=vec_field, dim=dim)] + fields = [cf.gen_int64_field(name=df.pk_field, is_primary=True), + cf.gen_float_vec_field(name=df.vec_field, dim=dim)] schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id) self.collection_wrap.init_collection(c_name, schema=schema) @@ -1067,9 +1152,10 @@ def test_wrong_row_based_values(self, row_based, auto_id, dim, entities): timeout=30) assert not success if row_based: - failed_reason = "invalid row-based JSON format, the key vectors is not found" + value = df.vec_field # if auto_id else df.pk_field + failed_reason = f"JSON parse: invalid row-based JSON format, the key {value} is not found" else: - failed_reason = "JSON column consumer: row count is 0" + failed_reason = "JSON parse: row count is 0" for state in states.values(): assert state.state_name == "BulkLoadFailed" assert failed_reason in state.infos.get("failed_reason", "") @@ -1081,21 +1167,22 @@ def test_wrong_row_based_values(self, row_based, auto_id, dim, entities): @pytest.mark.parametrize("entities", [100]) # 100 def test_wrong_pk_field_name(self, row_based, auto_id, dim, entities): """ - collection: auto_id, customized_id - import data: [pk, float_vector] + collection schema: [pk, float_vector] + data files: wrong primary key field name Steps: 1. create collection with a dismatch_uid as pk 2. import data 3. verify import data successfully if collection with auto_id 4. verify import error if collection with auto_id=False """ - prefix = gen_file_prefix(row_based, auto_id) - files = [f"{prefix}_float_vectors_only_{dim}d_{entities}.json"] - pk_field = "dismatch_pk" + files = prepare_bulk_load_json_files(row_based=row_based, rows=entities, + dim=dim, auto_id=auto_id, + data_fields=default_vec_only_fields) + dismatch_pk_field = "dismatch_pk" self._connect() - c_name = cf.gen_unique_str(prefix) - fields = [cf.gen_int64_field(name=pk_field, is_primary=True), - cf.gen_float_vec_field(name=vec_field, dim=dim)] + c_name = cf.gen_unique_str() + fields = [cf.gen_int64_field(name=dismatch_pk_field, is_primary=True), + cf.gen_float_vec_field(name=df.vec_field, dim=dim)] schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id) self.collection_wrap.init_collection(c_name, schema=schema) # import data @@ -1114,10 +1201,9 @@ def test_wrong_pk_field_name(self, row_based, auto_id, dim, entities): else: assert not success if row_based: - failed_reason = f"field {pk_field} missed at the row 0" + failed_reason = f"field {dismatch_pk_field} missed at the row 0" else: - # TODO: improve the failed msg: issue #16722 - failed_reason = f"import error: field {pk_field} row count 0 is not equal to other fields" + failed_reason = f"import error: field {dismatch_pk_field} row count 0 is not equal to other fields" for state in states.values(): assert state.state_name == "BulkLoadFailed" assert failed_reason in state.infos.get("failed_reason", "") @@ -1136,13 +1222,14 @@ def test_wrong_vector_field_name(self, row_based, auto_id, dim, entities): 3. verify import data successfully if collection with auto_id 4. verify import error if collection with auto_id=False """ - prefix = gen_file_prefix(row_based, auto_id) - files = [f"{prefix}_float_vectors_only_{dim}d_{entities}.json"] - vec_field = "dismatched_vectors" + files = prepare_bulk_load_json_files(row_based=row_based, rows=entities, + dim=dim, auto_id=auto_id, + data_fields=default_vec_only_fields) + dismatch_vec_field = "dismatched_vectors" self._connect() - c_name = cf.gen_unique_str(prefix) - fields = [cf.gen_int64_field(name=pk_field, is_primary=True), - cf.gen_float_vec_field(name=vec_field, dim=dim)] + c_name = cf.gen_unique_str() + fields = [cf.gen_int64_field(name=df.pk_field, is_primary=True), + cf.gen_float_vec_field(name=dismatch_vec_field, dim=dim)] schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id) self.collection_wrap.init_collection(c_name, schema=schema) # import data @@ -1159,13 +1246,12 @@ def test_wrong_vector_field_name(self, row_based, auto_id, dim, entities): assert not success if row_based: - failed_reason = f"field {vec_field} missed at the row 0" + failed_reason = f"field {dismatch_vec_field} missed at the row 0" else: if auto_id: failed_reason = f"JSON column consumer: row count is 0" else: - # TODO: improve the failed msg: issue #16722 - failed_reason = f"import error: field {vec_field} row count 0 is not equal to other fields 100" + failed_reason = f"import error: field {dismatch_vec_field} row count 0 is not equal to other fields" for state in states.values(): assert state.state_name == "BulkLoadFailed" assert failed_reason in state.infos.get("failed_reason", "") @@ -1174,23 +1260,24 @@ def test_wrong_vector_field_name(self, row_based, auto_id, dim, entities): @pytest.mark.parametrize("row_based", [True, False]) @pytest.mark.parametrize("auto_id", [True, False]) @pytest.mark.parametrize("dim", [4]) - @pytest.mark.parametrize("entities", [10000]) + @pytest.mark.parametrize("entities", [200]) def test_wrong_scalar_field_name(self, row_based, auto_id, dim, entities): """ - collection: customized partitions collection schema: [pk, float_vectors, int_scalar] + data file: with dismatched int scalar 1. create collection 2. import data that one scalar field name is dismatched 3. verify that import fails with errors """ - prefix = gen_file_prefix(row_based=row_based, auto_id=auto_id) - files = [f"{prefix}_float_vectors_int_scalar_{dim}d_{entities}.json"] - scalar_field = "dismatched_scalar" + files = prepare_bulk_load_json_files(row_based=row_based, rows=entities, + dim=dim, auto_id=auto_id, + data_fields=default_vec_n_int_fields) + dismatch_scalar_field = "dismatched_scalar" self._connect() - c_name = cf.gen_unique_str(prefix) - fields = [cf.gen_int64_field(name=pk_field, is_primary=True), - cf.gen_float_vec_field(name=vec_field, dim=dim), - cf.gen_int32_field(name=scalar_field)] + c_name = cf.gen_unique_str() + fields = [cf.gen_int64_field(name=df.pk_field, is_primary=True), + cf.gen_float_vec_field(name=df.vec_field, dim=dim), + cf.gen_int32_field(name=dismatch_scalar_field)] schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id) self.collection_wrap.init_collection(c_name, schema=schema) @@ -1208,10 +1295,9 @@ def test_wrong_scalar_field_name(self, row_based, auto_id, dim, entities): log.info(f"bulk load state:{success} in {tt}") assert not success if row_based: - failed_reason = f"field {scalar_field} missed at the row 0" + failed_reason = f"field {dismatch_scalar_field} missed at the row 0" else: - # TODO: improve the failed msg: issue #16722 - failed_reason = f"import error: field {scalar_field} row count 0 is not equal to other fields 100" + failed_reason = f"import error: field {dismatch_scalar_field} row count 0 is not equal to other fields" for state in states.values(): assert state.state_name == "BulkLoadFailed" assert failed_reason in state.infos.get("failed_reason", "") @@ -1220,22 +1306,23 @@ def test_wrong_scalar_field_name(self, row_based, auto_id, dim, entities): @pytest.mark.parametrize("row_based", [True, False]) @pytest.mark.parametrize("auto_id", [True, False]) @pytest.mark.parametrize("dim", [4]) - @pytest.mark.parametrize("entities", [10000]) + @pytest.mark.parametrize("entities", [200]) def test_wrong_dim_in_schema(self, row_based, auto_id, dim, entities): """ - collection: create a collection with a dim that dismatch with json file collection schema: [pk, float_vectors, int_scalar] + data file: with wrong dim of vectors 1. import data the collection 2. verify that import fails with errors """ - prefix = gen_file_prefix(row_based=row_based, auto_id=auto_id) - files = [f"{prefix}_float_vectors_int_scalar_{dim}d_{entities}.json"] + files = prepare_bulk_load_json_files(row_based=row_based, rows=entities, + dim=dim, auto_id=auto_id, + data_fields=default_vec_n_int_fields) self._connect() - c_name = cf.gen_unique_str(prefix) + c_name = cf.gen_unique_str() wrong_dim = dim + 1 - fields = [cf.gen_int64_field(name=pk_field, is_primary=True), - cf.gen_float_vec_field(name=vec_field, dim=wrong_dim), - cf.gen_int32_field(name=int_field)] + fields = [cf.gen_int64_field(name=df.pk_field, is_primary=True), + cf.gen_float_vec_field(name=df.vec_field, dim=wrong_dim), + cf.gen_int32_field(name=df.int_field)] schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id) self.collection_wrap.init_collection(c_name, schema=schema) # import data @@ -1256,7 +1343,7 @@ def test_wrong_dim_in_schema(self, row_based, auto_id, dim, entities): @pytest.mark.tags(CaseLabel.L3) @pytest.mark.parametrize("row_based", [True, False]) @pytest.mark.parametrize("dim", [4]) - @pytest.mark.parametrize("entities", [10000]) + @pytest.mark.parametrize("entities", [200]) def test_non_existing_collection(self, row_based, dim, entities): """ collection: not create collection @@ -1264,10 +1351,10 @@ def test_non_existing_collection(self, row_based, dim, entities): 1. import data into a non existing collection 2. verify that import fails with errors """ - prefix = gen_file_prefix(row_based=row_based) - files = [f"{prefix}_float_vectors_int_scalar_{dim}d_{entities}.json"] + files = prepare_bulk_load_json_files(row_based=row_based, rows=entities, + dim=dim, data_fields=default_vec_n_int_fields) self._connect() - c_name = cf.gen_unique_str(prefix) + c_name = cf.gen_unique_str() # import data into a non existing collection err_msg = f"can't find collection: {c_name}" task_ids, _ = self.utility_wrap.bulk_load(collection_name=c_name, @@ -1281,7 +1368,7 @@ def test_non_existing_collection(self, row_based, dim, entities): @pytest.mark.tags(CaseLabel.L3) @pytest.mark.parametrize("row_based", [True, False]) @pytest.mark.parametrize("dim", [4]) - @pytest.mark.parametrize("entities", [10000]) + @pytest.mark.parametrize("entities", [200]) def test_non_existing_partition(self, row_based, dim, entities): """ collection: create a collection @@ -1289,13 +1376,13 @@ def test_non_existing_partition(self, row_based, dim, entities): 1. import data into a non existing partition 2. verify that import fails with errors """ - prefix = gen_file_prefix(row_based=row_based) - files = [f"{prefix}_float_vectors_int_scalar_{dim}d_{entities}.json"] + files = prepare_bulk_load_json_files(row_based=row_based, rows=entities, + dim=dim, data_fields=default_vec_n_int_fields) self._connect() - c_name = cf.gen_unique_str(prefix) - fields = [cf.gen_int64_field(name=pk_field, is_primary=True), - cf.gen_float_vec_field(name=vec_field, dim=dim), - cf.gen_int32_field(name=int_field)] + c_name = cf.gen_unique_str() + fields = [cf.gen_int64_field(name=df.pk_field, is_primary=True), + cf.gen_float_vec_field(name=df.vec_field, dim=dim), + cf.gen_int32_field(name=df.int_field)] schema = cf.gen_collection_schema(fields=fields) self.collection_wrap.init_collection(c_name, schema=schema) # import data into a non existing partition @@ -1314,22 +1401,25 @@ def test_non_existing_partition(self, row_based, dim, entities): @pytest.mark.parametrize("row_based", [True, False]) @pytest.mark.parametrize("auto_id", [True, False]) @pytest.mark.parametrize("dim", [4]) - @pytest.mark.parametrize("entities", [10000]) - @pytest.mark.parametrize("position", ["first", "middle", "end"]) + @pytest.mark.parametrize("entities", [1000]) + @pytest.mark.parametrize("position", [0, 500, 999]) # the index of wrong dim entity def test_wrong_dim_in_one_entities_of_file(self, row_based, auto_id, dim, entities, position): """ - collection: create a collection - collection schema: [pk, float_vectors, int_scalar], one of entities has wrong dim data + collection schema: [pk, float_vectors, int_scalar] + data file: one of entities has wrong dim data 1. import data the collection 2. verify that import fails with errors """ - prefix = gen_file_prefix(row_based=row_based, auto_id=auto_id, prefix=f"err_{position}_dim_") - files = [f"{prefix}_float_vectors_int_scalar_{dim}d_{entities}.json"] + files = prepare_bulk_load_json_files(row_based=row_based, rows=entities, + dim=dim, auto_id=auto_id, + data_fields=default_vec_n_int_fields, + err_type=DataErrorType.one_entity_wrong_dim, + wrong_position=position, force=True) self._connect() - c_name = cf.gen_unique_str(prefix) - fields = [cf.gen_int64_field(name=pk_field, is_primary=True), - cf.gen_float_vec_field(name=vec_field, dim=dim), - cf.gen_int32_field(name=int_field)] + c_name = cf.gen_unique_str() + fields = [cf.gen_int64_field(name=df.pk_field, is_primary=True), + cf.gen_float_vec_field(name=df.vec_field, dim=dim), + cf.gen_int32_field(name=df.int_field)] schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id) self.collection_wrap.init_collection(c_name, schema=schema) # import data @@ -1351,9 +1441,10 @@ def test_wrong_dim_in_one_entities_of_file(self, row_based, auto_id, dim, entiti @pytest.mark.tags(CaseLabel.L3) @pytest.mark.parametrize("row_based", [True, False]) @pytest.mark.parametrize("auto_id", [True, False]) - @pytest.mark.parametrize("dim", [16]) # 16 - @pytest.mark.parametrize("entities", [3000]) # 3000 + @pytest.mark.parametrize("dim", [16]) + @pytest.mark.parametrize("entities", [300]) @pytest.mark.parametrize("file_nums", [10]) # max task nums 32? need improve + @pytest.mark.xfail(reason="not all correct data file imported successfully, issue #16923") def test_float_vector_one_of_files_fail(self, row_based, auto_id, dim, entities, file_nums): """ collection schema: [pk, float_vectors, int_scalar], one of entities has wrong dim data @@ -1361,21 +1452,26 @@ def test_float_vector_one_of_files_fail(self, row_based, auto_id, dim, entities, 1. import data 11 files(10 correct and 1 with errors) into the collection 2. verify that import fails with errors and no data imported """ - prefix = gen_file_prefix(row_based=row_based, auto_id=auto_id) - files = [] - for i in range(file_nums): - files.append(f"{prefix}_float_vectors_multi_scalars_{dim}d_{entities}_{i}.json") + correct_files = prepare_bulk_load_json_files(row_based=row_based, rows=entities, + dim=dim, auto_id=auto_id, + data_fields=default_multi_fields, + file_nums=file_nums, force=True) + # append a file that has errors - files.append(f"err_{prefix}_float_vectors_multi_scalars_{dim}d_{entities}_101.json") + dismatch_dim = dim + 1 + err_files = prepare_bulk_load_json_files(row_based=row_based, rows=entities, + dim=dismatch_dim, auto_id=auto_id, + data_fields=default_multi_fields, file_nums=1) + files = correct_files + err_files random.shuffle(files) # mix up the file order self._connect() - c_name = cf.gen_unique_str(prefix) - fields = [cf.gen_int64_field(name=pk_field, is_primary=True), - cf.gen_float_vec_field(name=vec_field, dim=dim), - cf.gen_int32_field(name=int_field), - cf.gen_string_field(name=string_field), - cf.gen_bool_field(name=bool_field) + c_name = cf.gen_unique_str() + fields = [cf.gen_int64_field(name=df.pk_field, is_primary=True), + cf.gen_float_vec_field(name=df.vec_field, dim=dim), + cf.gen_int32_field(name=df.int_field), + cf.gen_string_field(name=df.string_field), + cf.gen_bool_field(name=df.bool_field) ] schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id) self.collection_wrap.init_collection(c_name, schema=schema) @@ -1383,7 +1479,6 @@ def test_float_vector_one_of_files_fail(self, row_based, auto_id, dim, entities, # import data t0 = time.time() task_ids, _ = self.utility_wrap.bulk_load(collection_name=c_name, - partition_name='', row_based=row_based, files=files) logging.info(f"bulk load task ids:{task_ids}") @@ -1396,52 +1491,48 @@ def test_float_vector_one_of_files_fail(self, row_based, auto_id, dim, entities, # all correct files shall be imported successfully assert self.collection_wrap.num_entities == entities * file_nums else: - # TODO: Update assert after #16707 fixed assert self.collection_wrap.num_entities == 0 @pytest.mark.tags(CaseLabel.L3) @pytest.mark.parametrize("auto_id", [True, False]) - @pytest.mark.parametrize("same_field", [True, False]) @pytest.mark.parametrize("dim", [128]) # 128 @pytest.mark.parametrize("entities", [1000]) # 1000 - @pytest.mark.xfail(reason="issue #16698") - def test_float_vector_from_multi_npy_files(self, auto_id, same_field, dim, entities): + def test_wrong_dim_in_numpy(self, auto_id, dim, entities): """ collection schema 1: [pk, float_vector] - data file: .npy files + data file: .npy file with wrong dim Steps: 1. create collection - 2. import data with row_based=False from multiple .npy files - 3. verify import failed with errors + 2. import data + 3. verify failed with errors """ - vec_field = f"vectors_{dim}d_{entities}_0" + data_fields = [df.vec_field] + if not auto_id: + data_fields.append(df.pk_field) + files = prepare_bulk_load_numpy_files(rows=entities, dim=dim, + data_fields=data_fields, + force=True) self._connect() c_name = cf.gen_unique_str() - fields = [cf.gen_int64_field(name=pk_field, is_primary=True), - cf.gen_float_vec_field(name=vec_field, dim=dim)] - if not same_field: - fields.append(cf.gen_float_field(name=f"vectors_{dim}d_{entities}_1")) + wrong_dim = dim + 1 + fields = [cf.gen_int64_field(name=df.pk_field, is_primary=True), + cf.gen_float_vec_field(name=df.vec_field, dim=wrong_dim)] schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id) self.collection_wrap.init_collection(c_name, schema=schema) # import data - files = [f"{vec_field}.npy", f"{vec_field}.npy"] - if not same_field: - files = [f"{vec_field}.npy", f"vectors_{dim}d_{entities}_1.npy"] - if not auto_id: - files.append(f"col_uid_only_{dim}d_{entities}.json") - - # import data + t0 = time.time() task_ids, _ = self.utility_wrap.bulk_load(collection_name=c_name, row_based=False, files=files) logging.info(f"bulk load task ids:{task_ids}") - success, states = self.utility_wrap.wait_for_bulk_load_tasks_completed( - task_ids=task_ids, - timeout=30) - log.info(f"bulk load state:{success}") + success, states = self.utility_wrap.wait_for_bulk_load_tasks_completed(task_ids=task_ids, + timeout=30) + tt = time.time() - t0 + log.info(f"bulk load state:{success} in {tt}") + assert not success - failed_reason = f"Numpy parse: illegal data type" + failed_reason = f"Numpy parse: illegal row width {dim} for field {df.vec_field} dimension {wrong_dim}" for state in states.values(): assert state.state_name == "BulkLoadFailed" assert failed_reason in state.infos.get("failed_reason", "") @@ -1449,30 +1540,35 @@ def test_float_vector_from_multi_npy_files(self, auto_id, same_field, dim, entit @pytest.mark.tags(CaseLabel.L3) @pytest.mark.parametrize("auto_id", [True, False]) - @pytest.mark.parametrize("dim", [128]) # 128 - @pytest.mark.parametrize("entities", [1000]) # 1000 - def test_wrong_dim_in_numpy(self, auto_id, dim, entities): + @pytest.mark.parametrize("dim", [15]) + @pytest.mark.parametrize("entities", [100]) + def test_wrong_field_name_in_numpy(self, auto_id, dim, entities): """ collection schema 1: [pk, float_vector] - data file: .npy file with wrong dim + data file: .npy file Steps: 1. create collection 2. import data - 3. verify failed with errors + 3. if row_based: verify import failed + 4. if column_based: + 4.1 verify the data entities equal the import data + 4.2 verify search and query successfully """ - vec_field = f"vectors_{dim}d_{entities}" + data_fields = [df.vec_field] + if not auto_id: + data_fields.append(df.pk_field) + files = prepare_bulk_load_numpy_files(rows=entities, dim=dim, + data_fields=data_fields, + force=True) self._connect() c_name = cf.gen_unique_str() - wrong_dim = dim + 1 - fields = [cf.gen_int64_field(name=pk_field, is_primary=True), - cf.gen_float_vec_field(name=vec_field, dim=wrong_dim)] + wrong_vec_field = f"wrong_{df.vec_field}" + fields = [cf.gen_int64_field(name=df.pk_field, is_primary=True), + cf.gen_float_vec_field(name=wrong_vec_field, dim=dim)] schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id) self.collection_wrap.init_collection(c_name, schema=schema) # import data - files = [f"{vec_field}.npy"] # npy file name shall be the vector field name - if not auto_id: - files.append(f"col_uid_only_{dim}d_{entities}.json") t0 = time.time() task_ids, _ = self.utility_wrap.bulk_load(collection_name=c_name, row_based=False, @@ -1484,7 +1580,7 @@ def test_wrong_dim_in_numpy(self, auto_id, dim, entities): log.info(f"bulk load state:{success} in {tt}") assert not success - failed_reason = f"Numpy parse: illegal row width {dim} for field {vec_field} dimension {wrong_dim}" + failed_reason = f"Numpy parse: the field {df.vec_field} doesn't exist" for state in states.values(): assert state.state_name == "BulkLoadFailed" assert failed_reason in state.infos.get("failed_reason", "") @@ -1492,33 +1588,30 @@ def test_wrong_dim_in_numpy(self, auto_id, dim, entities): @pytest.mark.tags(CaseLabel.L3) @pytest.mark.parametrize("auto_id", [True, False]) - @pytest.mark.parametrize("dim", [128]) # 128 - @pytest.mark.parametrize("entities", [1000]) # 1000 - def test_wrong_field_name_in_numpy(self, auto_id, dim, entities): + @pytest.mark.parametrize("dim", [16]) # 128 + @pytest.mark.parametrize("entities", [100]) # 1000 + def test_duplicate_numpy_files(self, auto_id, dim, entities): """ collection schema 1: [pk, float_vector] - data file: .npy file + data file: .npy files Steps: 1. create collection - 2. import data - 3. if row_based: verify import failed - 4. if column_based: - 4.1 verify the data entities equal the import data - 4.2 verify search and query successfully + 2. import data with duplicate npy files + 3. verify fail to import with errors """ - vec_field = f"vectors_{dim}d_{entities}" + data_fields = [df.vec_field] + if not auto_id: + data_fields.append(df.pk_field) + files = prepare_bulk_load_numpy_files(rows=entities, dim=dim, + data_fields=data_fields) + files += files self._connect() c_name = cf.gen_unique_str() - wrong_vec_field = f"wrong_{vec_field}" - fields = [cf.gen_int64_field(name=pk_field, is_primary=True), - cf.gen_float_vec_field(name=wrong_vec_field, dim=dim)] + fields = [cf.gen_int64_field(name=df.pk_field, is_primary=True), + cf.gen_float_vec_field(name=df.vec_field, dim=dim)] schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id) self.collection_wrap.init_collection(c_name, schema=schema) - # import data - files = [f"{vec_field}.npy"] # npy file name shall be the vector field name - if not auto_id: - files.append(f"col_uid_only_{dim}d_{entities}.json") t0 = time.time() task_ids, _ = self.utility_wrap.bulk_load(collection_name=c_name, row_based=False, @@ -1528,9 +1621,8 @@ def test_wrong_field_name_in_numpy(self, auto_id, dim, entities): timeout=30) tt = time.time() - t0 log.info(f"bulk load state:{success} in {tt}") - assert not success - failed_reason = f"Numpy parse: the field {vec_field} doesn't exist" + failed_reason = "duplicate file" for state in states.values(): assert state.state_name == "BulkLoadFailed" assert failed_reason in state.infos.get("failed_reason", "") @@ -1542,24 +1634,26 @@ def test_wrong_field_name_in_numpy(self, auto_id, dim, entities): @pytest.mark.parametrize("entities", [10]) def test_data_type_string_on_int_pk(self, row_based, dim, entities): """ - collection schema: [pk, float_vectors, int_scalar], one of entities has wrong dim data + collection schema: default multi scalars data file: json file with one of entities has string on int pk Steps: 1. create collection 2. import data with row_based=False 3. verify import failed """ - err_string_on_pk = "iamstring" - prefix = gen_file_prefix(row_based=row_based, auto_id=False, prefix="err_str_on_int_pk_") - files = [f"{prefix}_float_vectors_multi_scalars_{dim}d_{entities}_0.json"] + files = prepare_bulk_load_json_files(row_based=row_based, rows=entities, + dim=dim, auto_id=False, + data_fields=default_multi_fields, + err_type=DataErrorType.str_on_int_pk, force=True) + self._connect() - c_name = cf.gen_unique_str(prefix) + c_name = cf.gen_unique_str() # TODO: add string pk - fields = [cf.gen_int64_field(name=pk_field, is_primary=True), - cf.gen_float_vec_field(name=vec_field, dim=dim), - cf.gen_int32_field(name=int_field), - cf.gen_string_field(name=string_field), - cf.gen_bool_field(name=bool_field) + fields = [cf.gen_int64_field(name=df.pk_field, is_primary=True), + cf.gen_float_vec_field(name=df.vec_field, dim=dim), + cf.gen_int32_field(name=df.int_field), + cf.gen_string_field(name=df.string_field), + cf.gen_bool_field(name=df.bool_field) ] schema = cf.gen_collection_schema(fields=fields, auto_id=False) self.collection_wrap.init_collection(c_name, schema=schema) @@ -1573,7 +1667,7 @@ def test_data_type_string_on_int_pk(self, row_based, dim, entities): timeout=30) log.info(f"bulk load state:{success}") assert not success - failed_reason = f"illegal numeric value {err_string_on_pk} at the row" + failed_reason = f"illegal numeric value" for state in states.values(): assert state.state_name == "BulkLoadFailed" assert failed_reason in state.infos.get("failed_reason", "") @@ -1584,7 +1678,7 @@ def test_data_type_string_on_int_pk(self, row_based, dim, entities): @pytest.mark.parametrize("auto_id", [True, False]) @pytest.mark.parametrize("dim", [8]) @pytest.mark.parametrize("entities", [10]) - def test_data_type_int_on_float_scalar(self, row_based, auto_id, dim, entities): + def test_data_type_typo_on_bool(self, row_based, auto_id, dim, entities): """ collection schema: [pk, float_vector, float_scalar, int_scalar, string_scalar, bool_scalar] @@ -1594,17 +1688,20 @@ def test_data_type_int_on_float_scalar(self, row_based, auto_id, dim, entities): 2. import data 3. verify import failed with errors """ - prefix = gen_file_prefix(row_based=row_based, auto_id=auto_id, prefix="err_typo_on_bool_") - files = [f"{prefix}_float_vectors_multi_scalars_{dim}d_{entities}_0.json"] + files = prepare_bulk_load_json_files(row_based=row_based, rows=entities, + dim=dim, auto_id=False, + data_fields=default_multi_fields, + err_type=DataErrorType.typo_on_bool, + scalars=default_multi_fields) self._connect() - c_name = cf.gen_unique_str(prefix) + c_name = cf.gen_unique_str() # TODO: add string pk - fields = [cf.gen_int64_field(name=pk_field, is_primary=True), - cf.gen_float_vec_field(name=vec_field, dim=dim), - cf.gen_int32_field(name=int_field), - cf.gen_float_field(name=float_field), - cf.gen_string_field(name=string_field), - cf.gen_bool_field(name=bool_field) + fields = [cf.gen_int64_field(name=df.pk_field, is_primary=True), + cf.gen_float_vec_field(name=df.vec_field, dim=dim), + cf.gen_int32_field(name=df.int_field), + cf.gen_float_field(name=df.float_field), + cf.gen_string_field(name=df.string_field), + cf.gen_bool_field(name=df.bool_field) ] schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id) self.collection_wrap.init_collection(c_name, schema=schema) @@ -1640,7 +1737,8 @@ def test_data_type_int_on_float_scalar(self, row_based, auto_id, dim, entities): # TODO: string data on float field -class TestImportAdvanced(TestcaseBase): +@pytest.mark.skip() +class TestBulkLoadAdvanced(TestcaseBase): def setup_class(self): log.info("[setup_import] Start setup class...") @@ -1655,7 +1753,7 @@ def teardown_class(self): @pytest.mark.parametrize("dim", [128]) # 128 @pytest.mark.parametrize("entities", [50000]) # 1m*3; 50k*20; 2m*3, 500k*4 @pytest.mark.xfail(reason="search fail for issue #16784") - def test_float_vector_from_multi_npy_files(self, auto_id, dim, entities): + def test_float_vector_from_multi_numpy_files(self, auto_id, dim, entities): """ collection schema 1: [pk, float_vector] data file: .npy files @@ -1670,7 +1768,7 @@ def test_float_vector_from_multi_npy_files(self, auto_id, dim, entities): vec_field = f"vectors_{dim}d_{suffix}" self._connect() c_name = cf.gen_unique_str() - fields = [cf.gen_int64_field(name=pk_field, is_primary=True), + fields = [cf.gen_int64_field(name=df.pk_field, is_primary=True), cf.gen_float_vec_field(name=vec_field, dim=dim)] schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id) self.collection_wrap.init_collection(c_name, schema=schema) @@ -1680,7 +1778,7 @@ def test_float_vector_from_multi_npy_files(self, auto_id, dim, entities): for i in range(file_nums): files = [f"{dim}d_{suffix}_{i}/{vec_field}.npy"] # npy file name shall be the vector field name if not auto_id: - files.append(f"{dim}d_{suffix}_{i}/{pk_field}.npy") + files.append(f"{dim}d_{suffix}_{i}/{df.pk_field}.npy") t0 = time.time() task_ids, _ = self.utility_wrap.bulk_load(collection_name=c_name, row_based=False,