66namespace NKikimr {
77namespace NDataShard {
88
9+ static constexpr size_t SmallReadSetCacheLimit = 8 ;
10+
911void TOutReadSets::UpdateMonCounter () const {
1012 Self->SetCounter (COUNTER_OUT_READSETS_IN_FLIGHT, CurrentReadSets.size ());
1113}
@@ -14,7 +16,7 @@ bool TOutReadSets::LoadReadSets(NIceDb::TNiceDb& db) {
1416 using Schema = TDataShard::Schema;
1517
1618 CurrentReadSets.clear (); // For idempotency
17- CurrentReadSetInfos .clear ();
19+ CurrentReadSetKeys .clear ();
1820
1921 // TODO[serxa]: this should be Range but it is not working right now
2022 auto rowset = db.Table <Schema::OutReadSets>().GreaterOrEqual (0 ).Select <
@@ -23,7 +25,8 @@ bool TOutReadSets::LoadReadSets(NIceDb::TNiceDb& db) {
2325 Schema::OutReadSets::TxId,
2426 Schema::OutReadSets::Origin,
2527 Schema::OutReadSets::From,
26- Schema::OutReadSets::To>();
28+ Schema::OutReadSets::To,
29+ Schema::OutReadSets::Body>();
2730 if (!rowset.IsReady ())
2831 return false ;
2932 while (!rowset.EndOfSet ()) {
@@ -33,19 +36,22 @@ bool TOutReadSets::LoadReadSets(NIceDb::TNiceDb& db) {
3336 ui64 origin = rowset.GetValue <Schema::OutReadSets::Origin>();
3437 ui64 source = rowset.GetValue <Schema::OutReadSets::From>();
3538 ui64 target = rowset.GetValue <Schema::OutReadSets::To>();
39+ TString body = rowset.GetValue <Schema::OutReadSets::Body>();
3640
3741 TReadSetInfo rsInfo;
3842 rsInfo.TxId = txId;
3943 rsInfo.Step = step;
4044 rsInfo.Origin = origin;
4145 rsInfo.From = source;
4246 rsInfo.To = target;
47+ // Cache it regardless of size, since we're going to send it soon
48+ rsInfo.Body = std::move (body);
4349
4450 Y_ABORT_UNLESS (!CurrentReadSets.contains (seqNo));
45- Y_ABORT_UNLESS (!CurrentReadSetInfos .contains (rsInfo));
51+ Y_ABORT_UNLESS (!CurrentReadSetKeys .contains (rsInfo));
4652
47- CurrentReadSets[seqNo ] = rsInfo ;
48- CurrentReadSetInfos[rsInfo ] = seqNo ;
53+ CurrentReadSetKeys[rsInfo ] = seqNo ;
54+ CurrentReadSets[seqNo ] = std::move (rsInfo) ;
4955
5056 if (!rowset.Next ())
5157 return false ;
@@ -59,15 +65,13 @@ void TOutReadSets::SaveReadSet(NIceDb::TNiceDb& db, ui64 seqNo, ui64 step, const
5965 using Schema = TDataShard::Schema;
6066
6167 Y_ABORT_UNLESS (!CurrentReadSets.contains (seqNo));
62- Y_ABORT_UNLESS (!CurrentReadSetInfos .contains (rsKey));
68+ Y_ABORT_UNLESS (!CurrentReadSetKeys .contains (rsKey));
6369
6470 TReadSetInfo rsInfo (rsKey);
6571 rsInfo.Step = step;
66-
67- CurrentReadSetInfos[rsKey] = seqNo;
68- CurrentReadSets[seqNo] = rsInfo;
69-
70- UpdateMonCounter ();
72+ if (body.size () <= SmallReadSetCacheLimit) {
73+ rsInfo.Body = body;
74+ }
7175
7276 db.Table <Schema::OutReadSets>().Key (seqNo).Update (
7377 NIceDb::TUpdate<Schema::OutReadSets::Step>(rsInfo.Step ),
@@ -76,6 +80,11 @@ void TOutReadSets::SaveReadSet(NIceDb::TNiceDb& db, ui64 seqNo, ui64 step, const
7680 NIceDb::TUpdate<Schema::OutReadSets::From>(rsInfo.From ),
7781 NIceDb::TUpdate<Schema::OutReadSets::To>(rsInfo.To ),
7882 NIceDb::TUpdate<Schema::OutReadSets::Body>(body));
83+
84+ CurrentReadSetKeys[rsKey] = seqNo;
85+ CurrentReadSets[seqNo] = std::move (rsInfo);
86+
87+ UpdateMonCounter ();
7988}
8089
8190void TOutReadSets::RemoveReadSet (NIceDb::TNiceDb& db, ui64 seqNo) {
@@ -85,7 +94,7 @@ void TOutReadSets::RemoveReadSet(NIceDb::TNiceDb& db, ui64 seqNo) {
8594
8695 auto it = CurrentReadSets.find (seqNo);
8796 if (it != CurrentReadSets.end ()) {
88- CurrentReadSetInfos .erase (it->second );
97+ CurrentReadSetKeys .erase (it->second );
8998 CurrentReadSets.erase (it);
9099 }
91100}
@@ -97,14 +106,19 @@ TReadSetInfo TOutReadSets::ReplaceReadSet(NIceDb::TNiceDb& db, ui64 seqNo, const
97106 if (it != CurrentReadSets.end ()) {
98107 db.Table <Schema::OutReadSets>().Key (seqNo).Update (
99108 NIceDb::TUpdate<Schema::OutReadSets::Body>(body));
109+ if (body.size () <= SmallReadSetCacheLimit) {
110+ it->second .Body = body;
111+ } else {
112+ it->second .Body .reset ();
113+ }
100114 return it->second ;
101115 } else {
102116 return TReadSetInfo ();
103117 }
104118}
105119
106120void TOutReadSets::AckForDeletedDestination (ui64 tabletId, ui64 seqNo, const TActorContext &ctx) {
107- const TReadSetKey* rsInfo = CurrentReadSets.FindPtr (seqNo);
121+ const TReadSetKey* rsInfo = CurrentReadSets.FindPtr (seqNo);
108122
109123 if (!rsInfo) {
110124 LOG_DEBUG (ctx, NKikimrServices::TX_DATASHARD,
@@ -136,14 +150,18 @@ void TOutReadSets::SaveAck(const TActorContext &ctx, TAutoPtr<TEvTxProcessing::T
136150 Self->TabletID (), sender, dest, consumer, txId);
137151
138152 ReadSetAcks.emplace_back (ev.Release ());
139- AckedSeqno.insert (seqno);
140153
141154 if (CurrentReadSets.contains (seqno)) {
142- TReadSetKey rsInfo (txId, Self->TabletID (), sender, dest);
143- Y_ABORT_UNLESS (CurrentReadSetInfos[rsInfo ] == seqno);
155+ TReadSetKey rsKey (txId, Self->TabletID (), sender, dest);
156+ Y_ABORT_UNLESS (CurrentReadSetKeys[rsKey ] == seqno);
144157
158+ CurrentReadSetKeys.erase (rsKey);
145159 CurrentReadSets.erase (seqno);
146- CurrentReadSetInfos.erase (rsInfo);
160+ }
161+
162+ // We don't need to resend this readset anymore
163+ if (auto it = Self->PersistentTablets .find (dest); it != Self->PersistentTablets .end ()) {
164+ it->second .OutReadSets .erase (seqno);
147165 }
148166}
149167
@@ -162,15 +180,8 @@ void TOutReadSets::Cleanup(NIceDb::TNiceDb& db, const TActorContext& ctx) {
162180 Self->TabletID (), sender, dest, consumer, seqno, txId);
163181
164182 RemoveReadSet (db, seqno);
165-
166- if (auto it = Self->PersistentTablets .find (ev.Record .GetTabletDest ());
167- it != Self->PersistentTablets .end ())
168- {
169- it->second .OutReadSets .erase (seqno);
170- }
171183 }
172184 ReadSetAcks.clear ();
173- AckedSeqno.clear ();
174185
175186 UpdateMonCounter ();
176187}
@@ -210,28 +221,44 @@ void TOutReadSets::ReleaseOnHoldReadSets(const std::vector<ui64>& seqNos, const
210221
211222bool TOutReadSets::ResendRS (NTabletFlatExecutor::TTransactionContext &txc, const TActorContext &ctx, ui64 seqNo) {
212223 using Schema = TDataShard::Schema;
213-
214224 NIceDb::TNiceDb db (txc.DB );
215- if (AckedSeqno.contains (seqNo)) {
225+
226+ auto * info = CurrentReadSets.FindPtr (seqNo);
227+ if (!info) {
216228 // Do not resend if we've already got ACK back, but not applied it to DB
217229 // Also, it is a good place to actually apply ACK(s)
218-
219230 txc.DB .NoMoreReadsForTx ();
220231 Cleanup (db, ctx);
221232 return true ;
222233 }
223234
224- auto rowset = db.Table <Schema::OutReadSets>().Key (seqNo).Select ();
225- if (!rowset.IsReady ())
226- return false ;
227- if (!rowset.IsValid ())
228- return true ;
229-
230- ui64 step = rowset.GetValue <Schema::OutReadSets::Step>();
231- ui64 txId = rowset.GetValue <Schema::OutReadSets::TxId>();
232- ui64 from = rowset.GetValue <Schema::OutReadSets::From>();
233- ui64 to = rowset.GetValue <Schema::OutReadSets::To>();
234- TString body = rowset.GetValue <Schema::OutReadSets::Body>();
235+ ui64 step = info->Step ;
236+ ui64 txId = info->TxId ;
237+ ui64 from = info->From ;
238+ ui64 to = info->To ;
239+ TString body;
240+
241+ if (info->Body ) {
242+ // We have readset body cached
243+ if (info->Body ->size () <= SmallReadSetCacheLimit) {
244+ body = *info->Body ;
245+ } else {
246+ // Don't keep it in memory while in transit
247+ body = std::move (*info->Body );
248+ info->Body .reset ();
249+ }
250+ } else {
251+ auto rowset = db.Table <Schema::OutReadSets>().Key (seqNo).Select ();
252+ if (!rowset.IsReady ())
253+ return false ;
254+ if (!rowset.IsValid ())
255+ return true ;
256+ body = rowset.GetValue <Schema::OutReadSets::Body>();
257+ if (body.size () <= SmallReadSetCacheLimit) {
258+ // Cache small readset body
259+ info->Body = body;
260+ }
261+ }
235262
236263 txc.DB .NoMoreReadsForTx ();
237264
0 commit comments