Skip to content

Commit

Permalink
[feature](jni) map c++ block to java vector table (apache#18566)
Browse files Browse the repository at this point in the history
PR(apache#17960) has introduced vector table which can map java table to c++ block.
In some cases(java udf & jdbc exector), we should map c++ block to java table. This PR implements this function.

The memory structure of java vector table and c++ block is consistent,
so the implementation doesn't copy the block, just passes the memory address.
  • Loading branch information
AshinGau authored Apr 16, 2023
1 parent 0788ff1 commit ddbff2a
Show file tree
Hide file tree
Showing 8 changed files with 298 additions and 11 deletions.
78 changes: 78 additions & 0 deletions be/src/vec/exec/jni_connector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -329,4 +329,82 @@ std::string JniConnector::get_hive_type(const TypeDescriptor& desc) {
return "unsupported";
}
}

Status JniConnector::generate_meta_info(Block* block, std::unique_ptr<long[]>& meta) {
std::vector<long> meta_data;
// insert number of rows
meta_data.emplace_back(block->rows());
for (int i = 0; i < block->columns(); ++i) {
auto& column_with_type_and_name = block->get_by_position(i);
auto& column_ptr = column_with_type_and_name.column;
auto& column_type = column_with_type_and_name.type;
TypeIndex logical_type = remove_nullable(column_type)->get_type_id();

// insert null map address
MutableColumnPtr data_column;
if (column_ptr->is_nullable()) {
auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(
column_ptr->assume_mutable().get());
data_column = nullable_column->get_nested_column_ptr();
NullMap& null_map = nullable_column->get_null_map_data();
meta_data.emplace_back((long)null_map.data());
} else {
meta_data.emplace_back(0);
data_column = column_ptr->assume_mutable();
}

switch (logical_type) {
#define DISPATCH(NUMERIC_TYPE, CPP_NUMERIC_TYPE) \
case NUMERIC_TYPE: { \
meta_data.emplace_back(_get_numeric_data_address<CPP_NUMERIC_TYPE>(data_column)); \
break; \
}
FOR_LOGICAL_NUMERIC_TYPES(DISPATCH)
#undef DISPATCH
case TypeIndex::Decimal128:
[[fallthrough]];
case TypeIndex::Decimal128I: {
meta_data.emplace_back(_get_decimal_data_address<Int128>(data_column));
break;
}
case TypeIndex::Decimal32: {
meta_data.emplace_back(_get_decimal_data_address<Int32>(data_column));
break;
}
case TypeIndex::Decimal64: {
meta_data.emplace_back(_get_decimal_data_address<Int64>(data_column));
break;
}
case TypeIndex::DateV2: {
meta_data.emplace_back(_get_time_data_address<UInt32>(data_column));
break;
}
case TypeIndex::DateTimeV2: {
meta_data.emplace_back(_get_time_data_address<UInt64>(data_column));
break;
}
case TypeIndex::String:
[[fallthrough]];
case TypeIndex::FixedString: {
auto& string_column = static_cast<ColumnString&>(*data_column);
// inert offsets
meta_data.emplace_back((long)string_column.get_offsets().data());
meta_data.emplace_back((long)string_column.get_chars().data());
break;
}
case TypeIndex::Array:
[[fallthrough]];
case TypeIndex::Struct:
[[fallthrough]];
case TypeIndex::Map:
return Status::IOError("Unhandled type {}", getTypeName(logical_type));
default:
return Status::IOError("Unsupported type {}", getTypeName(logical_type));
}
}

meta.reset(new long[meta_data.size()]);
memcpy(meta.get(), &meta_data[0], meta_data.size() * 8);
return Status::OK();
}
} // namespace doris::vectorized
19 changes: 19 additions & 0 deletions be/src/vec/exec/jni_connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,8 @@ class JniConnector {
*/
static std::string get_hive_type(const TypeDescriptor& desc);

static Status generate_meta_info(Block* block, std::unique_ptr<long[]>& meta);

private:
std::string _connector_class;
std::map<std::string, std::string> _scanner_params;
Expand Down Expand Up @@ -233,6 +235,11 @@ class JniConnector {
return Status::OK();
}

template <typename CppType>
static long _get_numeric_data_address(MutableColumnPtr& doris_column) {
return (long)static_cast<ColumnVector<CppType>&>(*doris_column).get_data().data();
}

template <typename DecimalPrimitiveType>
Status _fill_decimal_column(MutableColumnPtr& doris_column, DecimalPrimitiveType* ptr,
size_t num_rows) {
Expand All @@ -245,6 +252,13 @@ class JniConnector {
return Status::OK();
}

template <typename DecimalPrimitiveType>
static long _get_decimal_data_address(MutableColumnPtr& doris_column) {
return (long)static_cast<ColumnDecimal<Decimal<DecimalPrimitiveType>>&>(*doris_column)
.get_data()
.data();
}

template <typename CppType>
Status _decode_time_column(MutableColumnPtr& doris_column, CppType* ptr, size_t num_rows) {
auto& column_data = static_cast<ColumnVector<CppType>&>(*doris_column).get_data();
Expand All @@ -254,6 +268,11 @@ class JniConnector {
return Status::OK();
}

template <typename CppType>
static long _get_time_data_address(MutableColumnPtr& doris_column) {
return (long)static_cast<ColumnVector<CppType>&>(*doris_column).get_data().data();
}

Status _fill_string_column(MutableColumnPtr& doris_column, size_t num_rows);

void _generate_predicates(
Expand Down
59 changes: 59 additions & 0 deletions be/src/vec/exec/vjdbc_connector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "vec/columns/column_string.h"
#include "vec/data_types/data_type_factory.hpp"
#include "vec/data_types/data_type_string.h"
#include "vec/exec/jni_connector.h"
#include "vec/exec/scan/new_jdbc_scanner.h"
#include "vec/functions/simple_function_factory.h"

Expand All @@ -41,6 +42,7 @@ namespace vectorized {
const char* JDBC_EXECUTOR_CLASS = "org/apache/doris/udf/JdbcExecutor";
const char* JDBC_EXECUTOR_CTOR_SIGNATURE = "([B)V";
const char* JDBC_EXECUTOR_WRITE_SIGNATURE = "(Ljava/lang/String;)I";
const char* JDBC_EXECUTOR_STMT_WRITE_SIGNATURE = "(Ljava/util/Map;)I";
const char* JDBC_EXECUTOR_HAS_NEXT_SIGNATURE = "()Z";
const char* JDBC_EXECUTOR_GET_BLOCK_SIGNATURE = "(I)Ljava/util/List;";
const char* JDBC_EXECUTOR_GET_TYPES_SIGNATURE = "()Ljava/util/List;";
Expand Down Expand Up @@ -570,6 +572,8 @@ Status JdbcConnector::_register_func_id(JNIEnv* env) {
_executor_ctor_id));
RETURN_IF_ERROR(register_id(_executor_clazz, "write", JDBC_EXECUTOR_WRITE_SIGNATURE,
_executor_write_id));
RETURN_IF_ERROR(register_id(_executor_clazz, "write", JDBC_EXECUTOR_STMT_WRITE_SIGNATURE,
_executor_stmt_write_id));
RETURN_IF_ERROR(register_id(_executor_clazz, "read", "()I", _executor_read_id));
RETURN_IF_ERROR(register_id(_executor_clazz, "close", JDBC_EXECUTOR_CLOSE_SIGNATURE,
_executor_close_id));
Expand Down Expand Up @@ -692,6 +696,61 @@ Status JdbcConnector::exec_write_sql(const std::u16string& insert_stmt,
return Status::OK();
}

Status JdbcConnector::exec_stmt_write(
Block* block, const std::vector<vectorized::VExprContext*>& output_vexpr_ctxs) {
SCOPED_TIMER(_result_send_timer);
JNIEnv* env = nullptr;
RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));

// prepare table schema
std::ostringstream required_fields;
std::ostringstream columns_types;
for (int i = 0; i < block->columns(); ++i) {
// column name maybe empty or has special characters
// std::string field = block->get_by_position(i).name;
std::string type = JniConnector::get_hive_type(output_vexpr_ctxs[i]->root()->type());
if (i == 0) {
required_fields << "_col" << i;
columns_types << type;
} else {
required_fields << ","
<< "_col" << i;
columns_types << "#" << type;
}
}

// prepare table meta information
std::unique_ptr<long[]> meta_data;
RETURN_IF_ERROR(JniConnector::generate_meta_info(block, meta_data));
long meta_address = (long)meta_data.get();

// prepare constructor parameters
std::map<String, String> write_params = {{"meta_address", std::to_string(meta_address)},
{"required_fields", required_fields.str()},
{"columns_types", columns_types.str()},
{"write_sql", "/* todo */"}};
jclass hashmap_class = env->FindClass("java/util/HashMap");
jmethodID hashmap_constructor = env->GetMethodID(hashmap_class, "<init>", "(I)V");
jobject hashmap_object =
env->NewObject(hashmap_class, hashmap_constructor, write_params.size());
jmethodID hashmap_put = env->GetMethodID(
hashmap_class, "put", "(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;");
RETURN_ERROR_IF_EXC(env);
for (const auto& it : write_params) {
jstring key = env->NewStringUTF(it.first.c_str());
jstring value = env->NewStringUTF(it.second.c_str());
env->CallObjectMethod(hashmap_object, hashmap_put, key, value);
env->DeleteLocalRef(key);
env->DeleteLocalRef(value);
}
env->DeleteLocalRef(hashmap_class);
env->CallNonvirtualIntMethod(_executor_obj, _executor_clazz, _executor_stmt_write_id,
hashmap_object);
env->DeleteLocalRef(hashmap_object);
RETURN_IF_ERROR(JniUtil::GetJniExceptionMsg(env));
return Status::OK();
}

std::string JdbcConnector::_jobject_to_string(JNIEnv* env, jobject jobj) {
jobject jstr = env->CallObjectMethod(jobj, _to_string_id);
auto coding = env->NewStringUTF("UTF-8");
Expand Down
4 changes: 4 additions & 0 deletions be/src/vec/exec/vjdbc_connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ class JdbcConnector : public TableConnector {
Status exec_write_sql(const std::u16string& insert_stmt,
const fmt::memory_buffer& insert_stmt_buffer) override;

Status exec_stmt_write(Block* block,
const std::vector<vectorized::VExprContext*>& output_vexpr_ctxs);

Status get_next(bool* eos, std::vector<MutableColumnPtr>& columns, Block* block,
int batch_size);

Expand Down Expand Up @@ -99,6 +102,7 @@ class JdbcConnector : public TableConnector {
jobject _executor_obj;
jmethodID _executor_ctor_id;
jmethodID _executor_write_id;
jmethodID _executor_stmt_write_id;
jmethodID _executor_read_id;
jmethodID _executor_has_next_id;
jmethodID _executor_block_rows_id;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,61 @@ public VectorColumn(ColumnType columnType, int capacity) {
}
} else if (columnType.isStringType()) {
childColumns = new VectorColumn[1];
childColumns[0] = new VectorColumn(new ColumnType("#data", Type.BYTE), capacity * DEFAULT_STRING_LENGTH);
childColumns[0] = new VectorColumn(new ColumnType("#stringBytes", Type.BYTE),
capacity * DEFAULT_STRING_LENGTH);
}

reserveCapacity(capacity);
}

// restore the child of string column & restore meta column
public VectorColumn(long address, int capacity, ColumnType columnType) {
this.columnType = columnType;
this.capacity = capacity;
this.nullMap = 0;
this.data = address;
this.offsets = 0;
this.numNulls = 0;
this.appendIndex = capacity;
}

// restore block column
public VectorColumn(ColumnType columnType, int numRows, long columnMetaAddress) {
if (columnType.isUnsupported()) {
throw new RuntimeException("Unsupported type for column: " + columnType.getName());
}
long address = columnMetaAddress;
this.capacity = numRows;
this.columnType = columnType;
this.nullMap = OffHeap.getLong(null, address);
address += 8;
this.numNulls = 0;
if (this.nullMap != 0) {
for (int i = 0; i < numRows; ++i) {
if (isNullAt(i)) {
this.numNulls++;
}
}
}
this.appendIndex = numRows;

if (columnType.isComplexType()) {
// todo: support complex type
throw new RuntimeException("Unhandled type: " + columnType);
} else if (columnType.isStringType()) {
this.offsets = OffHeap.getLong(null, address);
address += 8;
this.data = 0;
int length = OffHeap.getInt(null, this.offsets + (numRows - 1) * 4L);
childColumns = new VectorColumn[1];
childColumns[0] = new VectorColumn(OffHeap.getLong(null, address), length,
new ColumnType("#stringBytes", Type.BYTE));
} else {
this.data = OffHeap.getLong(null, address);
this.offsets = 0;
}
}

public long nullMapAddress() {
return nullMap;
}
Expand All @@ -90,6 +139,10 @@ public long offsetAddress() {
return offsets;
}

public ColumnType.Type getColumnTyp() {
return columnType.getType();
}

/**
* Release columns and meta information
*/
Expand Down Expand Up @@ -159,8 +212,10 @@ private void reserveCapacity(int newCapacity) {
throw new RuntimeException("Unhandled type: " + columnType);
}
// todo: support complex type
this.nullMap = OffHeap.reallocateMemory(nullMap, oldCapacity, newCapacity);
OffHeap.setMemory(nullMap + oldCapacity, (byte) 0, newCapacity - oldCapacity);
if (!"#stringBytes".equals(columnType.getName())) {
this.nullMap = OffHeap.reallocateMemory(nullMap, oldCapacity, newCapacity);
OffHeap.setMemory(nullMap + oldCapacity, (byte) 0, newCapacity - oldCapacity);
}
capacity = newCapacity;
}

Expand All @@ -178,7 +233,11 @@ public void reset() {
}

public boolean isNullAt(int rowId) {
return OffHeap.getByte(null, nullMap + rowId) == 1;
if (nullMap == 0) {
return false;
} else {
return OffHeap.getByte(null, nullMap + rowId) == 1;
}
}

public boolean hasNull() {
Expand Down
Loading

0 comments on commit ddbff2a

Please sign in to comment.