Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions ydb/core/tx/datashard/datashard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2107,6 +2107,12 @@ TDataShard::TPromotePostExecuteEdges TDataShard::PromoteImmediatePostExecuteEdge
<< " promoting UnprotectedReadEdge to " << version);
SnapshotManager.PromoteUnprotectedReadEdge(version);

// Make sure pending distributed transactions are marked incomplete,
// since we just protected up to and including version from writes,
// we need to make sure new immediate conflicting writes are blocked
// and don't perform writes with out-of-order versions.
res.HadWrites |= Pipeline.MarkPlannedLogicallyIncompleteUpTo(version, txc);

// We want to promote the complete edge when protected reads are
// used or when we're already writing something anyway.
if (res.HadWrites) {
Expand Down
222 changes: 116 additions & 106 deletions ydb/core/tx/datashard/datashard__read_iterator.cpp

Large diffs are not rendered by default.

8 changes: 5 additions & 3 deletions ydb/core/tx/datashard/datashard_dep_tracker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ namespace {
return a.GetStep() < b.Step || (a.GetStep() == b.Step && a.GetTxId() < b.TxId);
}

bool IsLessEqual(const TOperation& a, const TRowVersion& b) {
return a.GetStep() < b.Step || (a.GetStep() == b.Step && a.GetTxId() <= b.TxId);
bool IsEqual(const TOperation& a, const TRowVersion& b) {
return a.GetStep() == b.Step && a.GetTxId() == b.TxId;
}
}

Expand Down Expand Up @@ -799,8 +799,10 @@ void TDependencyTracker::TMvccDependencyTrackingLogic::AddOperation(const TOpera
Y_ABORT_UNLESS(!conflict.IsImmediate());
if (snapshot.IsMax()) {
conflict.AddImmediateConflict(op);
} else if (snapshotRepeatable ? IsLessEqual(conflict, snapshot) : IsLess(conflict, snapshot)) {
} else if (IsLess(conflict, snapshot)) {
op->AddDependency(&conflict);
} else if (IsEqual(conflict, snapshot)) {
op->AddRepeatableReadConflict(&conflict);
}
};

Expand Down
2 changes: 2 additions & 0 deletions ydb/core/tx/datashard/datashard_pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ TPipeline::~TPipeline()
pr.second->ClearSpecialDependencies();
pr.second->ClearPlannedConflicts();
pr.second->ClearImmediateConflicts();
pr.second->ClearRepeatableReadConflicts();
}
}

Expand Down Expand Up @@ -487,6 +488,7 @@ void TPipeline::UnblockNormalDependencies(const TOperation::TPtr &op)
op->ClearDependencies();
op->ClearPlannedConflicts();
op->ClearImmediateConflicts();
op->ClearRepeatableReadConflicts();
DepTracker.RemoveOperation(op);
}

Expand Down
8 changes: 6 additions & 2 deletions ydb/core/tx/datashard/datashard_ut_common_kqp.h
Original file line number Diff line number Diff line change
Expand Up @@ -208,9 +208,13 @@ namespace NKqpHelpers {
return FormatResult(result);
}

inline TString KqpSimpleCommit(TTestActorRuntime& runtime, const TString& sessionId, const TString& txId, const TString& query) {
inline auto KqpSimpleSendCommit(TTestActorRuntime& runtime, const TString& sessionId, const TString& txId, const TString& query) {
Y_ABORT_UNLESS(!txId.empty(), "commit on empty transaction");
auto response = AwaitResponse(runtime, SendRequest(runtime, MakeSimpleRequestRPC(query, sessionId, txId, true /* commitTx */)));
return SendRequest(runtime, MakeSimpleRequestRPC(query, sessionId, txId, true /* commitTx */));
}

inline TString KqpSimpleCommit(TTestActorRuntime& runtime, const TString& sessionId, const TString& txId, const TString& query) {
auto response = AwaitResponse(runtime, KqpSimpleSendCommit(runtime, sessionId, txId, query));
if (response.operation().status() != Ydb::StatusIds::SUCCESS) {
return TStringBuilder() << "ERROR: " << response.operation().status();
}
Expand Down
Loading