Skip to content

Commit e525df6

Browse files
authored
Allow create_pq op to be a cancelable part of multipart operation (#4438)
1 parent 18ea57a commit e525df6

File tree

3 files changed

+59
-23
lines changed

3 files changed

+59
-23
lines changed

ydb/core/tx/schemeshard/schemeshard__operation_create_pq.cpp

Lines changed: 30 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,6 @@ void ApplySharding(TTxId txId,
243243
pqGroup->InitSplitMergeGraph();
244244
}
245245

246-
247246
class TCreatePQ: public TSubOperation {
248247
static TTxState::ETxState NextState() {
249248
return TTxState::CreateParts;
@@ -279,6 +278,11 @@ class TCreatePQ: public TSubOperation {
279278
}
280279
}
281280

281+
ui64 PqShardCountChange = 0;
282+
ui64 StreamReservedThroughputChange = 0;
283+
ui64 StreamReservedStorageChange = 0;
284+
ui64 StreamShardsCountChange = 0;
285+
282286
public:
283287
using TSubOperation::TSubOperation;
284288

@@ -467,12 +471,10 @@ class TCreatePQ: public TSubOperation {
467471

468472
ApplySharding(OperationId.GetTxId(), pathId, pqGroup, txState, tabletChannelsBinding, pqChannelsBinding, context.SS);
469473

470-
NIceDb::TNiceDb db(context.GetDB());
471-
472474
for (auto& shard : pqGroup->Shards) {
473475
auto shardIdx = shard.first;
474476
for (const auto& pqInfo : shard.second->Partitions) {
475-
context.SS->PersistPersQueue(db, pathId, shardIdx, *pqInfo);
477+
context.DbChanges.PersistPersQueue(pathId, shardIdx, *pqInfo);
476478
}
477479
}
478480

@@ -483,16 +485,17 @@ class TCreatePQ: public TSubOperation {
483485
context.SS->Topics[pathId]->AlterData = pqGroup;
484486
context.SS->IncrementPathDbRefCount(pathId);
485487

486-
context.SS->PersistPersQueueGroup(db, pathId, emptyGroup);
487-
context.SS->PersistAddPersQueueGroupAlter(db, pathId, pqGroup);
488+
context.DbChanges.PersistPersQueueGroup(pathId, emptyGroup);
489+
context.DbChanges.PersistAddPersQueueGroupAlter(pathId, pqGroup);
488490

489491
for (auto shard : txState.Shards) {
490492
Y_ABORT_UNLESS(shard.Operation == TTxState::CreateParts);
491-
context.SS->PersistShardMapping(db, shard.Idx, InvalidTabletId, pathId, OperationId.GetTxId(), shard.TabletType);
492-
context.SS->PersistChannelsBinding(db, shard.Idx, tabletChannelsBinding);
493+
context.DbChanges.PersistShard(shard.Idx);
493494
}
495+
494496
Y_ABORT_UNLESS(txState.Shards.size() == shardsToCreate);
495-
context.SS->TabletCounters->Simple()[COUNTER_PQ_SHARD_COUNT].Add(shardsToCreate-1);
497+
PqShardCountChange = shardsToCreate - 1;
498+
context.SS->TabletCounters->Simple()[COUNTER_PQ_SHARD_COUNT].Add(PqShardCountChange);
496499
context.SS->TabletCounters->Simple()[COUNTER_PQ_RB_SHARD_COUNT].Add(1);
497500

498501
dstPath.Base()->CreateTxId = OperationId.GetTxId();
@@ -517,22 +520,19 @@ class TCreatePQ: public TSubOperation {
517520
}
518521
}
519522

520-
context.SS->ChangeTxState(db, OperationId, TTxState::CreateParts);
521523
context.OnComplete.ActivateTx(OperationId);
522524

523-
context.SS->PersistTxState(db, OperationId);
524-
525+
context.DbChanges.PersistTxState(OperationId);
525526

526527
if (!acl.empty()) {
527528
dstPath.Base()->ApplyACL(acl);
528529
}
529-
context.SS->PersistPath(db, dstPath.Base()->PathId);
530-
531-
context.SS->PersistUpdateNextPathId(db);
532-
context.SS->PersistUpdateNextShardIdx(db);
530+
context.MemChanges.GrabPath(context.SS, dstPath.Base()->PathId);
531+
context.DbChanges.PersistPath(dstPath.Base()->PathId);
533532

534533
++parentPath.Base()->DirAlterVersion;
535-
context.SS->PersistPathDirAlterVersion(db, parentPath.Base());
534+
context.MemChanges.GrabPath(context.SS, parentPath.Base()->PathId);
535+
context.DbChanges.PersistPath(parentPath.Base()->PathId);
536536
context.SS->ClearDescribePathCaches(parentPath.Base());
537537
context.OnComplete.PublishToSchemeBoard(OperationId, parentPath.Base()->PathId);
538538

@@ -544,10 +544,12 @@ class TCreatePQ: public TSubOperation {
544544
dstPath.DomainInfo()->IncPQPartitionsInside(partitionsToCreate);
545545
dstPath.DomainInfo()->IncPQReservedStorage(reserve.Storage);
546546

547-
context.SS->TabletCounters->Simple()[COUNTER_STREAM_RESERVED_THROUGHPUT].Add(reserve.Throughput);
548-
context.SS->TabletCounters->Simple()[COUNTER_STREAM_RESERVED_STORAGE].Add(reserve.Storage);
549-
550-
context.SS->TabletCounters->Simple()[COUNTER_STREAM_SHARDS_COUNT].Add(partitionsToCreate);
547+
StreamReservedThroughputChange = reserve.Throughput;
548+
StreamReservedStorageChange = reserve.Storage;
549+
StreamShardsCountChange = partitionsToCreate;
550+
context.SS->TabletCounters->Simple()[COUNTER_STREAM_RESERVED_THROUGHPUT].Add(StreamReservedThroughputChange);
551+
context.SS->TabletCounters->Simple()[COUNTER_STREAM_RESERVED_STORAGE].Add(StreamReservedStorageChange);
552+
context.SS->TabletCounters->Simple()[COUNTER_STREAM_SHARDS_COUNT].Add(StreamShardsCountChange);
551553

552554
dstPath.Base()->IncShardsInside(shardsToCreate);
553555
parentPath.Base()->IncAliveChildren();
@@ -556,8 +558,13 @@ class TCreatePQ: public TSubOperation {
556558
return result;
557559
}
558560

559-
void AbortPropose(TOperationContext&) override {
560-
Y_ABORT("no AbortPropose for TCreatePQ");
561+
void AbortPropose(TOperationContext& context) override {
562+
context.SS->TabletCounters->Simple()[COUNTER_STREAM_RESERVED_THROUGHPUT].Sub(StreamReservedThroughputChange);
563+
context.SS->TabletCounters->Simple()[COUNTER_STREAM_RESERVED_STORAGE].Sub(StreamReservedStorageChange);
564+
context.SS->TabletCounters->Simple()[COUNTER_STREAM_SHARDS_COUNT].Sub(StreamShardsCountChange);
565+
566+
context.SS->TabletCounters->Simple()[COUNTER_PQ_SHARD_COUNT].Sub(PqShardCountChange);
567+
context.SS->TabletCounters->Simple()[COUNTER_PQ_RB_SHARD_COUNT].Sub(1);
561568
}
562569

563570
void AbortUnsafe(TTxId forceDropTxId, TOperationContext& context) override {

ydb/core/tx/schemeshard/schemeshard__operation_db_changes.cpp

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,18 @@ void TStorageChanges::Apply(TSchemeShard* ss, NTabletFlatExecutor::TTransactionC
8585

8686
ss->PersistUpdateNextPathId(db);
8787
ss->PersistUpdateNextShardIdx(db);
88+
89+
for (const auto& [pathId, shardIdx, pqInfo] : PersQueue) {
90+
ss->PersistPersQueue(db, pathId, shardIdx, pqInfo);
91+
}
92+
93+
for (const auto& [pathId, pqGroup] : PersQueueGroup) {
94+
ss->PersistPersQueueGroup(db, pathId, pqGroup);
95+
}
96+
97+
for (const auto& [pathId, pqGroup] : AddPersQueueGroupAlter) {
98+
ss->PersistAddPersQueueGroupAlter(db, pathId, pqGroup);
99+
}
88100
}
89101

90102
}

ydb/core/tx/schemeshard/schemeshard__operation_db_changes.h

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,26 @@ class TStorageChanges: public TSimpleRefCount<TStorageChanges> {
3737

3838
TDeque<TPathId> Views;
3939

40+
//PQ part
41+
TDeque<std::tuple<TPathId, TShardIdx, TTopicTabletInfo::TTopicPartitionInfo>> PersQueue;
42+
TDeque<std::pair<TPathId, TTopicInfo::TPtr>> PersQueueGroup;
43+
TDeque<std::pair<TPathId, TTopicInfo::TPtr>> AddPersQueueGroupAlter;
44+
4045
public:
4146
~TStorageChanges() = default;
4247

48+
void PersistPersQueue(const TPathId& pathId, const TShardIdx& shardIdx, const TTopicTabletInfo::TTopicPartitionInfo& pqInfo) {
49+
PersQueue.emplace_back(pathId, shardIdx, pqInfo);
50+
}
51+
52+
void PersistPersQueueGroup(const TPathId& pathId, const TTopicInfo::TPtr pqGroup) {
53+
PersQueueGroup.emplace_back(pathId, pqGroup);
54+
}
55+
56+
void PersistAddPersQueueGroupAlter(TPathId pathId, const TTopicInfo::TPtr alterData) {
57+
AddPersQueueGroupAlter.emplace_back(pathId, alterData);
58+
}
59+
4360
void PersistPath(const TPathId& pathId) {
4461
Paths.push_back(pathId);
4562
}

0 commit comments

Comments
 (0)