Skip to content

Commit

Permalink
[feature](paimon)paimon catalog supports complex types (apache#25364)
Browse files Browse the repository at this point in the history
  • Loading branch information
DongLiang-0 authored Oct 23, 2023
1 parent 6a6e10c commit 267c112
Show file tree
Hide file tree
Showing 8 changed files with 96 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -358,8 +358,9 @@ public static ColumnType parseType(String columnName, String hiveType) {
String keyValue = lowerCaseType.substring(4, lowerCaseType.length() - 1);
int index = findNextNestedField(keyValue);
if (index != keyValue.length() && index != 0) {
ColumnType keyType = parseType("key", keyValue.substring(0, index));
ColumnType valueType = parseType("value", keyValue.substring(index + 1));
ColumnType keyType = parseType("key", keyValue.substring(0, index).trim());
ColumnType valueType =
parseType("value", keyValue.substring(index + 1).trim());
ColumnType mapType = new ColumnType(columnName, Type.MAP);
mapType.setChildTypes(Arrays.asList(keyType, valueType));
return mapType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import org.apache.doris.common.jni.vec.ColumnType;
import org.apache.doris.common.jni.vec.ColumnValue;

import org.apache.paimon.data.DataGetters;
import org.apache.paimon.data.InternalArray;
import org.apache.paimon.data.InternalMap;
import org.apache.paimon.data.InternalRow;

import java.math.BigDecimal;
Expand All @@ -32,12 +35,18 @@

public class PaimonColumnValue implements ColumnValue {
private int idx;
private InternalRow record;
ColumnType dorisType;
private DataGetters record;
private ColumnType dorisType;

public PaimonColumnValue() {
}

public PaimonColumnValue(DataGetters record, int idx, ColumnType columnType) {
this.idx = idx;
this.record = record;
this.dorisType = columnType;
}

public void setIdx(int idx, ColumnType dorisType) {
this.idx = idx;
this.dorisType = dorisType;
Expand Down Expand Up @@ -130,12 +139,29 @@ public byte[] getBytes() {

@Override
public void unpackArray(List<ColumnValue> values) {

InternalArray recordArray = record.getArray(idx);
for (int i = 0; i < recordArray.size(); i++) {
PaimonColumnValue arrayColumnValue = new PaimonColumnValue((DataGetters) recordArray, i,
dorisType.getChildTypes().get(0));
values.add(arrayColumnValue);
}
}

@Override
public void unpackMap(List<ColumnValue> keys, List<ColumnValue> values) {

InternalMap map = record.getMap(idx);
InternalArray key = map.keyArray();
for (int i = 0; i < key.size(); i++) {
PaimonColumnValue keyColumnValue = new PaimonColumnValue((DataGetters) key, i,
dorisType.getChildTypes().get(0));
keys.add(keyColumnValue);
}
InternalArray value = map.valueArray();
for (int i = 0; i < value.size(); i++) {
PaimonColumnValue valueColumnValue = new PaimonColumnValue((DataGetters) value, i,
dorisType.getChildTypes().get(1));
values.add(valueColumnValue);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import java.util.Map;
import java.util.stream.Collectors;


public class PaimonJniScanner extends JniScanner {
private static final Logger LOG = LoggerFactory.getLogger(PaimonJniScanner.class);
private static final String PAIMON_OPTION_PREFIX = "paimon_option_prefix.";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;


public class PaimonTableCache {
private static final Logger LOG = LoggerFactory.getLogger(PaimonTableCache.class);
// Max cache num of paimon table
Expand Down Expand Up @@ -83,7 +82,6 @@ public static TableExt getTable(PaimonTableCacheKey key) {
}
}


public static class TableExt {
private Table table;
private long createTime;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;

/**
* Convert paimon type to doris type.
*/
Expand All @@ -56,9 +60,11 @@ private PaimonTypeUtils() {

public static ColumnType fromPaimonType(String columnName, DataType type) {
PaimonColumnType paimonColumnType = type.accept(PaimonToDorisTypeVisitor.INSTANCE);
return new ColumnType(columnName, paimonColumnType.getType(), paimonColumnType.getLength(),
ColumnType columnType = new ColumnType(columnName, paimonColumnType.getType(), paimonColumnType.getLength(),
paimonColumnType.getPrecision(),
paimonColumnType.getScale());
columnType.setChildTypes(paimonColumnType.getChildTypes());
return columnType;
}

private static class PaimonToDorisTypeVisitor extends DataTypeDefaultVisitor<PaimonColumnType> {
Expand Down Expand Up @@ -153,7 +159,10 @@ public PaimonColumnType visit(LocalZonedTimestampType localZonedTimestampType) {

@Override
public PaimonColumnType visit(ArrayType arrayType) {
return this.defaultMethod(arrayType);
PaimonColumnType paimonColumnType = new PaimonColumnType(Type.ARRAY);
ColumnType elementColumnType = fromPaimonType("dummy-element", arrayType.getElementType());
paimonColumnType.setChildTypes(Collections.singletonList(elementColumnType));
return paimonColumnType;
}

@Override
Expand All @@ -163,7 +172,11 @@ public PaimonColumnType visit(MultisetType multisetType) {

@Override
public PaimonColumnType visit(MapType mapType) {
return this.defaultMethod(mapType);
PaimonColumnType paimonColumnType = new PaimonColumnType(Type.MAP);
ColumnType key = fromPaimonType("dummy-key", mapType.getKeyType());
ColumnType value = fromPaimonType("dummy-value", mapType.getValueType());
paimonColumnType.setChildTypes(Arrays.asList(key, value));
return paimonColumnType;
}

@Override
Expand All @@ -184,6 +197,7 @@ private static class PaimonColumnType {
private int length;
private int precision;
private int scale;
private List<ColumnType> childTypes;

public PaimonColumnType(Type type) {
this.type = type;
Expand Down Expand Up @@ -225,5 +239,13 @@ public int getScale() {
public void setPrecision(int precision) {
this.precision = precision;
}

public void setChildTypes(List<ColumnType> childTypes) {
this.childTypes = childTypes;
}

public List<ColumnType> getChildTypes() {
return childTypes;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.AbstractFileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DecimalType;
import org.apache.paimon.types.MapType;

import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -111,6 +113,14 @@ private Type paimonPrimitiveTypeToDorisType(org.apache.paimon.types.DataType dat
case TIMESTAMP_WITHOUT_TIME_ZONE:
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
return ScalarType.createDatetimeV2Type(PAIMON_DATETIME_SCALE_MS);
case ARRAY:
ArrayType arrayType = (ArrayType) dataType;
Type innerType = paimonPrimitiveTypeToDorisType(arrayType.getElementType());
return org.apache.doris.catalog.ArrayType.create(innerType, true);
case MAP:
MapType mapType = (MapType) dataType;
return new org.apache.doris.catalog.MapType(
paimonTypeToDorisType(mapType.getKeyType()), paimonTypeToDorisType(mapType.getValueType()));
case TIME_WITHOUT_TIME_ZONE:
return Type.UNSUPPORTED;
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@
1 2 3 4 5 6 7 8 9.1 10.1 11.10 2020-02-02 13str 14varchar a true aaaa 2023-08-13T09:32:38.530

-- !c19 --
11 22 aa bb cc
1 2 a b c
11 22 aa bb cc

-- !c20 --
1 2 a b c
Expand All @@ -66,3 +66,21 @@

-- !c22 --

-- !c23 --
1 [1111, 2222, 3333] {"a_test":1}
2 [4444, 5555, 6666] {"b_test":0, "bbb":1}
3 [7777, 8888, 9999] {"c_test":1, "ccc":0}

-- !c24 --
1 [1111, 2222, 3333] {"a_test":1}

-- !c25 --
true \N \N \N
\N false true \N
\N \N \N false

-- !c26 --
3333
6666
9999

Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,14 @@ suite("test_paimon_catalog", "p0,external,doris,external_docker,external_docker_
def c15 = """select * from all_table where c15='a';"""
def c16 = """select * from all_table where c16=true;"""
def c18 = """select * from all_table where c18='2023-08-13 09:32:38.53';"""
def c19 = """select * from auto_bucket;"""
def c19 = """select * from auto_bucket order by user_id;"""
def c20 = """select * from auto_bucket where dt="b";"""
def c21 = """select * from auto_bucket where dt="b" and hh="c";"""
def c22 = """select * from auto_bucket where dt="d";"""
def c23 = """select * from complex_tab order by c1;"""
def c24 = """select * from complex_tab where c1=1;"""
def c26 = """select array_max(c2) from complex_tab"""
def c25 = """select c3['a_test'], c3['b_test'], c3['bbb'], c3['ccc'] from complex_tab"""

String hdfs_port = context.config.otherConfigs.get("hdfs_port")
String catalog_name = "paimon1"
Expand Down Expand Up @@ -111,5 +115,9 @@ suite("test_paimon_catalog", "p0,external,doris,external_docker,external_docker_
qt_c20 c20
qt_c21 c21
qt_c22 c22
qt_c23 c23
qt_c24 c24
qt_c25 c25
qt_c26 c26
}
}

0 comments on commit 267c112

Please sign in to comment.