77#include < absl/strings/match.h>
88#include < absl/strings/str_cat.h>
99
10- #include < mutex>
11-
10+ #include " base/cycle_clock.h"
1211#include " base/flags.h"
1312#include " base/logging.h"
1413#include " core/heap_size.h"
@@ -27,12 +26,13 @@ using namespace std;
2726using namespace util ;
2827using namespace chrono_literals ;
2928
30- using facade::operator " " _MB;
3129using facade::operator " " _KB;
3230namespace {
3331thread_local absl::flat_hash_set<SliceSnapshot*> tl_slice_snapshots;
3432
35- constexpr size_t kMinBlobSize = 32_KB;
33+ // Controls the chunks size for pushing serialized data. The larger the chunk the more CPU
34+ // it may require (especially with compression), and less responsive the server may be.
35+ constexpr size_t kMinBlobSize = 8_KB;
3636
3737} // namespace
3838
@@ -98,7 +98,8 @@ void SliceSnapshot::Start(bool stream_journal, SnapshotFlush allow_flush) {
9898
9999 VLOG (1 ) << " DbSaver::Start - saving entries with version less than " << snapshot_version_;
100100
101- snapshot_fb_ = fb2::Fiber (" snapshot" , [this , stream_journal] {
101+ string fb_name = absl::StrCat (" SliceSnapshot-" , ProactorBase::me ()->GetPoolIndex ());
102+ snapshot_fb_ = fb2::Fiber (fb_name, [this , stream_journal] {
102103 this ->IterateBucketsFb (stream_journal);
103104 db_slice_->UnregisterOnChange (snapshot_version_);
104105 consumer_->Finalize ();
@@ -114,7 +115,7 @@ void SliceSnapshot::StartIncremental(LSN start_lsn) {
114115
115116// Called only for replication use-case.
116117void SliceSnapshot::FinalizeJournalStream (bool cancel) {
117- VLOG (1 ) << " Finalize Snapshot " ;
118+ VLOG (1 ) << " FinalizeJournalStream " ;
118119 DCHECK (db_slice_->shard_owner ()->IsMyThread ());
119120 if (!journal_cb_id_) { // Finalize only once.
120121 return ;
@@ -129,7 +130,8 @@ void SliceSnapshot::FinalizeJournalStream(bool cancel) {
129130
130131 journal->UnregisterOnChange (cb_id);
131132 if (!cancel) {
132- serializer_->SendJournalOffset (journal->GetLsn ());
133+ // always succeeds because serializer_ flushes to string.
134+ std::ignore = serializer_->SendJournalOffset (journal->GetLsn ());
133135 PushSerialized (true );
134136 }
135137}
@@ -147,27 +149,23 @@ void SliceSnapshot::FinalizeJournalStream(bool cancel) {
147149
148150// Serializes all the entries with version less than snapshot_version_.
149151void SliceSnapshot::IterateBucketsFb (bool send_full_sync_cut) {
150- {
151- auto fiber_name = absl::StrCat (" SliceSnapshot-" , ProactorBase::me ()->GetPoolIndex ());
152- ThisFiber::SetName (std::move (fiber_name));
153- }
154-
155152 PrimeTable::Cursor cursor;
156153 for (DbIndex db_indx = 0 ; db_indx < db_array_.size (); ++db_indx) {
157154 stats_.keys_total += db_slice_->DbSize (db_indx);
158155 }
159156
157+ const uint64_t kCyclesPerJiffy = base::CycleClock::Frequency () >> 16 ; // ~15usec.
158+
160159 for (DbIndex db_indx = 0 ; db_indx < db_array_.size (); ++db_indx) {
161160 if (!cntx_->IsRunning ())
162161 return ;
163162
164163 if (!db_array_[db_indx])
165164 continue ;
166165
167- uint64_t last_yield = 0 ;
168166 PrimeTable* pt = &db_array_[db_indx]->prime ;
169-
170167 VLOG (1 ) << " Start traversing " << pt->size () << " items for index " << db_indx;
168+
171169 do {
172170 if (!cntx_->IsRunning ()) {
173171 return ;
@@ -176,17 +174,13 @@ void SliceSnapshot::IterateBucketsFb(bool send_full_sync_cut) {
176174 PrimeTable::Cursor next = pt->TraverseBuckets (
177175 cursor, [this , &db_indx](auto it) { return BucketSaveCb (db_indx, it); });
178176 cursor = next;
179- PushSerialized (false );
180-
181- if (stats_.loop_serialized >= last_yield + 100 ) {
182- DVLOG (2 ) << " Before sleep " << ThisFiber::GetName ();
183- ThisFiber::Yield ();
184- DVLOG (2 ) << " After sleep" ;
185177
186- last_yield = stats_.loop_serialized ;
187- // Push in case other fibers (writes commands that pushed previous values)
188- // filled the buffer.
189- PushSerialized (false );
178+ // If we do not flush the data, and have not preempted,
179+ // we may need to yield to other fibers to avoid grabbing CPU for too long.
180+ if (!PushSerialized (false )) {
181+ if (ThisFiber::GetRunningTimeCycles () > kCyclesPerJiffy ) {
182+ ThisFiber::Yield ();
183+ }
190184 }
191185 } while (cursor);
192186
@@ -214,7 +208,7 @@ void SliceSnapshot::SwitchIncrementalFb(LSN lsn) {
214208
215209 // The replica sends the LSN of the next entry is wants to receive.
216210 while (cntx_->IsRunning () && journal->IsLSNInBuffer (lsn)) {
217- serializer_->WriteJournalEntry (journal->GetEntry (lsn));
211+ std::ignore = serializer_->WriteJournalEntry (journal->GetEntry (lsn));
218212 PushSerialized (false );
219213 lsn++;
220214 }
@@ -231,10 +225,8 @@ void SliceSnapshot::SwitchIncrementalFb(LSN lsn) {
231225
232226 // GetLsn() is always the next lsn that we expect to create.
233227 if (journal->GetLsn () == lsn) {
234- {
235- FiberAtomicGuard fg;
236- serializer_->SendFullSyncCut ();
237- }
228+ std::ignore = serializer_->SendFullSyncCut ();
229+
238230 auto journal_cb = [this ](const journal::JournalItem& item, bool await) {
239231 OnJournalEntry (item, await);
240232 };
@@ -255,29 +247,22 @@ bool SliceSnapshot::BucketSaveCb(DbIndex db_index, PrimeTable::bucket_iterator i
255247
256248 ++stats_.savecb_calls ;
257249
258- auto check = [&](uint64_t v) {
259- if (v >= snapshot_version_) {
260- // either has been already serialized or added after snapshotting started.
261- DVLOG (3 ) << " Skipped " << it.segment_id () << " :" << it.bucket_id () << " at " << v;
262- ++stats_.skipped ;
263- return false ;
264- }
265- return true ;
266- };
267-
268- if (!check (it.GetVersion ())) {
250+ if (it.GetVersion () >= snapshot_version_) {
251+ // either has been already serialized or added after snapshotting started.
252+ DVLOG (3 ) << " Skipped " << it.segment_id () << " :" << it.bucket_id () << " at " << it.GetVersion ();
253+ ++stats_.skipped ;
269254 return false ;
270255 }
271256
272257 db_slice_->FlushChangeToEarlierCallbacks (db_index, DbSlice::Iterator::FromPrime (it),
273258 snapshot_version_);
274259
275- auto * blocking_counter = db_slice_->GetLatch ();
260+ auto * latch = db_slice_->GetLatch ();
276261
277262 // Locking this never preempts. We merely just increment the underline counter such that
278263 // if SerializeBucket preempts, Heartbeat() won't run because the blocking counter is not
279264 // zero.
280- std::lock_guard blocking_counter_guard (*blocking_counter );
265+ std::lock_guard latch_guard (*latch );
281266
282267 stats_.loop_serialized += SerializeBucket (db_index, it);
283268
@@ -324,7 +309,8 @@ void SliceSnapshot::SerializeEntry(DbIndex db_indx, const PrimeKey& pk, const Pr
324309
325310size_t SliceSnapshot::FlushSerialized (SerializerBase::FlushState flush_state) {
326311 io::StringFile sfile;
327- serializer_->FlushToSink (&sfile, flush_state);
312+ error_code ec = serializer_->FlushToSink (&sfile, flush_state);
313+ CHECK (!ec); // always succeeds
328314
329315 size_t serialized = sfile.val .size ();
330316 if (serialized == 0 )
@@ -333,6 +319,8 @@ size_t SliceSnapshot::FlushSerialized(SerializerBase::FlushState flush_state) {
333319 uint64_t id = rec_id_++;
334320 DVLOG (2 ) << " Pushing " << id;
335321
322+ uint64_t running_cycles = ThisFiber::GetRunningTimeCycles ();
323+
336324 fb2::NoOpLock lk;
337325
338326 // We create a critical section here that ensures that records are pushed in sequential order.
@@ -351,6 +339,12 @@ size_t SliceSnapshot::FlushSerialized(SerializerBase::FlushState flush_state) {
351339
352340 VLOG (2 ) << " Pushed with Serialize() " << serialized;
353341
342+ // FlushToSink can be quite slow for large values or due compression, therefore
343+ // we counter-balance CPU over-usage by forcing sleep.
344+ // We measure running_cycles before the preemption points, because they reset the counter.
345+ uint64_t sleep_usec = (running_cycles * 1000'000 / base::CycleClock::Frequency ()) / 2 ;
346+ ThisFiber::SleepFor (chrono::microseconds (std::min (sleep_usec, 2000ul )));
347+
354348 return serialized;
355349}
356350
@@ -419,19 +413,19 @@ void SliceSnapshot::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req)
419413// value. This is guaranteed by the fact that OnJournalEntry runs always after OnDbChange, and
420414// no database switch can be performed between those two calls, because they are part of one
421415// transaction.
422- void SliceSnapshot::OnJournalEntry (const journal::JournalItem& item, bool await) {
423- // To enable journal flushing to sync after non auto journal command is executed we call
424- // TriggerJournalWriteToSink. This call uses the NOOP opcode with await=true. Since there is no
425- // additional journal change to serialize, it simply invokes PushSerialized.
416+ // allow_flush is controlled by Journal::SetFlushMode
417+ // (usually it's true unless we are in the middle of a critical section that can not preempt).
418+ void SliceSnapshot::OnJournalEntry (const journal::JournalItem& item, bool allow_flush) {
426419 {
427- // We should release the lock after we preempt
428- std::lock_guard guard (big_value_mu_);
420+ // We grab the lock in case we are in the middle of serializing a bucket, so it serves as a
421+ // barrier here for atomic serialization.
422+ std::lock_guard barrier (big_value_mu_);
429423 if (item.opcode != journal::Op::NOOP) {
430- serializer_->WriteJournalEntry (item.data );
424+ std::ignore = serializer_->WriteJournalEntry (item.data );
431425 }
432426 }
433427
434- if (await ) {
428+ if (allow_flush ) {
435429 // This is the only place that flushes in streaming mode
436430 // once the iterate buckets fiber finished.
437431 PushSerialized (false );
0 commit comments