Skip to content

Commit dc9317b

Browse files
authored
Merge ae23199 into 95f0b21
2 parents 95f0b21 + ae23199 commit dc9317b

File tree

4 files changed

+147
-10
lines changed

4 files changed

+147
-10
lines changed

ydb/core/kqp/ut/olap/blobs_sharing_ut.cpp

Lines changed: 129 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,22 @@
1-
#include "helpers/typed_local.h"
21
#include "helpers/local.h"
2+
#include "helpers/typed_local.h"
33
#include "helpers/writer.h"
4-
#include <ydb/core/tx/columnshard/data_sharing/initiator/controller/abstract.h>
5-
#include <ydb/core/tx/columnshard/hooks/testing/controller.h>
4+
5+
#include <ydb/core/base/tablet_pipecache.h>
66
#include <ydb/core/tx/columnshard/common/snapshot.h>
7-
#include <ydb/core/tx/columnshard/data_sharing/initiator/status/abstract.h>
87
#include <ydb/core/tx/columnshard/data_sharing/common/context/context.h>
9-
#include <ydb/core/tx/columnshard/data_sharing/destination/session/destination.h>
108
#include <ydb/core/tx/columnshard/data_sharing/destination/events/control.h>
11-
#include <ydb/core/base/tablet_pipecache.h>
9+
#include <ydb/core/tx/columnshard/data_sharing/destination/session/destination.h>
10+
#include <ydb/core/tx/columnshard/data_sharing/initiator/controller/abstract.h>
11+
#include <ydb/core/tx/columnshard/data_sharing/initiator/status/abstract.h>
12+
#include <ydb/core/tx/columnshard/hooks/testing/controller.h>
13+
1214
#include <ydb/public/sdk/cpp/client/ydb_operation/operation.h>
1315
#include <ydb/public/sdk/cpp/client/ydb_ss_tasks/task.h>
1416

17+
#include <chrono>
18+
#include <thread>
19+
1520
namespace NKikimr::NKqp {
1621

1722
Y_UNIT_TEST_SUITE(KqpOlapBlobsSharing) {
@@ -276,7 +281,7 @@ Y_UNIT_TEST_SUITE(KqpOlapBlobsSharing) {
276281
void WaitResharding(const TString& hint = "") {
277282
const TInstant start = TInstant::Now();
278283
bool clean = false;
279-
while (TInstant::Now() - start < TDuration::Seconds(20)) {
284+
while (TInstant::Now() - start < TDuration::Seconds(200)) {
280285
NYdb::NOperation::TOperationClient operationClient(Kikimr.GetDriver());
281286
auto result = operationClient.List<NYdb::NSchemeShard::TBackgroundProcessesResponse>().GetValueSync();
282287
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
@@ -408,7 +413,8 @@ Y_UNIT_TEST_SUITE(KqpOlapBlobsSharing) {
408413

409414
public:
410415
TAsyncReshardingTest() {
411-
TLocalHelper(Kikimr).CreateTestOlapTable("olapTable", "olapStore", 128, 4);
416+
Tests::NCommon::TLoggerInit(Kikimr).SetPriority(NLog::PRI_CRIT).Initialize();
417+
TLocalHelper(Kikimr).CreateTestOlapTable("olapTable", "olapStore", 1024, 32);
412418
}
413419

414420
void AddBatch(int numRows) {
@@ -561,5 +567,120 @@ Y_UNIT_TEST_SUITE(KqpOlapBlobsSharing) {
561567

562568
tester.CheckCount();
563569
}
570+
571+
Y_UNIT_TEST(MultipleMerge) {
572+
TAsyncReshardingTest tester;
573+
tester.DisableCompaction();
574+
575+
tester.AddBatch(10000);
576+
577+
for (int i = 0; i < 4; i++) {
578+
tester.StartResharding("MERGE");
579+
tester.WaitResharding();
580+
}
581+
582+
tester.RestartAllShards();
583+
584+
tester.CheckCount();
585+
}
586+
587+
Y_UNIT_TEST(MultipleSplits) {
588+
TAsyncReshardingTest tester;
589+
tester.DisableCompaction();
590+
591+
tester.AddBatch(10000);
592+
593+
for (int i = 0; i < 4; i++) {
594+
tester.StartResharding("SPLIT");
595+
tester.WaitResharding();
596+
}
597+
598+
tester.RestartAllShards();
599+
600+
tester.CheckCount();
601+
}
602+
603+
Y_UNIT_TEST(MultipleSplitsThenMerges) {
604+
TAsyncReshardingTest tester;
605+
tester.DisableCompaction();
606+
607+
tester.AddBatch(10000);
608+
609+
for (int i = 0; i < 4; i++) {
610+
tester.StartResharding("SPLIT");
611+
tester.WaitResharding();
612+
}
613+
614+
for (int i = 0; i < 8; i++) {
615+
tester.StartResharding("MERGE");
616+
tester.WaitResharding();
617+
}
618+
619+
tester.RestartAllShards();
620+
621+
tester.CheckCount();
622+
}
623+
624+
Y_UNIT_TEST(MultipleSplitsWithRestartsAfterWait) {
625+
TAsyncReshardingTest tester;
626+
tester.DisableCompaction();
627+
628+
tester.AddBatch(10000);
629+
630+
for (int i = 0; i < 4; i++) {
631+
tester.StartResharding("SPLIT");
632+
tester.WaitResharding();
633+
tester.RestartAllShards();
634+
}
635+
636+
tester.CheckCount();
637+
}
638+
639+
Y_UNIT_TEST(MultipleSplitsWithRestartsWhenWait) {
640+
TAsyncReshardingTest tester;
641+
tester.DisableCompaction();
642+
643+
tester.AddBatch(10000);
644+
645+
for (int i = 0; i < 4; i++) {
646+
tester.StartResharding("SPLIT");
647+
tester.RestartAllShards();
648+
tester.WaitResharding();
649+
}
650+
tester.RestartAllShards();
651+
652+
tester.CheckCount();
653+
}
654+
655+
Y_UNIT_TEST(MultipleMergesWithRestartsAfterWait) {
656+
TAsyncReshardingTest tester;
657+
tester.DisableCompaction();
658+
659+
tester.AddBatch(10000);
660+
661+
for (int i = 0; i < 4; i++) {
662+
tester.StartResharding("MERGE");
663+
tester.WaitResharding();
664+
tester.RestartAllShards();
665+
}
666+
667+
tester.CheckCount();
668+
}
669+
670+
Y_UNIT_TEST(MultipleMergesWithRestartsWhenWait) {
671+
TAsyncReshardingTest tester;
672+
tester.DisableCompaction();
673+
674+
tester.AddBatch(10000);
675+
676+
for (int i = 0; i < 4; i++) {
677+
tester.StartResharding("MERGE");
678+
tester.RestartAllShards();
679+
tester.WaitResharding();
680+
}
681+
tester.RestartAllShards();
682+
683+
tester.CheckCount();
684+
}
564685
}
565686
}

ydb/core/tx/columnshard/data_sharing/destination/session/destination.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ NKikimr::TConclusionStatus TDestinationSession::DeserializeDataFromProto(
108108
for (auto&& i : proto.GetPathIds()) {
109109
auto g = index.GetGranuleOptional(i.GetDestPathId());
110110
if (!g) {
111-
return TConclusionStatus::Fail("Incorrect remapping into undefined path id: " + ::ToString(i.GetDestPathId()));
111+
return TConclusionStatus::Fail("Incorrect remapping into undefined path id: " + ::ToString(i.GetDestPathId()) + " " + GetSessionId());
112112
}
113113
if (!i.GetSourcePathId() || !i.GetDestPathId()) {
114114
return TConclusionStatus::Fail("PathIds remapping contains incorrect ids: " + i.DebugString());

ydb/core/tx/columnshard/data_sharing/destination/transactions/tx_finish_ack_from_initiator.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,15 @@ namespace NKikimr::NOlap::NDataSharing {
55
bool TTxFinishAckFromInitiator::DoExecute(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& /*ctx*/) {
66
using namespace NKikimr::NColumnShard;
77
NIceDb::TNiceDb db(txc.DB);
8+
9+
AFL_CRIT(NKikimrServices::TX_COLUMNSHARD)("TTxFinishAckFromInitiator::DoExecute", Session->GetSessionId());
810
db.Table<Schema::DestinationSessions>().Key(Session->GetSessionId()).Delete();
911
return true;
1012
}
1113

1214
void TTxFinishAckFromInitiator::DoComplete(const TActorContext& /*ctx*/) {
15+
AFL_CRIT(NKikimrServices::TX_COLUMNSHARD)("TTxFinishAckFromInitiator::DoComplete", Session->GetSessionId());
16+
1317
Self->SharingSessionsManager->RemoveDestinationSession(Session->GetSessionId());
1418
}
1519

ydb/core/tx/columnshard/transactions/operators/sharing.cpp

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,11 +52,23 @@ void TSharingTransactionOperator::DoStartProposeOnComplete(TColumnShard& /*owner
5252
}
5353

5454
bool TSharingTransactionOperator::ProgressOnExecute(
55-
TColumnShard& /*owner*/, const NOlap::TSnapshot& /*version*/, NTabletFlatExecutor::TTransactionContext& /*txc*/) {
55+
TColumnShard& owner, const NOlap::TSnapshot& /*version*/, NTabletFlatExecutor::TTransactionContext& /*txc*/) {
56+
if (!SharingTask) {
57+
return true;
58+
}
59+
if (!TxAbort) {
60+
TxAbort = SharingTask->AckInitiatorFinished(&owner, SharingTask).DetachResult();
61+
}
62+
5663
return true;
5764
}
5865

5966
bool TSharingTransactionOperator::ProgressOnComplete(TColumnShard& owner, const TActorContext& ctx) {
67+
if (!SharingTask) {
68+
return true;
69+
}
70+
AFL_VERIFY(!!TxAbort);
71+
TxAbort->Complete(ctx);
6072
for (TActorId subscriber : NotifySubscribers) {
6173
auto event = MakeHolder<TEvColumnShard::TEvNotifyTxCompletionResult>(owner.TabletID(), GetTxId());
6274
ctx.Send(subscriber, event.Release(), 0, 0);

0 commit comments

Comments
 (0)