Skip to content

Commit 2722753

Browse files
committed
fix
1 parent 938631d commit 2722753

File tree

2 files changed

+25
-36
lines changed

2 files changed

+25
-36
lines changed

ydb/core/kqp/common/kqp_tx.cpp

Lines changed: 24 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -310,40 +310,33 @@ bool HasOltpTableReadInTx(const NKqpProto::TKqpPhyQuery& physicalQuery) {
310310
return false;
311311
}
312312

313-
bool HasOltpTableWriteInStage(const NKqpProto::TKqpPhyStage& stage, const google::protobuf::RepeatedPtrField< ::NKqpProto::TKqpPhyTable>& tables) {
314-
for (const auto &tableOp : stage.GetTableOps()) {
315-
if (tableOp.GetTypeCase() == NKqpProto::TKqpPhyTableOperation::kUpsertRows
316-
|| tableOp.GetTypeCase() == NKqpProto::TKqpPhyTableOperation::kDeleteRows) {
317-
return true;
318-
}
319-
}
320-
321-
for (const auto& sink : stage.GetSinks()) {
322-
if (sink.GetTypeCase() == NKqpProto::TKqpSink::kInternalSink && sink.GetInternalSink().GetSettings().Is<NKikimrKqp::TKqpTableSinkSettings>()) {
323-
NKikimrKqp::TKqpTableSinkSettings settings;
324-
YQL_ENSURE(sink.GetInternalSink().GetSettings().UnpackTo(&settings), "Failed to unpack settings");
325-
326-
const bool isOltpSink = std::any_of(
327-
std::begin(tables),
328-
std::end(tables),
329-
[&](const NKqpProto::TKqpPhyTable& table) {
330-
return table.GetKind() == NKqpProto::EKqpPhyTableKind::TABLE_KIND_DS
331-
&& google::protobuf::util::MessageDifferencer::Equals(table.GetId(), settings.GetTable());
332-
});
333-
334-
if (isOltpSink) {
335-
return true;
336-
}
337-
}
338-
}
339-
return false;
340-
}
341-
342313
bool HasOltpTableWriteInTx(const NKqpProto::TKqpPhyQuery& physicalQuery) {
343314
for (const auto &tx : physicalQuery.GetTransactions()) {
344315
for (const auto &stage : tx.GetStages()) {
345-
if (HasOltpTableWriteInStage(stage, tx.GetTables())) {
346-
return true;
316+
for (const auto &tableOp : stage.GetTableOps()) {
317+
if (tableOp.GetTypeCase() == NKqpProto::TKqpPhyTableOperation::kUpsertRows
318+
|| tableOp.GetTypeCase() == NKqpProto::TKqpPhyTableOperation::kDeleteRows) {
319+
return true;
320+
}
321+
}
322+
323+
for (const auto& sink : stage.GetSinks()) {
324+
if (sink.GetTypeCase() == NKqpProto::TKqpSink::kInternalSink && sink.GetInternalSink().GetSettings().Is<NKikimrKqp::TKqpTableSinkSettings>()) {
325+
NKikimrKqp::TKqpTableSinkSettings settings;
326+
YQL_ENSURE(sink.GetInternalSink().GetSettings().UnpackTo(&settings), "Failed to unpack settings");
327+
328+
const bool isOltpSink = std::any_of(
329+
std::begin(tx.GetTables()),
330+
std::end(tx.GetTables()),
331+
[&](const NKqpProto::TKqpPhyTable& table) {
332+
return table.GetKind() == NKqpProto::EKqpPhyTableKind::TABLE_KIND_DS
333+
&& google::protobuf::util::MessageDifferencer::Equals(table.GetId(), settings.GetTable());
334+
});
335+
336+
if (isOltpSink) {
337+
return true;
338+
}
339+
}
347340
}
348341
}
349342
}

ydb/core/kqp/common/kqp_tx.h

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -454,14 +454,10 @@ std::pair<bool, std::vector<NYql::TIssue>> MergeLocks(const NKikimrMiniKQL::TTyp
454454
bool NeedSnapshot(const TKqpTransactionContext& txCtx, const NYql::TKikimrConfiguration& config, bool rollbackTx,
455455
bool commitTx, const NKqpProto::TKqpPhyQuery& physicalQuery);
456456

457+
bool HasOlapTableReadInTx(const NKqpProto::TKqpPhyQuery& physicalQuery);
457458
bool HasOlapTableWriteInStage(
458459
const NKqpProto::TKqpPhyStage& stage,
459460
const google::protobuf::RepeatedPtrField< ::NKqpProto::TKqpPhyTable>& tables);
460-
bool HasOltpTableWriteInStage(
461-
const NKqpProto::TKqpPhyStage& stage,
462-
const google::protobuf::RepeatedPtrField< ::NKqpProto::TKqpPhyTable>& tables);
463-
464-
bool HasOlapTableReadInTx(const NKqpProto::TKqpPhyQuery& physicalQuery);
465461
bool HasOlapTableWriteInTx(const NKqpProto::TKqpPhyQuery& physicalQuery);
466462
bool HasOltpTableReadInTx(const NKqpProto::TKqpPhyQuery& physicalQuery);
467463
bool HasOltpTableWriteInTx(const NKqpProto::TKqpPhyQuery& physicalQuery);

0 commit comments

Comments
 (0)