Skip to content

Commit

Permalink
[Feature]Support array and map type for paimon table (StarRocks#29854)
Browse files Browse the repository at this point in the history
Fixes StarRocks#29853

Signed-off-by: leoyy0316 <571684903@qq.com>
  • Loading branch information
leoyy0316 authored Aug 25, 2023
1 parent 567dec2 commit 449aa81
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -546,15 +546,13 @@ public Type visit(TimestampType timestampType) {
return ScalarType.createType(PrimitiveType.DATETIME);
}

// TODO: uncomment this and unit test case when this type is supported in paimon connector
//public Type visit(org.apache.paimon.types.ArrayType arrayType) {
// return new ArrayType(fromPaimonType(arrayType.getElementType()));
//}
public Type visit(org.apache.paimon.types.ArrayType arrayType) {
return new ArrayType(fromPaimonType(arrayType.getElementType()));
}

// TODO: uncomment this and unit test case when this type is supported in paimon connector
//public Type visit(org.apache.paimon.types.MapType mapType) {
// return new MapType(fromPaimonType(mapType.getKeyType()), fromPaimonType(mapType.getValueType()));
//}
public Type visit(org.apache.paimon.types.MapType mapType) {
return new MapType(fromPaimonType(mapType.getKeyType()), fromPaimonType(mapType.getValueType()));
}

// TODO: uncomment this and unit test case when this type is supported in paimon connector
//public Type visit(RowType rowType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public void testConvertDatetime() {
Assert.assertEquals(result, Type.DATETIME);
}

@Ignore
@Test
public void testConvertArray() {
ArrayType paimonType = new ArrayType(new SmallIntType());
Type result = ColumnTypeConverter.fromPaimonType(paimonType);
Expand All @@ -149,7 +149,7 @@ public void testConvertArray() {
Assert.assertEquals(Type.SMALLINT, srType.getItemType());
}

@Ignore
@Test
public void testConvertMap() {
MapType paimonType = new MapType(new VarCharType(20), new TimestampType());
Type result = ColumnTypeConverter.fromPaimonType(paimonType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,24 @@

import com.starrocks.jni.connector.ColumnType;
import com.starrocks.jni.connector.ColumnValue;
import org.apache.paimon.data.InternalArray;
import org.apache.paimon.data.InternalMap;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.MapType;
import org.apache.paimon.utils.InternalRowUtils;

import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.List;

public class PaimonColumnValue implements ColumnValue {
private final Object fieldData;
public PaimonColumnValue(Object fieldData) {
private final DataType dataType;
public PaimonColumnValue(Object fieldData, DataType dataType) {
this.fieldData = fieldData;
this.dataType = dataType;
}
@Override
public boolean getBoolean() {
Expand Down Expand Up @@ -86,12 +94,27 @@ public byte[] getBytes() {

@Override
public void unpackArray(List<ColumnValue> values) {

InternalArray array = (InternalArray) fieldData;
toPaimonColumnValue(values, array, ((ArrayType) dataType).getElementType());
}

@Override
public void unpackMap(List<ColumnValue> keys, List<ColumnValue> values) {
InternalMap map = (InternalMap) fieldData;
DataType keyType;
DataType valueType;
if (dataType instanceof MapType) {
keyType = ((MapType) dataType).getKeyType();
valueType = ((MapType) dataType).getValueType();
} else {
throw new UnsupportedOperationException("Unsupported type: " + dataType);
}

InternalArray keyArray = map.keyArray();
toPaimonColumnValue(keys, keyArray, keyType);

InternalArray valueArray = map.valueArray();
toPaimonColumnValue(values, valueArray, valueType);
}

@Override
Expand All @@ -103,4 +126,11 @@ public void unpackStruct(List<Integer> structFieldIndex, List<ColumnValue> value
public byte getByte() {
return (byte) fieldData;
}

private void toPaimonColumnValue(List<ColumnValue> values, InternalArray array, DataType dataType) {
for (int i = 0; i < array.size(); i++) {
PaimonColumnValue cv = new PaimonColumnValue(InternalRowUtils.get(array, i, dataType), dataType);
values.add(cv);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ public int getNext() throws IOException {
if (fieldData == null) {
appendData(i, null);
} else {
ColumnValue fieldValue = new PaimonColumnValue(fieldData);
ColumnValue fieldValue = new PaimonColumnValue(fieldData, logicalTypes[i]);
appendData(i, fieldValue);
}
}
Expand Down

0 comments on commit 449aa81

Please sign in to comment.