Skip to content

Commit 47fc644

Browse files
authored
Merge 5ebad3d into b38dea6
2 parents b38dea6 + 5ebad3d commit 47fc644

File tree

5 files changed

+315
-104
lines changed

5 files changed

+315
-104
lines changed

ydb/core/tablet_flat/flat_executor_ut.cpp

Lines changed: 213 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -297,32 +297,38 @@ class TDummyScan : public TActor<TDummyScan>, public NTable::IScan {
297297
if (seq && Abort == EAbort::None)
298298
return EScan::Final;
299299

300-
lead.To(Scheme->Tags(), { }, NTable::ESeek::Lower);
300+
lead.To(Scheme->Tags(), LeadKey, NTable::ESeek::Lower);
301+
if (LeadKey) {
302+
ExpectedRowId = LeadKey[0].AsValue<ui64>();
303+
}
301304
return EScan::Feed;
302305
}
303306

304307
EScan Feed(TArrayRef<const TCell> key, const TRow &) noexcept override
305308
{
306-
UNIT_ASSERT_VALUES_EQUAL(key[0].AsValue<ui64>(), ExpectedRowId);
309+
Y_ABORT_UNLESS(key[0].AsValue<ui64>() == ExpectedRowId);
307310
++ExpectedRowId;
308311
++StoredRows;
309312
return EScan::Feed;
310313
}
311314

312315
TAutoPtr<IDestructable> Finish(EAbort abort) noexcept override
313316
{
314-
UNIT_ASSERT_VALUES_EQUAL((int)Abort, (int)abort);
317+
Y_ABORT_UNLESS((int)Abort == (int)abort);
315318

316319
auto ctx = ActorContext();
317320
if (abort == EAbort::None) {
318-
UNIT_ASSERT_VALUES_EQUAL(ExpectedRows, StoredRows);
321+
Y_ABORT_UNLESS(ExpectedRows == StoredRows);
319322
}
320323

321324
Die(ctx);
322325

323326
return new TDummyResult(StoredRows, ExpectedRows);
324327
}
325328

329+
public:
330+
TArrayRef<const TCell> LeadKey;
331+
326332
private:
327333
TActorId Tablet;
328334
IDriver *Driver = nullptr;
@@ -369,6 +375,9 @@ struct TEvTestFlatTablet {
369375
NTable::EAbort Abort;
370376
const TRowVersion ReadVersion;
371377
const ui32 ExpectRows = 0;
378+
379+
std::optional<std::pair<ui64, ui64>> ReadAhead;
380+
TArrayRef<const TCell> LeadKey;
372381
};
373382
struct TEvStartQueuedScan : public TEventLocal<TEvStartQueuedScan, EvStartQueuedScan> {};
374383
struct TEvMakeScanSnapshot : public TEventLocal<TEvMakeScanSnapshot, EvMakeScanSnapshot> {};
@@ -442,13 +451,17 @@ class TTestFlatTablet : public TActor<TTestFlatTablet>, public TTabletExecutedFl
442451
auto abort = ev->Get()->Abort;
443452
auto rows = abort != NTable::EAbort::None ? 0 : ev->Get()->ExpectRows;
444453
Scan = new TDummyScan(SelfId(), postpone, abort, rows);
454+
Scan->LeadKey = ev->Get()->LeadKey;
445455
TScanOptions options;
446456
if (snap) {
447457
Y_ABORT_UNLESS(ev->Get()->ReadVersion.IsMax(), "Cannot combine multiple snapshot techniques");
448458
options.SetSnapshotId(snap);
449459
} else if (!ev->Get()->ReadVersion.IsMax()) {
450460
options.SetSnapshotRowVersion(ev->Get()->ReadVersion);
451461
}
462+
if (auto readAhead = ev->Get()->ReadAhead) {
463+
options.SetReadAhead(readAhead->first, readAhead->second);
464+
}
452465
ScanTaskId = Executor()->QueueScan(TRowsModel::TableId, Scan, ScanCookie, options);
453466
}
454467

@@ -5020,7 +5033,14 @@ Y_UNIT_TEST_SUITE(TFlatTableExecutorIndexLoading) {
50205033
}
50215034
};
50225035

5023-
Y_UNIT_TEST(TestPrechargeAndSeek_FlatIndex) {
5036+
void ZeroSharedCache(TMyEnvBase &env) {
5037+
env.Env.GetMemObserver()->NotifyStat({1, 1, 1});
5038+
TDispatchOptions options;
5039+
options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(NSharedCache::EvMem, 1));
5040+
env->DispatchEvents(options);
5041+
}
5042+
5043+
Y_UNIT_TEST(PrechargeAndSeek_FlatIndex) {
50245044
TMyEnvBase env;
50255045
TRowsModel rows;
50265046

@@ -5065,7 +5085,7 @@ Y_UNIT_TEST_SUITE(TFlatTableExecutorIndexLoading) {
50655085
env.SendSync(new TEvents::TEvPoison, false, true);
50665086
}
50675087

5068-
Y_UNIT_TEST(TestPrechargeAndSeek_BTreeIndex) {
5088+
Y_UNIT_TEST(PrechargeAndSeek_BTreeIndex) {
50695089
TMyEnvBase env;
50705090
TRowsModel rows;
50715091

@@ -5110,6 +5130,193 @@ Y_UNIT_TEST_SUITE(TFlatTableExecutorIndexLoading) {
51105130
env.SendSync(new TEvents::TEvPoison, false, true);
51115131
}
51125132

5133+
Y_UNIT_TEST(Scan_BTreeIndex) {
5134+
TMyEnvBase env;
5135+
TRowsModel rows;
5136+
const ui32 rowsCount = 1024;
5137+
5138+
auto &appData = env->GetAppData();
5139+
appData.FeatureFlags.SetEnableLocalDBBtreeIndex(true);
5140+
appData.FeatureFlags.SetEnableLocalDBFlatIndex(false);
5141+
5142+
env->SetLogPriority(NKikimrServices::TABLET_OPS_HOST, NActors::NLog::PRI_DEBUG);
5143+
5144+
env.FireTablet(env.Edge, env.Tablet, [&env](const TActorId &tablet, TTabletStorageInfo *info) {
5145+
return new TTestFlatTablet(env.Edge, tablet, info);
5146+
});
5147+
env.WaitForWakeUp();
5148+
ZeroSharedCache(env);
5149+
5150+
env.SendSync(rows.MakeScheme(new TCompactionPolicy(), false));
5151+
5152+
env.SendSync(rows.MakeRows(rowsCount, 10*1024));
5153+
5154+
env.SendSync(new NFake::TEvCompact(TRowsModel::TableId));
5155+
env.WaitFor<NFake::TEvCompacted>();
5156+
5157+
{ // no read ahead
5158+
auto queueScan = new TEvTestFlatTablet::TEvQueueScan(rowsCount);
5159+
queueScan->ReadAhead = {1, 1};
5160+
env.SendAsync(std::move(queueScan));
5161+
env.WaitFor<TEvTestFlatTablet::TEvScanFinished>();
5162+
}
5163+
5164+
{ // small read ahead
5165+
auto queueScan = new TEvTestFlatTablet::TEvQueueScan(rowsCount);
5166+
queueScan->ReadAhead = {5*10*1024, 10*10*1024};
5167+
env.SendAsync(std::move(queueScan));
5168+
env.WaitFor<TEvTestFlatTablet::TEvScanFinished>();
5169+
}
5170+
5171+
{ // infinite read ahead
5172+
auto queueScan = new TEvTestFlatTablet::TEvQueueScan(rowsCount);
5173+
queueScan->ReadAhead = {Max<ui64>(), Max<ui64>()};
5174+
env.SendAsync(std::move(queueScan));
5175+
env.WaitFor<TEvTestFlatTablet::TEvScanFinished>();
5176+
}
5177+
5178+
for (ui64 leadKey = 1; ; leadKey += rowsCount / 10) {
5179+
ui64 expectedRowsCount = rowsCount > leadKey ? rowsCount - leadKey + 1 : 0;
5180+
auto queueScan = new TEvTestFlatTablet::TEvQueueScan(expectedRowsCount);
5181+
queueScan->ReadAhead = {5*10*1024, 10*10*1024};
5182+
TVector<TCell> leadKey_ = {TCell::Make(leadKey)};
5183+
queueScan->LeadKey = leadKey_;
5184+
env.SendAsync(std::move(queueScan));
5185+
env.WaitFor<TEvTestFlatTablet::TEvScanFinished>();
5186+
if (!expectedRowsCount) {
5187+
break;
5188+
}
5189+
}
5190+
5191+
// If we didn't crash, then assume the test succeeded
5192+
env.SendSync(new TEvents::TEvPoison, false, true);
5193+
}
5194+
5195+
Y_UNIT_TEST(Scan_History_BTreeIndex) {
5196+
TMyEnvBase env;
5197+
TRowsModel rows;
5198+
const ui32 rowsCount = 1024;
5199+
5200+
auto &appData = env->GetAppData();
5201+
appData.FeatureFlags.SetEnableLocalDBBtreeIndex(true);
5202+
appData.FeatureFlags.SetEnableLocalDBFlatIndex(false);
5203+
5204+
env->SetLogPriority(NKikimrServices::TABLET_OPS_HOST, NActors::NLog::PRI_DEBUG);
5205+
5206+
env.FireTablet(env.Edge, env.Tablet, [&env](const TActorId &tablet, TTabletStorageInfo *info) {
5207+
return new TTestFlatTablet(env.Edge, tablet, info);
5208+
});
5209+
env.WaitForWakeUp();
5210+
ZeroSharedCache(env);
5211+
5212+
env.SendSync(rows.MakeScheme(new TCompactionPolicy(), false));
5213+
5214+
env.SendSync(rows.RowTo(1).VersionTo(TRowVersion(1, 10)).MakeRows(rowsCount, 10*1024));
5215+
env.SendSync(rows.RowTo(1).VersionTo(TRowVersion(2, 20)).MakeRows(rowsCount, 10*1024));
5216+
5217+
env.SendSync(new NFake::TEvCompact(TRowsModel::TableId));
5218+
env.WaitFor<NFake::TEvCompacted>();
5219+
5220+
{ // no read ahead
5221+
auto queueScan = new TEvTestFlatTablet::TEvQueueScan(rowsCount, TRowVersion(2, 0));
5222+
queueScan->ReadAhead = {1, 1};
5223+
env.SendAsync(std::move(queueScan));
5224+
env.WaitFor<TEvTestFlatTablet::TEvScanFinished>();
5225+
}
5226+
5227+
{ // small read ahead
5228+
auto queueScan = new TEvTestFlatTablet::TEvQueueScan(rowsCount, TRowVersion(2, 0));
5229+
queueScan->ReadAhead = {5*10*1024, 10*10*1024};
5230+
env.SendAsync(std::move(queueScan));
5231+
env.WaitFor<TEvTestFlatTablet::TEvScanFinished>();
5232+
}
5233+
5234+
{ // infinite read ahead
5235+
auto queueScan = new TEvTestFlatTablet::TEvQueueScan(rowsCount, TRowVersion(2, 0));
5236+
queueScan->ReadAhead = {Max<ui64>(), Max<ui64>()};
5237+
env.SendAsync(std::move(queueScan));
5238+
env.WaitFor<TEvTestFlatTablet::TEvScanFinished>();
5239+
}
5240+
5241+
for (ui64 leadKey = 1; ; leadKey += rowsCount / 10) {
5242+
ui64 expectedRowsCount = rowsCount > leadKey ? rowsCount - leadKey + 1 : 0;
5243+
auto queueScan = new TEvTestFlatTablet::TEvQueueScan(expectedRowsCount, TRowVersion(2, 0));
5244+
queueScan->ReadAhead = {5*10*1024, 10*10*1024};
5245+
TVector<TCell> leadKey_ = {TCell::Make(leadKey)};
5246+
queueScan->LeadKey = leadKey_;
5247+
env.SendAsync(std::move(queueScan));
5248+
env.WaitFor<TEvTestFlatTablet::TEvScanFinished>();
5249+
if (!expectedRowsCount) {
5250+
break;
5251+
}
5252+
}
5253+
5254+
// If we didn't crash, then assume the test succeeded
5255+
env.SendSync(new TEvents::TEvPoison, false, true);
5256+
}
5257+
5258+
Y_UNIT_TEST(Scan_Groups_BTreeIndex) {
5259+
TMyEnvBase env;
5260+
TRowsModel rows;
5261+
const ui32 rowsCount = 1024;
5262+
5263+
auto &appData = env->GetAppData();
5264+
appData.FeatureFlags.SetEnableLocalDBBtreeIndex(true);
5265+
appData.FeatureFlags.SetEnableLocalDBFlatIndex(false);
5266+
5267+
env->SetLogPriority(NKikimrServices::TABLET_OPS_HOST, NActors::NLog::PRI_DEBUG);
5268+
5269+
env.FireTablet(env.Edge, env.Tablet, [&env](const TActorId &tablet, TTabletStorageInfo *info) {
5270+
return new TTestFlatTablet(env.Edge, tablet, info);
5271+
});
5272+
env.WaitForWakeUp();
5273+
ZeroSharedCache(env);
5274+
5275+
env.SendSync(rows.MakeScheme(new TCompactionPolicy(), true));
5276+
5277+
env.SendSync(rows.MakeRows(rowsCount, 10*1024));
5278+
5279+
env.SendSync(new NFake::TEvCompact(TRowsModel::TableId));
5280+
env.WaitFor<NFake::TEvCompacted>();
5281+
5282+
{ // no read ahead
5283+
auto queueScan = new TEvTestFlatTablet::TEvQueueScan(rowsCount);
5284+
queueScan->ReadAhead = {1, 1};
5285+
env.SendAsync(std::move(queueScan));
5286+
env.WaitFor<TEvTestFlatTablet::TEvScanFinished>();
5287+
}
5288+
5289+
{ // small read ahead
5290+
auto queueScan = new TEvTestFlatTablet::TEvQueueScan(rowsCount);
5291+
queueScan->ReadAhead = {5*10*1024, 10*10*1024};
5292+
env.SendAsync(std::move(queueScan));
5293+
env.WaitFor<TEvTestFlatTablet::TEvScanFinished>();
5294+
}
5295+
5296+
{ // infinite read ahead
5297+
auto queueScan = new TEvTestFlatTablet::TEvQueueScan(rowsCount);
5298+
queueScan->ReadAhead = {Max<ui64>(), Max<ui64>()};
5299+
env.SendAsync(std::move(queueScan));
5300+
env.WaitFor<TEvTestFlatTablet::TEvScanFinished>();
5301+
}
5302+
5303+
for (ui64 leadKey = 1; ; leadKey += rowsCount / 10) {
5304+
ui64 expectedRowsCount = rowsCount > leadKey ? rowsCount - leadKey + 1 : 0;
5305+
auto queueScan = new TEvTestFlatTablet::TEvQueueScan(expectedRowsCount);
5306+
queueScan->ReadAhead = {5*10*1024, 10*10*1024};
5307+
TVector<TCell> leadKey_ = {TCell::Make(leadKey)};
5308+
queueScan->LeadKey = leadKey_;
5309+
env.SendAsync(std::move(queueScan));
5310+
env.WaitFor<TEvTestFlatTablet::TEvScanFinished>();
5311+
if (!expectedRowsCount) {
5312+
break;
5313+
}
5314+
}
5315+
5316+
// If we didn't crash, then assume the test succeeded
5317+
env.SendSync(new TEvents::TEvPoison, false, true);
5318+
}
5319+
51135320
}
51145321

51155322
Y_UNIT_TEST_SUITE(TFlatTableExecutorStickyPages) {

ydb/core/tablet_flat/flat_fwd_conf.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@ namespace NFwd {
1818

1919
ui32 Edge = Max<ui32>(); /* Outlined blob materialization edge */
2020
ui64 Tablet = 0; /* Use Edge only for this tablet if set */
21-
TVector<ui32> Keys; /* Always materalize this tag values */
21+
TVector<ui32> Keys; /* Always materialize this tag values */
2222

23-
/*_ Misc features configuation */
23+
/*_ Misc features configuration */
2424

2525
bool Trace = false; /* Track seen blobs used by reference */
2626
};

ydb/core/tablet_flat/flat_fwd_env.h

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ namespace NFwd {
6464
TAutoPtr<TFetch> Fetch;
6565
};
6666

67-
struct TEnv: public IPages {
67+
struct TEnv : public IPages {
6868
using TSlot = ui32;
6969
using TSlotVec = TSmallVec<TSlot>;
7070

@@ -200,15 +200,15 @@ namespace NFwd {
200200
return stat += q->Stat;
201201
};
202202

203-
return
204-
std::accumulate(Queues.begin(), Queues.end(), Total, aggr);
203+
return std::accumulate(Queues.begin(), Queues.end(), Total, aggr);
205204
}
206205

207206
TAutoPtr<TFetch> GrabFetches() noexcept
208207
{
209208
while (auto *q = Queue ? Queue.PopFront() : nullptr) {
210-
if (std::exchange(q->Grow, false))
209+
if (std::exchange(q->Grow, false)) {
211210
(*q)->Forward(q, Max(ui64(1), Conf.AheadHi));
211+
}
212212

213213
if (auto req = std::move(q->Fetch)) {
214214
Y_ABORT_UNLESS(req->Pages, "Shouldn't sent an empty requests");
@@ -287,7 +287,8 @@ namespace NFwd {
287287
if ((q.Grow = got.Grow) || bool(q.Fetch)) {
288288
Queue.PushBack(&q);
289289
} else if (got.Need && got.Page == nullptr) {
290-
Y_ABORT("Cache line head don't want to do fetch but should");
290+
// temporary hack for index pages as they are always stored in group 0
291+
Y_ABORT_UNLESS(!Queue.Empty(), "Cache line head don't want to do fetch but should");
291292
}
292293

293294
return { got.Need, got.Page };
@@ -322,7 +323,7 @@ namespace NFwd {
322323
{
323324
auto it = Parts.find(part);
324325
Y_ABORT_UNLESS(it != Parts.end(),
325-
"NFwd cache tyring to access part outside of subset");
326+
"NFwd cache trying to access part outside of subset");
326327
Y_ABORT_UNLESS(room < it->second.size(),
327328
"NFwd cache trying to access room out of bounds");
328329
Y_ABORT_UNLESS(it->second[room] != Max<ui16>(),
@@ -352,15 +353,15 @@ namespace NFwd {
352353
}
353354
}
354355

355-
TEgg MakeCache(const TPart *part, NPage::TGroupId groupId, TIntrusiveConstPtr<TSlices> bounds) noexcept
356+
TEgg MakeCache(const TPart *part, NPage::TGroupId groupId, TIntrusiveConstPtr<TSlices> slices) noexcept
356357
{
357-
auto *partStore = dynamic_cast<const TPartStore*>(part);
358+
auto *partStore = CheckedCast<const TPartStore*>(part);
358359

359360
Y_ABORT_UNLESS(groupId.Index < partStore->PageCollections.size(), "Got part without enough page collections");
360361

361362
auto& cache = partStore->PageCollections[groupId.Index];
362363

363-
auto* fwd = new NFwd::TCache(part, this, groupId, bounds);
364+
auto* fwd = new NFwd::TCache(part, this, groupId, slices);
364365
return { fwd, cache->PageCollection };
365366
}
366367

@@ -421,7 +422,7 @@ namespace NFwd {
421422
TDeque<TSimpleEnv> IndexPages;
422423
THashMap<const TPart*, TSlotVec> Parts;
423424
THashSet<const TPart*> ColdParts;
424-
// Wrapper for memable blobs
425+
// Wrapper for memtable blobs
425426
TAutoPtr<TMemTableHandler> MemTable;
426427
// Waiting for read aheads
427428
TIntrusiveList<TPageLoadingQueue> Queue;

ydb/core/tablet_flat/flat_fwd_warmed.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
#pragma once
22

3-
#include "flat_fwd_conf.h"
43
#include "flat_fwd_sieve.h"
4+
#include "flat_mem_snapshot.h"
55
#include "flat_mem_warm.h"
66
#include "flat_part_screen.h"
77
#include "flat_part_iface.h"

0 commit comments

Comments
 (0)