Skip to content

24-3: Fix bulk operations breaking frozen locks #12018

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions ydb/core/tx/datashard/datashard__op_rows.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,11 @@ void TDataShard::Handle(TEvDataShard::TEvUploadRowsRequest::TPtr& ev, const TAct
UpdateProposeQueueSize();
return;
}
if (Pipeline.HasProposeDelayers()) {
DelayedProposeQueue.emplace_back().Reset(ev.Release());
UpdateProposeQueueSize();
return;
}
if (IsReplicated()) {
return Reject<TEvDataShard::TEvUploadRowsResponse>(this, ev, "bulk upsert",
ERejectReasons::WrongState, "Can't execute bulk upsert at replicated table", &ReadOnly, ctx, TDataShard::ELogThrottlerType::UploadRows_Reject);
Expand All @@ -237,6 +242,11 @@ void TDataShard::Handle(TEvDataShard::TEvEraseRowsRequest::TPtr& ev, const TActo
UpdateProposeQueueSize();
return;
}
if (Pipeline.HasProposeDelayers()) {
DelayedProposeQueue.emplace_back().Reset(ev.Release());
UpdateProposeQueueSize();
return;
}
if (IsReplicated()) {
return Reject<TEvDataShard::TEvEraseRowsResponse>(this, ev, "erase",
ERejectReasons::WrongState, "Can't execute erase at replicated table", &ExecError, ctx, TDataShard::ELogThrottlerType::EraseRows_Reject);
Expand Down
184 changes: 184 additions & 0 deletions ydb/core/tx/datashard/datashard_ut_snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3977,6 +3977,190 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) {
"{ items { uint32_value: 4 } items { uint32_value: 40 } }");
}

Y_UNIT_TEST(UncommittedWriteRestartDuringCommitThenBulkErase) {
NKikimrConfig::TAppConfig app;

TPortManager pm;
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings.SetDomainName("Root")
.SetUseRealThreads(false)
.SetDomainPlanResolution(100)
.SetAppConfig(app)
// Bug was with non-volatile transactions
.SetEnableDataShardVolatileTransactions(false);

Tests::TServer::TPtr server = new TServer(serverSettings);
auto &runtime = *server->GetRuntime();
auto sender = runtime.AllocateEdgeActor();

runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG);
runtime.SetLogPriority(NKikimrServices::KQP_EXECUTER, NLog::PRI_TRACE);
runtime.SetLogPriority(NKikimrServices::KQP_SESSION, NLog::PRI_TRACE);

InitRoot(server, sender);

TDisableDataShardLogBatching disableDataShardLogBatching;
UNIT_ASSERT_VALUES_EQUAL(
KqpSchemeExec(runtime, R"(
CREATE TABLE `/Root/table` (key Uint32, value Uint32, PRIMARY KEY (key))
WITH (PARTITION_AT_KEYS = (5));
)"),
"SUCCESS");

// Insert some initial data
ExecSQL(server, sender, "UPSERT INTO `/Root/table` (key, value) VALUES (1, 10), (5, 50);");

const auto shards = GetTableShards(server, sender, "/Root/table");
const auto tableId = ResolveTableId(server, sender, "/Root/table");
UNIT_ASSERT_VALUES_EQUAL(shards.size(), 2u);

TString sessionId, txId;

// Start inserting a couple of rows into the table
Cerr << "... sending initial upsert" << Endl;
UNIT_ASSERT_VALUES_EQUAL(
KqpSimpleBegin(runtime, sessionId, txId, R"(
SELECT key, value FROM `/Root/table` WHERE key = 1;
UPSERT INTO `/Root/table` (key, value) VALUES (2, 20), (6, 60);
)"),
"{ items { uint32_value: 1 } items { uint32_value: 10 } }");

// We want to block readsets next
std::vector<std::unique_ptr<IEventHandle>> readSets;
auto blockReadSets = runtime.AddObserver<TEvTxProcessing::TEvReadSet>([&](TEvTxProcessing::TEvReadSet::TPtr& ev) {
readSets.emplace_back(ev.Release());
});

// Start committing an additional read/write
// Note: select on the table flushes accumulated changes first
Cerr << "... sending commit request" << Endl;
auto commitFuture = SendRequest(runtime, MakeSimpleRequestRPC(R"(
SELECT key, value FROM `/Root/table` ORDER BY key;
)", sessionId, txId, /* commitTx */ true));

WaitFor(runtime, [&]{ return readSets.size() >= 2; }, "readset exchange");
UNIT_ASSERT_VALUES_EQUAL(readSets.size(), 2u);

// We want to make sure we block the first progress message when shards reboot
std::vector<TActorId> shardActors(shards.size());
UNIT_ASSERT_VALUES_EQUAL(shardActors.size(), 2u);
std::vector<std::unique_ptr<IEventHandle>> blockedProgress;
auto blockProgressQueue = runtime.AddObserver([&](TAutoPtr<IEventHandle>& ev) noexcept {
switch (ev->GetTypeRewrite()) {
case TEvTablet::TEvBoot::EventType: {
auto* msg = ev->Get<TEvTablet::TEvBoot>();
Cerr << "... observed TEvBoot for " << msg->TabletID << " at " << ev->GetRecipientRewrite() << Endl;
auto it = std::find(shards.begin(), shards.end(), msg->TabletID);
if (it != shards.end()) {
shardActors.at(it - shards.begin()) = ev->GetRecipientRewrite();
}
break;
}
case EventSpaceBegin(TKikimrEvents::ES_PRIVATE) + 0 /* EvProgressTransaction */: {
auto it = std::find(shardActors.begin(), shardActors.end(), ev->GetRecipientRewrite());
if (it != shardActors.end()) {
ui64 shardId = shards.at(it - shardActors.begin());
Cerr << "... blocking TEvProgressTranasction at " << ev->GetRecipientRewrite() << " shard " << shardId << Endl;
blockedProgress.emplace_back(ev.Release());
return;
}
break;
}
}
});

// Clear old readsets and reboot both shards with TEvPoison
// This way shards don't have a chance to reply causing an UNDETERMINED error
readSets.clear();
for (ui64 shardId : shards) {
Cerr << "... sending TEvPoison to " << shardId << Endl;
ForwardToTablet(runtime, shardId, sender, new TEvents::TEvPoison);
}

// Note: we cannot wait for the commit result, since KQP is blocked trying to abort

// Sleep a little to make sure everything settles
Cerr << "... sleeping for 1 second" << Endl;
runtime.SimulateSleep(TDuration::Seconds(1));

UNIT_ASSERT_VALUES_EQUAL(readSets.size(), 2u);
UNIT_ASSERT_VALUES_EQUAL(blockedProgress.size(), 2u);

// Send an erase rows request before the progress queue resumes
{
Cerr << "... sending TEvEraseRowsRequest to shard 1 for key 1" << Endl;
auto req = std::make_unique<TEvDataShard::TEvEraseRowsRequest>();
req->Record.SetTableId(tableId.PathId.LocalPathId);
req->Record.SetSchemaVersion(tableId.SchemaVersion);
req->Record.AddKeyColumnIds(1);
ui32 key = 1;
TCell keyCell = TCell::Make(key);
req->Record.AddKeyColumns(TSerializedCellVec::Serialize(TArrayRef<const TCell>(&keyCell, 1)));
runtime.Send(new IEventHandle(shardActors.at(0), sender, req.release()), 0, true);
// Give shard 1 a chance to process this request incorrectly
Cerr << "... sleeping for 1 second" << Endl;
runtime.SimulateSleep(TDuration::Seconds(1));
}

// Unblock progress queue and resend blocked messages
Cerr << "... resending progress queue" << Endl;
blockProgressQueue.Remove();
for (auto& ev : blockedProgress) {
runtime.Send(ev.release(), 0, true);
}
blockedProgress.clear();

// This insert must run after the currently committing transaction, so it must fail: either read happens before
// the commit and is broken later by the commit, or the read finds a duplicate row and insert fails. Due to a
// bug the commit lock might already be broken, causing conflicts not to work properly, and allowing the insert
// to overwrite key = 2.
Cerr << "... sending an insert" << Endl;
auto insertFuture = KqpSimpleSend(runtime, R"(
INSERT INTO `/Root/table` (key, value) VALUES (2, 22);
)");

// Sleep a little to make sure everything settles
Cerr << "... sleeping for 1 second" << Endl;
runtime.SimulateSleep(TDuration::Seconds(1));

// Unblock readsets letting transaction to complete
Cerr << "... resending readsets" << Endl;
blockReadSets.Remove();
for (auto& ev : readSets) {
runtime.Send(ev.release(), 0, true);
}
readSets.clear();

// Sleep a little to make sure everything settles
Cerr << "... sleeping for 1 second" << Endl;
runtime.SimulateSleep(TDuration::Seconds(1));

// We expect erase to succeed by this point
Cerr << "... checking the erase result" << Endl;
{
auto ev = runtime.GrabEdgeEventRethrow<TEvDataShard::TEvEraseRowsResponse>(sender);
UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Record.GetStatus(), NKikimrTxDataShard::TEvEraseRowsResponse::OK);
}

// We expect commit to fail with an UNDETERMINED error
Cerr << "... checking the commit result" << Endl;
UNIT_ASSERT_VALUES_EQUAL(
FormatResult(AwaitResponse(runtime, std::move(commitFuture))),
"ERROR: UNDETERMINED");

// Now make a read query, we must not observe any partial commits
Cerr << "... checking final table state" << Endl;
UNIT_ASSERT_VALUES_EQUAL(
KqpSimpleExec(runtime, R"(
SELECT key, value FROM `/Root/table`
ORDER BY key;
)"),
"{ items { uint32_value: 2 } items { uint32_value: 20 } }, "
"{ items { uint32_value: 5 } items { uint32_value: 50 } }, "
"{ items { uint32_value: 6 } items { uint32_value: 60 } }");
}

/**
* This observer forces newly created nodes to start on particular nodes
*/
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/locks/locks.h
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ enum class ELockRangeFlags : ui8 {
using ELockRangeFlagsRaw = std::underlying_type<ELockRangeFlags>::type;

inline ELockRangeFlags operator|(ELockRangeFlags a, ELockRangeFlags b) { return ELockRangeFlags(ELockRangeFlagsRaw(a) | ELockRangeFlagsRaw(b)); }
inline ELockRangeFlags operator&(ELockRangeFlags a, ELockRangeFlags b) { return ELockRangeFlags(ELockRangeFlagsRaw(a) | ELockRangeFlagsRaw(b)); }
inline ELockRangeFlags operator&(ELockRangeFlags a, ELockRangeFlags b) { return ELockRangeFlags(ELockRangeFlagsRaw(a) & ELockRangeFlagsRaw(b)); }
inline ELockRangeFlags& operator|=(ELockRangeFlags& a, ELockRangeFlags b) { return a = a | b; }
inline ELockRangeFlags& operator&=(ELockRangeFlags& a, ELockRangeFlags b) { return a = a & b; }
inline bool operator!(ELockRangeFlags c) { return ELockRangeFlagsRaw(c) == 0; }
Expand Down
Loading