Skip to content

Commit 6def903

Browse files
Merge 50f9e9f into c42eeee
2 parents c42eeee + 50f9e9f commit 6def903

File tree

31 files changed

+458
-96
lines changed

31 files changed

+458
-96
lines changed

ydb/core/grpc_services/rpc_list_operations.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ class TListOperationsRPC: public TRpcOperationRequestActor<TListOperationsRPC, T
4242
return "[ListIndexBuilds]";
4343
case TOperationId::SCRIPT_EXECUTION:
4444
return "[ListScriptExecutions]";
45+
case TOperationId::SS_BG_TASKS:
46+
return "[SchemeShardTasks]";
4547
default:
4648
return "[Untagged]";
4749
}
@@ -165,6 +167,7 @@ class TListOperationsRPC: public TRpcOperationRequestActor<TListOperationsRPC, T
165167
case TOperationId::EXPORT:
166168
case TOperationId::IMPORT:
167169
case TOperationId::BUILD_INDEX:
170+
case TOperationId::SS_BG_TASKS:
168171
break;
169172
case TOperationId::SCRIPT_EXECUTION:
170173
SendListScriptExecutions();
@@ -181,6 +184,7 @@ class TListOperationsRPC: public TRpcOperationRequestActor<TListOperationsRPC, T
181184
switch (ev->GetTypeRewrite()) {
182185
hFunc(TEvExport::TEvListExportsResponse, Handle);
183186
hFunc(TEvImport::TEvListImportsResponse, Handle);
187+
hFunc(NSchemeShard::NBackground::TEvListResponse, Handle);
184188
hFunc(TEvIndexBuilder::TEvListResponse, Handle);
185189
hFunc(NKqp::TEvListScriptExecutionOperationsResponse, Handle);
186190
default:

ydb/core/kqp/gateway/behaviour/tablestore/operations/alter_sharding.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ TConclusionStatus TAlterShardingOperation::DoDeserialize(NYql::TObjectSettingsIm
1111
if (*modification == "SPLIT") {
1212
Increase = true;
1313
} else if (*modification == "MERGE") {
14-
return TConclusionStatus::Fail("modification is impossible yet");
14+
Increase = false;
1515
} else {
1616
return TConclusionStatus::Fail("undefined modification: \"" + *modification + "\"");
1717
}
@@ -20,9 +20,10 @@ TConclusionStatus TAlterShardingOperation::DoDeserialize(NYql::TObjectSettingsIm
2020

2121
void TAlterShardingOperation::DoSerializeScheme(NKikimrSchemeOp::TModifyScheme& scheme, const bool isStandalone) const {
2222
AFL_VERIFY(!isStandalone);
23+
AFL_VERIFY(!!Increase);
2324
scheme.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterColumnTable);
2425
scheme.MutableAlterColumnTable()->SetName(GetStoreName());
25-
*scheme.MutableAlterColumnTable()->MutableReshardColumnTable() = NKikimrSchemeOp::TReshardColumnTable();
26+
scheme.MutableAlterColumnTable()->MutableReshardColumnTable()->SetIncrease(*Increase);
2627
}
2728

2829
}

ydb/core/kqp/ut/olap/blobs_sharing_ut.cpp

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
#include "helpers/typed_local.h"
2+
#include "helpers/local.h"
3+
#include "helpers/writer.h"
24
#include <ydb/core/tx/columnshard/data_sharing/initiator/controller/abstract.h>
35
#include <ydb/core/tx/columnshard/hooks/testing/controller.h>
46
#include <ydb/core/tx/columnshard/common/snapshot.h>
@@ -7,6 +9,8 @@
79
#include <ydb/core/tx/columnshard/data_sharing/destination/session/destination.h>
810
#include <ydb/core/tx/columnshard/data_sharing/destination/events/control.h>
911
#include <ydb/core/base/tablet_pipecache.h>
12+
#include <ydb/public/sdk/cpp/client/ydb_operation/operation.h>
13+
#include <ydb/public/sdk/cpp/client/ydb_ss_tasks/task.h>
1014

1115
namespace NKikimr::NKqp {
1216

@@ -261,6 +265,151 @@ Y_UNIT_TEST_SUITE(KqpOlapBlobsSharing) {
261265
tester.Execute(0, {1, 2}, false, NOlap::TSnapshot(TInstant::Now().MilliSeconds(), 1232123), {0});
262266
tester.WaitNormalization();
263267
}
268+
269+
class TReshardingTest {
270+
private:
271+
YDB_ACCESSOR(TString, ShardingType, "HASH_FUNCTION_CONSISTENCY_64");
272+
273+
void WaitResharding() {
274+
const TInstant start = TInstant::Now();
275+
bool clean = false;
276+
while (TInstant::Now() - start < TDuration::Seconds(200)) {
277+
NYdb::NOperation::TOperationClient operationClient(Kikimr.GetDriver());
278+
auto result = operationClient.List<NYdb::NSchemeShard::TBackgroundProcessesResponse>().GetValueSync();
279+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
280+
if (result.GetList().size() == 0) {
281+
Cerr << "RESHARDING_FINISHED" << Endl;
282+
clean = true;
283+
break;
284+
}
285+
UNIT_ASSERT_VALUES_EQUAL(result.GetList().size(), 1);
286+
Sleep(TDuration::Seconds(1));
287+
Cerr << "WAIT_FINISHED..." << Endl;
288+
}
289+
AFL_VERIFY(clean);
290+
}
291+
292+
void CheckCount(const ui32 expectation) {
293+
auto it = Kikimr.GetTableClient().StreamExecuteScanQuery(R"(
294+
--!syntax_v1
295+
296+
SELECT
297+
COUNT(*)
298+
FROM `/Root/olapStore/olapTable`
299+
)").GetValueSync();
300+
301+
UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
302+
TString result = StreamResultToYson(it);
303+
Cerr << result << Endl;
304+
CompareYson(result, "[[" + ::ToString(expectation) + "u;]]");
305+
}
306+
307+
TKikimrRunner Kikimr;
308+
public:
309+
310+
TReshardingTest()
311+
: Kikimr(TKikimrSettings().SetWithSampleTables(false))
312+
{
313+
314+
}
315+
316+
void Execute() {
317+
auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>();
318+
csController->SetPeriodicWakeupActivationPeriod(TDuration::Seconds(1));
319+
csController->SetLagForCompactionBeforeTierings(TDuration::Seconds(1));
320+
csController->SetOverrideReduceMemoryIntervalLimit(1LLU << 30);
321+
322+
TLocalHelper(Kikimr).SetShardingMethod(ShardingType).CreateTestOlapTable("olapTable", "olapStore", 16, 4);
323+
auto tableClient = Kikimr.GetTableClient();
324+
325+
Tests::NCommon::TLoggerInit(Kikimr).SetComponents({ NKikimrServices::TX_COLUMNSHARD }, "CS").SetPriority(NActors::NLog::PRI_DEBUG).Initialize();
326+
327+
std::vector<TString> uids;
328+
std::vector<TString> resourceIds;
329+
std::vector<ui32> levels;
330+
331+
{
332+
WriteTestData(Kikimr, "/Root/olapStore/olapTable", 1000000, 300000000, 10000);
333+
WriteTestData(Kikimr, "/Root/olapStore/olapTable", 1100000, 300100000, 10000);
334+
WriteTestData(Kikimr, "/Root/olapStore/olapTable", 1200000, 300200000, 10000);
335+
WriteTestData(Kikimr, "/Root/olapStore/olapTable", 1300000, 300300000, 10000);
336+
WriteTestData(Kikimr, "/Root/olapStore/olapTable", 1400000, 300400000, 10000);
337+
WriteTestData(Kikimr, "/Root/olapStore/olapTable", 2000000, 200000000, 70000);
338+
WriteTestData(Kikimr, "/Root/olapStore/olapTable", 3000000, 100000000, 110000);
339+
340+
const auto filler = [&](const ui32 startRes, const ui32 startUid, const ui32 count) {
341+
for (ui32 i = 0; i < count; ++i) {
342+
uids.emplace_back("uid_" + ::ToString(startUid + i));
343+
resourceIds.emplace_back(::ToString(startRes + i));
344+
levels.emplace_back(i % 5);
345+
}
346+
};
347+
348+
filler(1000000, 300000000, 10000);
349+
filler(1100000, 300100000, 10000);
350+
filler(1200000, 300200000, 10000);
351+
filler(1300000, 300300000, 10000);
352+
filler(1400000, 300400000, 10000);
353+
filler(2000000, 200000000, 70000);
354+
filler(3000000, 100000000, 110000);
355+
356+
}
357+
358+
CheckCount(230000);
359+
{
360+
auto alterQuery = TStringBuilder() << R"(ALTER OBJECT `/Root/olapStore/olapTable` (TYPE TABLESTORE) SET (ACTION=ALTER_SHARDING, MODIFICATION=SPLIT);)";
361+
auto session = tableClient.CreateSession().GetValueSync().GetSession();
362+
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
363+
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString());
364+
}
365+
366+
WaitResharding();
367+
AFL_VERIFY(csController->GetShardingFiltersCount().Val() == 0);
368+
CheckCount(230000);
369+
370+
i64 count = csController->GetShardingFiltersCount().Val();
371+
AFL_VERIFY(count == 16)("count", count);
372+
WriteTestData(Kikimr, "/Root/olapStore/olapTable", 1000000, 300000000, 10000);
373+
csController->WaitIndexation(TDuration::Seconds(5));
374+
csController->WaitCompactions(TDuration::Seconds(5));
375+
376+
csController->SetCompactionControl(NYDBTest::EOptimizerCompactionWeightControl::Disable);
377+
378+
CheckCount(230000);
379+
380+
AFL_VERIFY(count == csController->GetShardingFiltersCount().Val())("count", count)("val", csController->GetShardingFiltersCount().Val());
381+
const ui32 portionsCount = 8;
382+
for (ui32 i = 0; i < 3; ++i) {
383+
{
384+
auto alterQuery = TStringBuilder() << R"(ALTER OBJECT `/Root/olapStore/olapTable` (TYPE TABLESTORE) SET (ACTION=ALTER_SHARDING, MODIFICATION=MERGE);)";
385+
auto session = tableClient.CreateSession().GetValueSync().GetSession();
386+
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
387+
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString());
388+
}
389+
WaitResharding();
390+
csController->WaitCleaning(TDuration::Seconds(5));
391+
392+
CheckCount(230000);
393+
AFL_VERIFY(count + portionsCount == csController->GetShardingFiltersCount().Val())("count", count)("val", csController->GetShardingFiltersCount().Val());
394+
count += portionsCount;
395+
}
396+
{
397+
auto alterQuery = TStringBuilder() << R"(ALTER OBJECT `/Root/olapStore/olapTable` (TYPE TABLESTORE) SET (ACTION=ALTER_SHARDING, MODIFICATION=MERGE);)";
398+
auto session = tableClient.CreateSession().GetValueSync().GetSession();
399+
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
400+
UNIT_ASSERT_VALUES_UNEQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString());
401+
}
402+
}
403+
};
404+
405+
Y_UNIT_TEST(TableReshardingConsistency64) {
406+
TReshardingTest().SetShardingType("HASH_FUNCTION_CONSISTENCY_64").Execute();
407+
}
408+
409+
Y_UNIT_TEST(TableReshardingModuloN) {
410+
TReshardingTest().SetShardingType("HASH_FUNCTION_MODULO_N").Execute();
411+
}
412+
264413
}
265414

266415
}

ydb/core/kqp/ut/olap/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ PEERDIR(
3333
ydb/core/tx/columnshard
3434
ydb/core/kqp/ut/olap/helpers
3535
ydb/core/tx/datashard/ut_common
36+
ydb/public/sdk/cpp/client/ydb_operation
3637
)
3738

3839
YQL_LAST_ABI_VERSION()

ydb/core/protos/flat_scheme_op.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1626,6 +1626,7 @@ message TShardingModification {
16261626
message TShardingTransfer {
16271627
optional uint64 DestinationTabletId = 1;
16281628
repeated uint64 SourceTabletIds = 2;
1629+
optional bool Moving = 3 [default = false];
16291630
}
16301631

16311632
message TShardingTransfers {

ydb/core/tx/columnshard/columnshard.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ struct TEvColumnShard {
110110
ui64 txId, TString txBody, const ui32 flags = 0)
111111
: TEvProposeTransaction(txKind, source, txId, std::move(txBody), flags)
112112
{
113-
Y_ABORT_UNLESS(txKind == NKikimrTxColumnShard::TX_KIND_SCHEMA);
113+
// Y_ABORT_UNLESS(txKind == NKikimrTxColumnShard::TX_KIND_SCHEMA);
114114
Record.SetSchemeShardId(ssId);
115115
}
116116

ydb/core/tx/columnshard/data_sharing/destination/session/destination.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ NKikimr::TConclusionStatus TDestinationSession::DataReceived(THashMap<ui64, NEve
1717
auto it = PathIds.find(i.first);
1818
AFL_VERIFY(it != PathIds.end())("path_id_undefined", i.first);
1919
for (auto&& portion : i.second.DetachPortions()) {
20-
portion.ResetShardingVersion();
2120
portion.SetPathId(it->second);
2221
index.UpsertPortion(std::move(portion));
2322
}

ydb/core/tx/columnshard/data_sharing/destination/transactions/tx_data_from_source.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ TTxDataFromSource::TTxDataFromSource(NColumnShard::TColumnShard* self, const std
4747
++p;
4848
} else {
4949
i.second.MutablePortions()[p] = std::move(i.second.MutablePortions().back());
50+
i.second.MutablePortions()[p].ResetShardingVersion();
5051
i.second.MutablePortions().pop_back();
5152
}
5253
}

ydb/core/tx/columnshard/data_sharing/modification/tasks/modification.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,10 @@ class TBlobSharing {
318318
if (i != selfTabletId) {
319319
TStorageTabletTask task(StorageId, i);
320320
task.AddRemapOwner(BlobId, selfTabletId, toTabletId);
321-
AFL_VERIFY(result.emplace(i, std::move(task)).second);
321+
auto info = result.emplace(i, task);
322+
if (!info.second) {
323+
info.first->second.Merge(task);
324+
}
322325
}
323326

324327
{

ydb/core/tx/columnshard/engines/column_engine.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,7 @@ class IColumnEngine {
276276
virtual const std::shared_ptr<arrow::Schema>& GetReplaceKey() const;
277277

278278
virtual bool HasDataInPathId(const ui64 pathId) const = 0;
279+
virtual bool ErasePathId(const ui64 pathId) = 0;
279280
virtual bool Load(IDbWrapper& db) = 0;
280281
void RegisterTable(const ui64 pathId) {
281282
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "RegisterTable")("path_id", pathId);

0 commit comments

Comments
 (0)