@@ -13,16 +13,30 @@ enum : ui32 {
1313 ValueColumnId = 2 ,
1414};
1515
16+ using TRetriedCounters = TVector<ui32>;
17+ using namespace NSharedCache ;
18+
19+ void Increment (TRetriedCounters& retried, ui32 attempts) {
20+ if (attempts >= retried.size ()) {
21+ retried.resize (attempts + 1 );
22+ }
23+ retried.at (attempts)++;
24+ }
25+
1626struct TTxInitSchema : public ITransaction {
1727 bool Execute (TTransactionContext& txc, const TActorContext&) override {
1828 if (txc.DB .GetScheme ().GetTableInfo (TableId))
1929 return true ;
2030
31+ TCompactionPolicy policy;
32+ policy.MinBTreeIndexNodeSize = 128 ;
33+
2134 txc.DB .Alter ()
2235 .AddTable (" test" + ToString (ui32 (TableId)), TableId)
2336 .AddColumn (TableId, " key" , KeyColumnId, NScheme::TInt64::TypeId, false )
2437 .AddColumn (TableId, " value" , ValueColumnId, NScheme::TString::TypeId, false )
25- .AddColumnToKey (TableId, KeyColumnId);
38+ .AddColumnToKey (TableId, KeyColumnId)
39+ .SetCompactionPolicy (TableId, policy);
2640
2741 return true ;
2842 }
@@ -59,12 +73,18 @@ struct TTxWriteRow : public ITransaction {
5973
6074struct TTxReadRow : public ITransaction {
6175 i64 Key;
76+ TRetriedCounters& Retried;
77+ ui32 Attempts = 0 ;
6278
63- explicit TTxReadRow (i64 key)
79+ explicit TTxReadRow (i64 key, TRetriedCounters& retried )
6480 : Key(key)
81+ , Retried(retried)
6582 { }
6683
6784 bool Execute (TTransactionContext& txc, const TActorContext&) override {
85+ Increment (Retried, Attempts);
86+ Attempts++;
87+
6888 TVector<TRawTypeValue> rawKey;
6989 rawKey.emplace_back (&Key, sizeof (Key), NScheme::TTypeInfo (NScheme::TInt64::TypeId));
7090
@@ -99,6 +119,15 @@ void WaitEvent(TMyEnvBase& env, ui32 eventType, ui32 requiredCount = 1) {
99119 env->DispatchEvents (options);
100120}
101121
122+ void RestartAndClearCache (TMyEnvBase& env) {
123+ env.SendSync (new TEvents::TEvPoison, false , true );
124+ env->Send (MakeSharedPageCacheId (), TActorId{}, new NMemory::TEvConsumerLimit (0_MB));
125+ WaitEvent (env, NMemory::EvConsumerLimit);
126+ env->Send (MakeSharedPageCacheId (), TActorId{}, new NMemory::TEvConsumerLimit (8_MB));
127+ WaitEvent (env, NMemory::EvConsumerLimit);
128+ env.FireDummyTablet (ui32 (NFake::TDummy::EFlg::Comp));
129+ }
130+
102131Y_UNIT_TEST (Limits) {
103132 TMyEnvBase env;
104133 auto counters = MakeIntrusive<TSharedPageCacheCounters>(env->GetDynamicCounters ());
@@ -109,7 +138,7 @@ Y_UNIT_TEST(Limits) {
109138 env.FireDummyTablet (ui32 (NFake::TDummy::EFlg::Comp));
110139 env.SendSync (new NFake::TEvExecute{ new TTxInitSchema () });
111140
112- // write 300 rows, each ~100KB
141+ // write 300 rows, each ~100KB (~30MB)
113142 for (i64 key = 0 ; key < 300 ; ++key) {
114143 TString value (size_t (100 * 1024 ), char (' a' + key % 26 ));
115144 env.SendSync (new NFake::TEvExecute{ new TTxWriteRow (key, std::move (value)) });
@@ -120,8 +149,9 @@ Y_UNIT_TEST(Limits) {
120149 Cerr << " ...waiting until compacted" << Endl;
121150 env.WaitFor <NFake::TEvCompacted>();
122151
152+ TRetriedCounters retried;
123153 for (i64 key = 0 ; key < 100 ; ++key) {
124- env.SendSync (new NFake::TEvExecute{ new TTxReadRow (key) });
154+ env.SendSync (new NFake::TEvExecute{ new TTxReadRow (key, retried ) });
125155 }
126156 LogCounters (counters);
127157 UNIT_ASSERT_VALUES_EQUAL (counters->LoadInFlyBytes ->Val (), 0 );
@@ -161,6 +191,199 @@ Y_UNIT_TEST(Limits) {
161191 UNIT_ASSERT_VALUES_EQUAL (counters->MemLimitBytes ->Val (), counters->ActiveLimitBytes ->Val ());
162192}
163193
194+ Y_UNIT_TEST (ThreeLeveledLRU) {
195+ TMyEnvBase env;
196+ auto counters = MakeIntrusive<TSharedPageCacheCounters>(env->GetDynamicCounters ());
197+
198+ env.FireDummyTablet (ui32 (NFake::TDummy::EFlg::Comp));
199+ env.SendSync (new NFake::TEvExecute{ new TTxInitSchema () });
200+
201+ // write 100 rows, each ~100KB (~10MB)
202+ for (i64 key = 0 ; key < 100 ; ++key) {
203+ TString value (size_t (100 * 1024 ), char (' a' + key % 26 ));
204+ env.SendSync (new NFake::TEvExecute{ new TTxWriteRow (key, std::move (value)) });
205+ }
206+
207+ Cerr << " ...compacting" << Endl;
208+ env.SendSync (new NFake::TEvCompact (TableId));
209+ Cerr << " ...waiting until compacted" << Endl;
210+ env.WaitFor <NFake::TEvCompacted>();
211+
212+ TRetriedCounters retried;
213+ for (i64 key = 99 ; key >= 0 ; --key) {
214+ env.SendSync (new NFake::TEvExecute{ new TTxReadRow (key, retried) });
215+ }
216+ LogCounters (counters);
217+ UNIT_ASSERT_DOUBLES_EQUAL (counters->ActiveBytes ->Val (), static_cast <i64 >(8_MB), static_cast <i64 >(1_MB / 3 ));
218+ UNIT_ASSERT_VALUES_EQUAL (retried, (TVector<ui32>{100 , 45 , 5 }));
219+
220+ RestartAndClearCache (env);
221+
222+ retried = {};
223+ for (i64 key = 0 ; key < 100 ; ++key) {
224+ env.SendSync (new NFake::TEvExecute{ new TTxReadRow (key, retried) }, true );
225+ }
226+ LogCounters (counters);
227+ UNIT_ASSERT_DOUBLES_EQUAL (counters->ActiveBytes ->Val (), static_cast <i64 >(8_MB / 3 * 2 ), static_cast <i64 >(1_MB / 3 )); // 2 full layers (fresh & staging)
228+ UNIT_ASSERT_VALUES_EQUAL (retried, (TVector<ui32>{100 , 100 , 14 , 2 }));
229+
230+ retried = {};
231+ for (i64 key = 99 ; key >= 0 ; --key) {
232+ env.SendSync (new NFake::TEvExecute{ new TTxReadRow (key, retried) }, true );
233+ }
234+ LogCounters (counters);
235+ UNIT_ASSERT_DOUBLES_EQUAL (counters->ActiveBytes ->Val (), static_cast <i64 >(8_MB), static_cast <i64 >(1_MB / 3 ));
236+ UNIT_ASSERT_VALUES_EQUAL (retried, (TVector<ui32>{100 , 44 , 6 }));
237+
238+ retried = {};
239+ for (i64 key = 99 ; key >= 0 ; --key) {
240+ env.SendSync (new NFake::TEvExecute{ new TTxReadRow (key, retried) }, true );
241+ }
242+ LogCounters (counters);
243+ UNIT_ASSERT_DOUBLES_EQUAL (counters->ActiveBytes ->Val (), static_cast <i64 >(8_MB), static_cast <i64 >(1_MB / 3 ));
244+ UNIT_ASSERT_VALUES_EQUAL (retried, (TVector<ui32>{100 , 73 , 10 , 1 }));
245+
246+ RestartAndClearCache (env);
247+
248+ // read some key twice
249+ retried = {};
250+ env.SendSync (new NFake::TEvExecute{ new TTxReadRow (0 , retried) }, true );
251+ UNIT_ASSERT_VALUES_EQUAL (retried, (TVector<ui32>{1 , 1 , 1 , 1 }));
252+ retried = {};
253+ env.SendSync (new NFake::TEvExecute{ new TTxReadRow (0 , retried) }, true );
254+ UNIT_ASSERT_VALUES_EQUAL (retried, (TVector<ui32>{1 }));
255+
256+ // simulate scan
257+ retried = {};
258+ for (i64 key = 1 ; key < 100 ; ++key) {
259+ env.SendSync (new NFake::TEvExecute{ new TTxReadRow (key, retried) }, true );
260+ }
261+ LogCounters (counters);
262+ UNIT_ASSERT_DOUBLES_EQUAL (counters->ActiveBytes ->Val (), static_cast <i64 >(8_MB / 3 * 2 ), static_cast <i64 >(1_MB / 3 )); // 2 full layers (fresh & staging)
263+ UNIT_ASSERT_VALUES_EQUAL (retried, (TVector<ui32>{99 , 99 , 13 , 1 }));
264+
265+ // read the key again
266+ retried = {};
267+ env.SendSync (new NFake::TEvExecute{ new TTxReadRow (0 , retried) }, true );
268+ UNIT_ASSERT_VALUES_EQUAL (retried, (TVector<ui32>{1 , 1 , 1 }));
269+ }
270+
271+ Y_UNIT_TEST (S3FIFO) {
272+ TMyEnvBase env;
273+ auto counters = MakeIntrusive<TSharedPageCacheCounters>(env->GetDynamicCounters ());
274+
275+ env.FireDummyTablet (ui32 (NFake::TDummy::EFlg::Comp));
276+ env.SendSync (new NFake::TEvExecute{ new TTxInitSchema () });
277+
278+ env->Send (MakeSharedPageCacheId (), TActorId{}, new TEvReplacementPolicySwitch (NKikimrSharedCache::S3FIFO));
279+ WaitEvent (env, NSharedCache::EvReplacementPolicySwitch);
280+
281+ // write 100 rows, each ~100KB (~10MB)
282+ for (i64 key = 0 ; key < 100 ; ++key) {
283+ TString value (size_t (100 * 1024 ), char (' a' + key % 26 ));
284+ env.SendSync (new NFake::TEvExecute{ new TTxWriteRow (key, std::move (value)) });
285+ }
286+
287+ Cerr << " ...compacting" << Endl;
288+ env.SendSync (new NFake::TEvCompact (TableId));
289+ Cerr << " ...waiting until compacted" << Endl;
290+ env.WaitFor <NFake::TEvCompacted>();
291+
292+ TRetriedCounters retried;
293+ for (i64 key = 99 ; key >= 0 ; --key) {
294+ env.SendSync (new NFake::TEvExecute{ new TTxReadRow (key, retried) });
295+ }
296+ LogCounters (counters);
297+ UNIT_ASSERT_DOUBLES_EQUAL (counters->ActiveBytes ->Val (), static_cast <i64 >(8_MB), static_cast <i64 >(1_MB / 3 ));
298+ UNIT_ASSERT_VALUES_EQUAL (retried, (TVector<ui32>{100 , 92 , 12 }));
299+
300+ RestartAndClearCache (env);
301+
302+ retried = {};
303+ for (i64 key = 0 ; key < 100 ; ++key) {
304+ env.SendSync (new NFake::TEvExecute{ new TTxReadRow (key, retried) }, true );
305+ }
306+ LogCounters (counters);
307+ UNIT_ASSERT_DOUBLES_EQUAL (counters->ActiveBytes ->Val (), static_cast <i64 >(8_MB / 10 ), static_cast <i64 >(1_MB / 3 ));
308+ UNIT_ASSERT_VALUES_EQUAL (retried, (TVector<ui32>{100 , 100 , 14 , 2 }));
309+
310+ retried = {};
311+ for (i64 key = 99 ; key >= 0 ; --key) {
312+ env.SendSync (new NFake::TEvExecute{ new TTxReadRow (key, retried) }, true );
313+ }
314+ LogCounters (counters);
315+ UNIT_ASSERT_DOUBLES_EQUAL (counters->ActiveBytes ->Val (), static_cast <i64 >(8_MB), static_cast <i64 >(1_MB / 3 ));
316+ UNIT_ASSERT_VALUES_EQUAL (retried, (TVector<ui32>{100 , 92 }));
317+
318+ retried = {};
319+ for (i64 key = 99 ; key >= 0 ; --key) {
320+ env.SendSync (new NFake::TEvExecute{ new TTxReadRow (key, retried) }, true );
321+ }
322+ LogCounters (counters);
323+ UNIT_ASSERT_DOUBLES_EQUAL (counters->ActiveBytes ->Val (), static_cast <i64 >(8_MB), static_cast <i64 >(1_MB / 3 ));
324+ UNIT_ASSERT_VALUES_EQUAL (retried, (TVector<ui32>{100 , 28 }));
325+
326+ RestartAndClearCache (env);
327+
328+ // read some key twice
329+ retried = {};
330+ env.SendSync (new NFake::TEvExecute{ new TTxReadRow (0 , retried) }, true );
331+ UNIT_ASSERT_VALUES_EQUAL (retried, (TVector<ui32>{1 , 1 , 1 , 1 }));
332+ retried = {};
333+ env.SendSync (new NFake::TEvExecute{ new TTxReadRow (0 , retried) }, true );
334+ UNIT_ASSERT_VALUES_EQUAL (retried, (TVector<ui32>{1 }));
335+
336+ // simulate scan
337+ retried = {};
338+ for (i64 key = 1 ; key < 100 ; ++key) {
339+ env.SendSync (new NFake::TEvExecute{ new TTxReadRow (key, retried) }, true );
340+ }
341+ LogCounters (counters);
342+ UNIT_ASSERT_DOUBLES_EQUAL (counters->ActiveBytes ->Val (), static_cast <i64 >(8_MB / 10 ), static_cast <i64 >(1_MB / 3 ));
343+ UNIT_ASSERT_VALUES_EQUAL (retried, (TVector<ui32>{99 , 99 , 13 , 1 }));
344+
345+ // read the key again
346+ retried = {};
347+ env.SendSync (new NFake::TEvExecute{ new TTxReadRow (0 , retried) }, true );
348+ UNIT_ASSERT_VALUES_EQUAL (retried, (TVector<ui32>{1 }));
349+ }
350+
351+ Y_UNIT_TEST (ReplacementPolicySwitch) {
352+ TMyEnvBase env;
353+ auto counters = MakeIntrusive<TSharedPageCacheCounters>(env->GetDynamicCounters ());
354+
355+ env.FireDummyTablet (ui32 (NFake::TDummy::EFlg::Comp));
356+ env.SendSync (new NFake::TEvExecute{ new TTxInitSchema () });
357+
358+ // write 100 rows, each ~100KB (~10MB)
359+ for (i64 key = 0 ; key < 100 ; ++key) {
360+ TString value (size_t (100 * 1024 ), char (' a' + key % 26 ));
361+ env.SendSync (new NFake::TEvExecute{ new TTxWriteRow (key, std::move (value)) });
362+ }
363+
364+ Cerr << " ...compacting" << Endl;
365+ env.SendSync (new NFake::TEvCompact (TableId));
366+ Cerr << " ...waiting until compacted" << Endl;
367+ env.WaitFor <NFake::TEvCompacted>();
368+
369+ RestartAndClearCache (env);
370+
371+ TRetriedCounters retried = {};
372+ for (i64 key = 0 ; key < 3 ; ++key) {
373+ env.SendSync (new NFake::TEvExecute{ new TTxReadRow (key, retried) }, true );
374+ }
375+ UNIT_ASSERT_VALUES_EQUAL (retried, (TVector<ui32>{3 , 3 , 1 , 1 }));
376+
377+ env->Send (MakeSharedPageCacheId (), TActorId{}, new TEvReplacementPolicySwitch (NKikimrSharedCache::S3FIFO));
378+ WaitEvent (env, NSharedCache::EvReplacementPolicySwitch);
379+
380+ retried = {};
381+ for (i64 key = 0 ; key < 3 ; ++key) {
382+ env.SendSync (new NFake::TEvExecute{ new TTxReadRow (key, retried) }, true );
383+ }
384+ UNIT_ASSERT_VALUES_EQUAL (retried, (TVector<ui32>{3 }));
385+ }
386+
164387} // Y_UNIT_TEST_SUITE(TSharedPageCache)
165388
166389} // namespace NTabletFlatExecutor
0 commit comments