File tree Expand file tree Collapse file tree 1 file changed +9
-1
lines changed
src/main/java/com/starrocks/connector/flink/table/sink Expand file tree Collapse file tree 1 file changed +9
-1
lines changed Original file line number Diff line number Diff line change @@ -514,7 +514,15 @@ public StreamLoadProperties getProperties(@Nullable StarRocksSinkTable table) {
514
514
.enableUpsertDelete (supportUpsertDelete ());
515
515
516
516
if (hasColumnMappingProperty ()) {
517
- defaultTablePropertiesBuilder .columns (streamLoadProps .get ("columns" ));
517
+ String columns = streamLoadProps .get ("columns" );
518
+ if (supportUpsertDelete ()) {
519
+ // auto add `__op` for primary key table even when user specified `sink.properties.columns`.
520
+ // in case user use a bitmap datatype and need set up `sink.properties.columns`, may forget to add `__op`.
521
+ if (!columns .endsWith (",__op" )) {
522
+ columns = columns + ",__op" ;
523
+ }
524
+ }
525
+ defaultTablePropertiesBuilder .columns (columns );
518
526
} else if (getTableSchemaFieldNames () != null ) {
519
527
// don't need to add "columns" header in following cases
520
528
// 1. use csv format but the flink and starrocks schemas are aligned
You can’t perform that action at this time.
0 commit comments