Skip to content

Commit

Permalink
[Enhancement] improve error message handling of JNI connector and sup…
Browse files Browse the repository at this point in the history
…port char type of paimon connector (StarRocks#28044)

Fixes StarRocks#27992

- fail fast for unsupported types of paimon table
- output error massage of JNI to client terminal
- support char type of paimon connector
- some minor refactor

Signed-off-by: miomiocat <284487410@qq.com>
  • Loading branch information
miomiocat authored Jul 28, 2023
1 parent 282bb77 commit 865b915
Show file tree
Hide file tree
Showing 8 changed files with 141 additions and 59 deletions.
4 changes: 4 additions & 0 deletions be/src/connector/hive_connector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,10 @@ Status HiveDataSource::_init_scanner(RuntimeState* state) {
RETURN_IF_ERROR(scanner->init(state, scanner_params));
Status st = scanner->open(state);
if (!st.ok()) {
if (scanner->is_jni_scanner()) {
return st;
}

auto msg = fmt::format("file = {}", native_file_path);

// After catching the AWS 404 file not found error and returning it to the FE,
Expand Down
1 change: 1 addition & 0 deletions be/src/exec/hdfs_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ class HdfsScanner {
virtual Status do_get_next(RuntimeState* runtime_state, ChunkPtr* chunk) = 0;
virtual Status do_init(RuntimeState* runtime_state, const HdfsScannerParams& scanner_params) = 0;
virtual void do_update_counter(HdfsScanProfile* profile);
virtual bool is_jni_scanner() { return false; }

void enter_pending_queue();
// how long it stays inside pending queue.
Expand Down
38 changes: 23 additions & 15 deletions be/src/exec/jni_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@
namespace starrocks {

Status JniScanner::_check_jni_exception(JNIEnv* _jni_env, const std::string& message) {
if (_jni_env->ExceptionCheck()) {
if (jthrowable thr = _jni_env->ExceptionOccurred(); thr) {
std::string jni_error_message = JVMFunctionHelper::getInstance().dumpExceptionString(thr);
_jni_env->ExceptionDescribe();
_jni_env->ExceptionClear();
return Status::InternalError(message);
_jni_env->DeleteLocalRef(thr);
return Status::InternalError(message + " java exception details: " + jni_error_message);
}
return Status::OK();
}
Expand Down Expand Up @@ -139,10 +141,11 @@ Status JniScanner::_get_next_chunk(JNIEnv* _jni_env, long* chunk_meta) {
return Status::OK();
}

template <LogicalType type, typename CppType>
template <LogicalType type>
Status JniScanner::_append_primitive_data(const FillColumnArgs& args) {
char* column_ptr = static_cast<char*>(next_chunk_meta_as_ptr());
using ColumnType = typename starrocks::RunTimeColumnType<type>;
using CppType = typename starrocks::RunTimeCppType<type>;
auto* runtime_column = down_cast<ColumnType*>(args.column);
runtime_column->resize_uninitialized(args.num_rows);
memcpy(runtime_column->get_data().data(), column_ptr, args.num_rows * sizeof(CppType));
Expand All @@ -169,12 +172,13 @@ Status JniScanner::_append_string_data(const FillColumnArgs& args) {
return Status::OK();
}

template <LogicalType type, typename CppType>
template <LogicalType type>
Status JniScanner::_append_decimal_data(const FillColumnArgs& args) {
int* offset_ptr = static_cast<int*>(next_chunk_meta_as_ptr());
char* column_ptr = static_cast<char*>(next_chunk_meta_as_ptr());

using ColumnType = typename starrocks::RunTimeColumnType<type>;
using CppType = typename starrocks::RunTimeCppType<type>;
auto* runtime_column = down_cast<ColumnType*>(args.column);
runtime_column->resize_uninitialized(args.num_rows);
CppType* runtime_data = runtime_column->get_data().data();
Expand Down Expand Up @@ -365,37 +369,41 @@ Status JniScanner::_fill_column(FillColumnArgs* pargs) {
pargs->column = data_column;
pargs->nulls = null_data.data();
} else {
// otherwise we skil this chunk meta, because in Java side
// we assume every column starswith `null_column`.
// otherwise we skip this chunk meta, because in Java side
// we assume every column starts with `null_column`.
}

LogicalType column_type = args.slot_type.type;
if (column_type == LogicalType::TYPE_BOOLEAN) {
RETURN_IF_ERROR((_append_primitive_data<TYPE_BOOLEAN, uint8_t>(args)));
RETURN_IF_ERROR((_append_primitive_data<TYPE_BOOLEAN>(args)));
} else if (column_type == LogicalType::TYPE_TINYINT) {
RETURN_IF_ERROR((_append_primitive_data<TYPE_TINYINT>(args)));
} else if (column_type == LogicalType::TYPE_SMALLINT) {
RETURN_IF_ERROR((_append_primitive_data<TYPE_SMALLINT, int16_t>(args)));
RETURN_IF_ERROR((_append_primitive_data<TYPE_SMALLINT>(args)));
} else if (column_type == LogicalType::TYPE_INT) {
RETURN_IF_ERROR((_append_primitive_data<TYPE_INT, int32_t>(args)));
RETURN_IF_ERROR((_append_primitive_data<TYPE_INT>(args)));
} else if (column_type == LogicalType::TYPE_FLOAT) {
RETURN_IF_ERROR((_append_primitive_data<TYPE_FLOAT, float>(args)));
RETURN_IF_ERROR((_append_primitive_data<TYPE_FLOAT>(args)));
} else if (column_type == LogicalType::TYPE_BIGINT) {
RETURN_IF_ERROR((_append_primitive_data<TYPE_BIGINT, int64_t>(args)));
RETURN_IF_ERROR((_append_primitive_data<TYPE_BIGINT>(args)));
} else if (column_type == LogicalType::TYPE_DOUBLE) {
RETURN_IF_ERROR((_append_primitive_data<TYPE_DOUBLE, double>(args)));
RETURN_IF_ERROR((_append_primitive_data<TYPE_DOUBLE>(args)));
} else if (column_type == LogicalType::TYPE_VARCHAR) {
RETURN_IF_ERROR((_append_string_data<TYPE_VARCHAR>(args)));
} else if (column_type == LogicalType::TYPE_CHAR) {
RETURN_IF_ERROR((_append_string_data<TYPE_CHAR>(args)));
} else if (column_type == LogicalType::TYPE_VARBINARY) {
RETURN_IF_ERROR((_append_string_data<TYPE_VARBINARY>(args)));
} else if (column_type == LogicalType::TYPE_DATE) {
RETURN_IF_ERROR((_append_date_data(args)));
} else if (column_type == LogicalType::TYPE_DATETIME) {
RETURN_IF_ERROR((_append_datetime_data(args)));
} else if (column_type == LogicalType::TYPE_DECIMAL32) {
RETURN_IF_ERROR((_append_decimal_data<TYPE_DECIMAL32, int32_t>(args)));
RETURN_IF_ERROR((_append_decimal_data<TYPE_DECIMAL32>(args)));
} else if (column_type == LogicalType::TYPE_DECIMAL64) {
RETURN_IF_ERROR((_append_decimal_data<TYPE_DECIMAL64, int64_t>(args)));
RETURN_IF_ERROR((_append_decimal_data<TYPE_DECIMAL64>(args)));
} else if (column_type == LogicalType::TYPE_DECIMAL128) {
RETURN_IF_ERROR((_append_decimal_data<TYPE_DECIMAL128, int128_t>(args)));
RETURN_IF_ERROR((_append_decimal_data<TYPE_DECIMAL128>(args)));
} else if (column_type == LogicalType::TYPE_ARRAY) {
RETURN_IF_ERROR((_append_array_data(args)));
} else if (column_type == LogicalType::TYPE_MAP) {
Expand Down
5 changes: 3 additions & 2 deletions be/src/exec/jni_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class JniScanner : public HdfsScanner {
void do_close(RuntimeState* runtime_state) noexcept override;
Status do_get_next(RuntimeState* runtime_state, ChunkPtr* chunk) override;
Status do_init(RuntimeState* runtime_state, const HdfsScannerParams& scanner_params) override;
bool is_jni_scanner() override { return true; }

private:
struct FillColumnArgs {
Expand All @@ -57,10 +58,10 @@ class JniScanner : public HdfsScanner {

Status _get_next_chunk(JNIEnv* _jni_env, long* chunk_meta);

template <LogicalType type, typename CppType>
template <LogicalType type>
Status _append_primitive_data(const FillColumnArgs& args);

template <LogicalType type, typename CppType>
template <LogicalType type>
Status _append_decimal_data(const FillColumnArgs& args);

template <LogicalType type>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,17 @@
import org.apache.avro.Schema;
import org.apache.iceberg.types.Types;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.BinaryType;
import org.apache.paimon.types.BooleanType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.CharType;
import org.apache.paimon.types.DataTypeDefaultVisitor;
import org.apache.paimon.types.DateType;
import org.apache.paimon.types.DecimalType;
import org.apache.paimon.types.DoubleType;
import org.apache.paimon.types.FloatType;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.SmallIntType;
import org.apache.paimon.types.TimestampType;
import org.apache.paimon.types.TinyIntType;
import org.apache.paimon.types.VarCharType;

import java.util.ArrayList;
Expand Down Expand Up @@ -394,6 +393,14 @@ private static class PaimonToHiveTypeVisitor extends DataTypeDefaultVisitor<Type

private static final PaimonToHiveTypeVisitor INSTANCE = new PaimonToHiveTypeVisitor();

public Type visit(BinaryType binaryType) {
return ScalarType.createType(PrimitiveType.VARBINARY);
}

public Type visit(CharType charType) {
return ScalarType.createCharType(charType.getLength());
}

public Type visit(VarCharType varCharType) {
return ScalarType.createDefaultExternalTableString();
}
Expand All @@ -406,9 +413,10 @@ public Type visit(DecimalType decimalType) {
return ScalarType.createUnifiedDecimalType(decimalType.getPrecision(), decimalType.getScale());
}

public Type visit(TinyIntType tinyIntType) {
return ScalarType.createType(PrimitiveType.TINYINT);
}
// TODO: uncomment this and unit test case when this type is supported in paimon connector
//public Type visit(TinyIntType tinyIntType) {
// return ScalarType.createType(PrimitiveType.TINYINT);
//}

public Type visit(SmallIntType smallIntType) {
return ScalarType.createType(PrimitiveType.SMALLINT);
Expand Down Expand Up @@ -438,24 +446,27 @@ public Type visit(TimestampType timestampType) {
return ScalarType.createType(PrimitiveType.DATETIME);
}

public Type visit(org.apache.paimon.types.ArrayType arrayType) {
return new ArrayType(fromPaimonType(arrayType.getElementType()));
}

public Type visit(org.apache.paimon.types.MapType mapType) {
return new MapType(fromPaimonType(mapType.getKeyType()), fromPaimonType(mapType.getValueType()));
}

public Type visit(RowType rowType) {
List<DataField> fields = rowType.getFields();
ArrayList<StructField> structFields = new ArrayList<>(fields.size());
for (DataField field : fields) {
String fieldName = field.name();
Type fieldType = fromPaimonType(field.type());
structFields.add(new StructField(fieldName, fieldType));
}
return new StructType(structFields);
}
// TODO: uncomment this and unit test case when this type is supported in paimon connector
//public Type visit(org.apache.paimon.types.ArrayType arrayType) {
// return new ArrayType(fromPaimonType(arrayType.getElementType()));
//}

// TODO: uncomment this and unit test case when this type is supported in paimon connector
//public Type visit(org.apache.paimon.types.MapType mapType) {
// return new MapType(fromPaimonType(mapType.getKeyType()), fromPaimonType(mapType.getValueType()));
//}

// TODO: uncomment this and unit test case when this type is supported in paimon connector
//public Type visit(RowType rowType) {
// List<DataField> fields = rowType.getFields();
// ArrayList<StructField> structFields = new ArrayList<>(fields.size());
// for (DataField field : fields) {
// String fieldName = field.name();
// Type fieldType = fromPaimonType(field.type());
// structFields.add(new StructField(fieldName, fieldType));
// }
// return new StructType(structFields);
//}

@Override
protected Type defaultMethod(org.apache.paimon.types.DataType dataType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
import com.starrocks.connector.ColumnTypeConverter;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.BinaryType;
import org.apache.paimon.types.BooleanType;
import org.apache.paimon.types.CharType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DateType;
import org.apache.paimon.types.DecimalType;
Expand All @@ -34,6 +36,7 @@
import org.apache.paimon.types.TinyIntType;
import org.apache.paimon.types.VarCharType;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;

import java.util.Arrays;
Expand All @@ -42,7 +45,22 @@
public class PaimonColumnConverterTest {

@Test
public void testConvertString() {
public void testConvertBinary() {
BinaryType paimonType = new BinaryType();
Type result = ColumnTypeConverter.fromPaimonType(paimonType);
Assert.assertEquals(result, Type.VARBINARY);
}

@Test
public void testConvertChar() {
CharType paimonType = new CharType(10);
Type result = ColumnTypeConverter.fromPaimonType(paimonType);
Type srType = ScalarType.createCharType(10);
Assert.assertEquals(result, srType);
}

@Test
public void testConvertVarchar() {
VarCharType paimonType = new VarCharType();
Type result = ColumnTypeConverter.fromPaimonType(paimonType);
Type srType = ScalarType.createDefaultExternalTableString();
Expand All @@ -56,13 +74,52 @@ public void testConvertBool() {
Assert.assertEquals(result, Type.BOOLEAN);
}

@Test
public void testConvertDecimal() {
int precision = 9;
int scale = 5;
DecimalType paimonType = new DecimalType(precision, scale);
Type result = ColumnTypeConverter.fromPaimonType(paimonType);
Type srType = ScalarType.createUnifiedDecimalType(precision, scale);
Assert.assertEquals(result, srType);
}


@Ignore
public void testConvertTinyInt() {
TinyIntType paimonType = new TinyIntType();
Type result = ColumnTypeConverter.fromPaimonType(paimonType);
Assert.assertEquals(result, Type.TINYINT);
}

@Test
public void testConvertSmallint() {
SmallIntType paimonType = new SmallIntType();
Type result = ColumnTypeConverter.fromPaimonType(paimonType);
Assert.assertEquals(result, Type.SMALLINT);
}

@Test
public void testConvertInt() {
IntType paimonType = new IntType();
Type result = ColumnTypeConverter.fromPaimonType(paimonType);
Assert.assertEquals(result, Type.INT);
}

@Test
public void testConvertBigint() {
BigIntType paimonType = new BigIntType();
Type result = ColumnTypeConverter.fromPaimonType(paimonType);
Assert.assertEquals(result, Type.BIGINT);
}

@Test
public void testConvertFlout() {
FloatType paimonType = new FloatType();
Type result = ColumnTypeConverter.fromPaimonType(paimonType);
Assert.assertEquals(result, Type.FLOAT);
}

@Test
public void testConvertDouble() {
DoubleType paimonType = new DoubleType();
Expand All @@ -84,17 +141,7 @@ public void testConvertDatetime() {
Assert.assertEquals(result, Type.DATETIME);
}

@Test
public void testConvertDecimal() {
int precision = 9;
int scale = 5;
DecimalType paimonType = new DecimalType(precision, scale);
Type result = ColumnTypeConverter.fromPaimonType(paimonType);
Type srType = ScalarType.createUnifiedDecimalType(precision, scale);
Assert.assertEquals(result, srType);
}

@Test
@Ignore
public void testConvertArray() {
ArrayType paimonType = new ArrayType(new SmallIntType());
Type result = ColumnTypeConverter.fromPaimonType(paimonType);
Expand All @@ -103,7 +150,7 @@ public void testConvertArray() {
Assert.assertEquals(Type.SMALLINT, srType.getItemType());
}

@Test
@Ignore
public void testConvertMap() {
MapType paimonType = new MapType(new VarCharType(20), new TimestampType());
Type result = ColumnTypeConverter.fromPaimonType(paimonType);
Expand All @@ -113,19 +160,19 @@ public void testConvertMap() {
Assert.assertEquals(Type.DATETIME, srType.getValueType());
}

@Test
@Ignore
public void testConvertStruct() {
List<DataField> fields =
Arrays.asList(
new DataField(0, "f0", new TinyIntType()),
new DataField(0, "f0", new BinaryType()),
new DataField(1, "f1", new BigIntType()),
new DataField(2, "f2", new FloatType()));
RowType paimonType = new RowType(fields);
Type result = ColumnTypeConverter.fromPaimonType(paimonType);
Assert.assertTrue(result instanceof StructType);
StructType srType = (StructType) result;
Assert.assertEquals(3, srType.getFields().size());
Assert.assertEquals(Type.TINYINT, srType.getField("f0").getType());
Assert.assertEquals(Type.VARBINARY, srType.getField("f0").getType());
Assert.assertEquals(Type.BIGINT, srType.getField("f1").getType());
Assert.assertEquals(Type.FLOAT, srType.getField("f2").getType());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ private void parse(StringScanner scanner) {
}

if (typeValue == null) {
throw new RuntimeException("Unknown type: " + t);
throw new RuntimeException("Unsupported type: " + t);
}
}

Expand Down
Loading

0 comments on commit 865b915

Please sign in to comment.