Skip to content

Commit

Permalink
Refine type check logic
Browse files Browse the repository at this point in the history
  • Loading branch information
banmoy committed Sep 27, 2022
1 parent 2f0d4cf commit 78c8eb6
Showing 1 changed file with 18 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -145,25 +145,30 @@ private void genFlinkRows() {
FieldVector fieldVector = fieldVectors.get(i);
StarRocksToFlinkTrans translators = null;
Column column = starRocksSchema.get(fieldVector.getName());
boolean nullable = true;
if (column != null) {
ColumnRichInfo richInfo = columnRichInfos.get(fieldVector.getName());
nullable = richInfo.getDataType().getLogicalType().isNullable();
LogicalTypeRoot flinkTypeRoot = richInfo.getDataType().getLogicalType().getTypeRoot();
String srType = DataUtil.clearBracket(column.getType());
if (Const.DataTypeRelationMap.containsKey(flinkTypeRoot)
&& Const.DataTypeRelationMap.get(flinkTypeRoot).containsKey(srType)) {
translators = Const.DataTypeRelationMap.get(flinkTypeRoot).get(srType);
}
boolean nullable;
if (column == null) {
throw new RuntimeException("Can't find StarRocks' column " + fieldVector.getName());
}

if (translators == null) {
ColumnRichInfo richInfo = columnRichInfos.get(fieldVector.getName());
nullable = richInfo.getDataType().getLogicalType().isNullable();
LogicalTypeRoot flinkTypeRoot = richInfo.getDataType().getLogicalType().getTypeRoot();
String srType = DataUtil.clearBracket(column.getType());

if (!Const.DataTypeRelationMap.containsKey(flinkTypeRoot)) {
throw new RuntimeException(
"Flink type not supported for column " + fieldVector.getName() +
", Flink type is -> [" + flinkTypeRoot.toString() + "]");
}
if (!Const.DataTypeRelationMap.get(flinkTypeRoot).containsKey(srType)) {
throw new RuntimeException(
"Flink type not support when convert data from starrocks to flink, " +
"type is -> [" + fieldVector.getMinorType().toString() + "]"
"StarRocks type can not convert to Flink type for column " + fieldVector.getName() +
", StarRocks type is -> [" + srType + "], " +
"Flink type is -> [" + flinkTypeRoot.toString() + "]"
);
}

translators = Const.DataTypeRelationMap.get(flinkTypeRoot).get(srType);
Object[] result = translators.transToFlinkData(fieldVector.getMinorType(), fieldVector, rowCountOfBatch, i, nullable);
for (int ri = 0; ri < result.length; ri ++) {
setValueToFlinkRows(ri, i, result[ri]);
Expand Down

0 comments on commit 78c8eb6

Please sign in to comment.