Skip to content

Commit 64b9fda

Browse files
authored
Merge 8eea118 into 0380229
2 parents 0380229 + 8eea118 commit 64b9fda

File tree

5 files changed

+318
-107
lines changed

5 files changed

+318
-107
lines changed

ydb/core/tablet_flat/flat_executor_ut.cpp

Lines changed: 213 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -297,32 +297,38 @@ class TDummyScan : public TActor<TDummyScan>, public NTable::IScan {
297297
if (seq && Abort == EAbort::None)
298298
return EScan::Final;
299299

300-
lead.To(Scheme->Tags(), { }, NTable::ESeek::Lower);
300+
lead.To(Scheme->Tags(), LeadKey, NTable::ESeek::Lower);
301+
if (LeadKey) {
302+
ExpectedRowId = LeadKey[0].AsValue<ui64>();
303+
}
301304
return EScan::Feed;
302305
}
303306

304307
EScan Feed(TArrayRef<const TCell> key, const TRow &) noexcept override
305308
{
306-
UNIT_ASSERT_VALUES_EQUAL(key[0].AsValue<ui64>(), ExpectedRowId);
309+
Y_ABORT_UNLESS(key[0].AsValue<ui64>() == ExpectedRowId);
307310
++ExpectedRowId;
308311
++StoredRows;
309312
return EScan::Feed;
310313
}
311314

312315
TAutoPtr<IDestructable> Finish(EAbort abort) noexcept override
313316
{
314-
UNIT_ASSERT_VALUES_EQUAL((int)Abort, (int)abort);
317+
Y_ABORT_UNLESS((int)Abort == (int)abort);
315318

316319
auto ctx = ActorContext();
317320
if (abort == EAbort::None) {
318-
UNIT_ASSERT_VALUES_EQUAL(ExpectedRows, StoredRows);
321+
Y_ABORT_UNLESS(ExpectedRows == StoredRows);
319322
}
320323

321324
Die(ctx);
322325

323326
return new TDummyResult(StoredRows, ExpectedRows);
324327
}
325328

329+
public:
330+
TArrayRef<const TCell> LeadKey;
331+
326332
private:
327333
TActorId Tablet;
328334
IDriver *Driver = nullptr;
@@ -369,6 +375,9 @@ struct TEvTestFlatTablet {
369375
NTable::EAbort Abort;
370376
const TRowVersion ReadVersion;
371377
const ui32 ExpectRows = 0;
378+
379+
std::optional<std::pair<ui64, ui64>> ReadAhead;
380+
TArrayRef<const TCell> LeadKey;
372381
};
373382
struct TEvStartQueuedScan : public TEventLocal<TEvStartQueuedScan, EvStartQueuedScan> {};
374383
struct TEvMakeScanSnapshot : public TEventLocal<TEvMakeScanSnapshot, EvMakeScanSnapshot> {};
@@ -442,13 +451,17 @@ class TTestFlatTablet : public TActor<TTestFlatTablet>, public TTabletExecutedFl
442451
auto abort = ev->Get()->Abort;
443452
auto rows = abort != NTable::EAbort::None ? 0 : ev->Get()->ExpectRows;
444453
Scan = new TDummyScan(SelfId(), postpone, abort, rows);
454+
Scan->LeadKey = ev->Get()->LeadKey;
445455
TScanOptions options;
446456
if (snap) {
447457
Y_ABORT_UNLESS(ev->Get()->ReadVersion.IsMax(), "Cannot combine multiple snapshot techniques");
448458
options.SetSnapshotId(snap);
449459
} else if (!ev->Get()->ReadVersion.IsMax()) {
450460
options.SetSnapshotRowVersion(ev->Get()->ReadVersion);
451461
}
462+
if (auto readAhead = ev->Get()->ReadAhead) {
463+
options.SetReadAhead(readAhead->first, readAhead->second);
464+
}
452465
ScanTaskId = Executor()->QueueScan(TRowsModel::TableId, Scan, ScanCookie, options);
453466
}
454467

@@ -5074,7 +5087,14 @@ Y_UNIT_TEST_SUITE(TFlatTableExecutorIndexLoading) {
50745087
}
50755088
};
50765089

5077-
Y_UNIT_TEST(TestPrechargeAndSeek_FlatIndex) {
5090+
void ZeroSharedCache(TMyEnvBase &env) {
5091+
env.Env.GetMemObserver()->NotifyStat({1, 1, 1});
5092+
TDispatchOptions options;
5093+
options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(NSharedCache::EvMem, 1));
5094+
env->DispatchEvents(options);
5095+
}
5096+
5097+
Y_UNIT_TEST(PrechargeAndSeek_FlatIndex) {
50785098
TMyEnvBase env;
50795099
TRowsModel rows;
50805100

@@ -5119,7 +5139,7 @@ Y_UNIT_TEST_SUITE(TFlatTableExecutorIndexLoading) {
51195139
env.SendSync(new TEvents::TEvPoison, false, true);
51205140
}
51215141

5122-
Y_UNIT_TEST(TestPrechargeAndSeek_BTreeIndex) {
5142+
Y_UNIT_TEST(PrechargeAndSeek_BTreeIndex) {
51235143
TMyEnvBase env;
51245144
TRowsModel rows;
51255145

@@ -5164,6 +5184,193 @@ Y_UNIT_TEST_SUITE(TFlatTableExecutorIndexLoading) {
51645184
env.SendSync(new TEvents::TEvPoison, false, true);
51655185
}
51665186

5187+
Y_UNIT_TEST(Scan_BTreeIndex) {
5188+
TMyEnvBase env;
5189+
TRowsModel rows;
5190+
const ui32 rowsCount = 1024;
5191+
5192+
auto &appData = env->GetAppData();
5193+
appData.FeatureFlags.SetEnableLocalDBBtreeIndex(true);
5194+
appData.FeatureFlags.SetEnableLocalDBFlatIndex(false);
5195+
5196+
env->SetLogPriority(NKikimrServices::TABLET_OPS_HOST, NActors::NLog::PRI_DEBUG);
5197+
5198+
env.FireTablet(env.Edge, env.Tablet, [&env](const TActorId &tablet, TTabletStorageInfo *info) {
5199+
return new TTestFlatTablet(env.Edge, tablet, info);
5200+
});
5201+
env.WaitForWakeUp();
5202+
ZeroSharedCache(env);
5203+
5204+
env.SendSync(rows.MakeScheme(new TCompactionPolicy(), false));
5205+
5206+
env.SendSync(rows.MakeRows(rowsCount, 10*1024));
5207+
5208+
env.SendSync(new NFake::TEvCompact(TRowsModel::TableId));
5209+
env.WaitFor<NFake::TEvCompacted>();
5210+
5211+
{ // no read ahead
5212+
auto queueScan = new TEvTestFlatTablet::TEvQueueScan(rowsCount);
5213+
queueScan->ReadAhead = {1, 1};
5214+
env.SendAsync(std::move(queueScan));
5215+
env.WaitFor<TEvTestFlatTablet::TEvScanFinished>();
5216+
}
5217+
5218+
{ // small read ahead
5219+
auto queueScan = new TEvTestFlatTablet::TEvQueueScan(rowsCount);
5220+
queueScan->ReadAhead = {5*10*1024, 10*10*1024};
5221+
env.SendAsync(std::move(queueScan));
5222+
env.WaitFor<TEvTestFlatTablet::TEvScanFinished>();
5223+
}
5224+
5225+
{ // infinite read ahead
5226+
auto queueScan = new TEvTestFlatTablet::TEvQueueScan(rowsCount);
5227+
queueScan->ReadAhead = {Max<ui64>(), Max<ui64>()};
5228+
env.SendAsync(std::move(queueScan));
5229+
env.WaitFor<TEvTestFlatTablet::TEvScanFinished>();
5230+
}
5231+
5232+
for (ui64 leadKey = 1; ; leadKey += rowsCount / 10) {
5233+
ui64 expectedRowsCount = rowsCount > leadKey ? rowsCount - leadKey + 1 : 0;
5234+
auto queueScan = new TEvTestFlatTablet::TEvQueueScan(expectedRowsCount);
5235+
queueScan->ReadAhead = {5*10*1024, 10*10*1024};
5236+
TVector<TCell> leadKey_ = {TCell::Make(leadKey)};
5237+
queueScan->LeadKey = leadKey_;
5238+
env.SendAsync(std::move(queueScan));
5239+
env.WaitFor<TEvTestFlatTablet::TEvScanFinished>();
5240+
if (!expectedRowsCount) {
5241+
break;
5242+
}
5243+
}
5244+
5245+
// If we didn't crash, then assume the test succeeded
5246+
env.SendSync(new TEvents::TEvPoison, false, true);
5247+
}
5248+
5249+
Y_UNIT_TEST(Scan_History_BTreeIndex) {
5250+
TMyEnvBase env;
5251+
TRowsModel rows;
5252+
const ui32 rowsCount = 1024;
5253+
5254+
auto &appData = env->GetAppData();
5255+
appData.FeatureFlags.SetEnableLocalDBBtreeIndex(true);
5256+
appData.FeatureFlags.SetEnableLocalDBFlatIndex(false);
5257+
5258+
env->SetLogPriority(NKikimrServices::TABLET_OPS_HOST, NActors::NLog::PRI_DEBUG);
5259+
5260+
env.FireTablet(env.Edge, env.Tablet, [&env](const TActorId &tablet, TTabletStorageInfo *info) {
5261+
return new TTestFlatTablet(env.Edge, tablet, info);
5262+
});
5263+
env.WaitForWakeUp();
5264+
ZeroSharedCache(env);
5265+
5266+
env.SendSync(rows.MakeScheme(new TCompactionPolicy(), false));
5267+
5268+
env.SendSync(rows.RowTo(1).VersionTo(TRowVersion(1, 10)).MakeRows(rowsCount, 10*1024));
5269+
env.SendSync(rows.RowTo(1).VersionTo(TRowVersion(2, 20)).MakeRows(rowsCount, 10*1024));
5270+
5271+
env.SendSync(new NFake::TEvCompact(TRowsModel::TableId));
5272+
env.WaitFor<NFake::TEvCompacted>();
5273+
5274+
{ // no read ahead
5275+
auto queueScan = new TEvTestFlatTablet::TEvQueueScan(rowsCount, TRowVersion(2, 0));
5276+
queueScan->ReadAhead = {1, 1};
5277+
env.SendAsync(std::move(queueScan));
5278+
env.WaitFor<TEvTestFlatTablet::TEvScanFinished>();
5279+
}
5280+
5281+
{ // small read ahead
5282+
auto queueScan = new TEvTestFlatTablet::TEvQueueScan(rowsCount, TRowVersion(2, 0));
5283+
queueScan->ReadAhead = {5*10*1024, 10*10*1024};
5284+
env.SendAsync(std::move(queueScan));
5285+
env.WaitFor<TEvTestFlatTablet::TEvScanFinished>();
5286+
}
5287+
5288+
{ // infinite read ahead
5289+
auto queueScan = new TEvTestFlatTablet::TEvQueueScan(rowsCount, TRowVersion(2, 0));
5290+
queueScan->ReadAhead = {Max<ui64>(), Max<ui64>()};
5291+
env.SendAsync(std::move(queueScan));
5292+
env.WaitFor<TEvTestFlatTablet::TEvScanFinished>();
5293+
}
5294+
5295+
for (ui64 leadKey = 1; ; leadKey += rowsCount / 10) {
5296+
ui64 expectedRowsCount = rowsCount > leadKey ? rowsCount - leadKey + 1 : 0;
5297+
auto queueScan = new TEvTestFlatTablet::TEvQueueScan(expectedRowsCount, TRowVersion(2, 0));
5298+
queueScan->ReadAhead = {5*10*1024, 10*10*1024};
5299+
TVector<TCell> leadKey_ = {TCell::Make(leadKey)};
5300+
queueScan->LeadKey = leadKey_;
5301+
env.SendAsync(std::move(queueScan));
5302+
env.WaitFor<TEvTestFlatTablet::TEvScanFinished>();
5303+
if (!expectedRowsCount) {
5304+
break;
5305+
}
5306+
}
5307+
5308+
// If we didn't crash, then assume the test succeeded
5309+
env.SendSync(new TEvents::TEvPoison, false, true);
5310+
}
5311+
5312+
Y_UNIT_TEST(Scan_Groups_BTreeIndex) {
5313+
TMyEnvBase env;
5314+
TRowsModel rows;
5315+
const ui32 rowsCount = 1024;
5316+
5317+
auto &appData = env->GetAppData();
5318+
appData.FeatureFlags.SetEnableLocalDBBtreeIndex(true);
5319+
appData.FeatureFlags.SetEnableLocalDBFlatIndex(false);
5320+
5321+
env->SetLogPriority(NKikimrServices::TABLET_OPS_HOST, NActors::NLog::PRI_DEBUG);
5322+
5323+
env.FireTablet(env.Edge, env.Tablet, [&env](const TActorId &tablet, TTabletStorageInfo *info) {
5324+
return new TTestFlatTablet(env.Edge, tablet, info);
5325+
});
5326+
env.WaitForWakeUp();
5327+
ZeroSharedCache(env);
5328+
5329+
env.SendSync(rows.MakeScheme(new TCompactionPolicy(), true));
5330+
5331+
env.SendSync(rows.MakeRows(rowsCount, 10*1024));
5332+
5333+
env.SendSync(new NFake::TEvCompact(TRowsModel::TableId));
5334+
env.WaitFor<NFake::TEvCompacted>();
5335+
5336+
{ // no read ahead
5337+
auto queueScan = new TEvTestFlatTablet::TEvQueueScan(rowsCount);
5338+
queueScan->ReadAhead = {1, 1};
5339+
env.SendAsync(std::move(queueScan));
5340+
env.WaitFor<TEvTestFlatTablet::TEvScanFinished>();
5341+
}
5342+
5343+
{ // small read ahead
5344+
auto queueScan = new TEvTestFlatTablet::TEvQueueScan(rowsCount);
5345+
queueScan->ReadAhead = {5*10*1024, 10*10*1024};
5346+
env.SendAsync(std::move(queueScan));
5347+
env.WaitFor<TEvTestFlatTablet::TEvScanFinished>();
5348+
}
5349+
5350+
{ // infinite read ahead
5351+
auto queueScan = new TEvTestFlatTablet::TEvQueueScan(rowsCount);
5352+
queueScan->ReadAhead = {Max<ui64>(), Max<ui64>()};
5353+
env.SendAsync(std::move(queueScan));
5354+
env.WaitFor<TEvTestFlatTablet::TEvScanFinished>();
5355+
}
5356+
5357+
for (ui64 leadKey = 1; ; leadKey += rowsCount / 10) {
5358+
ui64 expectedRowsCount = rowsCount > leadKey ? rowsCount - leadKey + 1 : 0;
5359+
auto queueScan = new TEvTestFlatTablet::TEvQueueScan(expectedRowsCount);
5360+
queueScan->ReadAhead = {5*10*1024, 10*10*1024};
5361+
TVector<TCell> leadKey_ = {TCell::Make(leadKey)};
5362+
queueScan->LeadKey = leadKey_;
5363+
env.SendAsync(std::move(queueScan));
5364+
env.WaitFor<TEvTestFlatTablet::TEvScanFinished>();
5365+
if (!expectedRowsCount) {
5366+
break;
5367+
}
5368+
}
5369+
5370+
// If we didn't crash, then assume the test succeeded
5371+
env.SendSync(new TEvents::TEvPoison, false, true);
5372+
}
5373+
51675374
}
51685375

51695376
Y_UNIT_TEST_SUITE(TFlatTableExecutorStickyPages) {

ydb/core/tablet_flat/flat_fwd_conf.h

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,13 @@ namespace NFwd {
1616

1717
/*_ Outline blobs materialization conf */
1818

19-
ui32 Edge = Max<ui32>(); /* Outlined blob materialization edge */
20-
ui64 Tablet = 0; /* Use Edge only for this tablet if set */
21-
TVector<ui32> Keys; /* Always materalize this tag values */
19+
ui32 Edge = Max<ui32>(); /* Outlined blob materialization edge */
20+
ui64 Tablet = 0; /* Use Edge only for this tablet if set */
21+
TVector<ui32> Keys; /* Always materialize these tag values */
2222

23-
/*_ Misc features configuation */
23+
/*_ Misc features configuration */
2424

25-
bool Trace = false; /* Track seen blobs used by reference */
25+
bool Trace = false; /* Track seen blobs used by reference */
2626
};
2727

2828
}

ydb/core/tablet_flat/flat_fwd_env.h

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ namespace NFwd {
6464
TAutoPtr<TFetch> Fetch;
6565
};
6666

67-
struct TEnv: public IPages {
67+
struct TEnv : public IPages {
6868
using TSlot = ui32;
6969
using TSlotVec = TSmallVec<TSlot>;
7070

@@ -200,15 +200,15 @@ namespace NFwd {
200200
return stat += q->Stat;
201201
};
202202

203-
return
204-
std::accumulate(Queues.begin(), Queues.end(), Total, aggr);
203+
return std::accumulate(Queues.begin(), Queues.end(), Total, aggr);
205204
}
206205

207206
TAutoPtr<TFetch> GrabFetches() noexcept
208207
{
209208
while (auto *q = Queue ? Queue.PopFront() : nullptr) {
210-
if (std::exchange(q->Grow, false))
209+
if (std::exchange(q->Grow, false)) {
211210
(*q)->Forward(q, Max(ui64(1), Conf.AheadHi));
211+
}
212212

213213
if (auto req = std::move(q->Fetch)) {
214214
Y_ABORT_UNLESS(req->Pages, "Shouldn't sent an empty requests");
@@ -287,7 +287,8 @@ namespace NFwd {
287287
if ((q.Grow = got.Grow) || bool(q.Fetch)) {
288288
Queue.PushBack(&q);
289289
} else if (got.Need && got.Page == nullptr) {
290-
Y_ABORT("Cache line head don't want to do fetch but should");
290+
// temporary hack for index pages as they are always stored in group 0
291+
Y_ABORT_UNLESS(!Queue.Empty(), "Cache line head don't want to do fetch but should");
291292
}
292293

293294
return { got.Need, got.Page };
@@ -322,7 +323,7 @@ namespace NFwd {
322323
{
323324
auto it = Parts.find(part);
324325
Y_ABORT_UNLESS(it != Parts.end(),
325-
"NFwd cache tyring to access part outside of subset");
326+
"NFwd cache trying to access part outside of subset");
326327
Y_ABORT_UNLESS(room < it->second.size(),
327328
"NFwd cache trying to access room out of bounds");
328329
Y_ABORT_UNLESS(it->second[room] != Max<ui16>(),
@@ -352,15 +353,15 @@ namespace NFwd {
352353
}
353354
}
354355

355-
TEgg MakeCache(const TPart *part, NPage::TGroupId groupId, TIntrusiveConstPtr<TSlices> bounds) noexcept
356+
TEgg MakeCache(const TPart *part, NPage::TGroupId groupId, TIntrusiveConstPtr<TSlices> slices) noexcept
356357
{
357-
auto *partStore = dynamic_cast<const TPartStore*>(part);
358+
auto *partStore = CheckedCast<const TPartStore*>(part);
358359

359360
Y_ABORT_UNLESS(groupId.Index < partStore->PageCollections.size(), "Got part without enough page collections");
360361

361362
auto& cache = partStore->PageCollections[groupId.Index];
362363

363-
auto* fwd = new NFwd::TCache(part, this, groupId, bounds);
364+
auto* fwd = new NFwd::TCache(part, this, groupId, slices);
364365
return { fwd, cache->PageCollection };
365366
}
366367

@@ -421,7 +422,7 @@ namespace NFwd {
421422
TDeque<TSimpleEnv> IndexPages;
422423
THashMap<const TPart*, TSlotVec> Parts;
423424
THashSet<const TPart*> ColdParts;
424-
// Wrapper for memable blobs
425+
// Wrapper for memtable blobs
425426
TAutoPtr<TMemTableHandler> MemTable;
426427
// Waiting for read aheads
427428
TIntrusiveList<TPageLoadingQueue> Queue;

ydb/core/tablet_flat/flat_fwd_warmed.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
#pragma once
22

3-
#include "flat_fwd_conf.h"
43
#include "flat_fwd_sieve.h"
4+
#include "flat_mem_snapshot.h"
55
#include "flat_mem_warm.h"
66
#include "flat_part_screen.h"
77
#include "flat_part_iface.h"

0 commit comments

Comments
 (0)