|
6 | 6 | #include <ydb/core/tx/long_tx_service/public/lock_handle.h>
|
7 | 7 | #include <ydb/core/tx/tx_proxy/proxy.h>
|
8 | 8 | #include <ydb/core/tx/tx_proxy/upload_rows.h>
|
| 9 | +#include <ydb/core/testlib/actors/block_events.h> |
9 | 10 |
|
10 | 11 | #include <ydb/core/kqp/ut/common/kqp_ut_common.h> // Y_UNIT_TEST_(TWIN|QUAD)
|
11 | 12 | #include <ydb/core/mind/local.h>
|
@@ -5149,6 +5150,135 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) {
|
5149 | 5150 | }
|
5150 | 5151 | }
|
5151 | 5152 |
|
| 5153 | + Y_UNIT_TEST(ShardRestartAfterDropTable) { |
| 5154 | + TPortManager pm; |
| 5155 | + TServerSettings serverSettings(pm.GetPort(2134)); |
| 5156 | + serverSettings.SetDomainName("Root") |
| 5157 | + .SetUseRealThreads(false) |
| 5158 | + .SetDomainPlanResolution(100); |
| 5159 | + |
| 5160 | + Tests::TServer::TPtr server = new TServer(serverSettings); |
| 5161 | + auto &runtime = *server->GetRuntime(); |
| 5162 | + auto sender = runtime.AllocateEdgeActor(); |
| 5163 | + |
| 5164 | + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); |
| 5165 | + |
| 5166 | + InitRoot(server, sender); |
| 5167 | + |
| 5168 | + TDisableDataShardLogBatching disableDataShardLogBatching; |
| 5169 | + |
| 5170 | + UNIT_ASSERT_VALUES_EQUAL( |
| 5171 | + KqpSchemeExec(runtime, R"( |
| 5172 | + CREATE TABLE `/Root/table` (key Uint32, value Uint32, PRIMARY KEY (key)); |
| 5173 | + )"), |
| 5174 | + "SUCCESS"); |
| 5175 | + |
| 5176 | + const auto shards = GetTableShards(server, sender, "/Root/table"); |
| 5177 | + UNIT_ASSERT_VALUES_EQUAL(shards.size(), 1u); |
| 5178 | + |
| 5179 | + ExecSQL(server, sender, "UPSERT INTO `/Root/table` (key, value) VALUES (1, 11);"); |
| 5180 | + |
| 5181 | + TString sessionId, txId; |
| 5182 | + UNIT_ASSERT_VALUES_EQUAL( |
| 5183 | + KqpSimpleBegin(runtime, sessionId, txId, R"( |
| 5184 | + UPSERT INTO `/Root/table` (key, value) VALUES (2, 22); |
| 5185 | +
|
| 5186 | + SELECT key, value FROM `/Root/table` |
| 5187 | + WHERE key <= 5 |
| 5188 | + ORDER BY key; |
| 5189 | + )"), |
| 5190 | + "{ items { uint32_value: 1 } items { uint32_value: 11 } }, " |
| 5191 | + "{ items { uint32_value: 2 } items { uint32_value: 22 } }"); |
| 5192 | + |
| 5193 | + // Copy table (this will prevent shard deletion) |
| 5194 | + { |
| 5195 | + auto senderCopy = runtime.AllocateEdgeActor(); |
| 5196 | + ui64 txId = AsyncCreateCopyTable(server, senderCopy, "/Root", "table-copy", "/Root/table"); |
| 5197 | + WaitTxNotification(server, senderCopy, txId); |
| 5198 | + } |
| 5199 | + |
| 5200 | + // Drop the original table |
| 5201 | + { |
| 5202 | + auto senderDrop = runtime.AllocateEdgeActor(); |
| 5203 | + ui64 txId = AsyncDropTable(server, senderDrop, "/Root", "table"); |
| 5204 | + WaitTxNotification(server, senderDrop, txId); |
| 5205 | + } |
| 5206 | + |
| 5207 | + // Reboot the original table shard and sleep a little |
| 5208 | + // The bug was causing shard to crash in UndoShardLock |
| 5209 | + RebootTablet(runtime, shards.at(0), sender); |
| 5210 | + runtime.SimulateSleep(TDuration::Seconds(1)); |
| 5211 | + } |
| 5212 | + |
| 5213 | + Y_UNIT_TEST(ShardRestartAfterDropTableAndAbort) { |
| 5214 | + TPortManager pm; |
| 5215 | + TServerSettings serverSettings(pm.GetPort(2134)); |
| 5216 | + serverSettings.SetDomainName("Root") |
| 5217 | + .SetUseRealThreads(false) |
| 5218 | + .SetDomainPlanResolution(100); |
| 5219 | + |
| 5220 | + Tests::TServer::TPtr server = new TServer(serverSettings); |
| 5221 | + auto &runtime = *server->GetRuntime(); |
| 5222 | + auto sender = runtime.AllocateEdgeActor(); |
| 5223 | + |
| 5224 | + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); |
| 5225 | + |
| 5226 | + InitRoot(server, sender); |
| 5227 | + |
| 5228 | + TDisableDataShardLogBatching disableDataShardLogBatching; |
| 5229 | + |
| 5230 | + UNIT_ASSERT_VALUES_EQUAL( |
| 5231 | + KqpSchemeExec(runtime, R"( |
| 5232 | + CREATE TABLE `/Root/table` (key Uint32, value Uint32, PRIMARY KEY (key)); |
| 5233 | + )"), |
| 5234 | + "SUCCESS"); |
| 5235 | + |
| 5236 | + const auto shards = GetTableShards(server, sender, "/Root/table"); |
| 5237 | + UNIT_ASSERT_VALUES_EQUAL(shards.size(), 1u); |
| 5238 | + |
| 5239 | + ExecSQL(server, sender, "UPSERT INTO `/Root/table` (key, value) VALUES (1, 11);"); |
| 5240 | + |
| 5241 | + TString sessionId, txId; |
| 5242 | + UNIT_ASSERT_VALUES_EQUAL( |
| 5243 | + KqpSimpleBegin(runtime, sessionId, txId, R"( |
| 5244 | + UPSERT INTO `/Root/table` (key, value) VALUES (2, 22); |
| 5245 | +
|
| 5246 | + SELECT key, value FROM `/Root/table` |
| 5247 | + WHERE key <= 5 |
| 5248 | + ORDER BY key; |
| 5249 | + )"), |
| 5250 | + "{ items { uint32_value: 1 } items { uint32_value: 11 } }, " |
| 5251 | + "{ items { uint32_value: 2 } items { uint32_value: 22 } }"); |
| 5252 | + |
| 5253 | + // Copy table (this will prevent shard deletion) |
| 5254 | + { |
| 5255 | + auto senderCopy = runtime.AllocateEdgeActor(); |
| 5256 | + ui64 txId = AsyncCreateCopyTable(server, senderCopy, "/Root", "table-copy", "/Root/table"); |
| 5257 | + WaitTxNotification(server, senderCopy, txId); |
| 5258 | + } |
| 5259 | + |
| 5260 | + // Drop the original table |
| 5261 | + { |
| 5262 | + auto senderDrop = runtime.AllocateEdgeActor(); |
| 5263 | + ui64 txId = AsyncDropTable(server, senderDrop, "/Root", "table"); |
| 5264 | + WaitTxNotification(server, senderDrop, txId); |
| 5265 | + } |
| 5266 | + |
| 5267 | + TBlockEvents<TEvLongTxService::TEvLockStatus> blockedLockStatus(runtime); |
| 5268 | + |
| 5269 | + UNIT_ASSERT_VALUES_EQUAL( |
| 5270 | + KqpSimpleCommit(runtime, sessionId, txId, "SELECT 1"), |
| 5271 | + "ERROR: UNAVAILABLE"); |
| 5272 | + |
| 5273 | + runtime.WaitFor("blocked lock status", [&]{ return blockedLockStatus.size() > 0; }); |
| 5274 | + blockedLockStatus.Stop().clear(); |
| 5275 | + |
| 5276 | + // Reboot the original table shard and sleep a little |
| 5277 | + // The bug was causing shard to crash in RemoveSubscribedLock |
| 5278 | + RebootTablet(runtime, shards.at(0), sender); |
| 5279 | + runtime.SimulateSleep(TDuration::Seconds(1)); |
| 5280 | + } |
| 5281 | + |
5152 | 5282 | Y_UNIT_TEST(BrokenLockChangesDontLeak) {
|
5153 | 5283 | TPortManager pm;
|
5154 | 5284 | TServerSettings serverSettings(pm.GetPort(2134));
|
|
0 commit comments