Skip to content

Commit 8192d38

Browse files
committed
Fix volatile result sent before it is fully committed (#2598)
1 parent 7b61b05 commit 8192d38

File tree

6 files changed

+146
-21
lines changed

6 files changed

+146
-21
lines changed

ydb/core/tx/datashard/datashard_active_transaction.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -404,7 +404,7 @@ TValidatedDataTx::TPtr TActiveTransaction::BuildDataTx(TDataShard *self,
404404
if (!DataTx) {
405405
Y_ABORT_UNLESS(TxBody);
406406
DataTx = std::make_shared<TValidatedDataTx>(self, txc, ctx, GetStepOrder(),
407-
GetReceivedAt(), TxBody, MvccSnapshotRepeatable);
407+
GetReceivedAt(), TxBody, IsMvccSnapshotRepeatable());
408408
if (DataTx->HasStreamResponse())
409409
SetStreamSink(DataTx->GetSink());
410410
}
@@ -635,7 +635,7 @@ ERestoreDataStatus TActiveTransaction::RestoreTxData(
635635

636636
bool extractKeys = DataTx->IsTxInfoLoaded();
637637
DataTx = std::make_shared<TValidatedDataTx>(self, txc, ctx, GetStepOrder(),
638-
GetReceivedAt(), TxBody, MvccSnapshotRepeatable);
638+
GetReceivedAt(), TxBody, IsMvccSnapshotRepeatable());
639639
if (DataTx->Ready() && extractKeys) {
640640
DataTx->ExtractKeys(true);
641641
}

ydb/core/tx/datashard/datashard_ut_common_kqp.h

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ namespace NKqpHelpers {
4242
inline TString CreateSessionRPC(TTestActorRuntime& runtime, const TString& database = {}) {
4343
Ydb::Table::CreateSessionRequest request;
4444
auto future = NRpcService::DoLocalRpc<TEvCreateSessionRequest>(
45-
std::move(request), database, "", /* token */ runtime.GetActorSystem(0));
45+
std::move(request), database, /* token */ "", runtime.GetActorSystem(0));
4646
TString sessionId;
4747
auto response = AwaitResponse(runtime, future);
4848
UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS);
@@ -71,7 +71,7 @@ namespace NKqpHelpers {
7171
TTestActorRuntime& runtime, Ydb::Table::ExecuteDataQueryRequest&& request, const TString& database = {})
7272
{
7373
return NRpcService::DoLocalRpc<TEvExecuteDataQueryRequest>(
74-
std::move(request), database, "" /* token */, runtime.GetActorSystem(0));
74+
std::move(request), database, /* token */ "", runtime.GetActorSystem(0));
7575
}
7676

7777
inline Ydb::Table::ExecuteDataQueryRequest MakeSimpleRequestRPC(
@@ -119,7 +119,7 @@ namespace NKqpHelpers {
119119
Ydb::Table::DeleteSessionRequest request;
120120
request.set_session_id(sessionId);
121121
auto future = NRpcService::DoLocalRpc<TEvDeleteSessionRequest>(
122-
std::move(request), "", "", /* token */ runtime.GetActorSystem(0));
122+
std::move(request), "", /* token */ "", runtime.GetActorSystem(0));
123123
}
124124

125125
inline THolder<NKqp::TEvKqp::TEvQueryRequest> MakeStreamRequest(
@@ -168,17 +168,15 @@ namespace NKqpHelpers {
168168
return FormatResult(result);
169169
}
170170

171-
inline TString KqpSimpleExec(TTestActorRuntime& runtime, const TString& query, bool staleRo = false, const TString& database = {}) {
171+
inline auto KqpSimpleSend(TTestActorRuntime& runtime, const TString& query, bool staleRo = false, const TString& database = {}) {
172172
TString sessionId = CreateSessionRPC(runtime, database);
173173
TString txId;
174-
auto response = AwaitResponse(
175-
runtime, SendRequest(runtime, MakeSimpleRequestRPC(query, sessionId, txId, true /* commitTx */, staleRo), database));
176-
if (response.operation().status() != Ydb::StatusIds::SUCCESS) {
177-
return TStringBuilder() << "ERROR: " << response.operation().status();
178-
}
179-
Ydb::Table::ExecuteQueryResult result;
180-
response.operation().result().UnpackTo(&result);
181-
return FormatResult(result);
174+
return SendRequest(runtime, MakeSimpleRequestRPC(query, sessionId, txId, /* commitTx */ true, staleRo), database);
175+
}
176+
177+
inline TString KqpSimpleExec(TTestActorRuntime& runtime, const TString& query, bool staleRo = false, const TString& database = {}) {
178+
auto response = AwaitResponse(runtime, KqpSimpleSend(runtime, query, staleRo, database));
179+
return FormatResult(response);
182180
}
183181

184182
inline TString KqpSimpleStaleRoExec(TTestActorRuntime& runtime, const TString& query, const TString& database = {}) {

ydb/core/tx/datashard/datashard_ut_volatile.cpp

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2283,6 +2283,120 @@ Y_UNIT_TEST_SUITE(DataShardVolatile) {
22832283
UNIT_ASSERT_VALUES_EQUAL(volatileTxs, 2u);
22842284
}
22852285

2286+
// Regression test for KIKIMR-21156
2287+
Y_UNIT_TEST(VolatileCommitOnBlobStorageFailure) {
2288+
TPortManager pm;
2289+
TServerSettings serverSettings(pm.GetPort(2134));
2290+
serverSettings.SetDomainName("Root")
2291+
.SetUseRealThreads(false)
2292+
.SetDomainPlanResolution(1000)
2293+
.SetEnableDataShardVolatileTransactions(true);
2294+
2295+
Tests::TServer::TPtr server = new TServer(serverSettings);
2296+
auto &runtime = *server->GetRuntime();
2297+
auto sender = runtime.AllocateEdgeActor();
2298+
2299+
runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
2300+
runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG);
2301+
runtime.SetLogPriority(NKikimrServices::KQP_EXECUTER, NLog::PRI_TRACE);
2302+
runtime.SetLogPriority(NKikimrServices::KQP_SESSION, NLog::PRI_TRACE);
2303+
2304+
InitRoot(server, sender);
2305+
2306+
TDisableDataShardLogBatching disableDataShardLogBatching;
2307+
CreateShardedTable(server, sender, "/Root", "table-1", 1);
2308+
CreateShardedTable(server, sender, "/Root", "table-2", 1);
2309+
2310+
// Make sure read flags are persisted by performing a snapshot read
2311+
UNIT_ASSERT_VALUES_EQUAL(
2312+
KqpSimpleExec(runtime, R"(
2313+
SELECT key, value FROM `/Root/table-1`
2314+
UNION ALL
2315+
SELECT key, value FROM `/Root/table-2`
2316+
ORDER BY key
2317+
)"),
2318+
"");
2319+
2320+
// Insert initial values
2321+
ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 10);"));
2322+
ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-2` (key, value) VALUES (2, 20);"));
2323+
2324+
// Start blocking commits for table-1
2325+
const auto shards1 = GetTableShards(server, sender, "/Root/table-1");
2326+
UNIT_ASSERT_VALUES_EQUAL(shards1.size(), 1u);
2327+
std::deque<TEvBlobStorage::TEvPut::TPtr> blockedPuts;
2328+
auto blockCommits = runtime.AddObserver<TEvBlobStorage::TEvPut>([&](TEvBlobStorage::TEvPut::TPtr& ev) {
2329+
auto* msg = ev->Get();
2330+
// Drop all put requests for table-1
2331+
if (msg->Id.TabletID() == shards1.at(0)) {
2332+
Cerr << "... blocking put " << msg->Id << Endl;
2333+
blockedPuts.push_back(std::move(ev));
2334+
}
2335+
});
2336+
2337+
// Start an upsert to table-1, this will block further readonly localdb tx completions
2338+
Cerr << "... starting an upsert to table-1" << Endl;
2339+
auto firstUpsertFuture = KqpSimpleSend(runtime, R"(
2340+
UPSERT INTO `/Root/table-1` (key, value) VALUES (3, 30);
2341+
)");
2342+
2343+
// Wait until puts are blocked
2344+
WaitFor(runtime, [&]{ return blockedPuts.size() > 0; }, "blocked puts");
2345+
auto firstUpsertPuts = std::move(blockedPuts);
2346+
UNIT_ASSERT(blockedPuts.empty());
2347+
2348+
// Read from table-2 and write to table-1 based on the result
2349+
// This will result in a two-shard volatile tx writing to table-1
2350+
Cerr << "... starting distributed tx between table-1 and table-2" << Endl;
2351+
auto volatileFuture = KqpSimpleSend(runtime, R"(
2352+
UPSERT INTO `/Root/table-1`
2353+
SELECT key + 2u AS key, value + 2u AS value
2354+
FROM `/Root/table-2`;
2355+
)");
2356+
2357+
// Wait until it also tries to commit
2358+
WaitFor(runtime, [&]{ return blockedPuts.size() > 0; }, "blocked puts");
2359+
2360+
// Now unblock the first upsert puts
2361+
blockCommits.Remove();
2362+
for (auto& ev : firstUpsertPuts) {
2363+
runtime.Send(ev.Release(), 0, true);
2364+
}
2365+
firstUpsertPuts.clear();
2366+
2367+
// And wait for it to finish successfully
2368+
Cerr << "... waiting for first upsert result" << Endl;
2369+
UNIT_ASSERT_VALUES_EQUAL(
2370+
FormatResult(AwaitResponse(runtime, std::move(firstUpsertFuture))),
2371+
"<empty>");
2372+
2373+
// Reply to everything previously blocked with an error, the shard will restart
2374+
for (auto& ev : blockedPuts) {
2375+
auto proxy = ev->Recipient;
2376+
ui32 groupId = GroupIDFromBlobStorageProxyID(proxy);
2377+
auto res = ev->Get()->MakeErrorResponse(NKikimrProto::ERROR, "Something went wrong", groupId);
2378+
runtime.Send(new IEventHandle(ev->Sender, proxy, res.release()), 0, true);
2379+
}
2380+
2381+
// Wait for the volatile tx result
2382+
Cerr << "... waiting for volatile tx result" << Endl;
2383+
auto result = FormatResult(AwaitResponse(runtime, std::move(volatileFuture)));
2384+
if (result == "<empty>") {
2385+
// A success result is not ok now, but in the future we might migrate state
2386+
// Check that the supposedly committed row actually exists
2387+
UNIT_ASSERT_VALUES_EQUAL(
2388+
KqpSimpleExec(runtime, R"(
2389+
SELECT key, value FROM `/Root/table-1` ORDER BY key;
2390+
)"),
2391+
"{ items { uint32_value: 1 } items { uint32_value: 10 } }, "
2392+
"{ items { uint32_value: 3 } items { uint32_value: 30 } }, "
2393+
"{ items { uint32_value: 4 } items { uint32_value: 22 } }");
2394+
} else {
2395+
// Otherwise the result must be undetermined
2396+
UNIT_ASSERT_VALUES_EQUAL(result, "ERROR: UNDETERMINED");
2397+
}
2398+
}
2399+
22862400
} // Y_UNIT_TEST_SUITE(DataShardVolatile)
22872401

22882402
} // namespace NKikimr

ydb/core/tx/datashard/finish_propose_unit.cpp

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,8 +99,11 @@ EExecutionStatus TFinishProposeUnit::Execute(TOperation::TPtr op,
9999
op->SetFinishProposeTs(DataShard.ConfirmReadOnlyLease());
100100
}
101101

102-
if (!op->HasResultSentFlag() && (op->IsDirty() || op->HasVolatilePrepareFlag() || !Pipeline.WaitCompletion(op)))
102+
if (!op->HasResultSentFlag() && (op->IsDirty() || op->HasVolatilePrepareFlag() || !Pipeline.WaitCompletion(op))) {
103+
DataShard.IncCounter(COUNTER_PREPARE_COMPLETE);
104+
op->SetProposeResultSentEarly();
103105
CompleteRequest(op, ctx);
106+
}
104107

105108
if (!DataShard.IsFollower())
106109
DataShard.PlanCleanup(ctx);
@@ -128,7 +131,7 @@ EExecutionStatus TFinishProposeUnit::Execute(TOperation::TPtr op,
128131
void TFinishProposeUnit::Complete(TOperation::TPtr op,
129132
const TActorContext &ctx)
130133
{
131-
if (!op->HasResultSentFlag()) {
134+
if (!op->HasResultSentFlag() && !op->IsProposeResultSentEarly()) {
132135
DataShard.IncCounter(COUNTER_PREPARE_COMPLETE);
133136

134137
if (op->Result())

ydb/core/tx/datashard/finish_propose_write_unit.cpp

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,8 +97,11 @@ EExecutionStatus TFinishProposeWriteUnit::Execute(TOperation::TPtr op,
9797
op->SetFinishProposeTs(DataShard.ConfirmReadOnlyLease());
9898
}
9999

100-
if (!op->HasResultSentFlag() && (op->IsDirty() || op->HasVolatilePrepareFlag() || !Pipeline.WaitCompletion(op)))
100+
if (!op->HasResultSentFlag() && (op->IsDirty() || op->HasVolatilePrepareFlag() || !Pipeline.WaitCompletion(op))) {
101+
DataShard.IncCounter(COUNTER_PREPARE_COMPLETE);
102+
op->SetProposeResultSentEarly();
101103
CompleteRequest(op, ctx);
104+
}
102105

103106
if (!DataShard.IsFollower())
104107
DataShard.PlanCleanup(ctx);
@@ -127,7 +130,7 @@ void TFinishProposeWriteUnit::Complete(TOperation::TPtr op, const TActorContext
127130
{
128131
TWriteOperation* writeOp = TWriteOperation::CastWriteOperation(op);
129132

130-
if (!op->HasResultSentFlag()) {
133+
if (!op->HasResultSentFlag() && !op->IsProposeResultSentEarly()) {
131134
DataShard.IncCounter(COUNTER_WRITE_COMPLETE);
132135

133136
if (writeOp->GetWriteResult())

ydb/core/tx/datashard/operation.h

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -402,12 +402,15 @@ class TBasicOpInfo {
402402

403403
bool IsMvccSnapshotRead() const { return !MvccSnapshot.IsMax(); }
404404
const TRowVersion& GetMvccSnapshot() const { return MvccSnapshot; }
405-
bool IsMvccSnapshotRepeatable() const { return MvccSnapshotRepeatable; }
405+
bool IsMvccSnapshotRepeatable() const { return MvccSnapshotRepeatable_; }
406406
void SetMvccSnapshot(const TRowVersion& snapshot, bool isRepeatable = true) {
407407
MvccSnapshot = snapshot;
408-
MvccSnapshotRepeatable = isRepeatable;
408+
MvccSnapshotRepeatable_ = isRepeatable;
409409
}
410410

411+
bool IsProposeResultSentEarly() const { return ProposeResultSentEarly_; }
412+
void SetProposeResultSentEarly(bool value = true) { ProposeResultSentEarly_ = value; }
413+
411414
///////////////////////////////////
412415
// DEBUG AND MONITORING //
413416
///////////////////////////////////
@@ -429,7 +432,11 @@ class TBasicOpInfo {
429432

430433
TSnapshotKey AcquiredSnapshotKey;
431434
TRowVersion MvccSnapshot = TRowVersion::Max();
432-
bool MvccSnapshotRepeatable = false;
435+
436+
private:
437+
// Runtime flags
438+
ui8 MvccSnapshotRepeatable_ : 1 = 0;
439+
ui8 ProposeResultSentEarly_ : 1 = 0;
433440
};
434441

435442
struct TRSData {

0 commit comments

Comments
 (0)