Skip to content

Commit 98ffaf5

Browse files
authored
Merge e8c8b8e into 9bef86a
2 parents 9bef86a + e8c8b8e commit 98ffaf5

File tree

6 files changed

+523
-439
lines changed

6 files changed

+523
-439
lines changed

ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,11 @@ void FillKqpTasksGraphStages(TKqpTasksGraph& tasksGraph, const TVector<IKqpGatew
134134
YQL_ENSURE(stage.SinksSize() == 1);
135135
meta.TableId = MakeTableId(settings.GetTable());
136136
meta.TablePath = settings.GetTable().GetPath();
137-
meta.ShardOperations.insert(TKeyDesc::ERowOperation::Update);
137+
if (settings.GetType() == NKikimrKqp::TKqpTableSinkSettings::MODE_DELETE) {
138+
meta.ShardOperations.insert(TKeyDesc::ERowOperation::Erase);
139+
} else {
140+
meta.ShardOperations.insert(TKeyDesc::ERowOperation::Update);
141+
}
138142
meta.TableConstInfo = tx.Body->GetTableConstInfoById()->Map.at(meta.TableId);
139143
}
140144
}

ydb/core/kqp/query_compiler/kqp_query_compiler.cpp

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1114,6 +1114,32 @@ class TKqpQueryCompiler : public IKqpQueryCompiler {
11141114
}
11151115
}
11161116

1117+
THashMap<TStringBuf, ui32> CreateColumnToOrder(
1118+
const TVector<TStringBuf>& columns,
1119+
const TKikimrTableMetadataPtr& tableMeta,
1120+
bool keysFirst) {
1121+
THashSet<TStringBuf> usedColumns;
1122+
for (const auto& columnName : columns) {
1123+
usedColumns.insert(columnName);
1124+
}
1125+
1126+
THashMap<TStringBuf, ui32> columnToOrder;
1127+
ui32 number = 0;
1128+
if (keysFirst) {
1129+
for (const auto& columnName : tableMeta->KeyColumnNames) {
1130+
YQL_ENSURE(usedColumns.contains(columnName));
1131+
columnToOrder[columnName] = number++;
1132+
}
1133+
}
1134+
for (const auto& columnName : tableMeta->ColumnOrder) {
1135+
if (usedColumns.contains(columnName) && !columnToOrder.contains(columnName)) {
1136+
columnToOrder[columnName] = number++;
1137+
}
1138+
}
1139+
1140+
return columnToOrder;
1141+
}
1142+
11171143
void FillKqpSink(const TDqSink& sink, NKqpProto::TKqpSink* protoSink, THashMap<TStringBuf, THashSet<TStringBuf>>& tablesMap, const TDqPhyStage& stage) {
11181144
if (auto settings = sink.Settings().Maybe<TKqpTableSinkSettings>()) {
11191145
NKqpProto::TKqpInternalSink& internalSinkProto = *protoSink->MutableInternalSink();
@@ -1165,6 +1191,14 @@ class TKqpQueryCompiler : public IKqpQueryCompiler {
11651191
fillColumnProto(columnName, columnMeta, columnProto);
11661192
}
11671193

1194+
const auto columnToOrder = CreateColumnToOrder(
1195+
columns,
1196+
tableMeta,
1197+
settings.TableType().Cast().StringValue() == "oltp");
1198+
for (const auto& columnName : columns) {
1199+
settingsProto.AddWriteIndexes(columnToOrder.at(columnName));
1200+
}
1201+
11681202
if (const auto inconsistentWrite = settings.InconsistentWrite().Cast(); inconsistentWrite.StringValue() == "true") {
11691203
settingsProto.SetInconsistentTx(true);
11701204
}

0 commit comments

Comments
 (0)