Skip to content

Commit 0e6efa9

Browse files
authored
Merge 2211f62 into 53f75b6
2 parents 53f75b6 + 2211f62 commit 0e6efa9

File tree

5 files changed

+99
-8
lines changed

5 files changed

+99
-8
lines changed

ydb/core/tx/datashard/datashard.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#include "datashard_impl.h"
22
#include "datashard_txs.h"
3+
#include "datashard_locks_db.h"
34
#include "probes.h"
45

56
#include <ydb/core/base/interconnect_channels.h>
@@ -1620,7 +1621,9 @@ TUserTable::TPtr TDataShard::MoveUserTable(TOperation::TPtr op, const NKikimrTxD
16201621
newTableInfo->StatsUpdateInProgress = false;
16211622
newTableInfo->StatsNeedUpdate = true;
16221623

1623-
RemoveUserTable(prevId);
1624+
TDataShardLocksDb locksDb(*this, txc);
1625+
1626+
RemoveUserTable(prevId, &locksDb);
16241627
AddUserTable(newId, newTableInfo);
16251628

16261629
for (auto& [_, record] : ChangesQueue) {

ydb/core/tx/datashard/datashard_impl.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1607,10 +1607,10 @@ class TDataShard
16071607
return nullptr;
16081608
}
16091609

1610-
void RemoveUserTable(const TPathId& tableId) {
1611-
TableInfos.erase(tableId.LocalPathId);
1612-
SysLocks.RemoveSchema(tableId);
1610+
void RemoveUserTable(const TPathId& tableId, ILocksDb* locksDb) {
1611+
SysLocks.RemoveSchema(tableId, locksDb);
16131612
Pipeline.GetDepTracker().RemoveSchema(tableId);
1613+
TableInfos.erase(tableId.LocalPathId);
16141614
}
16151615

16161616
void AddUserTable(const TPathId& tableId, TUserTable::TPtr tableInfo) {

ydb/core/tx/datashard/datashard_locks.cpp

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -624,12 +624,23 @@ void TLockLocker::UpdateSchema(const TPathId& tableId, const TUserTable& tableIn
624624
table->UpdateKeyColumnsTypes(tableInfo.KeyColumnTypes);
625625
}
626626

627-
void TLockLocker::RemoveSchema(const TPathId& tableId) {
627+
void TLockLocker::RemoveSchema(const TPathId& tableId, ILocksDb* db) {
628+
// Make sure all persistent locks are removed from the database
629+
for (auto& pr : Locks) {
630+
if (pr.second->IsPersistent()) {
631+
pr.second->PersistRemoveLock(db);
632+
}
633+
pr.second->OnRemoved();
634+
}
635+
628636
Tables.erase(tableId);
629637
Y_ABORT_UNLESS(Tables.empty());
630638
Locks.clear();
631639
ShardLocks.clear();
640+
ExpireQueue.Clear();
632641
BrokenLocks.Clear();
642+
BrokenPersistentLocks.Clear();
643+
BrokenLocksCount_ = 0;
633644
CleanupPending.clear();
634645
CleanupCandidates.clear();
635646
PendingSubscribeLocks.clear();

ydb/core/tx/datashard/datashard_locks.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -601,7 +601,7 @@ class TLockLocker {
601601
}
602602

603603
void UpdateSchema(const TPathId& tableId, const TUserTable& tableInfo);
604-
void RemoveSchema(const TPathId& tableId);
604+
void RemoveSchema(const TPathId& tableId, ILocksDb* db);
605605
bool ForceShardLock(const TPathId& tableId) const;
606606
bool ForceShardLock(const TIntrusiveList<TTableLocks, TTableLocksReadListTag>& readTables) const;
607607

@@ -840,8 +840,8 @@ class TSysLocks {
840840
Locker.UpdateSchema(tableId, tableInfo);
841841
}
842842

843-
void RemoveSchema(const TPathId& tableId) {
844-
Locker.RemoveSchema(tableId);
843+
void RemoveSchema(const TPathId& tableId, ILocksDb* db) {
844+
Locker.RemoveSchema(tableId, db);
845845
}
846846

847847
TVector<TLock> ApplyLocks();

ydb/core/tx/datashard/datashard_ut_snapshot.cpp

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4896,6 +4896,83 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) {
48964896
"{ items { int32_value: 2 } items { int32_value: 20 } }");
48974897
}
48984898

4899+
Y_UNIT_TEST(UncommittedChangesRenameTable) {
4900+
TPortManager pm;
4901+
TServerSettings serverSettings(pm.GetPort(2134));
4902+
serverSettings.SetDomainName("Root")
4903+
.SetUseRealThreads(false)
4904+
.SetDomainPlanResolution(100)
4905+
.SetEnableDataShardVolatileTransactions(true);
4906+
4907+
Tests::TServer::TPtr server = new TServer(serverSettings);
4908+
auto &runtime = *server->GetRuntime();
4909+
auto sender = runtime.AllocateEdgeActor();
4910+
4911+
runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
4912+
4913+
InitRoot(server, sender);
4914+
4915+
TDisableDataShardLogBatching disableDataShardLogBatching;
4916+
4917+
UNIT_ASSERT_VALUES_EQUAL(
4918+
KqpSchemeExec(runtime, R"(
4919+
CREATE TABLE `/Root/table1` (key int, value int, PRIMARY KEY (key));
4920+
)"),
4921+
"SUCCESS");
4922+
4923+
ExecSQL(server, sender, "UPSERT INTO `/Root/table1` (key, value) VALUES (2, 22);");
4924+
4925+
TString sessionId = CreateSessionRPC(runtime);
4926+
TString txId;
4927+
UNIT_ASSERT_VALUES_EQUAL(
4928+
KqpSimpleBegin(runtime, sessionId, txId, R"(
4929+
UPSERT INTO `/Root/table1` (key, value) VALUES (1, 11), (3, 33);
4930+
SELECT key, value FROM `/Root/table1` ORDER BY key;
4931+
)"),
4932+
"{ items { int32_value: 1 } items { int32_value: 11 } }, "
4933+
"{ items { int32_value: 2 } items { int32_value: 22 } }, "
4934+
"{ items { int32_value: 3 } items { int32_value: 33 } }");
4935+
4936+
auto shards = GetTableShards(server, sender, "/Root/table1");
4937+
auto tableId1 = ResolveTableId(server, sender, "/Root/table1");
4938+
4939+
// Check shard has open transactions
4940+
{
4941+
runtime.SendToPipe(shards.at(0), sender, new TEvDataShard::TEvGetOpenTxs(tableId1.PathId));
4942+
auto ev = runtime.GrabEdgeEventRethrow<TEvDataShard::TEvGetOpenTxsResult>(sender);
4943+
UNIT_ASSERT_C(!ev->Get()->OpenTxs.empty(), "at shard " << shards.at(0));
4944+
}
4945+
4946+
WaitTxNotification(server, sender, AsyncMoveTable(server, "/Root/table1", "/Root/table1moved"));
4947+
auto tableId2 = ResolveTableId(server, sender, "/Root/table1moved");
4948+
4949+
runtime.SimulateSleep(TDuration::Seconds(1));
4950+
4951+
// Check shard doesn't have open transactions
4952+
{
4953+
runtime.SendToPipe(shards.at(0), sender, new TEvDataShard::TEvGetOpenTxs(tableId2.PathId));
4954+
auto ev = runtime.GrabEdgeEventRethrow<TEvDataShard::TEvGetOpenTxsResult>(sender);
4955+
UNIT_ASSERT_C(ev->Get()->OpenTxs.empty(), "at shard " << shards.at(0));
4956+
}
4957+
4958+
RebootTablet(runtime, shards.at(0), sender);
4959+
4960+
// The original table was removed
4961+
// We must not be able to commit the transaction
4962+
UNIT_ASSERT_VALUES_EQUAL(
4963+
KqpSimpleCommit(runtime, sessionId, txId, "SELECT 1"),
4964+
"ERROR: ABORTED");
4965+
4966+
runtime.SimulateSleep(TDuration::Seconds(1));
4967+
4968+
// Check shard doesn't have open transactions
4969+
{
4970+
runtime.SendToPipe(shards.at(0), sender, new TEvDataShard::TEvGetOpenTxs(tableId2.PathId));
4971+
auto ev = runtime.GrabEdgeEventRethrow<TEvDataShard::TEvGetOpenTxsResult>(sender);
4972+
UNIT_ASSERT_C(ev->Get()->OpenTxs.empty(), "at shard " << shards.at(0));
4973+
}
4974+
}
4975+
48994976
}
49004977

49014978
} // namespace NKikimr

0 commit comments

Comments
 (0)