Skip to content

Forward cache bugfix index pages queue verify #3134

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Mar 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
219 changes: 213 additions & 6 deletions ydb/core/tablet_flat/flat_executor_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -297,32 +297,38 @@ class TDummyScan : public TActor<TDummyScan>, public NTable::IScan {
if (seq && Abort == EAbort::None)
return EScan::Final;

lead.To(Scheme->Tags(), { }, NTable::ESeek::Lower);
lead.To(Scheme->Tags(), LeadKey, NTable::ESeek::Lower);
if (LeadKey) {
ExpectedRowId = LeadKey[0].AsValue<ui64>();
}
return EScan::Feed;
}

EScan Feed(TArrayRef<const TCell> key, const TRow &) noexcept override
{
UNIT_ASSERT_VALUES_EQUAL(key[0].AsValue<ui64>(), ExpectedRowId);
Y_ABORT_UNLESS(key[0].AsValue<ui64>() == ExpectedRowId);
++ExpectedRowId;
++StoredRows;
return EScan::Feed;
}

TAutoPtr<IDestructable> Finish(EAbort abort) noexcept override
{
UNIT_ASSERT_VALUES_EQUAL((int)Abort, (int)abort);
Y_ABORT_UNLESS((int)Abort == (int)abort);

auto ctx = ActorContext();
if (abort == EAbort::None) {
UNIT_ASSERT_VALUES_EQUAL(ExpectedRows, StoredRows);
Y_ABORT_UNLESS(ExpectedRows == StoredRows);
}

Die(ctx);

return new TDummyResult(StoredRows, ExpectedRows);
}

public:
TArrayRef<const TCell> LeadKey;

private:
TActorId Tablet;
IDriver *Driver = nullptr;
Expand Down Expand Up @@ -369,6 +375,9 @@ struct TEvTestFlatTablet {
NTable::EAbort Abort;
const TRowVersion ReadVersion;
const ui32 ExpectRows = 0;

std::optional<std::pair<ui64, ui64>> ReadAhead;
TArrayRef<const TCell> LeadKey;
};
struct TEvStartQueuedScan : public TEventLocal<TEvStartQueuedScan, EvStartQueuedScan> {};
struct TEvMakeScanSnapshot : public TEventLocal<TEvMakeScanSnapshot, EvMakeScanSnapshot> {};
Expand Down Expand Up @@ -442,13 +451,17 @@ class TTestFlatTablet : public TActor<TTestFlatTablet>, public TTabletExecutedFl
auto abort = ev->Get()->Abort;
auto rows = abort != NTable::EAbort::None ? 0 : ev->Get()->ExpectRows;
Scan = new TDummyScan(SelfId(), postpone, abort, rows);
Scan->LeadKey = ev->Get()->LeadKey;
TScanOptions options;
if (snap) {
Y_ABORT_UNLESS(ev->Get()->ReadVersion.IsMax(), "Cannot combine multiple snapshot techniques");
options.SetSnapshotId(snap);
} else if (!ev->Get()->ReadVersion.IsMax()) {
options.SetSnapshotRowVersion(ev->Get()->ReadVersion);
}
if (auto readAhead = ev->Get()->ReadAhead) {
options.SetReadAhead(readAhead->first, readAhead->second);
}
ScanTaskId = Executor()->QueueScan(TRowsModel::TableId, Scan, ScanCookie, options);
}

Expand Down Expand Up @@ -5020,7 +5033,14 @@ Y_UNIT_TEST_SUITE(TFlatTableExecutorIndexLoading) {
}
};

Y_UNIT_TEST(TestPrechargeAndSeek_FlatIndex) {
void ZeroSharedCache(TMyEnvBase &env) {
env.Env.GetMemObserver()->NotifyStat({1, 1, 1});
TDispatchOptions options;
options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(NSharedCache::EvMem, 1));
env->DispatchEvents(options);
}

Y_UNIT_TEST(PrechargeAndSeek_FlatIndex) {
TMyEnvBase env;
TRowsModel rows;

Expand Down Expand Up @@ -5065,7 +5085,7 @@ Y_UNIT_TEST_SUITE(TFlatTableExecutorIndexLoading) {
env.SendSync(new TEvents::TEvPoison, false, true);
}

Y_UNIT_TEST(TestPrechargeAndSeek_BTreeIndex) {
Y_UNIT_TEST(PrechargeAndSeek_BTreeIndex) {
TMyEnvBase env;
TRowsModel rows;

Expand Down Expand Up @@ -5110,6 +5130,193 @@ Y_UNIT_TEST_SUITE(TFlatTableExecutorIndexLoading) {
env.SendSync(new TEvents::TEvPoison, false, true);
}

Y_UNIT_TEST(Scan_BTreeIndex) {
TMyEnvBase env;
TRowsModel rows;
const ui32 rowsCount = 1024;

auto &appData = env->GetAppData();
appData.FeatureFlags.SetEnableLocalDBBtreeIndex(true);
appData.FeatureFlags.SetEnableLocalDBFlatIndex(false);

env->SetLogPriority(NKikimrServices::TABLET_OPS_HOST, NActors::NLog::PRI_DEBUG);

env.FireTablet(env.Edge, env.Tablet, [&env](const TActorId &tablet, TTabletStorageInfo *info) {
return new TTestFlatTablet(env.Edge, tablet, info);
});
env.WaitForWakeUp();
ZeroSharedCache(env);

env.SendSync(rows.MakeScheme(new TCompactionPolicy(), false));

env.SendSync(rows.MakeRows(rowsCount, 10*1024));

env.SendSync(new NFake::TEvCompact(TRowsModel::TableId));
env.WaitFor<NFake::TEvCompacted>();

{ // no read ahead
auto queueScan = new TEvTestFlatTablet::TEvQueueScan(rowsCount);
queueScan->ReadAhead = {1, 1};
env.SendAsync(std::move(queueScan));
env.WaitFor<TEvTestFlatTablet::TEvScanFinished>();
}

{ // small read ahead
auto queueScan = new TEvTestFlatTablet::TEvQueueScan(rowsCount);
queueScan->ReadAhead = {5*10*1024, 10*10*1024};
env.SendAsync(std::move(queueScan));
env.WaitFor<TEvTestFlatTablet::TEvScanFinished>();
}

{ // infinite read ahead
auto queueScan = new TEvTestFlatTablet::TEvQueueScan(rowsCount);
queueScan->ReadAhead = {Max<ui64>(), Max<ui64>()};
env.SendAsync(std::move(queueScan));
env.WaitFor<TEvTestFlatTablet::TEvScanFinished>();
}

for (ui64 leadKey = 1; ; leadKey += rowsCount / 10) {
ui64 expectedRowsCount = rowsCount > leadKey ? rowsCount - leadKey + 1 : 0;
auto queueScan = new TEvTestFlatTablet::TEvQueueScan(expectedRowsCount);
queueScan->ReadAhead = {5*10*1024, 10*10*1024};
TVector<TCell> leadKey_ = {TCell::Make(leadKey)};
queueScan->LeadKey = leadKey_;
env.SendAsync(std::move(queueScan));
env.WaitFor<TEvTestFlatTablet::TEvScanFinished>();
if (!expectedRowsCount) {
break;
}
}

// If we didn't crash, then assume the test succeeded
env.SendSync(new TEvents::TEvPoison, false, true);
}

Y_UNIT_TEST(Scan_History_BTreeIndex) {
TMyEnvBase env;
TRowsModel rows;
const ui32 rowsCount = 1024;

auto &appData = env->GetAppData();
appData.FeatureFlags.SetEnableLocalDBBtreeIndex(true);
appData.FeatureFlags.SetEnableLocalDBFlatIndex(false);

env->SetLogPriority(NKikimrServices::TABLET_OPS_HOST, NActors::NLog::PRI_DEBUG);

env.FireTablet(env.Edge, env.Tablet, [&env](const TActorId &tablet, TTabletStorageInfo *info) {
return new TTestFlatTablet(env.Edge, tablet, info);
});
env.WaitForWakeUp();
ZeroSharedCache(env);

env.SendSync(rows.MakeScheme(new TCompactionPolicy(), false));

env.SendSync(rows.RowTo(1).VersionTo(TRowVersion(1, 10)).MakeRows(rowsCount, 10*1024));
env.SendSync(rows.RowTo(1).VersionTo(TRowVersion(2, 20)).MakeRows(rowsCount, 10*1024));

env.SendSync(new NFake::TEvCompact(TRowsModel::TableId));
env.WaitFor<NFake::TEvCompacted>();

{ // no read ahead
auto queueScan = new TEvTestFlatTablet::TEvQueueScan(rowsCount, TRowVersion(2, 0));
queueScan->ReadAhead = {1, 1};
env.SendAsync(std::move(queueScan));
env.WaitFor<TEvTestFlatTablet::TEvScanFinished>();
}

{ // small read ahead
auto queueScan = new TEvTestFlatTablet::TEvQueueScan(rowsCount, TRowVersion(2, 0));
queueScan->ReadAhead = {5*10*1024, 10*10*1024};
env.SendAsync(std::move(queueScan));
env.WaitFor<TEvTestFlatTablet::TEvScanFinished>();
}

{ // infinite read ahead
auto queueScan = new TEvTestFlatTablet::TEvQueueScan(rowsCount, TRowVersion(2, 0));
queueScan->ReadAhead = {Max<ui64>(), Max<ui64>()};
env.SendAsync(std::move(queueScan));
env.WaitFor<TEvTestFlatTablet::TEvScanFinished>();
}

for (ui64 leadKey = 1; ; leadKey += rowsCount / 10) {
ui64 expectedRowsCount = rowsCount > leadKey ? rowsCount - leadKey + 1 : 0;
auto queueScan = new TEvTestFlatTablet::TEvQueueScan(expectedRowsCount, TRowVersion(2, 0));
queueScan->ReadAhead = {5*10*1024, 10*10*1024};
TVector<TCell> leadKey_ = {TCell::Make(leadKey)};
queueScan->LeadKey = leadKey_;
env.SendAsync(std::move(queueScan));
env.WaitFor<TEvTestFlatTablet::TEvScanFinished>();
if (!expectedRowsCount) {
break;
}
}

// If we didn't crash, then assume the test succeeded
env.SendSync(new TEvents::TEvPoison, false, true);
}

Y_UNIT_TEST(Scan_Groups_BTreeIndex) {
TMyEnvBase env;
TRowsModel rows;
const ui32 rowsCount = 1024;

auto &appData = env->GetAppData();
appData.FeatureFlags.SetEnableLocalDBBtreeIndex(true);
appData.FeatureFlags.SetEnableLocalDBFlatIndex(false);

env->SetLogPriority(NKikimrServices::TABLET_OPS_HOST, NActors::NLog::PRI_DEBUG);

env.FireTablet(env.Edge, env.Tablet, [&env](const TActorId &tablet, TTabletStorageInfo *info) {
return new TTestFlatTablet(env.Edge, tablet, info);
});
env.WaitForWakeUp();
ZeroSharedCache(env);

env.SendSync(rows.MakeScheme(new TCompactionPolicy(), true));

env.SendSync(rows.MakeRows(rowsCount, 10*1024));

env.SendSync(new NFake::TEvCompact(TRowsModel::TableId));
env.WaitFor<NFake::TEvCompacted>();

{ // no read ahead
auto queueScan = new TEvTestFlatTablet::TEvQueueScan(rowsCount);
queueScan->ReadAhead = {1, 1};
env.SendAsync(std::move(queueScan));
env.WaitFor<TEvTestFlatTablet::TEvScanFinished>();
}

{ // small read ahead
auto queueScan = new TEvTestFlatTablet::TEvQueueScan(rowsCount);
queueScan->ReadAhead = {5*10*1024, 10*10*1024};
env.SendAsync(std::move(queueScan));
env.WaitFor<TEvTestFlatTablet::TEvScanFinished>();
}

{ // infinite read ahead
auto queueScan = new TEvTestFlatTablet::TEvQueueScan(rowsCount);
queueScan->ReadAhead = {Max<ui64>(), Max<ui64>()};
env.SendAsync(std::move(queueScan));
env.WaitFor<TEvTestFlatTablet::TEvScanFinished>();
}

for (ui64 leadKey = 1; ; leadKey += rowsCount / 10) {
ui64 expectedRowsCount = rowsCount > leadKey ? rowsCount - leadKey + 1 : 0;
auto queueScan = new TEvTestFlatTablet::TEvQueueScan(expectedRowsCount);
queueScan->ReadAhead = {5*10*1024, 10*10*1024};
TVector<TCell> leadKey_ = {TCell::Make(leadKey)};
queueScan->LeadKey = leadKey_;
env.SendAsync(std::move(queueScan));
env.WaitFor<TEvTestFlatTablet::TEvScanFinished>();
if (!expectedRowsCount) {
break;
}
}

// If we didn't crash, then assume the test succeeded
env.SendSync(new TEvents::TEvPoison, false, true);
}

}

Y_UNIT_TEST_SUITE(TFlatTableExecutorStickyPages) {
Expand Down
10 changes: 5 additions & 5 deletions ydb/core/tablet_flat/flat_fwd_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ namespace NFwd {

/*_ Outline blobs materialization conf */

ui32 Edge = Max<ui32>(); /* Outlined blob materialization edge */
ui64 Tablet = 0; /* Use Edge only for this tablet if set */
TVector<ui32> Keys; /* Always materalize this tag values */
ui32 Edge = Max<ui32>(); /* Outlined blob materialization edge */
ui64 Tablet = 0; /* Use Edge only for this tablet if set */
TVector<ui32> Keys; /* Always materialize these tag values */

/*_ Misc features configuation */
/*_ Misc features configuration */

bool Trace = false; /* Track seen blobs used by reference */
bool Trace = false; /* Track seen blobs used by reference */
};

}
Expand Down
21 changes: 11 additions & 10 deletions ydb/core/tablet_flat/flat_fwd_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ namespace NFwd {
TAutoPtr<TFetch> Fetch;
};

struct TEnv: public IPages {
struct TEnv : public IPages {
using TSlot = ui32;
using TSlotVec = TSmallVec<TSlot>;

Expand Down Expand Up @@ -200,15 +200,15 @@ namespace NFwd {
return stat += q->Stat;
};

return
std::accumulate(Queues.begin(), Queues.end(), Total, aggr);
return std::accumulate(Queues.begin(), Queues.end(), Total, aggr);
}

TAutoPtr<TFetch> GrabFetches() noexcept
{
while (auto *q = Queue ? Queue.PopFront() : nullptr) {
if (std::exchange(q->Grow, false))
if (std::exchange(q->Grow, false)) {
(*q)->Forward(q, Max(ui64(1), Conf.AheadHi));
}

if (auto req = std::move(q->Fetch)) {
Y_ABORT_UNLESS(req->Pages, "Shouldn't sent an empty requests");
Expand Down Expand Up @@ -287,7 +287,8 @@ namespace NFwd {
if ((q.Grow = got.Grow) || bool(q.Fetch)) {
Queue.PushBack(&q);
} else if (got.Need && got.Page == nullptr) {
Y_ABORT("Cache line head don't want to do fetch but should");
// temporary hack for index pages as they are always stored in group 0
Y_ABORT_UNLESS(!Queue.Empty(), "Cache line head don't want to do fetch but should");
}

return { got.Need, got.Page };
Expand Down Expand Up @@ -322,7 +323,7 @@ namespace NFwd {
{
auto it = Parts.find(part);
Y_ABORT_UNLESS(it != Parts.end(),
"NFwd cache tyring to access part outside of subset");
"NFwd cache trying to access part outside of subset");
Y_ABORT_UNLESS(room < it->second.size(),
"NFwd cache trying to access room out of bounds");
Y_ABORT_UNLESS(it->second[room] != Max<ui16>(),
Expand Down Expand Up @@ -352,15 +353,15 @@ namespace NFwd {
}
}

TEgg MakeCache(const TPart *part, NPage::TGroupId groupId, TIntrusiveConstPtr<TSlices> bounds) noexcept
TEgg MakeCache(const TPart *part, NPage::TGroupId groupId, TIntrusiveConstPtr<TSlices> slices) noexcept
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Кстати оно в сканах называлось bounds, потому что я хотел подчеркнуть, что грузить что-либо за границами этих строк строго запрещено, там блобов может уже не существовать (шардированный компакшен умел компактить половину sst и удалять блобы от скомпакченных данных). Но в целом я не против переименования.

{
auto *partStore = dynamic_cast<const TPartStore*>(part);
auto *partStore = CheckedCast<const TPartStore*>(part);

Y_ABORT_UNLESS(groupId.Index < partStore->PageCollections.size(), "Got part without enough page collections");

auto& cache = partStore->PageCollections[groupId.Index];

auto* fwd = new NFwd::TCache(part, this, groupId, bounds);
auto* fwd = new NFwd::TCache(part, this, groupId, slices);
return { fwd, cache->PageCollection };
}

Expand Down Expand Up @@ -421,7 +422,7 @@ namespace NFwd {
TDeque<TSimpleEnv> IndexPages;
THashMap<const TPart*, TSlotVec> Parts;
THashSet<const TPart*> ColdParts;
// Wrapper for memable blobs
// Wrapper for memtable blobs
TAutoPtr<TMemTableHandler> MemTable;
// Waiting for read aheads
TIntrusiveList<TPageLoadingQueue> Queue;
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tablet_flat/flat_fwd_warmed.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#pragma once

#include "flat_fwd_conf.h"
#include "flat_fwd_sieve.h"
#include "flat_mem_snapshot.h"
#include "flat_mem_warm.h"
#include "flat_part_screen.h"
#include "flat_part_iface.h"
Expand Down
Loading