Skip to content

Commit c72cf7b

Browse files
wait tables clearing before new initialization in tablet (#5419)
1 parent b01223a commit c72cf7b

File tree

30 files changed

+492
-105
lines changed

30 files changed

+492
-105
lines changed

ydb/core/tx/columnshard/columnshard__init.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,8 @@ bool TTxInit::ReadEverything(TTransactionContext& txc, const TActorContext& ctx)
223223
Self->SharingSessionsManager = local;
224224
}
225225

226+
Self->ProgressTxController->StartOperators();
227+
226228
Self->UpdateInsertTableCounters();
227229
Self->UpdateIndexCounters();
228230
Self->UpdateResourceMetrics(ctx, {});

ydb/core/tx/columnshard/columnshard_impl.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ TColumnShard::TColumnShard(TTabletStorageInfo* info, const TActorId& tablet)
7272
, StatsReportInterval(NYDBTest::TControllers::GetColumnShardController()->GetStatsReportInterval(TSettings::DefaultStatsReportInterval))
7373
, InFlightReadsTracker(StoragesManager)
7474
, TablesManager(StoragesManager, info->TabletID)
75+
, Subscribers(std::make_shared<NSubscriber::TManager>(*this))
7576
, PipeClientCache(NTabletPipe::CreateBoundedClientCache(new NTabletPipe::TBoundedClientCacheConfig(), GetPipeClientConfig()))
7677
, InsertTable(std::make_unique<NOlap::TInsertTable>())
7778
, SubscribeCounters(std::make_shared<NOlap::NResourceBroker::NSubscribe::TSubscriberCounters>())

ydb/core/tx/columnshard/columnshard_impl.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
#include "data_sharing/common/transactions/tx_extension.h"
2929
#include "data_sharing/modification/events/change_owning.h"
3030

31+
#include "subscriber/abstract/manager/manager.h"
32+
3133
#include <ydb/core/base/tablet_pipecache.h>
3234
#include <ydb/core/tablet/tablet_counters.h>
3335
#include <ydb/core/tablet/tablet_pipe_client_cache.h>
@@ -84,10 +86,11 @@ class TGeneralCompactColumnEngineChanges;
8486

8587
namespace NKikimr::NColumnShard {
8688

87-
89+
class TTxFinishAsyncTransaction;
8890
class TTxInsertTableCleanup;
8991
class TTxRemoveSharedBlobs;
9092
class TOperationsManager;
93+
class TWaitEraseTablesTxSubscriber;
9194

9295
extern bool gAllowLogBatchingDefaultValue;
9396

@@ -150,6 +153,8 @@ class TColumnShard
150153
friend class TTxApplyNormalizer;
151154
friend class TTxMonitoring;
152155
friend class TTxRemoveSharedBlobs;
156+
friend class TTxFinishAsyncTransaction;
157+
friend class TWaitEraseTablesTxSubscriber;
153158

154159
friend class NOlap::TCleanupPortionsColumnEngineChanges;
155160
friend class NOlap::TCleanupTablesColumnEngineChanges;
@@ -490,6 +495,7 @@ class TColumnShard
490495

491496
TInFlightReadsTracker InFlightReadsTracker;
492497
TTablesManager TablesManager;
498+
std::shared_ptr<NSubscriber::TManager> Subscribers;
493499
std::shared_ptr<TTiersManager> Tiers;
494500
std::unique_ptr<TTabletCountersBase> TabletCountersPtr;
495501
TTabletCountersBase* TabletCounters;

ydb/core/tx/columnshard/data_sharing/common/context/context.cpp

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,7 @@ NKikimrColumnShardDataSharingProto::TTransferContext TTransferContext::Serialize
1212
for (auto&& i : SourceTabletIds) {
1313
result.AddSourceTabletIds((ui64)i);
1414
}
15-
if (SnapshotBarrier) {
16-
SnapshotBarrier->SerializeToProto(*result.MutableSnapshotBarrier());
17-
}
15+
GetSnapshotBarrierVerified().SerializeToProto(*result.MutableSnapshotBarrier());
1816
result.SetMoving(Moving);
1917
return result;
2018
}
@@ -59,7 +57,7 @@ bool TTransferContext::IsEqualTo(const TTransferContext& context) const {
5957
}
6058

6159
TString TTransferContext::DebugString() const {
62-
return TStringBuilder() << "{from=" << (ui64)DestinationTabletId << ";moving=" << Moving << ";snapshot=" << (SnapshotBarrier ? SnapshotBarrier->DebugString() : "NO") << "}";
60+
return TStringBuilder() << "{from=" << (ui64)DestinationTabletId << ";moving=" << Moving << ";snapshot=" << SnapshotBarrier.DebugString() << "}";
6361
}
6462

6563
TTransferContext::TTransferContext(const TTabletId destination, const THashSet<TTabletId>& sources, const TSnapshot& snapshotBarrier, const bool moving, const std::optional<ui64> txId)
@@ -74,13 +72,8 @@ TTransferContext::TTransferContext(const TTabletId destination, const THashSet<T
7472
}
7573

7674
const NKikimr::NOlap::TSnapshot& TTransferContext::GetSnapshotBarrierVerified() const {
77-
AFL_VERIFY(!!SnapshotBarrier);
78-
return *SnapshotBarrier;
79-
}
80-
81-
void TTransferContext::SetSnapshotBarrier(const TSnapshot& snapshot) {
82-
AFL_VERIFY(!SnapshotBarrier || *SnapshotBarrier == snapshot);
83-
SnapshotBarrier = snapshot;
75+
AFL_VERIFY(SnapshotBarrier.Valid());
76+
return SnapshotBarrier;
8477
}
8578

8679
}

ydb/core/tx/columnshard/data_sharing/common/context/context.h

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ class TTransferContext {
1515
YDB_READONLY(TTabletId, DestinationTabletId, (TTabletId)0);
1616
YDB_READONLY_DEF(THashSet<TTabletId>, SourceTabletIds);
1717
YDB_READONLY(bool, Moving, false);
18-
std::optional<TSnapshot> SnapshotBarrier;
18+
TSnapshot SnapshotBarrier = TSnapshot::Zero();
1919
YDB_READONLY_DEF(std::optional<ui64>, TxId);
2020
public:
2121
TTransferContext() = default;
@@ -24,8 +24,6 @@ class TTransferContext {
2424

2525
const TSnapshot& GetSnapshotBarrierVerified() const;
2626

27-
void SetSnapshotBarrier(const TSnapshot& snapshot);
28-
2927
TTransferContext(const TTabletId destination, const THashSet<TTabletId>& sources, const TSnapshot& snapshotBarrier, const bool moving, const std::optional<ui64> txId = {});
3028
NKikimrColumnShardDataSharingProto::TTransferContext SerializeToProto() const;
3129
TConclusionStatus DeserializeFromProto(const NKikimrColumnShardDataSharingProto::TTransferContext& proto);

ydb/core/tx/columnshard/data_sharing/destination/session/destination.h

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -90,10 +90,6 @@ class TDestinationSession: public TCommonSession {
9090
public:
9191
bool TryTakePortionBlobs(const TVersionedIndex& vIndex, const TPortionInfo& portion);
9292

93-
void SetBarrierSnapshot(const TSnapshot& value) {
94-
TransferContext.SetSnapshotBarrier(value);
95-
}
96-
9793
TSourceCursorForDestination& GetCursorVerified(const TTabletId& tabletId) {
9894
auto it = Cursors.find(tabletId);
9995
AFL_VERIFY(it != Cursors.end());

ydb/core/tx/columnshard/engines/changes/cleanup_tables.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#include <ydb/core/tx/columnshard/engines/column_engine_logs.h>
44
#include <ydb/core/tx/columnshard/blobs_action/blob_manager_db.h>
55
#include <ydb/core/tx/columnshard/columnshard_schema.h>
6+
#include <ydb/core/tx/columnshard/subscriber/events/tables_erased/event.h>
67
#include <util/string/join.h>
78

89
namespace NKikimr::NOlap {
@@ -25,6 +26,7 @@ void TCleanupTablesColumnEngineChanges::DoWriteIndexOnComplete(NColumnShard::TCo
2526
for (auto&& t : TablesToDrop) {
2627
self->TablesManager.TryFinalizeDropPathOnComplete(t);
2728
}
29+
self->Subscribers->OnEvent(std::make_shared<NColumnShard::NSubscriber::TEventTablesErased>(TablesToDrop));
2830
}
2931

3032
void TCleanupTablesColumnEngineChanges::DoStart(NColumnShard::TColumnShard& self) {
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
#include "event.h"
2+
3+
namespace NKikimr::NColumnShard::NSubscriber {
4+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
#pragma once
2+
#include <ydb/library/accessor/accessor.h>
3+
#include <util/generic/string.h>
4+
5+
namespace NKikimr::NColumnShard::NSubscriber {
6+
7+
enum class EEventType {
8+
Undefined,
9+
TablesErased
10+
};
11+
12+
class ISubscriptionEvent {
13+
private:
14+
YDB_READONLY(EEventType, Type, EEventType::Undefined);
15+
virtual TString DoDebugString() const {
16+
return "";
17+
}
18+
public:
19+
virtual ~ISubscriptionEvent() = default;
20+
21+
ISubscriptionEvent(const EEventType type)
22+
: Type(type)
23+
{
24+
25+
}
26+
27+
TString DebugString() const {
28+
return DoDebugString();
29+
}
30+
};
31+
32+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
LIBRARY()
2+
3+
SRCS(
4+
event.cpp
5+
)
6+
7+
PEERDIR(
8+
)
9+
10+
GENERATE_ENUM_SERIALIZATION(event.h)
11+
12+
END()

0 commit comments

Comments
 (0)