Skip to content

Commit

Permalink
[Feature]Support struct type for paimon table (StarRocks#30162)
Browse files Browse the repository at this point in the history
Signed-off-by: leoyy0316 <571684903@qq.com>
  • Loading branch information
leoyy0316 authored Sep 5, 2023
1 parent 1aab391 commit b4a742e
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 17 deletions.
12 changes: 11 additions & 1 deletion be/src/connector/hive_connector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,16 @@ HdfsScanner* HiveDataSource::_create_paimon_jni_scanner(FSOptions& options) {
required_fields.append(",");
}
required_fields = required_fields.substr(0, required_fields.size() - 1);

std::string nested_fields;
for (auto slot : _tuple_desc->slots()) {
const TypeDescriptor& type = slot->type();
if (type.is_complex_type()) {
build_nested_fields(type, slot->col_name(), &nested_fields);
}
}
if (!nested_fields.empty()) {
nested_fields = nested_fields.substr(0, nested_fields.size() - 1);
}
std::map<std::string, std::string> jni_scanner_params;
jni_scanner_params["catalog_type"] = paimon_table->get_catalog_type();
jni_scanner_params["metastore_uri"] = paimon_table->get_metastore_uri();
Expand All @@ -438,6 +447,7 @@ HdfsScanner* HiveDataSource::_create_paimon_jni_scanner(FSOptions& options) {
jni_scanner_params["required_fields"] = required_fields;
jni_scanner_params["split_info"] = _scan_range.paimon_split_info;
jni_scanner_params["predicate_info"] = _scan_range.paimon_predicate_info;
jni_scanner_params["nested_fields"] = nested_fields;

string option_info = "";
if (options.cloud_configuration != nullptr && options.cloud_configuration->cloud_type == TCloudType::AWS) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,14 @@
import org.apache.paimon.types.BinaryType;
import org.apache.paimon.types.BooleanType;
import org.apache.paimon.types.CharType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypeDefaultVisitor;
import org.apache.paimon.types.DateType;
import org.apache.paimon.types.DecimalType;
import org.apache.paimon.types.DoubleType;
import org.apache.paimon.types.FloatType;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.SmallIntType;
import org.apache.paimon.types.TimestampType;
import org.apache.paimon.types.TinyIntType;
Expand Down Expand Up @@ -554,17 +556,16 @@ public Type visit(org.apache.paimon.types.MapType mapType) {
return new MapType(fromPaimonType(mapType.getKeyType()), fromPaimonType(mapType.getValueType()));
}

// TODO: uncomment this and unit test case when this type is supported in paimon connector
//public Type visit(RowType rowType) {
// List<DataField> fields = rowType.getFields();
// ArrayList<StructField> structFields = new ArrayList<>(fields.size());
// for (DataField field : fields) {
// String fieldName = field.name();
// Type fieldType = fromPaimonType(field.type());
// structFields.add(new StructField(fieldName, fieldType));
// }
// return new StructType(structFields);
//}
public Type visit(RowType rowType) {
List<DataField> fields = rowType.getFields();
ArrayList<StructField> structFields = new ArrayList<>(fields.size());
for (DataField field : fields) {
String fieldName = field.name();
Type fieldType = fromPaimonType(field.type());
structFields.add(new StructField(fieldName, fieldType));
}
return new StructType(structFields);
}

@Override
protected Type defaultMethod(org.apache.paimon.types.DataType dataType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.paimon.types.TinyIntType;
import org.apache.paimon.types.VarCharType;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;

import java.util.Arrays;
Expand Down Expand Up @@ -159,7 +158,7 @@ public void testConvertMap() {
Assert.assertEquals(Type.DATETIME, srType.getValueType());
}

@Ignore
@Test
public void testConvertStruct() {
List<DataField> fields =
Arrays.asList(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@
import org.apache.paimon.data.InternalArray;
import org.apache.paimon.data.InternalMap;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.data.columnar.ColumnarRow;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.MapType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.InternalRowUtils;

import java.time.LocalDate;
Expand Down Expand Up @@ -119,7 +122,20 @@ public void unpackMap(List<ColumnValue> keys, List<ColumnValue> values) {

@Override
public void unpackStruct(List<Integer> structFieldIndex, List<ColumnValue> values) {

ColumnarRow array = (ColumnarRow) fieldData;
List<DataField> fields = ((RowType) dataType).getFields();
for (int i = 0; i < structFieldIndex.size(); i++) {
Integer idx = structFieldIndex.get(i);
PaimonColumnValue cv = null;
if (idx != null) {
DataField dataField = fields.get(idx);
Object o = InternalRowUtils.get(array, i, dataField.type());
if (o != null) {
cv = new PaimonColumnValue(0, dataField.type());
}
}
values.add(cv);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.starrocks.jni.connector.ColumnType;
import com.starrocks.jni.connector.ColumnValue;
import com.starrocks.jni.connector.ConnectorScanner;
import com.starrocks.jni.connector.SelectedFields;
import com.starrocks.utils.loader.ThreadContextClassLoader;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -66,6 +67,7 @@ public class PaimonSplitScanner extends ConnectorScanner {
private RecordReaderIterator<InternalRow> iterator;
private final int fetchSize;
private final ClassLoader classLoader;
private final String[] nestedFields;

public PaimonSplitScanner(int fetchSize, Map<String, String> params) {
this.fetchSize = fetchSize;
Expand All @@ -75,6 +77,7 @@ public PaimonSplitScanner(int fetchSize, Map<String, String> params) {
this.databaseName = params.get("database_name");
this.tableName = params.get("table_name");
this.requiredFields = params.get("required_fields").split(",");
this.nestedFields = params.getOrDefault("nested_fields", "").split(",");
this.splitInfo = params.get("split_info");
this.predicateInfo = params.get("predicate_info");

Expand Down Expand Up @@ -127,6 +130,17 @@ private void parseRequiredTypes() {
requiredTypes[i] = new ColumnType(type);
logicalTypes[i] = dataType;
}

// prune fields
SelectedFields ssf = new SelectedFields();
for (String nestField : nestedFields) {
ssf.addNestedPath(nestField);
}
for (int i = 0; i < requiredFields.length; i++) {
ColumnType type = requiredTypes[i];
String name = requiredFields[i];
type.pruneOnField(ssf, name);
}
}

private void initReader() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,9 @@ public String visit(MapType mapType) {
}

public String visit(RowType rowType) {
String type = rowType.getFields().stream().map(f -> f.type().accept(this) + ",")
String type = rowType.getFields().stream().map(f -> f.name() + ":" + f.type().accept(this))
.collect(Collectors.joining(","));
return String.format("struct<%s>", type.substring(0, type.length() - 1));
return String.format("struct<%s>", type);
}

@Override
Expand Down

0 comments on commit b4a742e

Please sign in to comment.