Skip to content

Commit 095544a

Browse files
Merge 99e9d2e into e9a7b28
2 parents e9a7b28 + 99e9d2e commit 095544a

File tree

9 files changed

+216
-13
lines changed

9 files changed

+216
-13
lines changed

ydb/core/tx/columnshard/columnshard.cpp

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ void TColumnShard::SwitchToWork(const TActorContext& ctx) {
6161
EnqueueBackgroundActivities();
6262
BackgroundSessionsManager->Start();
6363
ctx.Send(SelfId(), new TEvPrivate::TEvPeriodicWakeup());
64+
ctx.Send(SelfId(), new TEvPrivate::TEvPingSnapshotsUsage());
6465
NYDBTest::TControllers::GetColumnShardController()->OnSwitchToWork(TabletID());
6566
AFL_VERIFY(!!StartInstant);
6667
Counters.GetCSCounters().Initialization.OnSwitchToWork(TMonotonic::Now() - *StartInstant, TMonotonic::Now() - CreateInstant);
@@ -173,6 +174,14 @@ void TColumnShard::Handle(TEvPrivate::TEvReadFinished::TPtr& ev, const TActorCon
173174
}
174175
}
175176

177+
void TColumnShard::Handle(TEvPrivate::TEvPingSnapshotsUsage::TPtr& ev, const TActorContext& ctx) {
178+
if (auto writeTx =
179+
InFlightReadsTracker.Ping(this, NYDBTest::TControllers::GetColumnShardController()->GetReadTimeoutClean(MaxReadStaleness))) {
180+
Execute(writeTx.release(), ctx);
181+
}
182+
ctx.Schedule(0.3 * MaxReadStaleness, new TEvPrivate::TEvPingSnapshotsUsage());
183+
}
184+
176185
void TColumnShard::Handle(TEvPrivate::TEvPeriodicWakeup::TPtr& ev, const TActorContext& ctx) {
177186
if (ev->Get()->Manual) {
178187
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "TEvPrivate::TEvPeriodicWakeup::MANUAL")("tablet_id", TabletID());
@@ -182,6 +191,10 @@ void TColumnShard::Handle(TEvPrivate::TEvPeriodicWakeup::TPtr& ev, const TActorC
182191
SendWaitPlanStep(GetOutdatedStep());
183192

184193
SendPeriodicStats();
194+
if (auto writeTx =
195+
InFlightReadsTracker.Ping(this, NYDBTest::TControllers::GetColumnShardController()->GetReadTimeoutClean(MaxReadStaleness))) {
196+
Execute(writeTx.release(), ctx);
197+
}
185198
ctx.Schedule(PeriodicWakeupActivationPeriod, new TEvPrivate::TEvPeriodicWakeup());
186199
}
187200
}

ydb/core/tx/columnshard/columnshard__init.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,14 @@ bool TTxInit::ReadEverything(TTransactionContext& txc, const TActorContext& ctx)
225225
}
226226
Self->SharingSessionsManager = local;
227227
}
228+
{
229+
TMemoryProfileGuard g("TTxInit/TInFlightReadsTracker");
230+
TInFlightReadsTracker local(Self->StoragesManager);
231+
if (!local.LoadFromDatabase(txc.DB)) {
232+
return false;
233+
}
234+
Self->InFlightReadsTracker = std::move(local);
235+
}
228236

229237
Self->UpdateInsertTableCounters();
230238
Self->UpdateIndexCounters();

ydb/core/tx/columnshard/columnshard_impl.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,8 @@ TColumnShard::TColumnShard(TTabletStorageInfo* info, const TActorId& tablet)
8585
, BackgroundController(Counters.GetBackgroundControllerCounters())
8686
, NormalizerController(StoragesManager, Counters.GetSubscribeCounters())
8787
, SysLocks(this)
88-
, MaxReadStaleness(TDuration::MilliSeconds(AppDataVerified().ColumnShardConfig.GetMaxReadStaleness_ms())) {
88+
, MaxReadStaleness(NYDBTest::TControllers::GetColumnShardController()->GetReadTimeoutClean(
89+
TDuration::MilliSeconds(AppDataVerified().ColumnShardConfig.GetMaxReadStaleness_ms()))) {
8990
}
9091

9192
void TColumnShard::OnDetach(const TActorContext& ctx) {
@@ -187,8 +188,7 @@ ui64 TColumnShard::GetOutdatedStep() const {
187188
}
188189

189190
ui64 TColumnShard::GetMinReadStep() const {
190-
const TDuration maxReadStaleness = NYDBTest::TControllers::GetColumnShardController()->GetReadTimeoutClean(MaxReadStaleness);
191-
ui64 delayMillisec = maxReadStaleness.MilliSeconds();
191+
ui64 delayMillisec = MaxReadStaleness.MilliSeconds();
192192
ui64 passedStep = GetOutdatedStep();
193193
ui64 minReadStep = (passedStep > delayMillisec ? passedStep - delayMillisec : 0);
194194
return minReadStep;
@@ -785,7 +785,7 @@ void TColumnShard::SetupCleanupPortions() {
785785
return;
786786
}
787787

788-
NOlap::TSnapshot cleanupSnapshot{GetMinReadStep(), 0};
788+
const auto cleanupSnapshot = InFlightReadsTracker.GetSnapshotToClean().value_or(NOlap::TSnapshot(GetMinReadStep(), 0));
789789

790790
auto changes = TablesManager.MutablePrimaryIndex().StartCleanupPortions(cleanupSnapshot, TablesManager.GetPathsToDrop(), DataLocksManager);
791791
if (!changes) {

ydb/core/tx/columnshard/columnshard_impl.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,8 @@ class TColumnShard
217217
void Handle(TEvPrivate::TEvScanStats::TPtr &ev, const TActorContext &ctx);
218218
void Handle(TEvPrivate::TEvReadFinished::TPtr &ev, const TActorContext &ctx);
219219
void Handle(TEvPrivate::TEvPeriodicWakeup::TPtr& ev, const TActorContext& ctx);
220+
void Handle(TEvPrivate::TEvPingSnapshotsUsage::TPtr& ev, const TActorContext& ctx);
221+
220222
void Handle(TEvPrivate::TEvWriteIndex::TPtr& ev, const TActorContext& ctx);
221223
void Handle(NMetadata::NProvider::TEvRefreshSubscriberData::TPtr& ev);
222224
void Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActorContext& ctx);
@@ -361,6 +363,8 @@ class TColumnShard
361363
HFunc(TEvPrivate::TEvScanStats, Handle);
362364
HFunc(TEvPrivate::TEvReadFinished, Handle);
363365
HFunc(TEvPrivate::TEvPeriodicWakeup, Handle);
366+
HFunc(TEvPrivate::TEvPingSnapshotsUsage, Handle);
367+
364368
HFunc(NEvents::TDataEvents::TEvWrite, Handle);
365369
HFunc(TEvPrivate::TEvWriteDraft, Handle);
366370
HFunc(TEvPrivate::TEvGarbageCollectionFinished, Handle);

ydb/core/tx/columnshard/columnshard_private_events.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ struct TEvPrivate {
4646
EvExportSaveCursor,
4747

4848
EvTaskProcessedResult,
49+
EvPingSnapshotsUsage,
4950

5051
EvEnd
5152
};
@@ -158,7 +159,11 @@ struct TEvPrivate {
158159
bool Manual;
159160
};
160161

161-
class TEvWriteBlobsResult : public TEventLocal<TEvWriteBlobsResult, EvWriteBlobsResult> {
162+
struct TEvPingSnapshotsUsage: public TEventLocal<TEvPingSnapshotsUsage, EvPingSnapshotsUsage> {
163+
TEvPingSnapshotsUsage() = default;
164+
};
165+
166+
class TEvWriteBlobsResult: public TEventLocal<TEvWriteBlobsResult, EvWriteBlobsResult> {
162167
private:
163168
NColumnShard::TBlobPutResult::TPtr PutResult;
164169
NOlap::TWritingBuffer WritesBuffer;

ydb/core/tx/columnshard/columnshard_schema.h

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,8 @@ struct Schema : NIceDb::Schema {
108108
TableVersionInfo = 11,
109109
SmallBlobs = 12,
110110
OneToOneEvictedBlobs = 13,
111-
BlobsToDeleteWT = 14
111+
BlobsToDeleteWT = 14,
112+
InFlightSnapshots = 15
112113
};
113114

114115
// Tablet tables
@@ -250,6 +251,14 @@ struct Schema : NIceDb::Schema {
250251
using TColumns = TableColumns<BlobId, TabletId>;
251252
};
252253

254+
struct InFlightSnapshots: Table<(ui32)ECommonTables::InFlightSnapshots> {
255+
struct PlanStep: Column<1, NScheme::NTypeIds::Uint64> {};
256+
struct TxId: Column<2, NScheme::NTypeIds::Uint64> {};
257+
258+
using TKey = TableKey<PlanStep, TxId>;
259+
using TColumns = TableColumns<PlanStep, TxId>;
260+
};
261+
253262
// Index tables
254263

255264
// InsertTable - common for all indices

ydb/core/tx/columnshard/engines/portions/portion_info.h

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -444,10 +444,7 @@ class TPortionInfo {
444444
return false;
445445
}
446446

447-
bool visible = (Meta.RecordSnapshotMin <= snapshot);
448-
if (visible && RemoveSnapshot.Valid()) {
449-
visible = snapshot < RemoveSnapshot;
450-
}
447+
const bool visible = (Meta.RecordSnapshotMax <= snapshot) && (!RemoveSnapshot.Valid() || snapshot < RemoveSnapshot);
451448

452449
AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "IsVisible")("analyze_portion", DebugString())("visible", visible)("snapshot", snapshot.DebugString());
453450
return visible;

ydb/core/tx/columnshard/inflight_request_tracker.cpp

Lines changed: 87 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
11
#include "inflight_request_tracker.h"
2+
#include "columnshard_impl.h"
3+
#include "columnshard_schema.h"
4+
#include "data_sharing/common/transactions/tx_extension.h"
25
#include "engines/column_engine.h"
36
#include "engines/reader/plain_reader/constructor/read_metadata.h"
47

@@ -14,6 +17,13 @@ void TInFlightReadsTracker::RemoveInFlightRequest(ui64 cookie, const NOlap::TVer
1417
if (!readMeta) {
1518
continue;
1619
}
20+
{
21+
auto it = SnapshotsLive.find(readMeta->GetRequestSnapshot());
22+
AFL_VERIFY(it != SnapshotsLive.end());
23+
if (it->second.DelRequest(cookie)) {
24+
SnapshotsLive.erase(it);
25+
}
26+
}
1727

1828
THashMap<TString, THashSet<NOlap::TUnifiedBlobId>> portionBlobIds;
1929
for (const auto& portion : readMeta->SelectInfo->PortionsOrderedPK) {
@@ -47,7 +57,8 @@ void TInFlightReadsTracker::RemoveInFlightRequest(ui64 cookie, const NOlap::TVer
4757
RequestsMeta.erase(cookie);
4858
}
4959

50-
TConclusionStatus TInFlightReadsTracker::AddToInFlightRequest(const ui64 cookie, NOlap::NReader::TReadMetadataBase::TConstPtr readMetaBase, const NOlap::TVersionedIndex* index) {
60+
TConclusionStatus TInFlightReadsTracker::AddToInFlightRequest(
61+
const ui64 cookie, NOlap::NReader::TReadMetadataBase::TConstPtr readMetaBase, const NOlap::TVersionedIndex* index) {
5162
RequestsMeta[cookie].push_back(readMetaBase);
5263

5364
auto readMeta = std::dynamic_pointer_cast<const NOlap::NReader::NPlain::TReadMetadata>(readMetaBase);
@@ -87,4 +98,79 @@ TConclusionStatus TInFlightReadsTracker::AddToInFlightRequest(const ui64 cookie,
8798
return TConclusionStatus::Success();
8899
}
89100

101+
namespace {
102+
class TTransactionSavePersistentSnapshots: public NOlap::NDataSharing::TExtendedTransactionBase<NColumnShard::TColumnShard> {
103+
private:
104+
using TBase = NOlap::NDataSharing::TExtendedTransactionBase<NColumnShard::TColumnShard>;
105+
const std::set<NOlap::TSnapshot> SaveSnapshots;
106+
const std::set<NOlap::TSnapshot> RemoveSnapshots;
107+
virtual bool DoExecute(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& /*ctx*/) override {
108+
using namespace NColumnShard;
109+
NIceDb::TNiceDb db(txc.DB);
110+
for (auto&& i : SaveSnapshots) {
111+
db.Table<Schema::InFlightSnapshots>().Key(i.GetPlanStep(), i.GetTxId()).Update();
112+
}
113+
for (auto&& i : RemoveSnapshots) {
114+
db.Table<Schema::InFlightSnapshots>().Key(i.GetPlanStep(), i.GetTxId()).Delete();
115+
}
116+
return true;
117+
}
118+
119+
virtual void DoComplete(const TActorContext& /*ctx*/) override {
120+
}
121+
122+
public:
123+
TTransactionSavePersistentSnapshots(
124+
NColumnShard::TColumnShard* self, std::set<NOlap::TSnapshot>&& saveSnapshots, std::set<NOlap::TSnapshot>&& removeSnapshots)
125+
: TBase(self)
126+
, SaveSnapshots(std::move(saveSnapshots))
127+
, RemoveSnapshots(std::move(removeSnapshots))
128+
{
129+
AFL_VERIFY(saveSnapshots.size() || removeSnapshots.size());
130+
}
131+
};
132+
} // namespace
133+
134+
std::unique_ptr<NTabletFlatExecutor::ITransaction> TInFlightReadsTracker::Ping(TColumnShard* self, const TDuration critDuration) {
135+
std::set<NOlap::TSnapshot> snapshotsToSave;
136+
std::set<NOlap::TSnapshot> snapshotsToFree;
137+
for (auto&& i : SnapshotsLive) {
138+
if (i.second.Ping(critDuration)) {
139+
if (i.second.GetIsLock()) {
140+
snapshotsToSave.emplace(i.first);
141+
} else {
142+
snapshotsToFree.emplace(i.first);
143+
}
144+
}
145+
}
146+
for (auto&& i : snapshotsToFree) {
147+
SnapshotsLive.erase(i);
148+
}
149+
if (snapshotsToFree.size() || snapshotsToSave.size()) {
150+
return std::make_unique<TTransactionSavePersistentSnapshots>(self, std::move(snapshotsToSave), std::move(snapshotsToFree));
151+
} else {
152+
return nullptr;
153+
}
154+
155+
}
156+
157+
bool TInFlightReadsTracker::LoadFromDatabase(NTable::TDatabase& tableDB) {
158+
NIceDb::TNiceDb db(tableDB);
159+
auto rowset = db.Table<Schema::InFlightSnapshots>().Select();
160+
if (!rowset.IsReady()) {
161+
return false;
162+
}
163+
164+
while (!rowset.EndOfSet()) {
165+
const NOlap::TSnapshot snapshot(
166+
rowset.GetValue<Schema::InFlightSnapshots::PlanStep>(), rowset.GetValue<Schema::InFlightSnapshots::TxId>());
167+
AFL_VERIFY(SnapshotsLive.emplace(snapshot, TSnapshotLiveInfo::BuildFromDatabase(snapshot)).second);
168+
169+
if (!rowset.Next()) {
170+
return false;
171+
}
172+
}
173+
return true;
174+
}
175+
90176
}

ydb/core/tx/columnshard/inflight_request_tracker.h

Lines changed: 83 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,95 @@ class TVersionedIndex;
88
}
99

1010
namespace NKikimr::NColumnShard {
11-
11+
class TColumnShard;
1212
using NOlap::IBlobInUseTracker;
1313

14+
class TSnapshotLiveInfo {
15+
private:
16+
const NOlap::TSnapshot Snapshot;
17+
std::optional<TInstant> LastPingInstant;
18+
std::optional<TInstant> LastRequestFinishedInstant;
19+
THashSet<ui32> Requests;
20+
YDB_READONLY(bool, IsLock, false);
21+
22+
TSnapshotLiveInfo(const NOlap::TSnapshot& snapshot)
23+
: Snapshot(snapshot)
24+
{
25+
26+
}
27+
28+
public:
29+
void AddRequest(const ui32 cookie) {
30+
AFL_VERIFY(Requests.emplace(cookie).second);
31+
}
32+
33+
[[nodiscard]] bool DelRequest(const ui32 cookie) {
34+
AFL_VERIFY(Requests.erase(cookie));
35+
if (Requests.empty()) {
36+
LastRequestFinishedInstant = TInstant::Now();
37+
}
38+
if (!IsLock && Requests.empty()) {
39+
return true;
40+
}
41+
return false;
42+
}
43+
44+
static TSnapshotLiveInfo BuildFromRequest(const NOlap::TSnapshot& reqSnapshot) {
45+
return TSnapshotLiveInfo(reqSnapshot);
46+
}
47+
48+
static TSnapshotLiveInfo BuildFromDatabase(const NOlap::TSnapshot& reqSnapshot) {
49+
TSnapshotLiveInfo result(reqSnapshot);
50+
result.LastPingInstant = TInstant::Now();
51+
result.LastRequestFinishedInstant = result.LastPingInstant;
52+
result.IsLock = true;
53+
return result;
54+
}
55+
56+
bool Ping(const TDuration critDuration) {
57+
LastPingInstant = TInstant::Now();
58+
if (Requests.empty()) {
59+
AFL_VERIFY(LastRequestFinishedInstant);
60+
if (critDuration < *LastPingInstant - *LastRequestFinishedInstant && IsLock) {
61+
IsLock = false;
62+
return true;
63+
}
64+
} else {
65+
if (critDuration < *LastPingInstant - Snapshot.GetPlanInstant() && !IsLock) {
66+
IsLock = true;
67+
return true;
68+
}
69+
}
70+
return false;
71+
}
72+
};
73+
1474
class TInFlightReadsTracker {
75+
private:
76+
std::map<NOlap::TSnapshot, TSnapshotLiveInfo> SnapshotsLive;
77+
1578
public:
79+
std::optional<NOlap::TSnapshot> GetSnapshotToClean() const {
80+
if (SnapshotsLive.empty()) {
81+
return std::nullopt;
82+
} else {
83+
return SnapshotsLive.begin()->first;
84+
}
85+
}
86+
87+
bool LoadFromDatabase(NTable::TDatabase& db);
88+
89+
[[nodiscard]] std::unique_ptr<NTabletFlatExecutor::ITransaction> Ping(TColumnShard* self, const TDuration critDuration);
90+
1691
// Returns a unique cookie associated with this request
1792
[[nodiscard]] TConclusion<ui64> AddInFlightRequest(NOlap::NReader::TReadMetadataBase::TConstPtr readMeta, const NOlap::TVersionedIndex* index) {
1893
const ui64 cookie = NextCookie++;
94+
auto it = SnapshotsLive.find(readMeta->GetRequestSnapshot());
95+
if (it == SnapshotsLive.end()) {
96+
it = SnapshotsLive.emplace(readMeta->GetRequestSnapshot(), TSnapshotLiveInfo::BuildFromRequest(readMeta->GetRequestSnapshot())).first;
97+
} else {
98+
it->second.AddRequest(cookie);
99+
}
19100
auto status = AddToInFlightRequest(cookie, readMeta, index);
20101
if (!status) {
21102
return status;
@@ -47,7 +128,7 @@ class TInFlightReadsTracker {
47128

48129
private:
49130
std::shared_ptr<NOlap::IStoragesManager> StoragesManager;
50-
ui64 NextCookie{1};
131+
ui64 NextCookie = 1;
51132
THashMap<ui64, TList<NOlap::NReader::TReadMetadataBase::TConstPtr>> RequestsMeta;
52133
THashMap<ui64, ui64> PortionUseCount;
53134
NOlap::TSelectInfo::TStats SelectStatsDelta;

0 commit comments

Comments
 (0)