diff --git a/be/src/vec/exec/scan/new_jdbc_scanner.cpp b/be/src/vec/exec/scan/new_jdbc_scanner.cpp index 25b6301c1390f2..89e5d8e5d85396 100644 --- a/be/src/vec/exec/scan/new_jdbc_scanner.cpp +++ b/be/src/vec/exec/scan/new_jdbc_scanner.cpp @@ -171,25 +171,13 @@ Status NewJdbcScanner::_get_block_impl(RuntimeState* state, Block* block, bool* return Status::OK(); } - auto column_size = _tuple_desc->slots().size(); - std::vector columns(column_size); - bool mem_reuse = block->mem_reuse(); // only empty block should be here DCHECK(block->rows() == 0); do { RETURN_IF_CANCELLED(state); - columns.resize(column_size); - for (auto i = 0; i < column_size; i++) { - if (mem_reuse) { - columns[i] = std::move(*block->get_by_position(i).column).mutate(); - } else { - columns[i] = _tuple_desc->slots()[i]->get_empty_mutable_column(); - } - } - - RETURN_IF_ERROR(_jdbc_connector->get_next(&_jdbc_eos, columns, block, state->batch_size())); + RETURN_IF_ERROR(_jdbc_connector->get_next(&_jdbc_eos, block, state->batch_size())); if (_jdbc_eos == true) { if (block->rows() == 0) { @@ -199,18 +187,6 @@ Status NewJdbcScanner::_get_block_impl(RuntimeState* state, Block* block, bool* break; } - // Before really use the Block, must clear other ptr of column in block - // So here need do std::move and clear in `columns` - if (!mem_reuse) { - int column_index = 0; - for (const auto slot_desc : _tuple_desc->slots()) { - block->insert(ColumnWithTypeAndName(std::move(columns[column_index++]), - slot_desc->get_data_type_ptr(), - slot_desc->col_name())); - } - } else { - columns.clear(); - } VLOG_ROW << "NewJdbcScanNode output rows: " << block->rows(); } while (block->rows() == 0 && !(*eof)); return Status::OK(); diff --git a/be/src/vec/exec/vjdbc_connector.cpp b/be/src/vec/exec/vjdbc_connector.cpp index 6c0857f237b4a2..b28717eafcf6c2 100644 --- a/be/src/vec/exec/vjdbc_connector.cpp +++ b/be/src/vec/exec/vjdbc_connector.cpp @@ -23,7 +23,6 @@ #include // IWYU pragma: no_include #include // IWYU pragma: keep -#include #include #include #include @@ -33,7 +32,6 @@ #include "exec/table_connector.h" #include "gutil/strings/substitute.h" #include "jni.h" -#include "jni_md.h" #include "runtime/define_primitive_type.h" #include "runtime/descriptors.h" #include "runtime/runtime_state.h" @@ -41,37 +39,23 @@ #include "runtime/user_function_cache.h" #include "util/jni-util.h" #include "util/runtime_profile.h" -#include "vec/columns/column.h" #include "vec/columns/column_nullable.h" -#include "vec/columns/column_string.h" -#include "vec/common/string_ref.h" #include "vec/core/block.h" -#include "vec/core/column_with_type_and_name.h" -#include "vec/core/columns_with_type_and_name.h" -#include "vec/core/field.h" -#include "vec/core/types.h" #include "vec/data_types/data_type_nullable.h" -#include "vec/data_types/data_type_number.h" #include "vec/data_types/data_type_string.h" #include "vec/exec/jni_connector.h" #include "vec/exprs/vexpr.h" -#include "vec/exprs/vexpr_context.h" -#include "vec/functions/function.h" #include "vec/functions/simple_function_factory.h" +#include "vec/io/reader_buffer.h" -namespace doris { -namespace vectorized { +namespace doris::vectorized { const char* JDBC_EXECUTOR_CLASS = "org/apache/doris/jdbc/JdbcExecutor"; const char* JDBC_EXECUTOR_CTOR_SIGNATURE = "([B)V"; -const char* JDBC_EXECUTOR_WRITE_SIGNATURE = "(Ljava/lang/String;)I"; const char* JDBC_EXECUTOR_STMT_WRITE_SIGNATURE = "(Ljava/util/Map;)I"; const char* JDBC_EXECUTOR_HAS_NEXT_SIGNATURE = "()Z"; -const char* JDBC_EXECUTOR_GET_BLOCK_SIGNATURE = "(I)Ljava/util/List;"; -const char* JDBC_EXECUTOR_GET_BLOCK_WITH_TYPES_SIGNATURE = "(ILjava/lang/Object;)Ljava/util/List;"; const char* JDBC_EXECUTOR_GET_TYPES_SIGNATURE = "()Ljava/util/List;"; const char* JDBC_EXECUTOR_CLOSE_SIGNATURE = "()V"; const char* JDBC_EXECUTOR_TRANSACTION_SIGNATURE = "()V"; -const char* JDBC_EXECUTOR_COPY_BATCH_SIGNATURE = "(Ljava/lang/Object;ZIJJ)V"; JdbcConnector::JdbcConnector(const JdbcConnectorParam& param) : TableConnector(param.tuple_desc, param.use_transaction, param.table_name, @@ -90,7 +74,7 @@ JdbcConnector::~JdbcConnector() { #define DELETE_BASIC_JAVA_CLAZZ_REF(CPP_TYPE) env->DeleteGlobalRef(_executor_##CPP_TYPE##_clazz); -Status JdbcConnector::close(Status) { +Status JdbcConnector::close(Status /*unused*/) { SCOPED_RAW_TIMER(&_jdbc_statistic._connector_close_timer); _closed = true; if (!_is_open) { @@ -112,15 +96,6 @@ Status JdbcConnector::close(Status) { return Status::OK(); } -Status JdbcConnector::append(vectorized::Block* block, - const vectorized::VExprContextSPtrs& output_vexpr_ctxs, - uint32_t start_send_row, uint32_t* num_rows_sent, - TOdbcTableType::type table_type) { - RETURN_IF_ERROR(exec_stmt_write(block, output_vexpr_ctxs, num_rows_sent)); - COUNTER_UPDATE(_sent_rows_counter, *num_rows_sent); - return Status::OK(); -} - Status JdbcConnector::open(RuntimeState* state, bool read) { if (_is_open) { LOG(INFO) << "this scanner of jdbc already opened"; @@ -223,6 +198,158 @@ Status JdbcConnector::query() { return Status::OK(); } +Status JdbcConnector::get_next(bool* eos, Block* block, int batch_size) { + if (!_is_open) { + return Status::InternalError("get_next before open of jdbc connector."); + } + SCOPED_RAW_TIMER(&_jdbc_statistic._get_data_timer); + JNIEnv* env = nullptr; + RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env)); + jboolean has_next = + env->CallNonvirtualBooleanMethod(_executor_obj, _executor_clazz, _executor_has_next_id); + if (has_next != JNI_TRUE) { + *eos = true; + return Status::OK(); + } + + RETURN_IF_ERROR(JniUtil::GetJniExceptionMsg(env)); + + auto column_size = _tuple_desc->slots().size(); + auto slots = _tuple_desc->slots(); + + jobject map = _get_reader_params(block, env, column_size); + long address = + env->CallLongMethod(_executor_obj, _executor_get_block_address_id, batch_size, map); + RETURN_IF_ERROR(JniUtil::GetJniExceptionMsg(env)); + env->DeleteLocalRef(map); + + std::vector all_columns; + for (size_t i = 0; i < column_size; ++i) { + all_columns.push_back(i); + } + Status fill_block_status = JniConnector::fill_block(block, all_columns, address); + if (!fill_block_status) { + return fill_block_status; + } + + Status cast_status = _cast_string_to_special(block, env, column_size); + + if (!cast_status) { + return cast_status; + } + + return JniUtil::GetJniExceptionMsg(env); +} + +Status JdbcConnector::append(vectorized::Block* block, + const vectorized::VExprContextSPtrs& output_vexpr_ctxs, + uint32_t start_send_row, uint32_t* num_rows_sent, + TOdbcTableType::type table_type) { + RETURN_IF_ERROR(exec_stmt_write(block, output_vexpr_ctxs, num_rows_sent)); + COUNTER_UPDATE(_sent_rows_counter, *num_rows_sent); + return Status::OK(); +} + +Status JdbcConnector::exec_stmt_write(Block* block, const VExprContextSPtrs& output_vexpr_ctxs, + uint32_t* num_rows_sent) { + SCOPED_TIMER(_result_send_timer); + JNIEnv* env = nullptr; + RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env)); + + // prepare table meta information + std::unique_ptr meta_data; + RETURN_IF_ERROR(JniConnector::to_java_table(block, meta_data)); + long meta_address = (long)meta_data.get(); + auto table_schema = JniConnector::parse_table_schema(block); + + // prepare constructor parameters + std::map write_params = {{"meta_address", std::to_string(meta_address)}, + {"required_fields", table_schema.first}, + {"columns_types", table_schema.second}}; + jobject hashmap_object = JniUtil::convert_to_java_map(env, write_params); + env->CallNonvirtualIntMethod(_executor_obj, _executor_clazz, _executor_stmt_write_id, + hashmap_object); + env->DeleteLocalRef(hashmap_object); + RETURN_IF_ERROR(JniUtil::GetJniExceptionMsg(env)); + *num_rows_sent = block->rows(); + return Status::OK(); +} + +Status JdbcConnector::begin_trans() { + if (_use_tranaction) { + JNIEnv* env = nullptr; + RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env)); + env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz, _executor_begin_trans_id); + RETURN_IF_ERROR(JniUtil::GetJniExceptionMsg(env)); + _is_in_transaction = true; + } + return Status::OK(); +} + +Status JdbcConnector::abort_trans() { + if (!_is_in_transaction) { + return Status::InternalError("Abort transaction before begin trans."); + } + JNIEnv* env = nullptr; + RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env)); + env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz, _executor_abort_trans_id); + return JniUtil::GetJniExceptionMsg(env); +} + +Status JdbcConnector::finish_trans() { + if (_use_tranaction && _is_in_transaction) { + JNIEnv* env = nullptr; + RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env)); + env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz, _executor_finish_trans_id); + RETURN_IF_ERROR(JniUtil::GetJniExceptionMsg(env)); + _is_in_transaction = false; + } + return Status::OK(); +} + +Status JdbcConnector::_register_func_id(JNIEnv* env) { + auto register_id = [&](jclass clazz, const char* func_name, const char* func_sign, + jmethodID& func_id) { + func_id = env->GetMethodID(clazz, func_name, func_sign); + Status s = JniUtil::GetJniExceptionMsg(env); + if (!s.ok()) { + return Status::InternalError(strings::Substitute( + "Jdbc connector _register_func_id meet error and error is $0", s.to_string())); + } + return s; + }; + + RETURN_IF_ERROR(register_id(_executor_clazz, "", JDBC_EXECUTOR_CTOR_SIGNATURE, + _executor_ctor_id)); + RETURN_IF_ERROR(register_id(_executor_clazz, "write", JDBC_EXECUTOR_STMT_WRITE_SIGNATURE, + _executor_stmt_write_id)); + RETURN_IF_ERROR(register_id(_executor_clazz, "read", "()I", _executor_read_id)); + RETURN_IF_ERROR(register_id(_executor_clazz, "close", JDBC_EXECUTOR_CLOSE_SIGNATURE, + _executor_close_id)); + RETURN_IF_ERROR(register_id(_executor_clazz, "hasNext", JDBC_EXECUTOR_HAS_NEXT_SIGNATURE, + _executor_has_next_id)); + RETURN_IF_ERROR(register_id(_executor_clazz, "getBlockAddress", "(ILjava/util/Map;)J", + _executor_get_block_address_id)); + RETURN_IF_ERROR( + register_id(_executor_clazz, "getCurBlockRows", "()I", _executor_block_rows_id)); + RETURN_IF_ERROR(register_id(_executor_list_clazz, "get", "(I)Ljava/lang/Object;", + _executor_get_list_id)); + RETURN_IF_ERROR(register_id(_executor_string_clazz, "getBytes", "(Ljava/lang/String;)[B", + _get_bytes_id)); + RETURN_IF_ERROR( + register_id(_executor_object_clazz, "toString", "()Ljava/lang/String;", _to_string_id)); + + RETURN_IF_ERROR(register_id(_executor_clazz, "openTrans", JDBC_EXECUTOR_TRANSACTION_SIGNATURE, + _executor_begin_trans_id)); + RETURN_IF_ERROR(register_id(_executor_clazz, "commitTrans", JDBC_EXECUTOR_TRANSACTION_SIGNATURE, + _executor_finish_trans_id)); + RETURN_IF_ERROR(register_id(_executor_clazz, "rollbackTrans", + JDBC_EXECUTOR_TRANSACTION_SIGNATURE, _executor_abort_trans_id)); + RETURN_IF_ERROR(register_id(_executor_clazz, "getResultColumnTypeNames", + JDBC_EXECUTOR_GET_TYPES_SIGNATURE, _executor_get_types_id)); + return Status::OK(); +} + Status JdbcConnector::_check_column_type() { SCOPED_RAW_TIMER(&_jdbc_statistic._check_type_timer); JNIEnv* env = nullptr; @@ -247,13 +374,14 @@ Status JdbcConnector::_check_column_type() { env->DeleteLocalRef(type_lists); return JniUtil::GetJniExceptionMsg(env); } + /* type mapping: https://doris.apache.org/zh-CN/docs/dev/ecosystem/external-table/jdbc-of-doris?_highlight=jdbc Doris MYSQL PostgreSQL Oracle SQLServer BOOLEAN java.lang.Boolean java.lang.Boolean java.lang.Boolean -TINYINT java.lang.Integer java.lang.Short -SMALLINT java.lang.Integer java.lang.Integer java.math.BigDecimal java.lang.Short +TINYINT java.lang.Integer java.lang.Short +SMALLINT java.lang.Integer java.lang.Integer java.math.BigDecimal java.lang.Short INT java.lang.Integer java.lang.Integer java.math.BigDecimal java.lang.Integer BIGINT java.lang.Long java.lang.Long java.lang.Long LARGET java.math.BigInteger @@ -347,19 +475,6 @@ Status JdbcConnector::_check_type(SlotDescriptor* slot_desc, const std::string& type_str != "java.lang.Object") { return Status::InternalError(error_msg); } - if (!slot_desc->type().children[0].children.empty()) { - return Status::InternalError("Now doris not support nested array type in array {}.", - slot_desc->type().debug_string()); - } - _map_column_idx_to_cast_idx[column_index] = _input_array_string_types.size(); - if (slot_desc->is_nullable()) { - _input_array_string_types.push_back(make_nullable(std::make_shared())); - } else { - _input_array_string_types.push_back(std::make_shared()); - } - str_array_cols.push_back( - _input_array_string_types[_map_column_idx_to_cast_idx[column_index]] - ->create_column()); break; } case TYPE_JSONB: { @@ -419,432 +534,94 @@ Status JdbcConnector::_check_type(SlotDescriptor* slot_desc, const std::string& return Status::OK(); } -Status JdbcConnector::get_next(bool* eos, std::vector& columns, Block* block, - int batch_size) { - if (!_is_open) { - return Status::InternalError("get_next before open of jdbc connector."); - } - SCOPED_RAW_TIMER(&_jdbc_statistic._get_data_timer); - JNIEnv* env = nullptr; - RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env)); - jboolean has_next = - env->CallNonvirtualBooleanMethod(_executor_obj, _executor_clazz, _executor_has_next_id); - if (has_next != JNI_TRUE) { - *eos = true; - return Status::OK(); - } +std::string JdbcConnector::_jobject_to_string(JNIEnv* env, jobject jobj) { + jobject jstr = env->CallObjectMethod(jobj, _to_string_id); + auto coding = env->NewStringUTF("UTF-8"); + const jbyteArray stringJbytes = (jbyteArray)env->CallObjectMethod(jstr, _get_bytes_id, coding); + size_t length = (size_t)env->GetArrayLength(stringJbytes); + jbyte* pBytes = env->GetByteArrayElements(stringJbytes, nullptr); + std::string str = std::string((char*)pBytes, length); + env->ReleaseByteArrayElements(stringJbytes, pBytes, JNI_ABORT); + env->DeleteLocalRef(stringJbytes); + env->DeleteLocalRef(jstr); + env->DeleteLocalRef(coding); + return str; +} - jobject block_obj; - // if contain HLL column, pass the column type to jni env - if (_tuple_desc->has_hll_slot() || _tuple_desc->has_bitmap_slot()) { - auto column_size = _tuple_desc->slots().size(); - // Find ArrayList and Integer - jclass arrayListClass = env->FindClass("java/util/ArrayList"); - jclass integerClass = env->FindClass("java/lang/Integer"); - - // Get method id of the constructor and the add in ArrayList - jmethodID arrayListConstructor = env->GetMethodID(arrayListClass, "", "()V"); - jmethodID arrayListAddMethod = - env->GetMethodID(arrayListClass, "add", "(Ljava/lang/Object;)Z"); - - // Create an ArrayList object - jobject arrayListObject = env->NewObject(arrayListClass, arrayListConstructor); - for (int column_index = 0; column_index < column_size; ++column_index) { - auto slot_desc = _tuple_desc->slots()[column_index]; - if (slot_desc->type().is_hll_type() || slot_desc->type().is_bitmap_type()) { - // Create an Integer object - jobject integerObject = env->NewObject( - integerClass, env->GetMethodID(integerClass, "", "(I)V"), - (int)slot_desc->type().type); - // Add Integer into ArrayList - env->CallBooleanMethod(arrayListObject, arrayListAddMethod, integerObject); - - } else { - jobject integerObject = env->NewObject( - integerClass, env->GetMethodID(integerClass, "", "(I)V"), 0); - env->CallBooleanMethod(arrayListObject, arrayListAddMethod, integerObject); +jobject JdbcConnector::_get_reader_params(Block* block, JNIEnv* env, size_t column_size) { + std::ostringstream columns_nullable; + std::ostringstream columns_replace_string; + std::ostringstream required_fields; + std::ostringstream columns_types; + + for (int i = 0; i < column_size; ++i) { + auto* slot = _tuple_desc->slots()[i]; + if (slot->is_materialized()) { + auto type = slot->type(); + // Record if column is nullable + columns_nullable << (slot->is_nullable() ? "true" : "false") << ","; + // Check column type and replace accordingly + std::string replace_type = "not_replace"; + if (type.is_bitmap_type()) { + replace_type = "bitmap"; + } else if (type.is_hll_type()) { + replace_type = "hll"; + } else if (type.is_json_type()) { + replace_type = "jsonb"; + } + columns_replace_string << replace_type << ","; + if (replace_type != "not_replace") { + block->get_by_position(i).column = std::make_shared() + ->create_column() + ->convert_to_full_column_if_const(); + block->get_by_position(i).type = std::make_shared(); + if (slot->is_nullable()) { + block->get_by_position(i).column = + make_nullable(block->get_by_position(i).column); + block->get_by_position(i).type = make_nullable(block->get_by_position(i).type); + } } } - - block_obj = env->CallNonvirtualObjectMethod(_executor_obj, _executor_clazz, - _executor_get_blocks_new_id, batch_size, - arrayListObject); - } else { - block_obj = env->CallNonvirtualObjectMethod(_executor_obj, _executor_clazz, - _executor_get_blocks_id, batch_size); + // Record required fields and column types + std::string field = slot->col_name(); + std::string jni_type; + if (slot->type().is_bitmap_type() || slot->type().is_hll_type() || + slot->type().is_json_type()) { + jni_type = "string"; + } else { + jni_type = JniConnector::get_jni_type(slot->type()); + } + required_fields << (i != 0 ? "," : "") << field; + columns_types << (i != 0 ? "#" : "") << jni_type; } - RETURN_IF_ERROR(JniUtil::GetJniExceptionMsg(env)); + std::map reader_params = {{"is_nullable", columns_nullable.str()}, + {"replace_string", columns_replace_string.str()}, + {"required_fields", required_fields.str()}, + {"columns_types", columns_types.str()}}; + return JniUtil::convert_to_java_map(env, reader_params); +} - auto column_size = _tuple_desc->slots().size(); - for (int column_index = 0, materialized_column_index = 0; column_index < column_size; - ++column_index) { - auto slot_desc = _tuple_desc->slots()[column_index]; +Status JdbcConnector::_cast_string_to_special(Block* block, JNIEnv* env, size_t column_size) { + for (size_t column_index = 0; column_index < column_size; ++column_index) { + auto* slot_desc = _tuple_desc->slots()[column_index]; // because the fe planner filter the non_materialize column if (!slot_desc->is_materialized()) { continue; } - jobject column_data = - env->CallObjectMethod(block_obj, _executor_get_list_id, materialized_column_index); jint num_rows = env->CallNonvirtualIntMethod(_executor_obj, _executor_clazz, _executor_block_rows_id); - RETURN_IF_ERROR(_convert_batch_result_set( - env, column_data, slot_desc, columns[column_index].get(), num_rows, column_index)); - env->DeleteLocalRef(column_data); - //here need to cast string to array type - if (slot_desc->type().is_array_type()) { - static_cast(_cast_string_to_array(slot_desc, block, column_index, num_rows)); - } else if (slot_desc->type().is_hll_type()) { + + RETURN_IF_ERROR(JniUtil::GetJniExceptionMsg(env)); + + if (slot_desc->type().is_hll_type()) { static_cast(_cast_string_to_hll(slot_desc, block, column_index, num_rows)); } else if (slot_desc->type().is_json_type()) { static_cast(_cast_string_to_json(slot_desc, block, column_index, num_rows)); } else if (slot_desc->type().is_bitmap_type()) { static_cast(_cast_string_to_bitmap(slot_desc, block, column_index, num_rows)); } - materialized_column_index++; - } - // All Java objects returned by JNI functions are local references. - env->DeleteLocalRef(block_obj); - return JniUtil::GetJniExceptionMsg(env); -} - -Status JdbcConnector::_convert_batch_result_set(JNIEnv* env, jobject jcolumn_data, - const SlotDescriptor* slot_desc, - vectorized::IColumn* column_ptr, int num_rows, - int column_index) { - vectorized::IColumn* col_ptr = column_ptr; - col_ptr->resize(num_rows); - int64_t address[2] = {0, 0}; - bool column_is_nullable = slot_desc->is_nullable(); - if (column_is_nullable) { - auto* nullable_column = reinterpret_cast(column_ptr); - auto& null_map = nullable_column->get_null_map_data(); - memset(null_map.data(), 0, num_rows); - address[0] = reinterpret_cast(null_map.data()); - col_ptr = &nullable_column->get_nested_column(); - } - switch (slot_desc->type().type) { - case TYPE_BOOLEAN: { - address[1] = reinterpret_cast(col_ptr->get_raw_data().data); - env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz, _executor_get_boolean_result, - jcolumn_data, column_is_nullable, num_rows, address[0], - address[1]); - break; - } - case TYPE_TINYINT: { - address[1] = reinterpret_cast(col_ptr->get_raw_data().data); - env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz, _executor_get_tinyint_result, - jcolumn_data, column_is_nullable, num_rows, address[0], - address[1]); - break; - } - case TYPE_SMALLINT: { - address[1] = reinterpret_cast(col_ptr->get_raw_data().data); - env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz, _executor_get_smallint_result, - jcolumn_data, column_is_nullable, num_rows, address[0], - address[1]); - break; - } - case TYPE_INT: { - address[1] = reinterpret_cast(col_ptr->get_raw_data().data); - env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz, _executor_get_int_result, - jcolumn_data, column_is_nullable, num_rows, address[0], - address[1]); - break; - } - case TYPE_BIGINT: { - address[1] = reinterpret_cast(col_ptr->get_raw_data().data); - env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz, _executor_get_bigint_result, - jcolumn_data, column_is_nullable, num_rows, address[0], - address[1]); - break; - } - case TYPE_LARGEINT: { - address[1] = reinterpret_cast(col_ptr->get_raw_data().data); - env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz, _executor_get_largeint_result, - jcolumn_data, column_is_nullable, num_rows, address[0], - address[1]); - break; - } - case TYPE_FLOAT: { - address[1] = reinterpret_cast(col_ptr->get_raw_data().data); - env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz, _executor_get_float_result, - jcolumn_data, column_is_nullable, num_rows, address[0], - address[1]); - break; - } - case TYPE_DOUBLE: { - address[1] = reinterpret_cast(col_ptr->get_raw_data().data); - env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz, _executor_get_double_result, - jcolumn_data, column_is_nullable, num_rows, address[0], - address[1]); - break; - } - case TYPE_CHAR: { - bool need_trim_spaces = false; - if ((_conn_param.table_type == TOdbcTableType::POSTGRESQL) || - (_conn_param.table_type == TOdbcTableType::ORACLE)) { - need_trim_spaces = true; - } - auto column_string = reinterpret_cast(col_ptr); - address[1] = reinterpret_cast(column_string->get_offsets().data()); - auto chars_addres = reinterpret_cast(&column_string->get_chars()); - env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz, _executor_get_char_result, - jcolumn_data, column_is_nullable, num_rows, address[0], - address[1], chars_addres, need_trim_spaces); - break; - } - case TYPE_STRING: - case TYPE_VARCHAR: { - auto column_string = reinterpret_cast(col_ptr); - address[1] = reinterpret_cast(column_string->get_offsets().data()); - auto chars_addres = reinterpret_cast(&column_string->get_chars()); - env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz, _executor_get_string_result, - jcolumn_data, column_is_nullable, num_rows, address[0], - address[1], chars_addres); - break; - } - case TYPE_DATE: { - address[1] = reinterpret_cast(col_ptr->get_raw_data().data); - env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz, _executor_get_date_result, - jcolumn_data, column_is_nullable, num_rows, address[0], - address[1]); - break; - } - case TYPE_DATEV2: { - address[1] = reinterpret_cast(col_ptr->get_raw_data().data); - env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz, _executor_get_datev2_result, - jcolumn_data, column_is_nullable, num_rows, address[0], - address[1]); - break; - } - case TYPE_DATETIME: { - address[1] = reinterpret_cast(col_ptr->get_raw_data().data); - env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz, _executor_get_datetime_result, - jcolumn_data, column_is_nullable, num_rows, address[0], - address[1]); - break; - } - case TYPE_DATETIMEV2: { - address[1] = reinterpret_cast(col_ptr->get_raw_data().data); - env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz, - _executor_get_datetimev2_result, jcolumn_data, - column_is_nullable, num_rows, address[0], address[1]); - break; - } - case TYPE_DECIMALV2: { - address[1] = reinterpret_cast(col_ptr->get_raw_data().data); - env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz, - _executor_get_decimalv2_result, jcolumn_data, - column_is_nullable, num_rows, address[0], address[1]); - break; - } - case TYPE_DECIMAL32: { - address[1] = reinterpret_cast(col_ptr->get_raw_data().data); - env->CallNonvirtualVoidMethod( - _executor_obj, _executor_clazz, _executor_get_decimal32_result, jcolumn_data, - column_is_nullable, num_rows, address[0], address[1], slot_desc->type().scale); - break; - } - case TYPE_DECIMAL64: { - address[1] = reinterpret_cast(col_ptr->get_raw_data().data); - env->CallNonvirtualVoidMethod( - _executor_obj, _executor_clazz, _executor_get_decimal64_result, jcolumn_data, - column_is_nullable, num_rows, address[0], address[1], slot_desc->type().scale); - break; - } - case TYPE_DECIMAL128I: { - address[1] = reinterpret_cast(col_ptr->get_raw_data().data); - env->CallNonvirtualVoidMethod( - _executor_obj, _executor_clazz, _executor_get_decimal128_result, jcolumn_data, - column_is_nullable, num_rows, address[0], address[1], slot_desc->type().scale); - break; - } - case TYPE_ARRAY: { - str_array_cols[_map_column_idx_to_cast_idx[column_index]]->resize(num_rows); - if (column_is_nullable) { - auto* nullable_column = reinterpret_cast( - str_array_cols[_map_column_idx_to_cast_idx[column_index]].get()); - auto& null_map = nullable_column->get_null_map_data(); - memset(null_map.data(), 0, num_rows); - address[0] = reinterpret_cast(null_map.data()); - col_ptr = &nullable_column->get_nested_column(); - } else { - col_ptr = str_array_cols[_map_column_idx_to_cast_idx[column_index]].get(); - } - auto column_string = reinterpret_cast(col_ptr); - address[1] = reinterpret_cast(column_string->get_offsets().data()); - auto chars_addres = reinterpret_cast(&column_string->get_chars()); - env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz, _executor_get_array_result, - jcolumn_data, column_is_nullable, num_rows, address[0], - address[1], chars_addres); - break; - } - case TYPE_JSONB: { - str_json_cols[_map_column_idx_to_cast_idx_json[column_index]]->resize(num_rows); - if (column_is_nullable) { - auto* nullbale_column = reinterpret_cast( - str_json_cols[_map_column_idx_to_cast_idx_json[column_index]].get()); - auto& null_map = nullbale_column->get_null_map_data(); - memset(null_map.data(), 0, num_rows); - address[0] = reinterpret_cast(null_map.data()); - col_ptr = &nullbale_column->get_nested_column(); - } else { - col_ptr = str_json_cols[_map_column_idx_to_cast_idx_json[column_index]].get(); - } - auto column_string = reinterpret_cast(col_ptr); - address[1] = reinterpret_cast(column_string->get_offsets().data()); - auto chars_addres = reinterpret_cast(&column_string->get_chars()); - env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz, _executor_get_json_result, - jcolumn_data, column_is_nullable, num_rows, address[0], - address[1], chars_addres); - break; - } - case TYPE_HLL: { - str_hll_cols[_map_column_idx_to_cast_idx_hll[column_index]]->resize(num_rows); - if (column_is_nullable) { - auto* nullable_column = reinterpret_cast( - str_hll_cols[_map_column_idx_to_cast_idx_hll[column_index]].get()); - auto& null_map = nullable_column->get_null_map_data(); - memset(null_map.data(), 0, num_rows); - address[0] = reinterpret_cast(null_map.data()); - col_ptr = &nullable_column->get_nested_column(); - } else { - col_ptr = str_hll_cols[_map_column_idx_to_cast_idx_hll[column_index]].get(); - } - auto column_string = reinterpret_cast(col_ptr); - address[1] = reinterpret_cast(column_string->get_offsets().data()); - auto chars_addres = reinterpret_cast(&column_string->get_chars()); - env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz, _executor_get_hll_result, - jcolumn_data, column_is_nullable, num_rows, address[0], - address[1], chars_addres); - break; } - //BITMAP - case TYPE_OBJECT: { - str_bitmap_cols[_map_column_idx_to_cast_idx_bitmap[column_index]]->resize(num_rows); - if (column_is_nullable) { - auto* nullable_column = reinterpret_cast( - str_bitmap_cols[_map_column_idx_to_cast_idx_bitmap[column_index]].get()); - auto& null_map = nullable_column->get_null_map_data(); - memset(null_map.data(), 0, num_rows); - address[0] = reinterpret_cast(null_map.data()); - col_ptr = &nullable_column->get_nested_column(); - } else { - col_ptr = str_bitmap_cols[_map_column_idx_to_cast_idx_bitmap[column_index]].get(); - } - auto column_string = reinterpret_cast(col_ptr); - address[1] = reinterpret_cast(column_string->get_offsets().data()); - auto chars_addres = reinterpret_cast(&column_string->get_chars()); - env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz, _executor_get_bitmap_result, - jcolumn_data, column_is_nullable, num_rows, address[0], - address[1], chars_addres); - break; - } - default: { - const std::string& error_msg = - fmt::format("Fail to convert jdbc value to {} on column: {}", - slot_desc->type().debug_string(), slot_desc->col_name()); - return Status::InternalError(std::string(error_msg)); - } - } - return JniUtil::GetJniExceptionMsg(env); -} - -Status JdbcConnector::_register_func_id(JNIEnv* env) { - auto register_id = [&](jclass clazz, const char* func_name, const char* func_sign, - jmethodID& func_id) { - func_id = env->GetMethodID(clazz, func_name, func_sign); - Status s = JniUtil::GetJniExceptionMsg(env); - if (!s.ok()) { - return Status::InternalError(strings::Substitute( - "Jdbc connector _register_func_id meet error and error is $0", s.to_string())); - } - return s; - }; - - RETURN_IF_ERROR(register_id(_executor_clazz, "", JDBC_EXECUTOR_CTOR_SIGNATURE, - _executor_ctor_id)); - RETURN_IF_ERROR(register_id(_executor_clazz, "write", JDBC_EXECUTOR_WRITE_SIGNATURE, - _executor_write_id)); - RETURN_IF_ERROR(register_id(_executor_clazz, "write", JDBC_EXECUTOR_STMT_WRITE_SIGNATURE, - _executor_stmt_write_id)); - RETURN_IF_ERROR(register_id(_executor_clazz, "read", "()I", _executor_read_id)); - RETURN_IF_ERROR(register_id(_executor_clazz, "close", JDBC_EXECUTOR_CLOSE_SIGNATURE, - _executor_close_id)); - RETURN_IF_ERROR(register_id(_executor_clazz, "hasNext", JDBC_EXECUTOR_HAS_NEXT_SIGNATURE, - _executor_has_next_id)); - RETURN_IF_ERROR( - register_id(_executor_clazz, "getCurBlockRows", "()I", _executor_block_rows_id)); - RETURN_IF_ERROR(register_id(_executor_clazz, "copyBatchBooleanResult", - JDBC_EXECUTOR_COPY_BATCH_SIGNATURE, _executor_get_boolean_result)); - RETURN_IF_ERROR(register_id(_executor_clazz, "copyBatchTinyIntResult", - JDBC_EXECUTOR_COPY_BATCH_SIGNATURE, _executor_get_tinyint_result)); - RETURN_IF_ERROR(register_id(_executor_clazz, "copyBatchSmallIntResult", - JDBC_EXECUTOR_COPY_BATCH_SIGNATURE, _executor_get_smallint_result)); - RETURN_IF_ERROR(register_id(_executor_clazz, "copyBatchIntResult", - JDBC_EXECUTOR_COPY_BATCH_SIGNATURE, _executor_get_int_result)); - RETURN_IF_ERROR(register_id(_executor_clazz, "copyBatchBigIntResult", - JDBC_EXECUTOR_COPY_BATCH_SIGNATURE, _executor_get_bigint_result)); - RETURN_IF_ERROR(register_id(_executor_clazz, "copyBatchLargeIntResult", - JDBC_EXECUTOR_COPY_BATCH_SIGNATURE, _executor_get_largeint_result)); - RETURN_IF_ERROR(register_id(_executor_clazz, "copyBatchFloatResult", - JDBC_EXECUTOR_COPY_BATCH_SIGNATURE, _executor_get_float_result)); - RETURN_IF_ERROR(register_id(_executor_clazz, "copyBatchDoubleResult", - JDBC_EXECUTOR_COPY_BATCH_SIGNATURE, _executor_get_double_result)); - RETURN_IF_ERROR(register_id(_executor_clazz, "copyBatchStringResult", - "(Ljava/lang/Object;ZIJJJ)V", _executor_get_string_result)); - RETURN_IF_ERROR(register_id(_executor_clazz, "copyBatchArrayResult", - "(Ljava/lang/Object;ZIJJJ)V", _executor_get_array_result)); - RETURN_IF_ERROR(register_id(_executor_clazz, "copyBatchHllResult", "(Ljava/lang/Object;ZIJJJ)V", - _executor_get_hll_result)); - RETURN_IF_ERROR(register_id(_executor_clazz, "copyBatchBitMapResult", - "(Ljava/lang/Object;ZIJJJ)V", _executor_get_bitmap_result)); - RETURN_IF_ERROR(register_id(_executor_clazz, "copyBatchJsonResult", - "(Ljava/lang/Object;ZIJJJ)V", _executor_get_json_result)); - RETURN_IF_ERROR(register_id(_executor_clazz, "copyBatchCharResult", - "(Ljava/lang/Object;ZIJJJZ)V", _executor_get_char_result)); - - RETURN_IF_ERROR(register_id(_executor_clazz, "copyBatchDateResult", - JDBC_EXECUTOR_COPY_BATCH_SIGNATURE, _executor_get_date_result)); - RETURN_IF_ERROR(register_id(_executor_clazz, "copyBatchDateV2Result", - JDBC_EXECUTOR_COPY_BATCH_SIGNATURE, _executor_get_datev2_result)); - RETURN_IF_ERROR(register_id(_executor_clazz, "copyBatchDateTimeResult", - JDBC_EXECUTOR_COPY_BATCH_SIGNATURE, _executor_get_datetime_result)); - RETURN_IF_ERROR(register_id(_executor_clazz, "copyBatchDateTimeV2Result", - JDBC_EXECUTOR_COPY_BATCH_SIGNATURE, - _executor_get_datetimev2_result)); - - RETURN_IF_ERROR(register_id(_executor_clazz, "copyBatchDecimalV2Result", - JDBC_EXECUTOR_COPY_BATCH_SIGNATURE, - _executor_get_decimalv2_result)); - RETURN_IF_ERROR(register_id(_executor_clazz, "copyBatchDecimal32Result", - "(Ljava/lang/Object;ZIJJI)V", _executor_get_decimal32_result)); - RETURN_IF_ERROR(register_id(_executor_clazz, "copyBatchDecimal64Result", - "(Ljava/lang/Object;ZIJJI)V", _executor_get_decimal64_result)); - RETURN_IF_ERROR(register_id(_executor_clazz, "copyBatchDecimal128Result", - "(Ljava/lang/Object;ZIJJI)V", _executor_get_decimal128_result)); - - RETURN_IF_ERROR(register_id(_executor_clazz, "getBlock", JDBC_EXECUTOR_GET_BLOCK_SIGNATURE, - _executor_get_blocks_id)); - RETURN_IF_ERROR(register_id(_executor_clazz, "getBlock", - JDBC_EXECUTOR_GET_BLOCK_WITH_TYPES_SIGNATURE, - _executor_get_blocks_new_id)); - RETURN_IF_ERROR(register_id(_executor_list_clazz, "get", "(I)Ljava/lang/Object;", - _executor_get_list_id)); - RETURN_IF_ERROR(register_id(_executor_string_clazz, "getBytes", "(Ljava/lang/String;)[B", - _get_bytes_id)); - RETURN_IF_ERROR( - register_id(_executor_object_clazz, "toString", "()Ljava/lang/String;", _to_string_id)); - - RETURN_IF_ERROR(register_id(_executor_clazz, "openTrans", JDBC_EXECUTOR_TRANSACTION_SIGNATURE, - _executor_begin_trans_id)); - RETURN_IF_ERROR(register_id(_executor_clazz, "commitTrans", JDBC_EXECUTOR_TRANSACTION_SIGNATURE, - _executor_finish_trans_id)); - RETURN_IF_ERROR(register_id(_executor_clazz, "rollbackTrans", - JDBC_EXECUTOR_TRANSACTION_SIGNATURE, _executor_abort_trans_id)); - RETURN_IF_ERROR(register_id(_executor_clazz, "getResultColumnTypeNames", - JDBC_EXECUTOR_GET_TYPES_SIGNATURE, _executor_get_types_id)); return Status::OK(); } @@ -855,10 +632,12 @@ Status JdbcConnector::_cast_string_to_hll(const SlotDescriptor* slot_desc, Block DataTypePtr _cast_param_data_type = _target_data_type; ColumnPtr _cast_param = _cast_param_data_type->create_column_const_with_default_value(1); + auto& input_col = block->get_by_position(column_index).column; + ColumnsWithTypeAndName argument_template; argument_template.reserve(2); argument_template.emplace_back( - std::move(str_hll_cols[_map_column_idx_to_cast_idx_hll[column_index]]), + std::move(input_col), _input_hll_string_types[_map_column_idx_to_cast_idx_hll[column_index]], "java.sql.String"); argument_template.emplace_back(_cast_param, _cast_param_data_type, _target_data_type_name); @@ -871,6 +650,7 @@ Status JdbcConnector::_cast_string_to_hll(const SlotDescriptor* slot_desc, Block static_cast(func_cast->execute(nullptr, cast_block, {0, 1}, result_idx, rows)); auto res_col = cast_block.get_by_position(result_idx).column; + block->get_by_position(column_index).type = _target_data_type; if (_target_data_type->is_nullable()) { block->replace_by_position(column_index, res_col); } else { @@ -878,8 +658,7 @@ Status JdbcConnector::_cast_string_to_hll(const SlotDescriptor* slot_desc, Block ->get_nested_column_ptr(); block->replace_by_position(column_index, nested_ptr); } - str_hll_cols[_map_column_idx_to_cast_idx_hll[column_index]] = - _input_hll_string_types[_map_column_idx_to_cast_idx_hll[column_index]]->create_column(); + return Status::OK(); } @@ -890,10 +669,12 @@ Status JdbcConnector::_cast_string_to_bitmap(const SlotDescriptor* slot_desc, Bl DataTypePtr _cast_param_data_type = _target_data_type; ColumnPtr _cast_param = _cast_param_data_type->create_column_const_with_default_value(1); + auto& input_col = block->get_by_position(column_index).column; + ColumnsWithTypeAndName argument_template; argument_template.reserve(2); argument_template.emplace_back( - std::move(str_bitmap_cols[_map_column_idx_to_cast_idx_bitmap[column_index]]), + std::move(input_col), _input_bitmap_string_types[_map_column_idx_to_cast_idx_bitmap[column_index]], "java.sql.String"); argument_template.emplace_back(_cast_param, _cast_param_data_type, _target_data_type_name); @@ -906,6 +687,7 @@ Status JdbcConnector::_cast_string_to_bitmap(const SlotDescriptor* slot_desc, Bl static_cast(func_cast->execute(nullptr, cast_block, {0, 1}, result_idx, rows)); auto res_col = cast_block.get_by_position(result_idx).column; + block->get_by_position(column_index).type = _target_data_type; if (_target_data_type->is_nullable()) { block->replace_by_position(column_index, res_col); } else { @@ -913,44 +695,7 @@ Status JdbcConnector::_cast_string_to_bitmap(const SlotDescriptor* slot_desc, Bl ->get_nested_column_ptr(); block->replace_by_position(column_index, nested_ptr); } - str_bitmap_cols[_map_column_idx_to_cast_idx_bitmap[column_index]] = - _input_bitmap_string_types[_map_column_idx_to_cast_idx_bitmap[column_index]] - ->create_column(); - return Status::OK(); -} - -Status JdbcConnector::_cast_string_to_array(const SlotDescriptor* slot_desc, Block* block, - int column_index, int rows) { - DataTypePtr _target_data_type = slot_desc->get_data_type_ptr(); - std::string _target_data_type_name = _target_data_type->get_name(); - DataTypePtr _cast_param_data_type = _target_data_type; - ColumnPtr _cast_param = _cast_param_data_type->create_column_const_with_default_value(1); - - ColumnsWithTypeAndName argument_template; - argument_template.reserve(2); - argument_template.emplace_back( - std::move(str_array_cols[_map_column_idx_to_cast_idx[column_index]]), - _input_array_string_types[_map_column_idx_to_cast_idx[column_index]], - "java.sql.String"); - argument_template.emplace_back(_cast_param, _cast_param_data_type, _target_data_type_name); - FunctionBasePtr func_cast = SimpleFunctionFactory::instance().get_function( - "CAST", argument_template, make_nullable(_target_data_type)); - Block cast_block(argument_template); - int result_idx = cast_block.columns(); - cast_block.insert({nullptr, make_nullable(_target_data_type), "cast_result"}); - static_cast(func_cast->execute(nullptr, cast_block, {0, 1}, result_idx, rows)); - - auto res_col = cast_block.get_by_position(result_idx).column; - if (_target_data_type->is_nullable()) { - block->replace_by_position(column_index, res_col); - } else { - auto nested_ptr = reinterpret_cast(res_col.get()) - ->get_nested_column_ptr(); - block->replace_by_position(column_index, nested_ptr); - } - str_array_cols[_map_column_idx_to_cast_idx[column_index]] = - _input_array_string_types[_map_column_idx_to_cast_idx[column_index]]->create_column(); return Status::OK(); } @@ -961,10 +706,12 @@ Status JdbcConnector::_cast_string_to_json(const SlotDescriptor* slot_desc, Bloc DataTypePtr _cast_param_data_type = _target_data_type; ColumnPtr _cast_param = _cast_param_data_type->create_column_const(1, "{}"); + auto& input_col = block->get_by_position(column_index).column; + ColumnsWithTypeAndName argument_template; argument_template.reserve(2); argument_template.emplace_back( - std::move(str_json_cols[_map_column_idx_to_cast_idx_json[column_index]]), + std::move(input_col), _input_json_string_types[_map_column_idx_to_cast_idx_json[column_index]], "java.sql.String"); argument_template.emplace_back(_cast_param, _cast_param_data_type, _target_data_type_name); @@ -977,6 +724,7 @@ Status JdbcConnector::_cast_string_to_json(const SlotDescriptor* slot_desc, Bloc static_cast(func_cast->execute(nullptr, cast_block, {0, 1}, result_idx, rows)); auto res_col = cast_block.get_by_position(result_idx).column; + block->get_by_position(column_index).type = _target_data_type; if (_target_data_type->is_nullable()) { block->replace_by_position(column_index, res_col); } else { @@ -984,82 +732,8 @@ Status JdbcConnector::_cast_string_to_json(const SlotDescriptor* slot_desc, Bloc ->get_nested_column_ptr(); block->replace_by_position(column_index, nested_ptr); } - str_json_cols[_map_column_idx_to_cast_idx_json[column_index]] = - _input_json_string_types[_map_column_idx_to_cast_idx_json[column_index]] - ->create_column(); - return Status::OK(); -} - -Status JdbcConnector::exec_stmt_write(Block* block, const VExprContextSPtrs& output_vexpr_ctxs, - uint32_t* num_rows_sent) { - SCOPED_TIMER(_result_send_timer); - JNIEnv* env = nullptr; - RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env)); - // prepare table meta information - std::unique_ptr meta_data; - RETURN_IF_ERROR(JniConnector::to_java_table(block, meta_data)); - long meta_address = (long)meta_data.get(); - auto table_schema = JniConnector::parse_table_schema(block); - - // prepare constructor parameters - std::map write_params = {{"meta_address", std::to_string(meta_address)}, - {"required_fields", table_schema.first}, - {"columns_types", table_schema.second}}; - jobject hashmap_object = JniUtil::convert_to_java_map(env, write_params); - env->CallNonvirtualIntMethod(_executor_obj, _executor_clazz, _executor_stmt_write_id, - hashmap_object); - env->DeleteLocalRef(hashmap_object); - RETURN_IF_ERROR(JniUtil::GetJniExceptionMsg(env)); - *num_rows_sent = block->rows(); - return Status::OK(); -} - -std::string JdbcConnector::_jobject_to_string(JNIEnv* env, jobject jobj) { - jobject jstr = env->CallObjectMethod(jobj, _to_string_id); - auto coding = env->NewStringUTF("UTF-8"); - const jbyteArray stringJbytes = (jbyteArray)env->CallObjectMethod(jstr, _get_bytes_id, coding); - size_t length = (size_t)env->GetArrayLength(stringJbytes); - jbyte* pBytes = env->GetByteArrayElements(stringJbytes, nullptr); - std::string str = std::string((char*)pBytes, length); - env->ReleaseByteArrayElements(stringJbytes, pBytes, JNI_ABORT); - env->DeleteLocalRef(stringJbytes); - env->DeleteLocalRef(jstr); - env->DeleteLocalRef(coding); - return str; -} - -Status JdbcConnector::begin_trans() { - if (_use_tranaction) { - JNIEnv* env = nullptr; - RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env)); - env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz, _executor_begin_trans_id); - RETURN_IF_ERROR(JniUtil::GetJniExceptionMsg(env)); - _is_in_transaction = true; - } - return Status::OK(); -} - -Status JdbcConnector::abort_trans() { - if (!_is_in_transaction) { - return Status::InternalError("Abort transaction before begin trans."); - } - JNIEnv* env = nullptr; - RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env)); - env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz, _executor_abort_trans_id); - return JniUtil::GetJniExceptionMsg(env); -} - -Status JdbcConnector::finish_trans() { - if (_use_tranaction && _is_in_transaction) { - JNIEnv* env = nullptr; - RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env)); - env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz, _executor_finish_trans_id); - RETURN_IF_ERROR(JniUtil::GetJniExceptionMsg(env)); - _is_in_transaction = false; - } return Status::OK(); } -} // namespace vectorized -} // namespace doris +} // namespace doris::vectorized diff --git a/be/src/vec/exec/vjdbc_connector.h b/be/src/vec/exec/vjdbc_connector.h index d1587af87f5864..02da0587f7b981 100644 --- a/be/src/vec/exec/vjdbc_connector.h +++ b/be/src/vec/exec/vjdbc_connector.h @@ -79,6 +79,8 @@ class JdbcConnector : public TableConnector { Status query() override; + Status get_next(bool* eos, Block* block, int batch_size); + Status append(vectorized::Block* block, const vectorized::VExprContextSPtrs& _output_vexpr_ctxs, uint32_t start_send_row, uint32_t* num_rows_sent, TOdbcTableType::type table_type = TOdbcTableType::MYSQL) override; @@ -91,9 +93,6 @@ class JdbcConnector : public TableConnector { Status exec_stmt_write(Block* block, const VExprContextSPtrs& output_vexpr_ctxs, uint32_t* num_rows_sent) override; - Status get_next(bool* eos, std::vector& columns, Block* block, - int batch_size); - // use in JDBC transaction Status begin_trans() override; // should be call after connect and before query or init_to_write Status abort_trans() override; // should be call after transaction abort @@ -116,17 +115,16 @@ class JdbcConnector : public TableConnector { Status _check_column_type(); Status _check_type(SlotDescriptor*, const std::string& type_str, int column_index); std::string _jobject_to_string(JNIEnv* env, jobject jobj); - Status _cast_string_to_array(const SlotDescriptor* slot_desc, Block* block, int column_index, - int rows); + + jobject _get_reader_params(Block* block, JNIEnv* env, size_t column_size); + + Status _cast_string_to_special(Block* block, JNIEnv* env, size_t column_size); Status _cast_string_to_hll(const SlotDescriptor* slot_desc, Block* block, int column_index, int rows); Status _cast_string_to_bitmap(const SlotDescriptor* slot_desc, Block* block, int column_index, int rows); Status _cast_string_to_json(const SlotDescriptor* slot_desc, Block* block, int column_index, int rows); - Status _convert_batch_result_set(JNIEnv* env, jobject jobj, const SlotDescriptor* slot_desc, - vectorized::IColumn* column_ptr, int num_rows, - int column_index); bool _closed = false; jclass _executor_clazz; @@ -135,35 +133,11 @@ class JdbcConnector : public TableConnector { jclass _executor_string_clazz; jobject _executor_obj; jmethodID _executor_ctor_id; - jmethodID _executor_write_id; jmethodID _executor_stmt_write_id; jmethodID _executor_read_id; jmethodID _executor_has_next_id; + jmethodID _executor_get_block_address_id; jmethodID _executor_block_rows_id; - jmethodID _executor_get_blocks_id; - jmethodID _executor_get_blocks_new_id; - jmethodID _executor_get_boolean_result; - jmethodID _executor_get_tinyint_result; - jmethodID _executor_get_smallint_result; - jmethodID _executor_get_int_result; - jmethodID _executor_get_bigint_result; - jmethodID _executor_get_largeint_result; - jmethodID _executor_get_float_result; - jmethodID _executor_get_double_result; - jmethodID _executor_get_char_result; - jmethodID _executor_get_string_result; - jmethodID _executor_get_date_result; - jmethodID _executor_get_datev2_result; - jmethodID _executor_get_datetime_result; - jmethodID _executor_get_datetimev2_result; - jmethodID _executor_get_decimalv2_result; - jmethodID _executor_get_decimal32_result; - jmethodID _executor_get_decimal64_result; - jmethodID _executor_get_decimal128_result; - jmethodID _executor_get_array_result; - jmethodID _executor_get_json_result; - jmethodID _executor_get_hll_result; - jmethodID _executor_get_bitmap_result; jmethodID _executor_get_types_id; jmethodID _executor_close_id; jmethodID _executor_get_list_id; @@ -172,10 +146,6 @@ class JdbcConnector : public TableConnector { jmethodID _executor_begin_trans_id; jmethodID _executor_finish_trans_id; jmethodID _executor_abort_trans_id; - std::map _map_column_idx_to_cast_idx; - std::vector _input_array_string_types; - std::vector - str_array_cols; // for array type to save data like big string [1,2,3] std::map _map_column_idx_to_cast_idx_hll; std::vector _input_hll_string_types; diff --git a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/VectorColumn.java b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/VectorColumn.java index b03cbd9cdc6a49..ec7d108bd39d9d 100644 --- a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/VectorColumn.java +++ b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/VectorColumn.java @@ -17,7 +17,6 @@ package org.apache.doris.common.jni.vec; - import org.apache.doris.common.jni.utils.OffHeap; import org.apache.doris.common.jni.utils.TypeNativeBytes; import org.apache.doris.common.jni.vec.ColumnType.Type; @@ -1060,6 +1059,20 @@ public void appendStringAndOffset(String[] batch, boolean isNullable) { } } + public void appendBinaryAndOffset(byte[][] batch, boolean isNullable) { + reserve(appendIndex + batch.length); + for (byte[] v : batch) { + byte[] bytes = v; + if (bytes == null) { + putNull(appendIndex); + bytes = new byte[0]; + } + int startOffset = childColumns[0].appendBytes(bytes, 0, bytes.length); + OffHeap.putInt(null, offsets + 4L * appendIndex, startOffset + bytes.length); + appendIndex++; + } + } + public byte[] getBytesWithOffset(int rowId) { long endOffsetAddress = offsets + 4L * rowId; int startOffset = rowId == 0 ? 0 : OffHeap.getInt(null, endOffsetAddress - 4); @@ -1391,7 +1404,11 @@ public void appendObjectColumn(Object[] batch, boolean isNullable) { case CHAR: case VARCHAR: case STRING: - appendStringAndOffset((String[]) batch, isNullable); + if (batch instanceof String[]) { + appendStringAndOffset((String[]) batch, isNullable); + } else { + appendBinaryAndOffset((byte[][]) batch, isNullable); + } break; case ARRAY: appendArray((List[]) batch, isNullable); diff --git a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcExecutor.java b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcExecutor.java index c6640429bd2734..7aba1fb62b3bc4 100644 --- a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcExecutor.java +++ b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcExecutor.java @@ -19,9 +19,9 @@ import org.apache.doris.common.exception.InternalException; import org.apache.doris.common.exception.UdfRuntimeException; -import org.apache.doris.common.jni.utils.JNINativeMethod; import org.apache.doris.common.jni.utils.UdfUtils; import org.apache.doris.common.jni.vec.ColumnType; +import org.apache.doris.common.jni.vec.ColumnValueConverter; import org.apache.doris.common.jni.vec.VectorColumn; import org.apache.doris.common.jni.vec.VectorTable; import org.apache.doris.thrift.TJdbcExecutorCtorParams; @@ -44,11 +44,10 @@ import java.lang.reflect.Array; import java.math.BigDecimal; import java.math.BigInteger; -import java.math.RoundingMode; import java.net.Inet4Address; import java.net.Inet6Address; +import java.net.InetAddress; import java.net.MalformedURLException; -import java.nio.charset.StandardCharsets; import java.sql.Connection; import java.sql.Date; import java.sql.DriverManager; @@ -57,18 +56,26 @@ import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.Statement; +import java.sql.Time; import java.sql.Timestamp; import java.sql.Types; import java.time.LocalDate; import java.time.LocalDateTime; import java.time.OffsetDateTime; +import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeFormatterBuilder; +import java.time.temporal.ChronoField; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; import java.util.function.Function; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; public class JdbcExecutor { private static final Logger LOG = Logger.getLogger(JdbcExecutor.class); @@ -80,6 +87,7 @@ public class JdbcExecutor { private ResultSetMetaData resultSetMetaData = null; private List resultColumnTypeNames = null; private List block = null; + private VectorTable outputTable = null; private int batchSizeNum = 0; private int curBlockRows = 0; private static final byte[] emptyBytes = new byte[0]; @@ -115,10 +123,6 @@ public JdbcExecutor(byte[] thriftParams) throws Exception { request.jdbc_url, request.jdbc_user, request.jdbc_password, request.op, request.table_type); } - public boolean isNebula() { - return tableType == TOdbcTableType.NEBULA; - } - public void close() throws Exception { if (resultSet != null) { resultSet.close(); @@ -163,12 +167,55 @@ public int read() throws UdfRuntimeException { } } - public int write(String sql) throws UdfRuntimeException { + public long getBlockAddress(int batchSize, Map outputParams) + throws UdfRuntimeException { try { - return stmt.executeUpdate(sql); - } catch (SQLException e) { - throw new UdfRuntimeException("JDBC executor sql has error: ", e); + if (outputTable != null) { + outputTable.close(); + } + + String isNullableString = outputParams.get("is_nullable"); + String replaceString = outputParams.get("replace_string"); + + if (isNullableString == null || replaceString == null) { + throw new IllegalArgumentException( + "Output parameters 'is_nullable' and 'replace_string' are required."); + } + + String[] nullableList = isNullableString.split(","); + String[] replaceStringList = replaceString.split(","); + curBlockRows = 0; + int columnCount = resultSetMetaData.getColumnCount(); + + do { + for (int i = 0; i < columnCount; ++i) { + boolean isBitmapOrHll = + replaceStringList[i].equals("bitmap") + || replaceStringList[i].equals("hll"); + block.get(i)[curBlockRows] = getColumnValue(tableType, i, isBitmapOrHll); + } + curBlockRows++; + } while (curBlockRows < batchSize && resultSet.next()); + + outputTable = VectorTable.createWritableTable(outputParams, curBlockRows); + + for (int i = 0; i < columnCount; ++i) { + Object[] columnData = block.get(i); + Class clz = findNonNullClass(columnData); + Object[] newColumn = (Object[]) Array.newInstance(clz, curBlockRows); + System.arraycopy(columnData, 0, newColumn, 0, curBlockRows); + boolean isNullable = Boolean.parseBoolean(nullableList[i]); + outputTable.appendData( + i, + newColumn, + getOutputConverter(outputTable.getColumnType(i), clz, replaceStringList[i]), + isNullable); + } + } catch (Exception e) { + LOG.warn("jdbc get block address exception: ", e); + throw new UdfRuntimeException("jdbc get block address: ", e); } + return outputTable.getMetaAddress(); } public int write(Map params) throws UdfRuntimeException { @@ -182,126 +229,6 @@ public int write(Map params) throws UdfRuntimeException { return batchTable.getNumRows(); } - private int insert(VectorTable data) throws SQLException { - for (int i = 0; i < data.getNumRows(); ++i) { - for (int j = 0; j < data.getColumns().length; ++j) { - insertColumn(i, j, data.getColumns()[j]); - } - preparedStatement.addBatch(); - } - preparedStatement.executeBatch(); - preparedStatement.clearBatch(); - return data.getNumRows(); - } - - private void insertColumn(int rowIdx, int colIdx, VectorColumn column) throws SQLException { - int parameterIndex = colIdx + 1; - ColumnType.Type dorisType = column.getColumnTyp(); - if (column.isNullAt(rowIdx)) { - insertNullColumn(parameterIndex, dorisType); - return; - } - switch (dorisType) { - case BOOLEAN: - preparedStatement.setBoolean(parameterIndex, column.getBoolean(rowIdx)); - break; - case TINYINT: - preparedStatement.setByte(parameterIndex, column.getByte(rowIdx)); - break; - case SMALLINT: - preparedStatement.setShort(parameterIndex, column.getShort(rowIdx)); - break; - case INT: - preparedStatement.setInt(parameterIndex, column.getInt(rowIdx)); - break; - case BIGINT: - preparedStatement.setLong(parameterIndex, column.getLong(rowIdx)); - break; - case LARGEINT: - preparedStatement.setObject(parameterIndex, column.getBigInteger(rowIdx)); - break; - case FLOAT: - preparedStatement.setFloat(parameterIndex, column.getFloat(rowIdx)); - break; - case DOUBLE: - preparedStatement.setDouble(parameterIndex, column.getDouble(rowIdx)); - break; - case DECIMALV2: - case DECIMAL32: - case DECIMAL64: - case DECIMAL128: - preparedStatement.setBigDecimal(parameterIndex, column.getDecimal(rowIdx)); - break; - case DATEV2: - preparedStatement.setDate(parameterIndex, Date.valueOf(column.getDate(rowIdx))); - break; - case DATETIMEV2: - preparedStatement.setTimestamp(parameterIndex, Timestamp.valueOf(column.getDateTime(rowIdx))); - break; - case CHAR: - case VARCHAR: - case STRING: - case BINARY: - preparedStatement.setString(parameterIndex, column.getStringWithOffset(rowIdx)); - break; - default: - throw new RuntimeException("Unknown type value: " + dorisType); - } - } - - private void insertNullColumn(int parameterIndex, ColumnType.Type dorisType) throws SQLException { - switch (dorisType) { - case BOOLEAN: - preparedStatement.setNull(parameterIndex, Types.BOOLEAN); - break; - case TINYINT: - preparedStatement.setNull(parameterIndex, Types.TINYINT); - break; - case SMALLINT: - preparedStatement.setNull(parameterIndex, Types.SMALLINT); - break; - case INT: - preparedStatement.setNull(parameterIndex, Types.INTEGER); - break; - case BIGINT: - preparedStatement.setNull(parameterIndex, Types.BIGINT); - break; - case LARGEINT: - preparedStatement.setNull(parameterIndex, Types.JAVA_OBJECT); - break; - case FLOAT: - preparedStatement.setNull(parameterIndex, Types.FLOAT); - break; - case DOUBLE: - preparedStatement.setNull(parameterIndex, Types.DOUBLE); - break; - case DECIMALV2: - case DECIMAL32: - case DECIMAL64: - case DECIMAL128: - preparedStatement.setNull(parameterIndex, Types.DECIMAL); - break; - case DATEV2: - preparedStatement.setNull(parameterIndex, Types.DATE); - break; - case DATETIMEV2: - preparedStatement.setNull(parameterIndex, Types.TIMESTAMP); - break; - case CHAR: - case VARCHAR: - case STRING: - case BINARY: - preparedStatement.setNull(parameterIndex, Types.VARCHAR); - break; - default: - throw new RuntimeException("Unknown type value: " + dorisType); - } - } - - public List getResultColumnTypeNames() { - return resultColumnTypeNames; - } - public void openTrans() throws UdfRuntimeException { try { if (conn != null) { @@ -332,55 +259,8 @@ public void rollbackTrans() throws UdfRuntimeException { } } - public List getBlock(int batchSize, Object colsArray) throws UdfRuntimeException { - try { - ArrayList colsTypes = (ArrayList) colsArray; - Integer[] colArray = new Integer[colsTypes.size()]; - colArray = colsTypes.toArray(colArray); - int columnCount = resultSetMetaData.getColumnCount(); - curBlockRows = 0; - do { - for (int i = 0; i < columnCount; ++i) { - // colArray[i] > 0, means the type is Hll/Bitmap, we should read it with getBytes - // instead of getObject, as Hll/Bitmap in JDBC will map to String by default. - if (colArray[i] > 0) { - block.get(i)[curBlockRows] = resultSet.getBytes(i + 1); - } else { - block.get(i)[curBlockRows] = resultSet.getObject(i + 1); - } - } - curBlockRows++; - } while (curBlockRows < batchSize && resultSet.next()); - } catch (SQLException e) { - throw new UdfRuntimeException("get next block failed: ", e); - } - return block; - } - - public List getBlock(int batchSize) throws UdfRuntimeException { - try { - int columnCount = resultSetMetaData.getColumnCount(); - curBlockRows = 0; - - if (isNebula()) { - do { - for (int i = 0; i < columnCount; ++i) { - block.get(i)[curBlockRows] = UdfUtils.convertObject((ValueWrapper) resultSet.getObject(i + 1)); - } - curBlockRows++; - } while (curBlockRows < batchSize && resultSet.next()); - } else { - do { - for (int i = 0; i < columnCount; ++i) { - block.get(i)[curBlockRows] = resultSet.getObject(i + 1); - } - curBlockRows++; - } while (curBlockRows < batchSize && resultSet.next()); - } - } catch (SQLException e) { - throw new UdfRuntimeException("get next block failed: ", e); - } - return block; + public List getResultColumnTypeNames() { + return resultColumnTypeNames; } public int getCurBlockRows() { @@ -482,1071 +362,405 @@ private void setValidationQuery(DruidDataSource ds, TOdbcTableType tableType) { } } - public void booleanPutToByte(Object[] column, boolean isNullable, int numRows, long nullMapAddr, long columnAddr, - int startRow) { - if (isNullable) { - for (int i = 0; i < numRows; i++) { - if (column[i] == null) { - UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1); - } else { - UdfUtils.UNSAFE.putByte(columnAddr + i, (Boolean) column[i] ? (byte) 1 : 0); - } - } - } else { - for (int i = 0; i < numRows; i++) { - UdfUtils.UNSAFE.putByte(columnAddr + i, (Boolean) column[i] ? (byte) 1 : 0); - } - } + public boolean isNebula() { + return tableType == TOdbcTableType.NEBULA; } - public void copyBatchBooleanResult(Object columnObj, boolean isNullable, int numRows, long nullMapAddr, - long columnAddr) { - Object[] column = (Object[]) columnObj; - int firstNotNullIndex = 0; - if (isNullable) { - firstNotNullIndex = getFirstNotNullObject(column, numRows, nullMapAddr); - } - if (firstNotNullIndex == numRows) { - return; - } - if (column[firstNotNullIndex] instanceof Boolean) { - booleanPutToByte(column, isNullable, numRows, nullMapAddr, columnAddr, firstNotNullIndex); - } else if (column[firstNotNullIndex] instanceof Integer) { - integerPutToByte(column, isNullable, numRows, nullMapAddr, columnAddr, firstNotNullIndex); - } else if (column[firstNotNullIndex] instanceof Byte) { - bytePutToByte(column, isNullable, numRows, nullMapAddr, columnAddr, firstNotNullIndex); + private Class findNonNullClass(Object[] columnData) { + for (Object data : columnData) { + if (data != null) { + return data.getClass(); + } + } + return Object.class; + } + + public Object getColumnValue(TOdbcTableType tableType, int columnIndex, boolean isBitmapOrHll) + throws SQLException { + Object result; + if (tableType == TOdbcTableType.NEBULA) { + result = UdfUtils.convertObject((ValueWrapper) resultSet.getObject(columnIndex + 1)); + } else { + result = + isBitmapOrHll + ? resultSet.getBytes(columnIndex + 1) + : resultSet.getObject(columnIndex + 1); + } + return result; + } + + /* + | Type | Java Array Type | + |---------------------------------------------|----------------------------| + | BOOLEAN | Boolean[] | + | TINYINT | Byte[] | + | SMALLINT | Short[] | + | INT | Integer[] | + | BIGINT | Long[] | + | LARGEINT | BigInteger[] | + | FLOAT | Float[] | + | DOUBLE | Double[] | + | DECIMALV2, DECIMAL32, DECIMAL64, DECIMAL128 | BigDecimal[] | + | DATE, DATEV2 | LocalDate[] | + | DATETIME, DATETIMEV2 | LocalDateTime[] | + | CHAR, VARCHAR, STRING | String[] | + | ARRAY | List[] | + | MAP | Map[] | + | STRUCT | Map[] | + */ + + private ColumnValueConverter getOutputConverter( + ColumnType columnType, Class clz, String replaceString) { + switch (columnType.getType()) { + case BOOLEAN: + if (Integer.class.equals(clz)) { + return createConverter(input -> ((Integer) input) != 0, Boolean.class); + } + if (Byte.class.equals(clz)) { + return createConverter(input -> ((Byte) input) != 0, Boolean.class); + } + if (String.class.equals(clz)) { + return createConverter( + input -> + Boolean.parseBoolean( + String.valueOf(input).equals("1") ? "true" : "false"), + Boolean.class); + } + break; + case TINYINT: + if (Integer.class.equals(clz)) { + return createConverter(input -> ((Integer) input).byteValue(), Byte.class); + } + if (Short.class.equals(clz)) { + return createConverter(input -> ((Short) input).byteValue(), Byte.class); + } + if (Object.class.equals(clz)) { + return createConverter( + input -> (byte) Integer.parseInt(String.valueOf(input)), Byte.class); + } + if (BigDecimal.class.equals(clz)) { + return createConverter(input -> ((BigDecimal) input).byteValue(), Byte.class); + } + if (String.class.equals(clz)) { + return createConverter( + input -> Byte.parseByte(String.valueOf(input)), Byte.class); + } + break; + case SMALLINT: + if (Integer.class.equals(clz)) { + return createConverter(input -> ((Integer) input).shortValue(), Short.class); + } + if (BigDecimal.class.equals(clz)) { + return createConverter(input -> ((BigDecimal) input).shortValue(), Short.class); + } + if (String.class.equals(clz)) { + return createConverter( + input -> Short.parseShort(String.valueOf(input)), Short.class); + } + if (Byte.class.equals(clz)) { + return createConverter(input -> ((Byte) input).shortValue(), Short.class); + } + if (com.clickhouse.data.value.UnsignedByte.class.equals(clz)) { + return createConverter( + input -> ((UnsignedByte) input).shortValue(), Short.class); + } + break; + case INT: + if (Long.class.equals(clz)) { + return createConverter(input -> ((Long) input).intValue(), Integer.class); + } + if (BigDecimal.class.equals(clz)) { + return createConverter(input -> ((BigDecimal) input).intValue(), Integer.class); + } + if (String.class.equals(clz)) { + return createConverter( + input -> Integer.parseInt(String.valueOf(input)), Integer.class); + } + if (Short.class.equals(clz)) { + return createConverter(input -> ((Short) input).intValue(), Integer.class); + } + if (com.clickhouse.data.value.UnsignedShort.class.equals(clz)) { + return createConverter( + input -> ((UnsignedShort) input).intValue(), Integer.class); + } + break; + case BIGINT: + if (BigDecimal.class.equals(clz)) { + return createConverter(input -> ((BigDecimal) input).longValue(), Long.class); + } + if (String.class.equals(clz)) { + return createConverter( + input -> Long.parseLong(String.valueOf(input)), Long.class); + } + if (Integer.class.equals(clz)) { + return createConverter(input -> ((Integer) input).longValue(), Long.class); + } + if (com.clickhouse.data.value.UnsignedInteger.class.equals(clz)) { + return createConverter( + input -> ((UnsignedInteger) input).longValue(), Long.class); + } + break; + case LARGEINT: + if (BigDecimal.class.equals(clz)) { + return createConverter( + input -> ((BigDecimal) input).toBigInteger(), BigInteger.class); + } + if (String.class.equals(clz)) { + return createConverter( + input -> new BigInteger(String.valueOf(input)), BigInteger.class); + } + if (Long.class.equals(clz)) { + return createConverter( + input -> BigInteger.valueOf((Long) input), BigInteger.class); + } + if (com.clickhouse.data.value.UnsignedLong.class.equals(clz)) { + return createConverter( + input -> ((UnsignedLong) input).bigIntegerValue(), BigInteger.class); + } + break; + case DOUBLE: + if (BigDecimal.class.equals(clz)) { + return createConverter( + input -> ((BigDecimal) input).doubleValue(), Double.class); + } + if (String.class.equals(clz)) { + return createConverter( + input -> Double.parseDouble(String.valueOf(input)), Double.class); + } + break; + case FLOAT: + return createConverter( + input -> Float.parseFloat(String.valueOf(input)), Float.class); + case DECIMALV2: + case DECIMAL32: + case DECIMAL64: + case DECIMAL128: + return createConverter( + input -> new BigDecimal(String.valueOf(input)), BigDecimal.class); + case DATE: + case DATEV2: + if (Date.class.equals(clz)) { + return createConverter(input -> ((Date) input).toLocalDate(), LocalDate.class); + } + if (Timestamp.class.equals(clz)) { + return createConverter( + input -> ((Timestamp) input).toLocalDateTime().toLocalDate(), + LocalDate.class); + } + if (String.class.equals(clz)) { + return createConverter( + input -> LocalDate.parse(String.valueOf(input)), LocalDate.class); + } + break; + case DATETIME: + case DATETIMEV2: + if (Timestamp.class.equals(clz)) { + return createConverter( + input -> ((Timestamp) input).toLocalDateTime(), LocalDateTime.class); + } + if (OffsetDateTime.class.equals(clz)) { + return createConverter( + input -> ((Timestamp) input).toLocalDateTime(), LocalDateTime.class); + } + if (oracle.sql.TIMESTAMP.class.equals(clz)) { + return createConverter( + input -> { + try { + return ((oracle.sql.TIMESTAMP) input) + .timestampValue() + .toLocalDateTime(); + } catch (SQLException e) { + throw new RuntimeException(e); + } + }, + LocalDateTime.class); + } + if (String.class.equals(clz)) { + return createConverter( + input -> + LocalDateTime.parse( + String.valueOf(input), + getDateTimeFormatter(String.valueOf(input))), + LocalDateTime.class); + } + break; + case CHAR: + return createConverter( + input -> trimSpaces(tableType, input.toString()), String.class); + case VARCHAR: + case STRING: + if (byte[].class.equals(clz)) { + if (replaceString.equals("bitmap") || replaceString.equals("hll")) { + break; + } else { + return createConverter( + input -> byteArrayToHexString(tableType, (byte[]) input), + String.class); + } + } + if (Time.class.equals(clz)) { + return createConverter( + input -> timeToString((java.sql.Time) input), String.class); + } + if (oracle.sql.CLOB.class.equals(clz)) { + return createConverter( + input -> { + try { + oracle.sql.CLOB clob = (oracle.sql.CLOB) input; + return clob.getSubString(1, (int) clob.length()); + } catch (SQLException e) { + throw new RuntimeException(e); + } + }, + String.class); + } + if (java.net.Inet4Address.class.equals(clz)) { + return createConverter( + input -> ((InetAddress) input).getHostAddress(), String.class); + } + if (java.net.Inet6Address.class.equals(clz)) { + return createConverter( + input -> { + String inetAddress = ((InetAddress) input).getHostAddress(); + return simplifyIPv6Address(inetAddress); + }, + String.class); + } else { + return createConverter(Object::toString, String.class); + } + case ARRAY: + if (java.sql.Array.class.equals(clz)) { + return createConverter( + input -> { + try { + return Arrays.asList( + (Object[]) ((java.sql.Array) input).getArray()); + } catch (SQLException e) { + throw new RuntimeException(e); + } + }, + List.class); + } + if (String.class.equals(clz)) { + return createConverter( + input -> { + List list = parseArray(String.valueOf(input)); + return convertArray(list, columnType.getChildTypes().get(0)); + }, + List.class); + } + if (tableType == TOdbcTableType.CLICKHOUSE) { + return createConverter( + input -> { + List list = convertClickHouseArray(input); + return convertArray(list, columnType.getChildTypes().get(0)); + }, + List.class); + } + break; + default: + throw new IllegalArgumentException( + "Unsupported column type: " + columnType.getType()); } + return null; } - private void bigDecimalPutToByte(Object[] column, boolean isNullable, int numRows, long nullMapAddr, - long columnAddr, int startRowForNullable) { - if (isNullable) { - for (int i = startRowForNullable; i < numRows; i++) { - if (column[i] == null) { - UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1); - } else { - Short res = ((BigDecimal) column[i]).shortValueExact(); - UdfUtils.UNSAFE.putByte(columnAddr + i, res.byteValue()); - } - } - } else { - for (int i = 0; i < numRows; i++) { - Short res = ((BigDecimal) column[i]).shortValueExact(); - UdfUtils.UNSAFE.putByte(columnAddr + i, res.byteValue()); + private ColumnValueConverter createConverter( + Function converterFunction, Class type) { + return (Object[] columnData) -> { + Object[] result = (Object[]) Array.newInstance(type, columnData.length); + for (int i = 0; i < columnData.length; i++) { + result[i] = columnData[i] != null ? converterFunction.apply(columnData[i]) : null; } - } + return result; + }; } - private void integerPutToByte(Object[] column, boolean isNullable, int numRows, long nullMapAddr, - long columnAddr, int startRowForNullable) { - if (isNullable) { - for (int i = startRowForNullable; i < numRows; i++) { - if (column[i] == null) { - UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1); - } else { - UdfUtils.UNSAFE.putByte(columnAddr + i, ((Integer) column[i]).byteValue()); - } - } + private String byteArrayToHexString(TOdbcTableType tableType, byte[] columnData) { + if (tableType == TOdbcTableType.MYSQL || tableType == TOdbcTableType.OCEANBASE) { + return mysqlByteArrayToHexString(columnData); + } else if (tableType == TOdbcTableType.POSTGRESQL) { + return pgByteArrayToHexString(columnData); } else { - for (int i = 0; i < numRows; i++) { - UdfUtils.UNSAFE.putByte(columnAddr + i, ((Integer) column[i]).byteValue()); - } + return defaultByteArrayToHexString(columnData); } } - private void shortPutToByte(Object[] column, boolean isNullable, int numRows, long nullMapAddr, - long columnAddr, int startRowForNullable) { - if (isNullable) { - for (int i = startRowForNullable; i < numRows; i++) { - if (column[i] == null) { - UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1); - } else { - UdfUtils.UNSAFE.putByte(columnAddr + i, ((Short) column[i]).byteValue()); - } - } - } else { - for (int i = 0; i < numRows; i++) { - UdfUtils.UNSAFE.putByte(columnAddr + i, ((Short) column[i]).byteValue()); - } - } - } - - private void bytePutToByte(Object[] column, boolean isNullable, int numRows, long nullMapAddr, - long columnAddr, int startRowForNullable) { - if (isNullable) { - for (int i = startRowForNullable; i < numRows; i++) { - if (column[i] == null) { - UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1); - } else { - UdfUtils.UNSAFE.putByte(columnAddr + i, (Byte) column[i]); - } - } - } else { - for (int i = 0; i < numRows; i++) { - UdfUtils.UNSAFE.putByte(columnAddr + i, (Byte) column[i]); - } - } - } - - private void objectPutToByte(Object[] column, boolean isNullable, int numRows, long nullMapAddr, - long columnAddr, int startRowForNullable) { - if (isNullable) { - for (int i = startRowForNullable; i < numRows; i++) { - if (column[i] == null) { - UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1); - } else { - String columnStr = String.valueOf(column[i]); - int columnInt = Integer.parseInt(columnStr); - UdfUtils.UNSAFE.putByte(columnAddr + i, (byte) columnInt); - } - } - } else { - for (int i = 0; i < numRows; i++) { - String columnStr = String.valueOf(column[i]); - int columnInt = Integer.parseInt(columnStr); - UdfUtils.UNSAFE.putByte(columnAddr + i, (byte) columnInt); - } - } - } - - public void copyBatchTinyIntResult(Object columnObj, boolean isNullable, int numRows, long nullMapAddr, - long columnAddr) { - Object[] column = (Object[]) columnObj; - int firstNotNullIndex = 0; - if (isNullable) { - firstNotNullIndex = getFirstNotNullObject(column, numRows, nullMapAddr); - } - if (firstNotNullIndex == numRows) { - return; - } - if (column[firstNotNullIndex] instanceof BigDecimal) { - bigDecimalPutToByte(column, isNullable, numRows, nullMapAddr, columnAddr, firstNotNullIndex); - } else if (column[firstNotNullIndex] instanceof Integer) { - integerPutToByte(column, isNullable, numRows, nullMapAddr, columnAddr, firstNotNullIndex); - } else if (column[firstNotNullIndex] instanceof Short) { - shortPutToByte(column, isNullable, numRows, nullMapAddr, columnAddr, firstNotNullIndex); - } else if (column[firstNotNullIndex] instanceof Byte) { - bytePutToByte(column, isNullable, numRows, nullMapAddr, columnAddr, firstNotNullIndex); - } else if (column[firstNotNullIndex] instanceof java.lang.Object) { - objectPutToByte(column, isNullable, numRows, nullMapAddr, columnAddr, firstNotNullIndex); - } - } - - private void bigDecimalPutToShort(Object[] column, boolean isNullable, int numRows, long nullMapAddr, - long columnAddr, int startRowForNullable) { - if (isNullable) { - for (int i = startRowForNullable; i < numRows; i++) { - if (column[i] == null) { - UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1); - } else { - UdfUtils.UNSAFE.putShort(columnAddr + (i * 2L), ((BigDecimal) column[i]).shortValueExact()); - } - } - } else { - for (int i = 0; i < numRows; i++) { - UdfUtils.UNSAFE.putShort(columnAddr + (i * 2L), ((BigDecimal) column[i]).shortValueExact()); - } - } - } - - private void integerPutToShort(Object[] column, boolean isNullable, int numRows, long nullMapAddr, - long columnAddr, int startRowForNullable) { - if (isNullable) { - for (int i = startRowForNullable; i < numRows; i++) { - if (column[i] == null) { - UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1); - } else { - UdfUtils.UNSAFE.putShort(columnAddr + (i * 2L), ((Integer) column[i]).shortValue()); - } - } - } else { - for (int i = 0; i < numRows; i++) { - UdfUtils.UNSAFE.putShort(columnAddr + (i * 2L), ((Integer) column[i]).shortValue()); - } - } - } - - private void shortPutToShort(Object[] column, boolean isNullable, int numRows, long nullMapAddr, - long columnAddr, int startRowForNullable) { - if (isNullable) { - for (int i = startRowForNullable; i < numRows; i++) { - if (column[i] == null) { - UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1); - } else { - UdfUtils.UNSAFE.putShort(columnAddr + (i * 2L), (Short) column[i]); - } - } - } else { - for (int i = 0; i < numRows; i++) { - UdfUtils.UNSAFE.putShort(columnAddr + (i * 2L), (Short) column[i]); - } - } - } - - public void clickHouseUInt8ToInt(Object[] column, boolean isNullable, int numRows, long nullMapAddr, - long columnAddr, int startRowForNullable) { - if (isNullable) { - for (int i = startRowForNullable; i < numRows; i++) { - if (column[i] == null) { - UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1); - } else { - UdfUtils.UNSAFE.putShort(columnAddr + (i * 2L), (short) ((UnsignedByte) column[i]).intValue()); - } - } - } else { - for (int i = 0; i < numRows; i++) { - UdfUtils.UNSAFE.putShort(columnAddr + (i * 2L), (short) ((UnsignedByte) column[i]).intValue()); - } - } - } - - public void copyBatchSmallIntResult(Object columnObj, boolean isNullable, int numRows, long nullMapAddr, - long columnAddr) { - Object[] column = (Object[]) columnObj; - int firstNotNullIndex = 0; - if (isNullable) { - firstNotNullIndex = getFirstNotNullObject(column, numRows, nullMapAddr); - } - if (firstNotNullIndex == numRows) { - return; - } - if (column[firstNotNullIndex] instanceof BigDecimal) { - bigDecimalPutToShort(column, isNullable, numRows, nullMapAddr, columnAddr, firstNotNullIndex); - } else if (column[firstNotNullIndex] instanceof Integer) { - integerPutToShort(column, isNullable, numRows, nullMapAddr, columnAddr, firstNotNullIndex); - } else if (column[firstNotNullIndex] instanceof Short) { - shortPutToShort(column, isNullable, numRows, nullMapAddr, columnAddr, firstNotNullIndex); - } else if (column[firstNotNullIndex] instanceof com.clickhouse.data.value.UnsignedByte) { - clickHouseUInt8ToInt(column, isNullable, numRows, nullMapAddr, columnAddr, firstNotNullIndex); - } - } - - private void bigDecimalPutToInt(Object[] column, boolean isNullable, int numRows, long nullMapAddr, - long columnAddr, int startRowForNullable) { - if (isNullable) { - for (int i = startRowForNullable; i < numRows; i++) { - if (column[i] == null) { - UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1); - } else { - UdfUtils.UNSAFE.putInt(columnAddr + (i * 4L), ((BigDecimal) column[i]).intValueExact()); - } - } - } else { - for (int i = 0; i < numRows; i++) { - UdfUtils.UNSAFE.putInt(columnAddr + (i * 4L), ((BigDecimal) column[i]).intValueExact()); - } - } - } - - private void integerPutToInt(Object[] column, boolean isNullable, int numRows, long nullMapAddr, - long columnAddr, int startRowForNullable) { - if (isNullable) { - for (int i = startRowForNullable; i < numRows; i++) { - if (column[i] == null) { - UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1); - } else { - UdfUtils.UNSAFE.putInt(columnAddr + (i * 4L), (Integer) column[i]); - } - } - } else { - for (int i = 0; i < numRows; i++) { - UdfUtils.UNSAFE.putInt(columnAddr + (i * 4L), (Integer) column[i]); - } - } - } - - private void longPutToInt(Object[] column, boolean isNullable, int numRows, long nullMapAddr, - long columnAddr, int startRowForNullable) { - if (isNullable) { - for (int i = startRowForNullable; i < numRows; i++) { - if (column[i] == null) { - UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1); - } else { - UdfUtils.UNSAFE.putInt(columnAddr + (i * 4L), ((Long) column[i]).intValue()); - } - } - } else { - for (int i = 0; i < numRows; i++) { - UdfUtils.UNSAFE.putInt(columnAddr + (i * 4L), ((Long) column[i]).intValue()); - } - } - } - - public void clickHouseUInt16ToInt(Object[] column, boolean isNullable, int numRows, long nullMapAddr, - long columnAddr, int startRowForNullable) { - if (isNullable) { - for (int i = startRowForNullable; i < numRows; i++) { - if (column[i] == null) { - UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1); - } else { - UdfUtils.UNSAFE.putInt(columnAddr + (i * 4L), ((UnsignedShort) column[i]).intValue()); - } - } - } else { - for (int i = 0; i < numRows; i++) { - UdfUtils.UNSAFE.putInt(columnAddr + (i * 4L), ((UnsignedShort) column[i]).intValue()); - } - } - } - - public void copyBatchIntResult(Object columnObj, boolean isNullable, int numRows, long nullMapAddr, - long columnAddr) { - Object[] column = (Object[]) columnObj; - int firstNotNullIndex = 0; - if (isNullable) { - firstNotNullIndex = getFirstNotNullObject(column, numRows, nullMapAddr); - } - if (firstNotNullIndex == numRows) { - return; - } - if (column[firstNotNullIndex] instanceof BigDecimal) { - bigDecimalPutToInt(column, isNullable, numRows, nullMapAddr, columnAddr, firstNotNullIndex); - } else if (column[firstNotNullIndex] instanceof Integer) { - integerPutToInt(column, isNullable, numRows, nullMapAddr, columnAddr, firstNotNullIndex); - } else if (column[firstNotNullIndex] instanceof java.lang.Long) { - // For mysql view. But don't worry about overflow - longPutToInt(column, isNullable, numRows, nullMapAddr, columnAddr, firstNotNullIndex); - } else if (column[firstNotNullIndex] instanceof com.clickhouse.data.value.UnsignedShort) { - clickHouseUInt16ToInt(column, isNullable, numRows, nullMapAddr, columnAddr, firstNotNullIndex); - } - } - - private void bigDecimalPutToLong(Object[] column, boolean isNullable, int numRows, long nullMapAddr, - long columnAddr, int startRowForNullable) { - if (isNullable) { - for (int i = startRowForNullable; i < numRows; i++) { - if (column[i] == null) { - UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1); - } else { - UdfUtils.UNSAFE.putLong(columnAddr + (i * 8L), ((BigDecimal) column[i]).longValueExact()); - } - } - } else { - for (int i = 0; i < numRows; i++) { - UdfUtils.UNSAFE.putLong(columnAddr + (i * 8L), ((BigDecimal) column[i]).longValueExact()); - } - } - } - - private void longPutToLong(Object[] column, boolean isNullable, int numRows, long nullMapAddr, - long columnAddr, int startRowForNullable) { - if (isNullable) { - for (int i = startRowForNullable; i < numRows; i++) { - if (column[i] == null) { - UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1); - } else { - UdfUtils.UNSAFE.putLong(columnAddr + (i * 8L), (Long) column[i]); - } - } - } else { - for (int i = 0; i < numRows; i++) { - UdfUtils.UNSAFE.putLong(columnAddr + (i * 8L), (Long) column[i]); - } - } - } - - private void clickHouseUInt32ToLong(Object[] column, boolean isNullable, int numRows, long nullMapAddr, - long columnAddr, int startRowForNullable) { - if (isNullable) { - for (int i = startRowForNullable; i < numRows; i++) { - if (column[i] == null) { - UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1); - } else { - UdfUtils.UNSAFE.putLong(columnAddr + (i * 8L), ((UnsignedInteger) column[i]).longValue()); - } - } - } else { - for (int i = 0; i < numRows; i++) { - UdfUtils.UNSAFE.putLong(columnAddr + (i * 8L), ((UnsignedInteger) column[i]).longValue()); - } - } - } - - public void copyBatchBigIntResult(Object columnObj, boolean isNullable, int numRows, long nullMapAddr, - long columnAddr) { - Object[] column = (Object[]) columnObj; - int firstNotNullIndex = 0; - if (isNullable) { - firstNotNullIndex = getFirstNotNullObject(column, numRows, nullMapAddr); - } - if (firstNotNullIndex == numRows) { - return; - } - if (column[firstNotNullIndex] instanceof BigDecimal) { - bigDecimalPutToLong(column, isNullable, numRows, nullMapAddr, columnAddr, firstNotNullIndex); - } else if (column[firstNotNullIndex] instanceof Long) { - longPutToLong(column, isNullable, numRows, nullMapAddr, columnAddr, firstNotNullIndex); - } else if (column[firstNotNullIndex] instanceof com.clickhouse.data.value.UnsignedInteger) { - clickHouseUInt32ToLong(column, isNullable, numRows, nullMapAddr, columnAddr, firstNotNullIndex); - } - } - - private void bigDecimalPutToBigInteger(Object[] column, boolean isNullable, int numRows, long nullMapAddr, - long columnAddr, int startRowForNullable) { - BigInteger[] data = new BigInteger[numRows]; - for (int i = 0; i < numRows; i++) { - if (column[i] == null) { - data[i] = null; - UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1); - } else { - data[i] = ((BigDecimal) column[i]).toBigInteger(); - } - } - copyBatchDecimalResult(data, isNullable, numRows, columnAddr, 16, startRowForNullable); - } - - private void bigIntegerPutToByte(Object[] column, boolean isNullable, int numRows, long nullMapAddr, - long columnAddr, int startRowForNullable) { - if (isNullable) { - for (int i = startRowForNullable; i < numRows; i++) { - if (column[i] == null) { - UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1); - } else { - BigInteger columnValue = (BigInteger) column[i]; - byte[] bytes = UdfUtils.convertByteOrder(columnValue.toByteArray()); - byte[] value = new byte[16]; - if (columnValue.signum() == -1) { - Arrays.fill(value, (byte) -1); - } - for (int index = 0; index < Math.min(bytes.length, value.length); ++index) { - value[index] = bytes[index]; - } - UdfUtils.copyMemory(value, UdfUtils.BYTE_ARRAY_OFFSET, null, columnAddr + (i * 16L), 16); - } - } - } else { - for (int i = 0; i < numRows; i++) { - BigInteger columnValue = (BigInteger) column[i]; - byte[] bytes = UdfUtils.convertByteOrder(columnValue.toByteArray()); - byte[] value = new byte[16]; - if (columnValue.signum() == -1) { - Arrays.fill(value, (byte) -1); - } - for (int index = 0; index < Math.min(bytes.length, value.length); ++index) { - value[index] = bytes[index]; - } - UdfUtils.copyMemory(value, UdfUtils.BYTE_ARRAY_OFFSET, null, columnAddr + (i * 16L), 16); - } - } - } - - private void stringPutToBigInteger(Object[] column, boolean isNullable, int numRows, long nullMapAddr, - long columnAddr, int startRowForNullable) { - BigInteger[] data = new BigInteger[numRows]; - for (int i = 0; i < numRows; i++) { - if (column[i] == null) { - data[i] = null; - UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1); - } else { - data[i] = new BigInteger((String) column[i]); - } - } - copyBatchDecimalResult(data, isNullable, numRows, columnAddr, 16, startRowForNullable); - } - - private void clickHouseUInt64ToByte(Object[] column, boolean isNullable, int numRows, long nullMapAddr, - long columnAddr, int startRowForNullable) { - if (isNullable) { - for (int i = startRowForNullable; i < numRows; i++) { - if (column[i] == null) { - UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1); - } else { - UnsignedLong columnValue = (UnsignedLong) column[i]; - BigInteger bigIntValue = columnValue.bigIntegerValue(); - byte[] bytes = UdfUtils.convertByteOrder(bigIntValue.toByteArray()); - byte[] value = new byte[16]; - if (bigIntValue.signum() == -1) { - Arrays.fill(value, (byte) -1); - } - System.arraycopy(bytes, 0, value, 0, Math.min(bytes.length, value.length)); - UdfUtils.copyMemory(value, UdfUtils.BYTE_ARRAY_OFFSET, null, columnAddr + (i * 16L), 16); - } - } - } else { - for (int i = 0; i < numRows; i++) { - UnsignedLong columnValue = (UnsignedLong) column[i]; - BigInteger bigIntValue = columnValue.bigIntegerValue(); - byte[] bytes = UdfUtils.convertByteOrder(bigIntValue.toByteArray()); - byte[] value = new byte[16]; - if (bigIntValue.signum() == -1) { - Arrays.fill(value, (byte) -1); - } - System.arraycopy(bytes, 0, value, 0, Math.min(bytes.length, value.length)); - UdfUtils.copyMemory(value, UdfUtils.BYTE_ARRAY_OFFSET, null, columnAddr + (i * 16L), 16); - } - } - } - - public void copyBatchLargeIntResult(Object columnObj, boolean isNullable, int numRows, long nullMapAddr, - long columnAddr) { - Object[] column = (Object[]) columnObj; - int firstNotNullIndex = 0; - if (isNullable) { - firstNotNullIndex = getFirstNotNullObject(column, numRows, nullMapAddr); - } - if (firstNotNullIndex == numRows) { - return; - } - if (column[firstNotNullIndex] instanceof BigDecimal) { - bigDecimalPutToBigInteger(column, isNullable, numRows, nullMapAddr, columnAddr, firstNotNullIndex); - } else if (column[firstNotNullIndex] instanceof BigInteger) { - bigIntegerPutToByte(column, isNullable, numRows, nullMapAddr, columnAddr, firstNotNullIndex); - } else if (column[firstNotNullIndex] instanceof String) { - stringPutToBigInteger(column, isNullable, numRows, nullMapAddr, columnAddr, firstNotNullIndex); - } else if (column[firstNotNullIndex] instanceof com.clickhouse.data.value.UnsignedLong) { - clickHouseUInt64ToByte(column, isNullable, numRows, nullMapAddr, columnAddr, firstNotNullIndex); - } - } - - public void copyBatchFloatResult(Object columnObj, boolean isNullable, int numRows, long nullMapAddr, - long columnAddr) { - Object[] column = (Object[]) columnObj; - if (isNullable) { - for (int i = 0; i < numRows; i++) { - if (column[i] == null) { - UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1); - } else { - UdfUtils.UNSAFE.putFloat(columnAddr + (i * 4L), (Float) column[i]); - } - } - } else { - for (int i = 0; i < numRows; i++) { - UdfUtils.UNSAFE.putFloat(columnAddr + (i * 4L), (Float) column[i]); - } - } - } - - private void bigDecimalPutToDouble(Object[] column, boolean isNullable, int numRows, long nullMapAddr, - long columnAddr, int startRowForNullable) { - if (isNullable) { - for (int i = startRowForNullable; i < numRows; i++) { - if (column[i] == null) { - UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1); - } else { - UdfUtils.UNSAFE.putDouble(columnAddr + (i * 8L), ((BigDecimal) column[i]).doubleValue()); - } - } - } else { - for (int i = 0; i < numRows; i++) { - UdfUtils.UNSAFE.putDouble(columnAddr + (i * 8L), ((BigDecimal) column[i]).doubleValue()); - } - } - } - - private void doublePutToDouble(Object[] column, boolean isNullable, int numRows, long nullMapAddr, - long columnAddr, int startRowForNullable) { - if (isNullable) { - for (int i = startRowForNullable; i < numRows; i++) { - if (column[i] == null) { - UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1); - } else { - UdfUtils.UNSAFE.putDouble(columnAddr + (i * 8L), (Double) column[i]); - } - } - } else { - for (int i = 0; i < numRows; i++) { - UdfUtils.UNSAFE.putDouble(columnAddr + (i * 8L), (Double) column[i]); - } - } - } - - public void copyBatchDoubleResult(Object columnObj, boolean isNullable, int numRows, long nullMapAddr, - long columnAddr) { - Object[] column = (Object[]) columnObj; - int firstNotNullIndex = 0; - if (isNullable) { - firstNotNullIndex = getFirstNotNullObject(column, numRows, nullMapAddr); - } - if (firstNotNullIndex == numRows) { - return; - } - if (column[firstNotNullIndex] instanceof BigDecimal) { - bigDecimalPutToDouble(column, isNullable, numRows, nullMapAddr, columnAddr, firstNotNullIndex); - } else if (column[firstNotNullIndex] instanceof Double) { - doublePutToDouble(column, isNullable, numRows, nullMapAddr, columnAddr, firstNotNullIndex); - } - } - - private void localDatePutToLong(Object[] column, boolean isNullable, int numRows, long nullMapAddr, - long columnAddr, int startRowForNullable) { - if (isNullable) { - for (int i = startRowForNullable; i < numRows; i++) { - if (column[i] == null) { - UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1); - } else { - LocalDate date = (LocalDate) column[i]; - UdfUtils.UNSAFE.putLong(columnAddr + (i * 8L), - UdfUtils.convertToDateTime(date.getYear(), date.getMonthValue(), - date.getDayOfMonth(), 0, 0, 0, true)); - } - } - } else { - for (int i = 0; i < numRows; i++) { - LocalDate date = (LocalDate) column[i]; - UdfUtils.UNSAFE.putLong(columnAddr + (i * 8L), - UdfUtils.convertToDateTime(date.getYear(), date.getMonthValue(), - date.getDayOfMonth(), 0, 0, 0, true)); - } - } - } - - private void datePutToLong(Object[] column, boolean isNullable, int numRows, long nullMapAddr, - long columnAddr, int startRowForNullable) { - if (isNullable) { - for (int i = startRowForNullable; i < numRows; i++) { - if (column[i] == null) { - UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1); - } else { - LocalDate date = ((Date) column[i]).toLocalDate(); - UdfUtils.UNSAFE.putLong(columnAddr + (i * 8L), - UdfUtils.convertToDateTime(date.getYear(), date.getMonthValue(), - date.getDayOfMonth(), 0, 0, 0, true)); - } - } - } else { - for (int i = 0; i < numRows; i++) { - LocalDate date = ((Date) column[i]).toLocalDate(); - UdfUtils.UNSAFE.putLong(columnAddr + (i * 8L), - UdfUtils.convertToDateTime(date.getYear(), date.getMonthValue(), - date.getDayOfMonth(), 0, 0, 0, true)); - } - } - } - - public void copyBatchDateResult(Object columnObj, boolean isNullable, int numRows, long nullMapAddr, - long columnAddr) { - Object[] column = (Object[]) columnObj; - int firstNotNullIndex = 0; - if (isNullable) { - firstNotNullIndex = getFirstNotNullObject(column, numRows, nullMapAddr); - } - if (firstNotNullIndex == numRows) { - return; - } - if (column[firstNotNullIndex] instanceof LocalDate) { - localDatePutToLong(column, isNullable, numRows, nullMapAddr, columnAddr, firstNotNullIndex); - } else if (column[firstNotNullIndex] instanceof Date) { - datePutToLong(column, isNullable, numRows, nullMapAddr, columnAddr, firstNotNullIndex); - } - } - - private void localDatePutToInt(Object[] column, boolean isNullable, int numRows, long nullMapAddr, - long columnAddr, int startRowForNullable) { - if (isNullable) { - for (int i = startRowForNullable; i < numRows; i++) { - if (column[i] == null) { - UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1); - } else { - LocalDate date = (LocalDate) column[i]; - UdfUtils.UNSAFE.putInt(columnAddr + (i * 4L), - UdfUtils.convertToDateV2(date.getYear(), date.getMonthValue(), - date.getDayOfMonth())); - } - } - } else { - for (int i = 0; i < numRows; i++) { - LocalDate date = (LocalDate) column[i]; - UdfUtils.UNSAFE.putInt(columnAddr + (i * 4L), - UdfUtils.convertToDateV2(date.getYear(), date.getMonthValue(), - date.getDayOfMonth())); - } - } - } - - private void datePutToInt(Object[] column, boolean isNullable, int numRows, long nullMapAddr, - long columnAddr, int startRowForNullable) { - if (isNullable) { - for (int i = startRowForNullable; i < numRows; i++) { - if (column[i] == null) { - UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1); - } else { - LocalDate date = ((Date) column[i]).toLocalDate(); - UdfUtils.UNSAFE.putInt(columnAddr + (i * 4L), - UdfUtils.convertToDateV2(date.getYear(), date.getMonthValue(), date.getDayOfMonth())); - } - } - } else { - for (int i = 0; i < numRows; i++) { - LocalDate date = ((Date) column[i]).toLocalDate(); - UdfUtils.UNSAFE.putInt(columnAddr + (i * 4L), - UdfUtils.convertToDateV2(date.getYear(), date.getMonthValue(), date.getDayOfMonth())); - } - } - } - - private void timestampPutToInt(Object[] column, boolean isNullable, int numRows, long nullMapAddr, - long columnAddr, int startRowForNullable) { - if (isNullable) { - for (int i = startRowForNullable; i < numRows; i++) { - if (column[i] == null) { - UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1); - } else { - LocalDateTime date = ((java.sql.Timestamp) column[i]).toLocalDateTime(); - UdfUtils.UNSAFE.putInt(columnAddr + (i * 4L), - UdfUtils.convertToDateV2(date.getYear(), date.getMonthValue(), - date.getDayOfMonth())); - } - } - } else { - for (int i = 0; i < numRows; i++) { - LocalDateTime date = ((java.sql.Timestamp) column[i]).toLocalDateTime(); - UdfUtils.UNSAFE.putLong(columnAddr + (i * 4L), - UdfUtils.convertToDateV2(date.getYear(), date.getMonthValue(), - date.getDayOfMonth())); - } - } - } - - public void copyBatchDateV2Result(Object columnObj, boolean isNullable, int numRows, long nullMapAddr, - long columnAddr) { - Object[] column = (Object[]) columnObj; - int firstNotNullIndex = 0; - if (isNullable) { - firstNotNullIndex = getFirstNotNullObject(column, numRows, nullMapAddr); - } - if (firstNotNullIndex == numRows) { - return; - } - if (column[firstNotNullIndex] instanceof LocalDate) { - localDatePutToInt(column, isNullable, numRows, nullMapAddr, columnAddr, firstNotNullIndex); - } else if (column[firstNotNullIndex] instanceof Date) { - datePutToInt(column, isNullable, numRows, nullMapAddr, columnAddr, firstNotNullIndex); - } else if (column[firstNotNullIndex] instanceof Timestamp) { - timestampPutToInt(column, isNullable, numRows, nullMapAddr, columnAddr, firstNotNullIndex); - } - } - - private void localDateTimePutToLong(Object[] column, boolean isNullable, int numRows, long nullMapAddr, - long columnAddr, int startRowForNullable) { - if (isNullable) { - for (int i = startRowForNullable; i < numRows; i++) { - if (column[i] == null) { - UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1); - } else { - LocalDateTime date = (LocalDateTime) column[i]; - UdfUtils.UNSAFE.putLong(columnAddr + (i * 8L), - UdfUtils.convertToDateTime(date.getYear(), date.getMonthValue(), - date.getDayOfMonth(), date.getHour(), date.getMinute(), - date.getSecond(), false)); - } - } - } else { - for (int i = 0; i < numRows; i++) { - LocalDateTime date = (LocalDateTime) column[i]; - UdfUtils.UNSAFE.putLong(columnAddr + (i * 8L), - UdfUtils.convertToDateTime(date.getYear(), date.getMonthValue(), - date.getDayOfMonth(), date.getHour(), date.getMinute(), - date.getSecond(), false)); - } - } - } - - private void timestampPutToLong(Object[] column, boolean isNullable, int numRows, long nullMapAddr, - long columnAddr, int startRowForNullable) { - if (isNullable) { - for (int i = startRowForNullable; i < numRows; i++) { - if (column[i] == null) { - UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1); - } else { - LocalDateTime date = ((java.sql.Timestamp) column[i]).toLocalDateTime(); - UdfUtils.UNSAFE.putLong(columnAddr + (i * 8L), - UdfUtils.convertToDateTime(date.getYear(), date.getMonthValue(), - date.getDayOfMonth(), date.getHour(), date.getMinute(), date.getSecond(), false)); - } - } - } else { - for (int i = 0; i < numRows; i++) { - LocalDateTime date = ((java.sql.Timestamp) column[i]).toLocalDateTime(); - UdfUtils.UNSAFE.putLong(columnAddr + (i * 8L), - UdfUtils.convertToDateTime(date.getYear(), date.getMonthValue(), date.getDayOfMonth(), - date.getHour(), date.getMinute(), date.getSecond(), false)); - } - } - } - - private void oracleTimetampPutToLong(Object[] column, boolean isNullable, int numRows, - long nullMapAddr, - long columnAddr, int startRowForNullable) throws SQLException { - if (isNullable) { - for (int i = startRowForNullable; i < numRows; i++) { - if (column[i] == null) { - UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1); - } else { - LocalDateTime date = ((oracle.sql.TIMESTAMP) column[i]).timestampValue().toLocalDateTime(); - UdfUtils.UNSAFE.putLong(columnAddr + (i * 8L), - UdfUtils.convertToDateTime(date.getYear(), date.getMonthValue(), - date.getDayOfMonth(), date.getHour(), date.getMinute(), date.getSecond(), false)); - } - } - } else { - for (int i = 0; i < numRows; i++) { - LocalDateTime date = ((oracle.sql.TIMESTAMP) column[i]).timestampValue().toLocalDateTime(); - UdfUtils.UNSAFE.putLong(columnAddr + (i * 8L), - UdfUtils.convertToDateTime(date.getYear(), date.getMonthValue(), date.getDayOfMonth(), - date.getHour(), date.getMinute(), date.getSecond(), false)); - } - } - } - - public void copyBatchDateTimeResult(Object columnObj, boolean isNullable, int numRows, long nullMapAddr, - long columnAddr) throws SQLException { - Object[] column = (Object[]) columnObj; - int firstNotNullIndex = 0; - if (isNullable) { - firstNotNullIndex = getFirstNotNullObject(column, numRows, nullMapAddr); - } - if (firstNotNullIndex == numRows) { - return; - } - if (column[firstNotNullIndex] instanceof LocalDateTime) { - localDateTimePutToLong(column, isNullable, numRows, nullMapAddr, columnAddr, firstNotNullIndex); - } else if (column[firstNotNullIndex] instanceof java.sql.Timestamp) { - timestampPutToLong(column, isNullable, numRows, nullMapAddr, columnAddr, firstNotNullIndex); - } else if (column[firstNotNullIndex] instanceof oracle.sql.TIMESTAMP) { - oracleTimetampPutToLong(column, isNullable, numRows, nullMapAddr, columnAddr, firstNotNullIndex); - } - } - - private void localDateTimePutToLongV2(Object[] column, boolean isNullable, int numRows, long nullMapAddr, - long columnAddr, int startRowForNullable) { - if (isNullable) { - for (int i = startRowForNullable; i < numRows; i++) { - if (column[i] == null) { - UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1); - } else { - LocalDateTime date = (LocalDateTime) column[i]; - UdfUtils.UNSAFE.putLong(columnAddr + (i * 8L), - UdfUtils.convertToDateTimeV2(date.getYear(), date.getMonthValue(), - date.getDayOfMonth(), date.getHour(), date.getMinute(), - date.getSecond(), date.getNano() / 1000)); - } - } - } else { - for (int i = 0; i < numRows; i++) { - LocalDateTime date = (LocalDateTime) column[i]; - UdfUtils.UNSAFE.putLong(columnAddr + (i * 8L), - UdfUtils.convertToDateTimeV2(date.getYear(), date.getMonthValue(), - date.getDayOfMonth(), date.getHour(), date.getMinute(), - date.getSecond(), date.getNano() / 1000)); - } - } - } - - private void timestampPutToLongV2(Object[] column, boolean isNullable, int numRows, long nullMapAddr, - long columnAddr, int startRowForNullable) { - if (isNullable) { - for (int i = startRowForNullable; i < numRows; i++) { - if (column[i] == null) { - UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1); - } else { - LocalDateTime date = ((java.sql.Timestamp) column[i]).toLocalDateTime(); - UdfUtils.UNSAFE.putLong(columnAddr + (i * 8L), - UdfUtils.convertToDateTimeV2(date.getYear(), date.getMonthValue(), - date.getDayOfMonth(), date.getHour(), date.getMinute(), - date.getSecond(), date.getNano() / 1000)); - } - } - } else { - for (int i = 0; i < numRows; i++) { - LocalDateTime date = ((java.sql.Timestamp) column[i]).toLocalDateTime(); - UdfUtils.UNSAFE.putLong(columnAddr + (i * 8L), - UdfUtils.convertToDateTimeV2(date.getYear(), date.getMonthValue(), date.getDayOfMonth(), - date.getHour(), date.getMinute(), date.getSecond(), date.getNano() / 1000)); + private String mysqlByteArrayToHexString(byte[] bytes) { + StringBuilder hexString = new StringBuilder("0x"); + for (byte b : bytes) { + String hex = Integer.toHexString(0xFF & b); + if (hex.length() == 1) { + hexString.append('0'); } + hexString.append(hex.toUpperCase()); } + return hexString.toString(); } - private void oracleTimetampPutToLongV2(Object[] column, boolean isNullable, int numRows, - long nullMapAddr, long columnAddr, int startRowForNullable) throws SQLException { - if (isNullable) { - for (int i = startRowForNullable; i < numRows; i++) { - if (column[i] == null) { - UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1); - } else { - LocalDateTime date = ((oracle.sql.TIMESTAMP) column[i]).timestampValue().toLocalDateTime(); - UdfUtils.UNSAFE.putLong(columnAddr + (i * 8L), - UdfUtils.convertToDateTimeV2(date.getYear(), date.getMonthValue(), - date.getDayOfMonth(), date.getHour(), date.getMinute(), - date.getSecond(), date.getNano() / 1000)); - } - } - } else { - for (int i = 0; i < numRows; i++) { - LocalDateTime date = ((oracle.sql.TIMESTAMP) column[i]).timestampValue().toLocalDateTime(); - UdfUtils.UNSAFE.putLong(columnAddr + (i * 8L), - UdfUtils.convertToDateTimeV2(date.getYear(), date.getMonthValue(), date.getDayOfMonth(), - date.getHour(), date.getMinute(), date.getSecond(), date.getNano() / 1000)); - } + private static String pgByteArrayToHexString(byte[] bytes) { + StringBuilder hexString = new StringBuilder("\\x"); + for (byte b : bytes) { + hexString.append(String.format("%02x", b & 0xff)); } + return hexString.toString(); } - private void offsetDateTimePutToLongV2(Object[] column, boolean isNullable, int numRows, long nullMapAddr, - long columnAddr, int startRowForNullable) { - if (isNullable) { - for (int i = startRowForNullable; i < numRows; i++) { - if (column[i] == null) { - UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1); - } else { - LocalDateTime date = ((OffsetDateTime) column[i]).toLocalDateTime(); - UdfUtils.UNSAFE.putLong(columnAddr + (i * 8L), - UdfUtils.convertToDateTimeV2(date.getYear(), date.getMonthValue(), - date.getDayOfMonth(), date.getHour(), date.getMinute(), - date.getSecond(), date.getNano() / 1000)); - } - } - } else { - for (int i = 0; i < numRows; i++) { - LocalDateTime date = ((OffsetDateTime) column[i]).toLocalDateTime(); - UdfUtils.UNSAFE.putLong(columnAddr + (i * 8L), - UdfUtils.convertToDateTimeV2(date.getYear(), date.getMonthValue(), - date.getDayOfMonth(), date.getHour(), date.getMinute(), - date.getSecond(), date.getNano() / 1000)); + private String defaultByteArrayToHexString(byte[] bytes) { + StringBuilder hexString = new StringBuilder(); + for (byte b : bytes) { + String hex = Integer.toHexString(0xFF & b); + if (hex.length() == 1) { + hexString.append('0'); } + hexString.append(hex.toUpperCase()); } + return hexString.toString(); } - public void copyBatchDateTimeV2Result(Object columnObj, boolean isNullable, int numRows, long nullMapAddr, - long columnAddr) throws SQLException { - Object[] column = (Object[]) columnObj; - int firstNotNullIndex = 0; - if (isNullable) { - firstNotNullIndex = getFirstNotNullObject(column, numRows, nullMapAddr); - } - if (firstNotNullIndex == numRows) { - return; - } - if (column[firstNotNullIndex] instanceof LocalDateTime) { - localDateTimePutToLongV2(column, isNullable, numRows, nullMapAddr, columnAddr, firstNotNullIndex); - } else if (column[firstNotNullIndex] instanceof java.sql.Timestamp) { - timestampPutToLongV2(column, isNullable, numRows, nullMapAddr, columnAddr, firstNotNullIndex); - } else if (column[firstNotNullIndex] instanceof OffsetDateTime) { - offsetDateTimePutToLongV2(column, isNullable, numRows, nullMapAddr, columnAddr, firstNotNullIndex); - } else if (column[firstNotNullIndex] instanceof oracle.sql.TIMESTAMP) { - oracleTimetampPutToLongV2(column, isNullable, numRows, nullMapAddr, columnAddr, firstNotNullIndex); - } - } - - public String trimSpaces(String str) { - int end = str.length() - 1; - while (end >= 0 && str.charAt(end) == ' ') { - end--; - } - return str.substring(0, end + 1); - } - - public void copyBatchCharResult(Object columnObj, boolean isNullable, int numRows, long nullMapAddr, - long offsetsAddr, long charsAddr, boolean needTrimSpaces) { - if (needTrimSpaces == true) { - Object[] column = (Object[]) columnObj; - for (int i = 0; i < numRows; i++) { - if (column[i] != null) { - column[i] = trimSpaces((String) column[i]); - } + private String trimSpaces(TOdbcTableType tableType, String str) { + if (tableType == TOdbcTableType.POSTGRESQL || tableType == TOdbcTableType.ORACLE) { + int end = str.length() - 1; + while (end >= 0 && str.charAt(end) == ' ') { + end--; } - copyBatchStringResult(column, isNullable, numRows, nullMapAddr, offsetsAddr, charsAddr); + return str.substring(0, end + 1); } else { - copyBatchStringResult(columnObj, isNullable, numRows, nullMapAddr, offsetsAddr, charsAddr); + return str; } } - private void hllPutToString(Object[] column, boolean isNullable, int numRows, long nullMapAddr, - long offsetsAddr, long charsAddr) { - int[] offsets = new int[numRows]; - byte[][] byteRes = new byte[numRows][]; - int offset = 0; - if (isNullable == true) { - // Here can not loop from startRowForNullable, - // because byteRes will be used later - for (int i = 0; i < numRows; i++) { - if (column[i] == null) { - byteRes[i] = emptyBytes; - UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1); - } else { - byteRes[i] = (byte[]) column[i]; - } - offset += byteRes[i].length; - offsets[i] = offset; - } + public String timeToString(java.sql.Time time) { + long milliseconds = time.getTime() % 1000L; + if (milliseconds > 0) { + return String.format("%s.%03d", time, milliseconds); } else { - for (int i = 0; i < numRows; i++) { - byteRes[i] = (byte[]) column[i]; - offset += byteRes[i].length; - offsets[i] = offset; - } - } - byte[] bytes = new byte[offsets[numRows - 1]]; - long bytesAddr = JNINativeMethod.resizeStringColumn(charsAddr, offsets[numRows - 1]); - int dst = 0; - for (int i = 0; i < numRows; i++) { - for (int j = 0; j < byteRes[i].length; j++) { - bytes[dst++] = byteRes[i][j]; - } + return time.toString(); } - UdfUtils.copyMemory(offsets, UdfUtils.INT_ARRAY_OFFSET, null, offsetsAddr, numRows * 4L); - UdfUtils.copyMemory(bytes, UdfUtils.BYTE_ARRAY_OFFSET, null, bytesAddr, offsets[numRows - 1]); } - private void bitMapPutToString(Object[] column, boolean isNullable, int numRows, long nullMapAddr, - long offsetsAddr, long charsAddr) { - int[] offsets = new int[numRows]; - byte[][] byteRes = new byte[numRows][]; - int offset = 0; - if (isNullable == true) { - // Here can not loop from startRowForNullable, - // because byteRes will be used later - for (int i = 0; i < numRows; i++) { - if (column[i] == null) { - byteRes[i] = emptyBytes; - UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1); - } else { - byteRes[i] = (byte[]) column[i]; - } - offset += byteRes[i].length; - offsets[i] = offset; - } - } else { - for (int i = 0; i < numRows; i++) { - byteRes[i] = (byte[]) column[i]; - offset += byteRes[i].length; - offsets[i] = offset; - } - } - byte[] bytes = new byte[offsets[numRows - 1]]; - long bytesAddr = JNINativeMethod.resizeStringColumn(charsAddr, offsets[numRows - 1]); - int dst = 0; - for (int i = 0; i < numRows; i++) { - for (int j = 0; j < byteRes[i].length; j++) { - bytes[dst++] = byteRes[i][j]; + private List convertArray(List list, ColumnType childType) { + Class clz = Object.class; + for (Object data : list) { + if (data != null) { + clz = data.getClass(); + break; } } - UdfUtils.copyMemory(offsets, UdfUtils.INT_ARRAY_OFFSET, null, offsetsAddr, numRows * 4L); - UdfUtils.copyMemory(bytes, UdfUtils.BYTE_ARRAY_OFFSET, null, bytesAddr, offsets[numRows - 1]); - } - - public void copyBatchHllResult(Object columnObj, boolean isNullable, int numRows, long nullMapAddr, - long offsetsAddr, long charsAddr) { - Object[] column = (Object[]) columnObj; - int firstNotNullIndex = 0; - if (isNullable) { - firstNotNullIndex = getFirstNotNullObject(column, numRows, nullMapAddr); - } - if (firstNotNullIndex == numRows) { - return; - } - hllPutToString(column, isNullable, numRows, nullMapAddr, offsetsAddr, charsAddr); - } - - public void copyBatchBitMapResult(Object columnObj, boolean isNullable, int numRows, long nullMapAddr, - long offsetsAddr, long charsAddr) { - Object[] column = (Object[]) columnObj; - int firstNotNullIndex = 0; - if (isNullable) { - firstNotNullIndex = getFirstNotNullObject(column, numRows, nullMapAddr); - } - if (firstNotNullIndex == numRows) { - return; + List convertedList = new ArrayList<>(list.size()); + ColumnValueConverter converter = getOutputConverter(childType, clz, "not_replace"); + for (Object element : list) { + if (childType.isComplexType()) { + convertedList.add(convertArray((List) element, childType)); + } else { + if (converter != null) { + convertedList.add(converter.convert(new Object[] {element})[0]); + } else { + convertedList.add(element); + } + } } - bitMapPutToString(column, isNullable, numRows, nullMapAddr, offsetsAddr, charsAddr); + return convertedList; } private static String simplifyIPv6Address(String address) { @@ -1572,7 +786,7 @@ private static String simplifyIPv6Address(String address) { } } if (longestSeqLen <= 1) { - return address; // No sequences of zeros to replace + return address; // No sequences of zeros to replace } StringBuilder sb = new StringBuilder(); for (int i = 0; i < longestSeqStart; i++) { @@ -1588,598 +802,216 @@ private static String simplifyIPv6Address(String address) { return sb.toString(); } - private void ipPutToString(Object[] column, boolean isNullable, int numRows, long nullMapAddr, - long offsetsAddr, long charsAddr) { - int[] offsets = new int[numRows]; - byte[][] byteRes = new byte[numRows][]; - int offset = 0; - if (isNullable) { - for (int i = 0; i < numRows; i++) { - if (column[i] == null) { - byteRes[i] = emptyBytes; - UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1); - } else { - String ip = ((java.net.InetAddress) column[i]).getHostAddress(); - if (column[i] instanceof java.net.Inet6Address) { - ip = simplifyIPv6Address(ip); - } - byteRes[i] = ip.getBytes(StandardCharsets.UTF_8); - } - offset += byteRes[i].length; - offsets[i] = offset; - } - } else { - for (int i = 0; i < numRows; i++) { - String ip = ((java.net.InetAddress) column[i]).getHostAddress(); - if (column[i] instanceof java.net.Inet6Address) { - ip = simplifyIPv6Address(ip); - } - byteRes[i] = ip.getBytes(StandardCharsets.UTF_8); - offset += byteRes[i].length; - offsets[i] = offset; - } - } - byte[] bytes = new byte[offsets[numRows - 1]]; - long bytesAddr = JNINativeMethod.resizeStringColumn(charsAddr, offsets[numRows - 1]); - int dst = 0; - for (int i = 0; i < numRows; i++) { - for (int j = 0; j < byteRes[i].length; j++) { - bytes[dst++] = byteRes[i][j]; - } - } - UdfUtils.copyMemory(offsets, UdfUtils.INT_ARRAY_OFFSET, null, offsetsAddr, numRows * 4L); - UdfUtils.copyMemory(bytes, UdfUtils.BYTE_ARRAY_OFFSET, null, bytesAddr, offsets[numRows - 1]); - } - - private void oracleClobToString(Object[] column, boolean isNullable, int numRows, long nullMapAddr, - long offsetsAddr, long charsAddr) { - int[] offsets = new int[numRows]; - byte[][] byteRes = new byte[numRows][]; - int offset = 0; - if (isNullable) { - for (int i = 0; i < numRows; i++) { - if (column[i] == null) { - byteRes[i] = emptyBytes; - UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1); - } else { - try { - oracle.sql.CLOB clob = (oracle.sql.CLOB) column[i]; - String result = clob.getSubString(1, (int) clob.length()); - byteRes[i] = result.getBytes(StandardCharsets.UTF_8); - } catch (Exception e) { - LOG.info("clobToString have error when convert " + e.getMessage()); - } - } - offset += byteRes[i].length; - offsets[i] = offset; - } - } else { - for (int i = 0; i < numRows; i++) { - try { - oracle.sql.CLOB clob = (oracle.sql.CLOB) column[i]; - String result = clob.getSubString(1, (int) clob.length()); - byteRes[i] = result.getBytes(StandardCharsets.UTF_8); - } catch (Exception e) { - LOG.info("clobToString have error when convert " + e.getMessage()); - } - offset += byteRes[i].length; - offsets[i] = offset; - } - } - byte[] bytes = new byte[offsets[numRows - 1]]; - long bytesAddr = JNINativeMethod.resizeStringColumn(charsAddr, offsets[numRows - 1]); - int dst = 0; - for (int i = 0; i < numRows; i++) { - for (int j = 0; j < byteRes[i].length; j++) { - bytes[dst++] = byteRes[i][j]; - } - } - UdfUtils.copyMemory(offsets, UdfUtils.INT_ARRAY_OFFSET, null, offsetsAddr, numRows * 4L); - UdfUtils.copyMemory(bytes, UdfUtils.BYTE_ARRAY_OFFSET, null, bytesAddr, offsets[numRows - 1]); - } - - private void objectPutToString(Object[] column, boolean isNullable, int numRows, long nullMapAddr, - long offsetsAddr, long charsAddr) { - int[] offsets = new int[numRows]; - byte[][] byteRes = new byte[numRows][]; - int offset = 0; - if (isNullable) { - // Here can not loop from startRowForNullable, - // because byteRes will be used later - for (int i = 0; i < numRows; i++) { - if (column[i] == null) { - byteRes[i] = emptyBytes; - UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1); - } else { - String result = column[i].toString(); - if (column[i] instanceof java.sql.Time) { - // the default toString() method doesn't format the milliseconds in Time. - long milliseconds = ((java.sql.Time) column[i]).getTime() % 1000L; - if (milliseconds > 0) { - result = String.format("%s.%03d", column[i].toString(), milliseconds); - } - } - byteRes[i] = result.getBytes(StandardCharsets.UTF_8); - } - offset += byteRes[i].length; - offsets[i] = offset; - } - } else { - boolean isTime = numRows > 0 && column[0] instanceof java.sql.Time; - for (int i = 0; i < numRows; i++) { - String result = column[i].toString(); - if (isTime) { - // Doc https://dev.mysql.com/doc/connector-j/8.0/en/connector-j-reference-type-conversions.html - // shows that jdbc API use java.sql.Time to hold the TIME type, - // but java.sql.Time can only have millisecond precision. - // the default toString() method doesn't format the milliseconds in Time. - // Doc https://dev.mysql.com/doc/refman/8.0/en/time.html shows that MySQL supports time[0~6], - // so time[4~6] will lose precision - long milliseconds = ((java.sql.Time) column[i]).getTime() % 1000L; - if (milliseconds > 0) { - result = String.format("%s.%03d", column[i].toString(), milliseconds); - } - } - byteRes[i] = result.getBytes(StandardCharsets.UTF_8); - offset += byteRes[i].length; - offsets[i] = offset; - } - } - byte[] bytes = new byte[offsets[numRows - 1]]; - long bytesAddr = JNINativeMethod.resizeStringColumn(charsAddr, offsets[numRows - 1]); - int dst = 0; - for (int i = 0; i < numRows; i++) { - for (int j = 0; j < byteRes[i].length; j++) { - bytes[dst++] = byteRes[i][j]; - } - } - UdfUtils.copyMemory(offsets, UdfUtils.INT_ARRAY_OFFSET, null, offsetsAddr, numRows * 4L); - UdfUtils.copyMemory(bytes, UdfUtils.BYTE_ARRAY_OFFSET, null, bytesAddr, offsets[numRows - 1]); - } - - private void stringPutToString(Object[] column, boolean isNullable, int numRows, long nullMapAddr, - long offsetsAddr, long charsAddr) { - int[] offsets = new int[numRows]; - byte[][] byteRes = new byte[numRows][]; - int offset = 0; - if (isNullable) { - for (int i = 0; i < numRows; i++) { - if (column[i] == null) { - byteRes[i] = emptyBytes; - UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1); - } else { - byteRes[i] = ((String) column[i]).getBytes(StandardCharsets.UTF_8); - } - offset += byteRes[i].length; - offsets[i] = offset; - } - } else { - for (int i = 0; i < numRows; i++) { - byteRes[i] = ((String) column[i]).getBytes(StandardCharsets.UTF_8); - offset += byteRes[i].length; - offsets[i] = offset; - } - } - byte[] bytes = new byte[offsets[numRows - 1]]; - long bytesAddr = JNINativeMethod.resizeStringColumn(charsAddr, offsets[numRows - 1]); - int dst = 0; - for (int i = 0; i < numRows; i++) { - for (int j = 0; j < byteRes[i].length; j++) { - bytes[dst++] = byteRes[i][j]; - } - } - UdfUtils.copyMemory(offsets, UdfUtils.INT_ARRAY_OFFSET, null, offsetsAddr, numRows * 4L); - UdfUtils.copyMemory(bytes, UdfUtils.BYTE_ARRAY_OFFSET, null, bytesAddr, offsets[numRows - 1]); - } - - private void byteaPutToHexString(Object[] column, boolean isNullable, int numRows, long nullMapAddr, - long offsetsAddr, long charsAddr) { - int[] offsets = new int[numRows]; - byte[][] byteRes = new byte[numRows][]; - int offset = 0; - if (isNullable) { - for (int i = 0; i < numRows; i++) { - if (column[i] == null) { - byteRes[i] = emptyBytes; - UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1); - } else { - byteRes[i] = byteArrayToHexString((byte[]) column[i]).getBytes(StandardCharsets.UTF_8); - } - offset += byteRes[i].length; - offsets[i] = offset; - } - } else { - for (int i = 0; i < numRows; i++) { - byteRes[i] = byteArrayToHexString((byte[]) column[i]).getBytes(StandardCharsets.UTF_8); - offset += byteRes[i].length; - offsets[i] = offset; - } - } - byte[] bytes = new byte[offsets[numRows - 1]]; - long bytesAddr = JNINativeMethod.resizeStringColumn(charsAddr, offsets[numRows - 1]); - int dst = 0; - for (int i = 0; i < numRows; i++) { - for (int j = 0; j < byteRes[i].length; j++) { - bytes[dst++] = byteRes[i][j]; - } - } - UdfUtils.copyMemory(offsets, UdfUtils.INT_ARRAY_OFFSET, null, offsetsAddr, numRows * 4L); - UdfUtils.copyMemory(bytes, UdfUtils.BYTE_ARRAY_OFFSET, null, bytesAddr, offsets[numRows - 1]); - } - - private static String byteArrayToHexString(byte[] bytes) { - StringBuilder hexString = new StringBuilder("\\x"); - for (byte b : bytes) { - hexString.append(String.format("%02x", b & 0xff)); - } - return hexString.toString(); - } - - private void byteaPutToMySQLString(Object[] column, boolean isNullable, int numRows, long nullMapAddr, - long offsetsAddr, long charsAddr) { - int[] offsets = new int[numRows]; - byte[][] byteRes = new byte[numRows][]; - int offset = 0; - if (isNullable) { - for (int i = 0; i < numRows; i++) { - if (column[i] == null) { - byteRes[i] = emptyBytes; - UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1); - } else { - byteRes[i] = mysqlByteArrayToHexString((byte[]) column[i]).getBytes(StandardCharsets.UTF_8); - } - offset += byteRes[i].length; - offsets[i] = offset; - } - } else { - for (int i = 0; i < numRows; i++) { - byteRes[i] = mysqlByteArrayToHexString((byte[]) column[i]).getBytes(StandardCharsets.UTF_8); - offset += byteRes[i].length; - offsets[i] = offset; - } - } - byte[] bytes = new byte[offsets[numRows - 1]]; - long bytesAddr = JNINativeMethod.resizeStringColumn(charsAddr, offsets[numRows - 1]); - int dst = 0; - for (int i = 0; i < numRows; i++) { - for (int j = 0; j < byteRes[i].length; j++) { - bytes[dst++] = byteRes[i][j]; - } - } - UdfUtils.copyMemory(offsets, UdfUtils.INT_ARRAY_OFFSET, null, offsetsAddr, numRows * 4L); - UdfUtils.copyMemory(bytes, UdfUtils.BYTE_ARRAY_OFFSET, null, bytesAddr, offsets[numRows - 1]); - } - - private static String mysqlByteArrayToHexString(byte[] bytes) { - StringBuilder hexString = new StringBuilder("0x"); - for (byte b : bytes) { - String hex = Integer.toHexString(0xFF & b); - if (hex.length() == 1) { - hexString.append('0'); - } - hexString.append(hex.toUpperCase()); - } - return hexString.toString(); - } - - public void copyBatchStringResult(Object columnObj, boolean isNullable, int numRows, long nullMapAddr, - long offsetsAddr, long charsAddr) { - Object[] column = (Object[]) columnObj; - int firstNotNullIndex = 0; - if (isNullable) { - firstNotNullIndex = getFirstNotNullObject(column, numRows, nullMapAddr); - } - if (firstNotNullIndex == numRows) { - return; - } - if (column[firstNotNullIndex] instanceof String) { - stringPutToString(column, isNullable, numRows, nullMapAddr, offsetsAddr, charsAddr); - } else if (column[firstNotNullIndex] instanceof byte[] && tableType == TOdbcTableType.POSTGRESQL) { - // for postgresql bytea type - byteaPutToHexString(column, isNullable, numRows, nullMapAddr, offsetsAddr, charsAddr); - } else if ((column[firstNotNullIndex] instanceof java.net.Inet4Address - || column[firstNotNullIndex] instanceof java.net.Inet6Address) - && tableType == TOdbcTableType.CLICKHOUSE) { - // for clickhouse ipv4 and ipv6 type - ipPutToString(column, isNullable, numRows, nullMapAddr, offsetsAddr, charsAddr); - } else if (column[firstNotNullIndex] instanceof byte[] && (tableType == TOdbcTableType.MYSQL - || tableType == TOdbcTableType.OCEANBASE)) { - // for mysql bytea type - byteaPutToMySQLString(column, isNullable, numRows, nullMapAddr, offsetsAddr, charsAddr); - } else if (column[firstNotNullIndex] instanceof oracle.sql.CLOB && tableType == TOdbcTableType.ORACLE) { - // for oracle clob type - oracleClobToString(column, isNullable, numRows, nullMapAddr, offsetsAddr, charsAddr); - } else { - // object like in pg type point, polygon, jsonb..... get object is - // org.postgresql.util.PGobject..... - // here object put to string, so the object must have impl toString() function - objectPutToString(column, isNullable, numRows, nullMapAddr, offsetsAddr, charsAddr); - } - } - - public void copyBatchDecimalV2Result(Object columnObj, boolean isNullable, int numRows, long nullMapAddr, - long columnAddr) { - Object[] column = (Object[]) columnObj; - BigInteger[] data = new BigInteger[numRows]; - for (int i = 0; i < numRows; i++) { - if (column[i] == null) { - data[i] = null; - UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1); - } else { - data[i] = ((BigDecimal) column[i]).setScale(9, RoundingMode.HALF_EVEN).unscaledValue(); - } - } - copyBatchDecimalResult(data, isNullable, numRows, columnAddr, 16, 0); - } - - public void copyBatchDecimal32Result(Object columnObj, boolean isNullable, int numRows, long nullMapAddr, - long columnAddr, int scale) { - Object[] column = (Object[]) columnObj; - BigInteger[] data = new BigInteger[numRows]; - for (int i = 0; i < numRows; i++) { - if (column[i] == null) { - data[i] = null; - UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1); - } else { - data[i] = ((BigDecimal) column[i]).setScale(scale, RoundingMode.HALF_EVEN).unscaledValue(); - } - } - copyBatchDecimalResult(data, isNullable, numRows, columnAddr, 4, 0); - } - - public void copyBatchDecimal64Result(Object columnObj, boolean isNullable, int numRows, long nullMapAddr, - long columnAddr, int scale) { - Object[] column = (Object[]) columnObj; - BigInteger[] data = new BigInteger[numRows]; - for (int i = 0; i < numRows; i++) { - if (column[i] == null) { - data[i] = null; - UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1); - } else { - data[i] = ((BigDecimal) column[i]).setScale(scale, RoundingMode.HALF_EVEN).unscaledValue(); - } - } - copyBatchDecimalResult(data, isNullable, numRows, columnAddr, 8, 0); - } + private static final Pattern MILLIS_PATTERN = Pattern.compile("(\\.\\d+)"); - public void copyBatchDecimal128Result(Object columnObj, boolean isNullable, int numRows, long nullMapAddr, - long columnAddr, int scale) { - Object[] column = (Object[]) columnObj; - BigInteger[] data = new BigInteger[numRows]; - for (int i = 0; i < numRows; i++) { - if (column[i] == null) { - data[i] = null; - UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1); - } else { - data[i] = ((BigDecimal) column[i]).setScale(scale, RoundingMode.HALF_EVEN).unscaledValue(); - } + public static DateTimeFormatter getDateTimeFormatter(String dateTimeString) { + Matcher matcher = MILLIS_PATTERN.matcher(dateTimeString); + int fractionDigits = 0; + if (matcher.find()) { + fractionDigits = matcher.group(1).length() - 1; // Subtract 1 to exclude the dot } - copyBatchDecimalResult(data, isNullable, numRows, columnAddr, 16, 0); - } + fractionDigits = Math.min(fractionDigits, 6); // Limit the fraction digits to 6 - private void copyBatchDecimalResult(BigInteger[] column, boolean isNullable, int numRows, - long columnAddr, int typeLen, int startRowForNullable) { - if (isNullable) { - for (int i = startRowForNullable; i < numRows; i++) { - if (column[i] != null) { - byte[] bytes = UdfUtils.convertByteOrder(column[i].toByteArray()); - byte[] value = new byte[typeLen]; - if (column[i].signum() == -1) { - Arrays.fill(value, (byte) -1); - } - for (int index = 0; index < Math.min(bytes.length, value.length); ++index) { - value[index] = bytes[index]; - } - UdfUtils.copyMemory(value, UdfUtils.BYTE_ARRAY_OFFSET, null, columnAddr + ((long) i * typeLen), - typeLen); - } - } - } else { - for (int i = 0; i < numRows; i++) { - byte[] bytes = UdfUtils.convertByteOrder(column[i].toByteArray()); - byte[] value = new byte[typeLen]; - if (column[i].signum() == -1) { - Arrays.fill(value, (byte) -1); - } - for (int index = 0; index < Math.min(bytes.length, value.length); ++index) { - value[index] = bytes[index]; - } - UdfUtils.copyMemory(value, UdfUtils.BYTE_ARRAY_OFFSET, null, columnAddr + ((long) i * typeLen), - typeLen); - } - } + return new DateTimeFormatterBuilder() + .appendPattern("yyyy-MM-dd HH:mm:ss") + .appendFraction(ChronoField.MILLI_OF_SECOND, fractionDigits, fractionDigits, true) + .toFormatter(); } - private static final Map, Function> CK_ARRAY_CONVERTERS = new HashMap<>(); + private static final Map, Function>> CK_ARRAY_CONVERTERS = + new HashMap<>(); static { - CK_ARRAY_CONVERTERS.put(String[].class, res -> Arrays.toString((String[]) res)); - CK_ARRAY_CONVERTERS.put(boolean[].class, res -> Arrays.toString((boolean[]) res)); - CK_ARRAY_CONVERTERS.put(Boolean[].class, res -> Arrays.toString((Boolean[]) res)); - CK_ARRAY_CONVERTERS.put(byte[].class, res -> Arrays.toString((byte[]) res)); - CK_ARRAY_CONVERTERS.put(Byte[].class, res -> Arrays.toString((Byte[]) res)); - CK_ARRAY_CONVERTERS.put(LocalDate[].class, res -> Arrays.toString((LocalDate[]) res)); - CK_ARRAY_CONVERTERS.put(LocalDateTime[].class, res -> Arrays.toString((LocalDateTime[]) res)); - CK_ARRAY_CONVERTERS.put(float[].class, res -> Arrays.toString((float[]) res)); - CK_ARRAY_CONVERTERS.put(Float[].class, res -> Arrays.toString((Float[]) res)); - CK_ARRAY_CONVERTERS.put(double[].class, res -> Arrays.toString((double[]) res)); - CK_ARRAY_CONVERTERS.put(Double[].class, res -> Arrays.toString((Double[]) res)); - CK_ARRAY_CONVERTERS.put(short[].class, res -> Arrays.toString((short[]) res)); - CK_ARRAY_CONVERTERS.put(Short[].class, res -> Arrays.toString((Short[]) res)); - CK_ARRAY_CONVERTERS.put(int[].class, res -> Arrays.toString((int[]) res)); - CK_ARRAY_CONVERTERS.put(Integer[].class, res -> Arrays.toString((Integer[]) res)); - CK_ARRAY_CONVERTERS.put(long[].class, res -> Arrays.toString((long[]) res)); - CK_ARRAY_CONVERTERS.put(Long[].class, res -> Arrays.toString((Long[]) res)); - CK_ARRAY_CONVERTERS.put(BigInteger[].class, res -> Arrays.toString((BigInteger[]) res)); - CK_ARRAY_CONVERTERS.put(BigDecimal[].class, res -> Arrays.toString((BigDecimal[]) res)); - CK_ARRAY_CONVERTERS.put(Inet4Address[].class, res -> { - if (res == null) { - return "null"; - } else { - return Arrays.toString(Arrays.stream((Inet4Address[]) res) - .map(addr -> addr == null ? "null" : addr.getHostAddress()) - .toArray(String[]::new)); - } - }); - - CK_ARRAY_CONVERTERS.put(Inet6Address[].class, res -> { - if (res == null) { - return "null"; - } else { - return Arrays.toString(Arrays.stream((Inet6Address[]) res) - .map(addr -> addr == null ? "null" : simplifyIPv6Address(addr.getHostAddress())) - .toArray(String[]::new)); - } - }); + CK_ARRAY_CONVERTERS.put(String[].class, res -> Arrays.asList((String[]) res)); + CK_ARRAY_CONVERTERS.put(boolean[].class, res -> toList((boolean[]) res)); + CK_ARRAY_CONVERTERS.put(Boolean[].class, res -> Arrays.asList((Boolean[]) res)); + CK_ARRAY_CONVERTERS.put(byte[].class, res -> toList((byte[]) res)); + CK_ARRAY_CONVERTERS.put(Byte[].class, res -> Arrays.asList((Byte[]) res)); + CK_ARRAY_CONVERTERS.put(LocalDate[].class, res -> Arrays.asList((LocalDate[]) res)); + CK_ARRAY_CONVERTERS.put(LocalDateTime[].class, res -> Arrays.asList((LocalDateTime[]) res)); + CK_ARRAY_CONVERTERS.put(float[].class, res -> toList((float[]) res)); + CK_ARRAY_CONVERTERS.put(Float[].class, res -> Arrays.asList((Float[]) res)); + CK_ARRAY_CONVERTERS.put(double[].class, res -> toList((double[]) res)); + CK_ARRAY_CONVERTERS.put(Double[].class, res -> Arrays.asList((Double[]) res)); + CK_ARRAY_CONVERTERS.put(short[].class, res -> toList((short[]) res)); + CK_ARRAY_CONVERTERS.put(Short[].class, res -> Arrays.asList((Short[]) res)); + CK_ARRAY_CONVERTERS.put(int[].class, res -> toList((int[]) res)); + CK_ARRAY_CONVERTERS.put(Integer[].class, res -> Arrays.asList((Integer[]) res)); + CK_ARRAY_CONVERTERS.put(long[].class, res -> toList((long[]) res)); + CK_ARRAY_CONVERTERS.put(Long[].class, res -> Arrays.asList((Long[]) res)); + CK_ARRAY_CONVERTERS.put(BigInteger[].class, res -> Arrays.asList((BigInteger[]) res)); + CK_ARRAY_CONVERTERS.put(BigDecimal[].class, res -> Arrays.asList((BigDecimal[]) res)); + CK_ARRAY_CONVERTERS.put( + Inet4Address[].class, + res -> + Arrays.stream((Inet4Address[]) res) + .map(addr -> addr == null ? null : addr.getHostAddress()) + .collect(Collectors.toList())); + CK_ARRAY_CONVERTERS.put( + Inet6Address[].class, + res -> + Arrays.stream((Inet6Address[]) res) + .map(addr -> addr == null ? null : simplifyIPv6Address(addr.getHostAddress())) + .collect(Collectors.toList())); + CK_ARRAY_CONVERTERS.put(UUID[].class, res -> Arrays.asList((UUID[]) res)); CK_ARRAY_CONVERTERS.put(com.clickhouse.data.value.UnsignedByte[].class, - res -> Arrays.toString((com.clickhouse.data.value.UnsignedByte[]) res)); + res -> Arrays.asList((com.clickhouse.data.value.UnsignedByte[]) res)); CK_ARRAY_CONVERTERS.put(com.clickhouse.data.value.UnsignedShort[].class, - res -> Arrays.toString((com.clickhouse.data.value.UnsignedShort[]) res)); + res -> Arrays.asList((com.clickhouse.data.value.UnsignedShort[]) res)); CK_ARRAY_CONVERTERS.put(com.clickhouse.data.value.UnsignedInteger[].class, - res -> Arrays.toString((com.clickhouse.data.value.UnsignedInteger[]) res)); + res -> Arrays.asList((com.clickhouse.data.value.UnsignedInteger[]) res)); CK_ARRAY_CONVERTERS.put(com.clickhouse.data.value.UnsignedLong[].class, - res -> Arrays.toString((com.clickhouse.data.value.UnsignedLong[]) res)); - CK_ARRAY_CONVERTERS.put(UUID[].class, res -> Arrays.toString((UUID[]) res)); + res -> Arrays.asList((com.clickhouse.data.value.UnsignedLong[]) res)); } - public static Object convertClickHouseArray(Object obj) { - Function converter = CK_ARRAY_CONVERTERS.get(obj.getClass()); - return converter != null ? converter.apply(obj) : obj; + public static List convertClickHouseArray(Object obj) { + Function> converter = CK_ARRAY_CONVERTERS.get(obj.getClass()); + return converter != null ? converter.apply(obj) : Collections.singletonList(obj); } - private void ckArrayPutToString(Object[] column, boolean isNullable, int numRows, long nullMapAddr, - long offsetsAddr, long charsAddr) { - int[] offsets = new int[numRows]; - byte[][] byteRes = new byte[numRows][]; - int offset = 0; - if (isNullable) { - // Here can not loop from startRowForNullable, - // because byteRes will be used later - for (int i = 0; i < numRows; i++) { - if (column[i] == null) { - byteRes[i] = emptyBytes; - UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1); - } else { - byteRes[i] = ((String) convertClickHouseArray(column[i])).getBytes(StandardCharsets.UTF_8); - } - offset += byteRes[i].length; - offsets[i] = offset; - } - } else { - for (int i = 0; i < numRows; i++) { - byteRes[i] = ((String) convertClickHouseArray(column[i])).getBytes(StandardCharsets.UTF_8); - offset += byteRes[i].length; - offsets[i] = offset; - } + private static List toList(T array) { + if (array instanceof Object[]) { + return Arrays.asList((Object[]) array); } - byte[] bytes = new byte[offsets[numRows - 1]]; - long bytesAddr = JNINativeMethod.resizeStringColumn(charsAddr, offsets[numRows - 1]); - int dst = 0; - for (int i = 0; i < numRows; i++) { - for (int j = 0; j < byteRes[i].length; j++) { - bytes[dst++] = byteRes[i][j]; - } + int length = Array.getLength(array); + List list = new ArrayList<>(length); + for (int i = 0; i < length; i++) { + list.add(Array.get(array, i)); } - UdfUtils.copyMemory(offsets, UdfUtils.INT_ARRAY_OFFSET, null, offsetsAddr, numRows * 4L); - UdfUtils.copyMemory(bytes, UdfUtils.BYTE_ARRAY_OFFSET, null, bytesAddr, offsets[numRows - 1]); + return list; } - private void arrayPutToString(Object[] column, boolean isNullable, int numRows, long nullMapAddr, - long offsetsAddr, long charsAddr) { - int[] offsets = new int[numRows]; - byte[][] byteRes = new byte[numRows][]; - int offset = 0; - if (isNullable) { - // Here can not loop from startRowForNullable, - // because byteRes will be used later - for (int i = 0; i < numRows; i++) { - if (column[i] == null) { - byteRes[i] = emptyBytes; - UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1); - } else { - try { - byteRes[i] = Arrays.toString((Object[]) ((java.sql.Array) column[i]).getArray()) - .getBytes(StandardCharsets.UTF_8); - } catch (SQLException e) { - LOG.info("arrayPutToString have error when convert " + e.getMessage()); - } - } - offset += byteRes[i].length; - offsets[i] = offset; - } - } else { - for (int i = 0; i < numRows; i++) { - try { - byteRes[i] = Arrays.toString((Object[]) ((java.sql.Array) column[i]).getArray()) - .getBytes(StandardCharsets.UTF_8); - } catch (SQLException e) { - LOG.info("arrayPutToString have error when convert " + e.getMessage()); - } - offset += byteRes[i].length; - offsets[i] = offset; - } - } - byte[] bytes = new byte[offsets[numRows - 1]]; - long bytesAddr = JNINativeMethod.resizeStringColumn(charsAddr, offsets[numRows - 1]); - int dst = 0; - for (int i = 0; i < numRows; i++) { - for (int j = 0; j < byteRes[i].length; j++) { - bytes[dst++] = byteRes[i][j]; + private static final Pattern ARRAY_PATTERN = Pattern.compile("\"([^\"]*)\"|([^,]+)"); + + private static List parseArray(String input) { + String trimmedInput = input.substring(1, input.length() - 1); + List list = new ArrayList<>(); + Matcher matcher = ARRAY_PATTERN.matcher(trimmedInput); + while (matcher.find()) { + if (matcher.group(1) != null) { + list.add(matcher.group(1)); + } else { + list.add(matcher.group(2)); } } - UdfUtils.copyMemory(offsets, UdfUtils.INT_ARRAY_OFFSET, null, offsetsAddr, numRows * 4L); - UdfUtils.copyMemory(bytes, UdfUtils.BYTE_ARRAY_OFFSET, null, bytesAddr, offsets[numRows - 1]); + return list; } - public void copyBatchArrayResult(Object columnObj, boolean isNullable, int numRows, long nullMapAddr, - long offsetsAddr, long charsAddr) { - Object[] column = (Object[]) columnObj; - int firstNotNullIndex = 0; - if (isNullable) { - firstNotNullIndex = getFirstNotNullObject(column, numRows, nullMapAddr); - } - if (firstNotNullIndex == numRows) { - return; - } - // for doris array - if (column[firstNotNullIndex] instanceof String) { - stringPutToString(column, isNullable, numRows, nullMapAddr, offsetsAddr, charsAddr); - } else if (column[firstNotNullIndex] instanceof java.sql.Array) { - // for PG array - arrayPutToString(column, isNullable, numRows, nullMapAddr, offsetsAddr, charsAddr); - } else { - // For the ClickHouse array type - ckArrayPutToString(column, isNullable, numRows, nullMapAddr, offsetsAddr, charsAddr); + private int insert(VectorTable data) throws SQLException { + for (int i = 0; i < data.getNumRows(); ++i) { + for (int j = 0; j < data.getColumns().length; ++j) { + insertColumn(i, j, data.getColumns()[j]); + } + preparedStatement.addBatch(); } + preparedStatement.executeBatch(); + preparedStatement.clearBatch(); + return data.getNumRows(); } - public void copyBatchJsonResult(Object columnObj, boolean isNullable, int numRows, long nullMapAddr, - long offsetsAddr, long charsAddr) { - Object[] column = (Object[]) columnObj; - int firstNotNullIndex = 0; - if (isNullable) { - firstNotNullIndex = getFirstNotNullObject(column, numRows, nullMapAddr); - } - if (firstNotNullIndex == numRows) { + private void insertColumn(int rowIdx, int colIdx, VectorColumn column) throws SQLException { + int parameterIndex = colIdx + 1; + ColumnType.Type dorisType = column.getColumnTyp(); + if (column.isNullAt(rowIdx)) { + insertNullColumn(parameterIndex, dorisType); return; } - if (column[firstNotNullIndex] instanceof String) { - stringPutToString(column, isNullable, numRows, nullMapAddr, offsetsAddr, charsAddr); - } else { - objectPutToString(column, isNullable, numRows, nullMapAddr, offsetsAddr, charsAddr); + switch (dorisType) { + case BOOLEAN: + preparedStatement.setBoolean(parameterIndex, column.getBoolean(rowIdx)); + break; + case TINYINT: + preparedStatement.setByte(parameterIndex, column.getByte(rowIdx)); + break; + case SMALLINT: + preparedStatement.setShort(parameterIndex, column.getShort(rowIdx)); + break; + case INT: + preparedStatement.setInt(parameterIndex, column.getInt(rowIdx)); + break; + case BIGINT: + preparedStatement.setLong(parameterIndex, column.getLong(rowIdx)); + break; + case LARGEINT: + preparedStatement.setObject(parameterIndex, column.getBigInteger(rowIdx)); + break; + case FLOAT: + preparedStatement.setFloat(parameterIndex, column.getFloat(rowIdx)); + break; + case DOUBLE: + preparedStatement.setDouble(parameterIndex, column.getDouble(rowIdx)); + break; + case DECIMALV2: + case DECIMAL32: + case DECIMAL64: + case DECIMAL128: + preparedStatement.setBigDecimal(parameterIndex, column.getDecimal(rowIdx)); + break; + case DATEV2: + preparedStatement.setDate(parameterIndex, Date.valueOf(column.getDate(rowIdx))); + break; + case DATETIMEV2: + preparedStatement.setTimestamp( + parameterIndex, Timestamp.valueOf(column.getDateTime(rowIdx))); + break; + case CHAR: + case VARCHAR: + case STRING: + case BINARY: + preparedStatement.setString(parameterIndex, column.getStringWithOffset(rowIdx)); + break; + default: + throw new RuntimeException("Unknown type value: " + dorisType); } } - private int getFirstNotNullObject(Object[] column, int numRows, long nullMapAddr) { - int i = 0; - for (; i < numRows; ++i) { - if (null == column[i]) { - UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1); - } else { + private void insertNullColumn(int parameterIndex, ColumnType.Type dorisType) + throws SQLException { + switch (dorisType) { + case BOOLEAN: + preparedStatement.setNull(parameterIndex, Types.BOOLEAN); break; - } + case TINYINT: + preparedStatement.setNull(parameterIndex, Types.TINYINT); + break; + case SMALLINT: + preparedStatement.setNull(parameterIndex, Types.SMALLINT); + break; + case INT: + preparedStatement.setNull(parameterIndex, Types.INTEGER); + break; + case BIGINT: + preparedStatement.setNull(parameterIndex, Types.BIGINT); + break; + case LARGEINT: + preparedStatement.setNull(parameterIndex, Types.JAVA_OBJECT); + break; + case FLOAT: + preparedStatement.setNull(parameterIndex, Types.FLOAT); + break; + case DOUBLE: + preparedStatement.setNull(parameterIndex, Types.DOUBLE); + break; + case DECIMALV2: + case DECIMAL32: + case DECIMAL64: + case DECIMAL128: + preparedStatement.setNull(parameterIndex, Types.DECIMAL); + break; + case DATEV2: + preparedStatement.setNull(parameterIndex, Types.DATE); + break; + case DATETIMEV2: + preparedStatement.setNull(parameterIndex, Types.TIMESTAMP); + break; + case CHAR: + case VARCHAR: + case STRING: + case BINARY: + preparedStatement.setNull(parameterIndex, Types.VARCHAR); + break; + default: + throw new RuntimeException("Unknown type value: " + dorisType); } - return i; } } - diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClickHouseClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClickHouseClient.java index b68ec481d3d2ec..f25fabcc3c8e98 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClickHouseClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClickHouseClient.java @@ -43,11 +43,13 @@ protected Type jdbcTypeToDoris(JdbcFieldSchema fieldSchema) { String ckType = fieldSchema.getDataTypeName(); if (ckType.startsWith("LowCardinality")) { + fieldSchema.setAllowNull(true); ckType = ckType.substring(15, ckType.length() - 1); if (ckType.startsWith("Nullable")) { ckType = ckType.substring(9, ckType.length() - 1); } } else if (ckType.startsWith("Nullable")) { + fieldSchema.setAllowNull(true); ckType = ckType.substring(9, ckType.length() - 1); } diff --git a/regression-test/data/external_table_p0/jdbc/test_clickhouse_jdbc_catalog.out b/regression-test/data/external_table_p0/jdbc/test_clickhouse_jdbc_catalog.out index cadcd8e0e2da62..a92a0f5d71af34 100644 Binary files a/regression-test/data/external_table_p0/jdbc/test_clickhouse_jdbc_catalog.out and b/regression-test/data/external_table_p0/jdbc/test_clickhouse_jdbc_catalog.out differ