Skip to content

Commit 54353c4

Browse files
committed
Update StarRocksSinkOptions.java
auto add `__op` for primary key table even when user specified `sink.properties.columns` Signed-off-by: uicosp <uicosp@gmail.com> auto add `__op` for primary key table even when user specified `sink.properties.columns` auto add `__op` for primary key table even when user specified `sink.properties.columns`
1 parent 08de67a commit 54353c4

File tree

1 file changed

+9
-1
lines changed

1 file changed

+9
-1
lines changed

src/main/java/com/starrocks/connector/flink/table/sink/StarRocksSinkOptions.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -514,7 +514,15 @@ public StreamLoadProperties getProperties(@Nullable StarRocksSinkTable table) {
514514
.enableUpsertDelete(supportUpsertDelete());
515515

516516
if (hasColumnMappingProperty()) {
517-
defaultTablePropertiesBuilder.columns(streamLoadProps.get("columns"));
517+
List<String> columns = new ArrayList<>(Arrays.asList(streamLoadProps.get("columns").split(",")));
518+
if (supportUpsertDelete()) {
519+
// auto add `__op` for primary key table even when user specified `sink.properties.columns`.
520+
// in case user use a bitmap datatype and need set up `sink.properties.columns`, may forget to add `__op`.
521+
if (columns.stream().noneMatch(it -> it.equals("__op"))) {
522+
columns.add("__op");
523+
}
524+
}
525+
defaultTablePropertiesBuilder.columns(String.join(",", columns));
518526
} else if (getTableSchemaFieldNames() != null) {
519527
// don't need to add "columns" header in following cases
520528
// 1. use csv format but the flink and starrocks schemas are aligned

0 commit comments

Comments
 (0)