Skip to content

Fix compacted pages offload race #15377

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Mar 11, 2025
2 changes: 1 addition & 1 deletion ydb/core/tablet_flat/flat_boot_bundle.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ namespace NBoot {
void TryFinalize()
{
if (!LeftReads) {
for (auto req : Loader->Run(false)) {
for (auto req : Loader->Run({.PreloadIndex = true, .PreloadData = false})) {
LeftReads += Logic->LoadPages(this, req);
}
}
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/tablet_flat/flat_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1225,7 +1225,7 @@ bool TExecutor::PrepareExternalPart(TPendingPartSwitch &partSwitch, TPendingPart
}

if (auto* stage = bundle.GetStage<TPendingPartSwitch::TLoaderStage>()) {
if (auto fetch = stage->Loader.Run(PreloadTablesData.contains(partSwitch.TableId))) {
if (auto fetch = stage->Loader.Run({.PreloadIndex = true, .PreloadData = PreloadTablesData.contains(partSwitch.TableId)})) {
Y_ABORT_UNLESS(fetch.size() == 1, "Cannot handle loads from more than one page collection");

for (auto req : fetch) {
Expand Down Expand Up @@ -2827,9 +2827,9 @@ void TExecutor::Handle(NSharedCache::TEvResult::TPtr &ev) {
void TExecutor::Handle(NSharedCache::TEvUpdated::TPtr &ev) {
const auto *msg = ev->Get();

for (auto &kv : msg->Actions) {
for (auto &kv : msg->DroppedPages) {
if (auto *info = PrivatePageCache->Info(kv.first)) {
for (ui32 pageId : kv.second.Dropped) {
for (ui32 pageId : kv.second) {
PrivatePageCache->DropSharedBody(info, pageId);
}
}
Expand Down
45 changes: 27 additions & 18 deletions ydb/core/tablet_flat/flat_executor_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6285,13 +6285,22 @@ Y_UNIT_TEST_SUITE(TFlatTableExecutor_StickyPages) {
env->Send(MakeSharedPageCacheId(), TActorId{}, new NMemory::TEvConsumerLimit(0));
}

void SetupEnvironment(TMyEnvBase &env, std::optional<bool> bTreeIndex = {}) {
env->SetLogPriority(NKikimrServices::TABLET_SAUSAGECACHE, NActors::NLog::PRI_TRACE);
env->SetLogPriority(NKikimrServices::TABLET_EXECUTOR, NActors::NLog::PRI_TRACE);

if (bTreeIndex.has_value()) {
auto &appData = env->GetAppData();
appData.FeatureFlags.SetEnableLocalDBBtreeIndex(bTreeIndex.value());
appData.FeatureFlags.SetEnableLocalDBFlatIndex(!bTreeIndex.value());
}
}

Y_UNIT_TEST(TestNonSticky_FlatIndex) {
TMyEnvBase env;
TRowsModel rows;

auto &appData = env->GetAppData();
appData.FeatureFlags.SetEnableLocalDBBtreeIndex(false);
appData.FeatureFlags.SetEnableLocalDBFlatIndex(true);
SetupEnvironment(env, false);

env.FireDummyTablet(ui32(NFake::TDummy::EFlg::Comp));
ZeroSharedCache(env);
Expand Down Expand Up @@ -6326,9 +6335,7 @@ Y_UNIT_TEST_SUITE(TFlatTableExecutor_StickyPages) {
TMyEnvBase env;
TRowsModel rows;

auto &appData = env->GetAppData();
appData.FeatureFlags.SetEnableLocalDBBtreeIndex(true);
appData.FeatureFlags.SetEnableLocalDBFlatIndex(false);
SetupEnvironment(env, true);

env.FireDummyTablet(ui32(NFake::TDummy::EFlg::Comp));
ZeroSharedCache(env);
Expand Down Expand Up @@ -6363,6 +6370,8 @@ Y_UNIT_TEST_SUITE(TFlatTableExecutor_StickyPages) {
TMyEnvBase env;
TRowsModel rows;

SetupEnvironment(env);

env.FireDummyTablet(ui32(NFake::TDummy::EFlg::Comp));
ZeroSharedCache(env);

Expand Down Expand Up @@ -6395,9 +6404,7 @@ Y_UNIT_TEST_SUITE(TFlatTableExecutor_StickyPages) {
TMyEnvBase env;
TRowsModel rows;

auto &appData = env->GetAppData();
appData.FeatureFlags.SetEnableLocalDBBtreeIndex(false);
appData.FeatureFlags.SetEnableLocalDBFlatIndex(true);
SetupEnvironment(env, false);

env.FireDummyTablet(ui32(NFake::TDummy::EFlg::Comp));
ZeroSharedCache(env);
Expand Down Expand Up @@ -6432,9 +6439,7 @@ Y_UNIT_TEST_SUITE(TFlatTableExecutor_StickyPages) {
TMyEnvBase env;
TRowsModel rows;

auto &appData = env->GetAppData();
appData.FeatureFlags.SetEnableLocalDBBtreeIndex(true);
appData.FeatureFlags.SetEnableLocalDBFlatIndex(false);
SetupEnvironment(env, true);

env.FireDummyTablet(ui32(NFake::TDummy::EFlg::Comp));
ZeroSharedCache(env);
Expand Down Expand Up @@ -6469,6 +6474,8 @@ Y_UNIT_TEST_SUITE(TFlatTableExecutor_StickyPages) {
TMyEnvBase env;
TRowsModel rows;

SetupEnvironment(env);

env.FireDummyTablet(ui32(NFake::TDummy::EFlg::Comp));
ZeroSharedCache(env);

Expand Down Expand Up @@ -6501,9 +6508,7 @@ Y_UNIT_TEST_SUITE(TFlatTableExecutor_StickyPages) {
TMyEnvBase env;
TRowsModel rows;

auto &appData = env->GetAppData();
appData.FeatureFlags.SetEnableLocalDBBtreeIndex(false);
appData.FeatureFlags.SetEnableLocalDBFlatIndex(true);
SetupEnvironment(env, false);

env.FireDummyTablet(ui32(NFake::TDummy::EFlg::Comp));
ZeroSharedCache(env);
Expand Down Expand Up @@ -6540,9 +6545,7 @@ Y_UNIT_TEST_SUITE(TFlatTableExecutor_StickyPages) {
TMyEnvBase env;
TRowsModel rows;

auto &appData = env->GetAppData();
appData.FeatureFlags.SetEnableLocalDBBtreeIndex(true);
appData.FeatureFlags.SetEnableLocalDBFlatIndex(false);
SetupEnvironment(env, true);

env.FireDummyTablet(ui32(NFake::TDummy::EFlg::Comp));
ZeroSharedCache(env);
Expand Down Expand Up @@ -6579,6 +6582,8 @@ Y_UNIT_TEST_SUITE(TFlatTableExecutor_StickyPages) {
TMyEnvBase env;
TRowsModel rows;

SetupEnvironment(env);

env.FireDummyTablet(ui32(NFake::TDummy::EFlg::Comp));
ZeroSharedCache(env);

Expand Down Expand Up @@ -6612,6 +6617,8 @@ Y_UNIT_TEST_SUITE(TFlatTableExecutor_StickyPages) {
TMyEnvBase env;
TRowsModel rows;

SetupEnvironment(env);

env.FireDummyTablet(ui32(NFake::TDummy::EFlg::Comp));
ZeroSharedCache(env);

Expand Down Expand Up @@ -6647,6 +6654,8 @@ Y_UNIT_TEST_SUITE(TFlatTableExecutor_StickyPages) {
TMyEnvBase env;
TRowsModel rows;

SetupEnvironment(env);

env.FireDummyTablet(ui32(NFake::TDummy::EFlg::Comp));
ZeroSharedCache(env);

Expand Down
18 changes: 15 additions & 3 deletions ydb/core/tablet_flat/flat_ops_compact.h
Original file line number Diff line number Diff line change
Expand Up @@ -368,9 +368,21 @@ namespace NTabletFlatExecutor {
{ },
std::move(result.Overlay));

auto fetch = loader.Run(false);

Y_ABORT_UNLESS(!fetch, "Just compacted part needs to load some pages");
// do not preload index as it may be already offloaded
auto fetch = loader.Run({.PreloadIndex = false, .PreloadData = false});

if (Y_UNLIKELY(fetch)) {
TStringBuilder error;
error << "Just compacted part needs to load pages";
for (auto collection : fetch) {
error << " " << collection->PageCollection->Label().ToString() << ": [ ";
for (auto pageId : collection->Pages) {
error << pageId << " " << (NTable::NPage::EPage)collection->PageCollection->Page(pageId).Type << " ";
}
error << "]";
}
Y_ABORT_S(error);
}

auto& res = prod->Results.emplace_back();
res.Part = loader.Result();
Expand Down
16 changes: 9 additions & 7 deletions ydb/core/tablet_flat/flat_part_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ void TLoader::StageParseMeta() noexcept
}
}

TAutoPtr<NPageCollection::TFetch> TLoader::StageCreatePartView() noexcept
TAutoPtr<NPageCollection::TFetch> TLoader::StageCreatePartView(bool preloadIndex) noexcept
{
Y_ABORT_UNLESS(!PartView, "PartView already initialized in CreatePartView stage");
Y_ABORT_UNLESS(Packs && Packs.front());
Expand All @@ -161,12 +161,14 @@ TAutoPtr<NPageCollection::TFetch> TLoader::StageCreatePartView() noexcept
};

if (BTreeGroupIndexes) {
// Note: preload root nodes only because we don't want to have multiple restarts here
for (const auto& meta : BTreeGroupIndexes) {
if (meta.LevelCount) getPage(meta.GetPageId());
}
for (const auto& meta : BTreeHistoricIndexes) {
if (meta.LevelCount) getPage(meta.GetPageId());
if (preloadIndex) {
// Note: preload root nodes only because we don't want to have multiple restarts here
for (const auto& meta : BTreeGroupIndexes) {
if (meta.LevelCount) getPage(meta.GetPageId());
}
for (const auto& meta : BTreeHistoricIndexes) {
if (meta.LevelCount) getPage(meta.GetPageId());
}
}
} else if (FlatGroupIndexes) {
for (auto indexPageId : FlatGroupIndexes) {
Expand Down
18 changes: 14 additions & 4 deletions ydb/core/tablet_flat/flat_part_loader.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,16 @@ namespace NTable {
THashSet<TPageId> NeedPages;
};

struct TRunOptions {
// Marks that optional index pages should be loaded
//
// Effects only b-tree index as flat index is kept as sticky
bool PreloadIndex = true;

// Marks that all data pages from the main group should be loaded
bool PreloadData = false;
};

TLoader(TPartComponents ou)
: TLoader(TPartStore::Construct(std::move(ou.PageCollectionComponents)),
std::move(ou.Legacy),
Expand All @@ -128,7 +138,7 @@ namespace NTable {
TEpoch epoch = NTable::TEpoch::Max());
~TLoader();

TVector<TAutoPtr<NPageCollection::TFetch>> Run(bool preloadData)
TVector<TAutoPtr<NPageCollection::TFetch>> Run(TRunOptions options)
{
while (Stage < EStage::Result) {
TAutoPtr<NPageCollection::TFetch> fetch;
Expand All @@ -138,7 +148,7 @@ namespace NTable {
StageParseMeta();
break;
case EStage::PartView:
fetch = StageCreatePartView();
fetch = StageCreatePartView(options.PreloadIndex);
break;
case EStage::Slice:
fetch = StageSliceBounds();
Expand All @@ -147,7 +157,7 @@ namespace NTable {
StageDeltas();
break;
case EStage::PreloadData:
if (preloadData) {
if (options.PreloadData) {
fetch = StagePreloadData();
}
break;
Expand Down Expand Up @@ -241,7 +251,7 @@ namespace NTable {
}

void StageParseMeta() noexcept;
TAutoPtr<NPageCollection::TFetch> StageCreatePartView() noexcept;
TAutoPtr<NPageCollection::TFetch> StageCreatePartView(bool preloadIndex) noexcept;
TAutoPtr<NPageCollection::TFetch> StageSliceBounds() noexcept;
void StageDeltas() noexcept;
TAutoPtr<NPageCollection::TFetch> StagePreloadData() noexcept;
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/tablet_flat/flat_sausagecache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ void TPrivatePageCache::RegisterPageCollection(TIntrusivePtr<TInfo> info) {
Stats.TotalPinnedBody += page->Size;

TryUnload(page);
// notify shared cache that we have a page handle
ToTouchShared[page->Info->Id].insert(page->Id);
Y_DEBUG_ABORT_UNLESS(!page->IsUnnecessary());
}

Expand Down Expand Up @@ -285,6 +287,7 @@ const TSharedData* TPrivatePageCache::Lookup(TPageId pageId, TInfo *info) {
}

if (page->Empty()) {
Y_DEBUG_ABORT_UNLESS(info->GetPageType(page->Id) != EPage::FlatIndex, "Flat index pages should have been sticked and preloaded");
ToLoad.PushBack(page);
Stats.CurrentCacheMisses++;
}
Expand Down
17 changes: 12 additions & 5 deletions ydb/core/tablet_flat/flat_sausagecache.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,19 @@ class TPrivatePageCache {
return Info->IsSticky(Id);
}

void Fill(TSharedPageRef shared) {
SharedBody = std::move(shared);
void Fill(TSharedPageRef sharedBody) {
SharedBody = std::move(sharedBody);
LoadState = LoadStateLoaded;
PinnedBody = TPinnedPageRef(SharedBody).GetData();
}

void ProvideSharedBody(TSharedPageRef sharedBody) {
SharedBody = std::move(sharedBody);
SharedBody.UnUse();
LoadState = LoadStateNo;
PinnedBody = { };
}

const TSharedData* GetPinnedBody() const noexcept {
return LoadState == LoadStateLoaded ? &PinnedBody : nullptr;
}
Expand Down Expand Up @@ -118,11 +125,11 @@ class TPrivatePageCache {
}

// Note: this method is only called during a page collection creation
void Fill(TPageId pageId, TSharedPageRef page, bool sticky) noexcept {
void Fill(TPageId pageId, TSharedPageRef sharedBody, bool sticky) noexcept {
if (sticky) {
AddSticky(pageId, page);
AddSticky(pageId, sharedBody);
}
EnsurePage(pageId)->Fill(std::move(page));
EnsurePage(pageId)->ProvideSharedBody(std::move(sharedBody));
}

void AddSticky(TPageId pageId, TSharedPageRef page) noexcept {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tablet_flat/flat_scan_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ namespace NOps {
}

void RunLoader() {
for (auto req : Loader->Run(false)) {
for (auto req : Loader->Run({.PreloadIndex = false, .PreloadData = false})) {
Send(Owner, new TEvPrivate::TEvLoadPages(std::move(req)));
++ReadsLeft;
}
Expand Down
6 changes: 1 addition & 5 deletions ydb/core/tablet_flat/shared_cache_events.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,11 +135,7 @@ namespace NKikimr::NSharedCache {
};

struct TEvUpdated : public TEventLocal<TEvUpdated, EvUpdated> {
struct TActions {
THashSet<ui32> Dropped;
};

THashMap<TLogoBlobID, TActions> Actions;
THashMap<TLogoBlobID, THashSet<TPageId>> DroppedPages;
};
}

Expand Down
Loading
Loading