Skip to content

Commit

Permalink
feat(server): refactor allow preempt on journal record (dragonflydb#4393
Browse files Browse the repository at this point in the history
)

* feat server: refactor allow preempt on journal record

Signed-off-by: adi_holden <adi@dragonflydb.io>
  • Loading branch information
adiholden authored Jan 2, 2025
1 parent 7a68528 commit a3ef239
Show file tree
Hide file tree
Showing 15 changed files with 148 additions and 118 deletions.
2 changes: 1 addition & 1 deletion src/server/cluster/cluster_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,7 @@ void WriteFlushSlotsToJournal(const SlotRanges& slot_ranges) {
// TODO: Break slot migration upon FLUSHSLOTS
journal->RecordEntry(/* txid= */ 0, journal::Op::COMMAND, /* dbid= */ 0,
/* shard_cnt= */ shard_set->size(), nullopt,
Payload("DFLYCLUSTER", args_view), false);
Payload("DFLYCLUSTER", args_view));
};
shard_set->pool()->AwaitFiberOnAll(std::move(cb));
}
Expand Down
23 changes: 14 additions & 9 deletions src/server/db_slice.cc
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ unsigned PrimeEvictionPolicy::GarbageCollect(const PrimeTable::HotspotBuckets& e
if (db_slice_->WillBlockOnJournalWrite()) {
return res;
}
// Disable flush journal changes to prevent preemtion in GarbageCollect.
journal::JournalFlushGuard journal_flush_guard(db_slice_->shard_owner()->journal());

// bool should_print = (eb.key_hash % 128) == 0;

Expand Down Expand Up @@ -172,6 +174,8 @@ unsigned PrimeEvictionPolicy::GarbageCollect(const PrimeTable::HotspotBuckets& e
unsigned PrimeEvictionPolicy::Evict(const PrimeTable::HotspotBuckets& eb, PrimeTable* me) {
if (!can_evict_ || db_slice_->WillBlockOnJournalWrite())
return 0;
// Disable flush journal changes to prevent preemtion in evict.
journal::JournalFlushGuard journal_flush_guard(db_slice_->shard_owner()->journal());

constexpr size_t kNumStashBuckets = ABSL_ARRAYSIZE(eb.probes.by_type.stash_buckets);

Expand All @@ -195,7 +199,7 @@ unsigned PrimeEvictionPolicy::Evict(const PrimeTable::HotspotBuckets& eb, PrimeT

// log the evicted keys to journal.
if (auto journal = db_slice_->shard_owner()->journal(); journal) {
RecordExpiry(cntx_.db_index, key, false);
RecordExpiry(cntx_.db_index, key);
}
db_slice_->PerformDeletion(DbSlice::Iterator(last_slot_it, StringOrView::FromView(key)), table);

Expand Down Expand Up @@ -453,7 +457,7 @@ OpResult<DbSlice::PrimeItAndExp> DbSlice::FindInternal(const Context& cntx, std:
}

if (res.it->second.HasExpire()) { // check expiry state
res = ExpireIfNeeded(cntx, res.it, true);
res = ExpireIfNeeded(cntx, res.it);
if (!IsValid(res.it)) {
return OpStatus::KEY_NOTFOUND;
}
Expand Down Expand Up @@ -1077,12 +1081,11 @@ void DbSlice::PostUpdate(DbIndex db_ind, Iterator it, std::string_view key, size
}

DbSlice::ItAndExp DbSlice::ExpireIfNeeded(const Context& cntx, Iterator it) const {
auto res = ExpireIfNeeded(cntx, it.GetInnerIt(), false);
auto res = ExpireIfNeeded(cntx, it.GetInnerIt());
return {.it = Iterator::FromPrime(res.it), .exp_it = ExpIterator::FromPrime(res.exp_it)};
}

DbSlice::PrimeItAndExp DbSlice::ExpireIfNeeded(const Context& cntx, PrimeIterator it,
bool preempts) const {
DbSlice::PrimeItAndExp DbSlice::ExpireIfNeeded(const Context& cntx, PrimeIterator it) const {
if (!it->second.HasExpire()) {
LOG(ERROR) << "Invalid call to ExpireIfNeeded";
return {it, ExpireIterator{}};
Expand Down Expand Up @@ -1112,7 +1115,7 @@ DbSlice::PrimeItAndExp DbSlice::ExpireIfNeeded(const Context& cntx, PrimeIterato

// Replicate expiry
if (auto journal = owner_->journal(); journal) {
RecordExpiry(cntx.db_index, key, preempts);
RecordExpiry(cntx.db_index, key);
}

if (expired_keys_events_recording_)
Expand All @@ -1136,6 +1139,8 @@ void DbSlice::ExpireAllIfNeeded() {
// We hold no locks to any of the keys so we should Wait() here such that
// we don't preempt in ExpireIfNeeded
block_counter_.Wait();
// Disable flush journal changes to prevent preemtion in traverse.
journal::JournalFlushGuard journal_flush_guard(owner_->journal());

for (DbIndex db_index = 0; db_index < db_arr_.size(); db_index++) {
if (!db_arr_[db_index])
Expand All @@ -1148,7 +1153,7 @@ void DbSlice::ExpireAllIfNeeded() {
LOG(ERROR) << "Expire entry " << exp_it->first.ToString() << " not found in prime table";
return;
}
ExpireIfNeeded(Context{nullptr, db_index, GetCurrentTimeMs()}, prime_it, false);
ExpireIfNeeded(Context{nullptr, db_index, GetCurrentTimeMs()}, prime_it);
};

ExpireTable::Cursor cursor;
Expand Down Expand Up @@ -1212,7 +1217,7 @@ auto DbSlice::DeleteExpiredStep(const Context& cntx, unsigned count) -> DeleteEx
auto prime_it = db.prime.Find(it->first);
CHECK(!prime_it.is_done());
result.deleted_bytes += prime_it->first.MallocUsed() + prime_it->second.MallocUsed();
ExpireIfNeeded(cntx, prime_it, false);
ExpireIfNeeded(cntx, prime_it);
++result.deleted;
} else {
result.survivor_ttl_sum += ttl;
Expand Down Expand Up @@ -1280,7 +1285,7 @@ pair<uint64_t, size_t> DbSlice::FreeMemWithEvictionStep(DbIndex db_ind, size_t s
// fiber preemption could happen in this phase.
for (string_view key : keys_to_journal) {
if (auto journal = owner_->journal(); journal)
RecordExpiry(db_ind, key, false);
RecordExpiry(db_ind, key);

if (expired_keys_events_recording_)
db_table->expired_keys_events_.emplace_back(key);
Expand Down
2 changes: 1 addition & 1 deletion src/server/db_slice.h
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,7 @@ class DbSlice {
ExpireIterator exp_it;
};

PrimeItAndExp ExpireIfNeeded(const Context& cntx, PrimeIterator it, bool preempts = false) const;
PrimeItAndExp ExpireIfNeeded(const Context& cntx, PrimeIterator it) const;

OpResult<AddOrFindResult> AddOrFindInternal(const Context& cntx, std::string_view key);

Expand Down
2 changes: 1 addition & 1 deletion src/server/dflycmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ bool WaitReplicaFlowToCatchup(absl::Time end_time, const DflyCmd::ReplicaInfo* r
// We don't want any writes to the journal after we send the `PING`,
// and expirations could ruin that.
namespaces->GetDefaultNamespace().GetDbSlice(shard->shard_id()).SetExpireAllowed(false);
shard->journal()->RecordEntry(0, journal::Op::PING, 0, 0, nullopt, {}, true);
shard->journal()->RecordEntry(0, journal::Op::PING, 0, 0, nullopt, {});

const FlowInfo* flow = &replica->flows[shard->shard_id()];
while (flow->last_acked_lsn < shard->journal()->GetLsn()) {
Expand Down
88 changes: 41 additions & 47 deletions src/server/engine_shard.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
extern "C" {
#include "redis/zmalloc.h"
}

#include "server/engine_shard_set.h"
#include "server/journal/journal.h"
#include "server/namespaces.h"
#include "server/search/doc_index.h"
#include "server/server_state.h"
Expand Down Expand Up @@ -756,62 +756,56 @@ void EngineShard::Heartbeat() {
}

void EngineShard::RetireExpiredAndEvict() {
{
FiberAtomicGuard guard;
// TODO: iterate over all namespaces
DbSlice& db_slice = namespaces->GetDefaultNamespace().GetDbSlice(shard_id());
constexpr double kTtlDeleteLimit = 200;

uint32_t traversed = GetMovingSum6(TTL_TRAVERSE);
uint32_t deleted = GetMovingSum6(TTL_DELETE);
unsigned ttl_delete_target = 5;

if (deleted > 10) {
// deleted should be <= traversed.
// hence we map our delete/traversed ratio into a range [0, kTtlDeleteLimit).
// The higher ttl_delete_target the more likely we have lots of expired items that need
// to be deleted.
ttl_delete_target = kTtlDeleteLimit * double(deleted) / (double(traversed) + 10);
}
// Disable flush journal changes to prevent preemtion
journal::JournalFlushGuard journal_flush_guard(journal_);

DbContext db_cntx;
db_cntx.time_now_ms = GetCurrentTimeMs();
// TODO: iterate over all namespaces
DbSlice& db_slice = namespaces->GetDefaultNamespace().GetDbSlice(shard_id());
constexpr double kTtlDeleteLimit = 200;

size_t eviction_goal = GetFlag(FLAGS_enable_heartbeat_eviction) ? CalculateEvictionBytes() : 0;
uint32_t traversed = GetMovingSum6(TTL_TRAVERSE);
uint32_t deleted = GetMovingSum6(TTL_DELETE);
unsigned ttl_delete_target = 5;

for (unsigned i = 0; i < db_slice.db_array_size(); ++i) {
if (!db_slice.IsDbValid(i))
continue;
if (deleted > 10) {
// deleted should be <= traversed.
// hence we map our delete/traversed ratio into a range [0, kTtlDeleteLimit).
// The higher ttl_delete_target the more likely we have lots of expired items that need
// to be deleted.
ttl_delete_target = kTtlDeleteLimit * double(deleted) / (double(traversed) + 10);
}

db_cntx.db_index = i;
auto [pt, expt] = db_slice.GetTables(i);
if (expt->size() > pt->size() / 4) {
DbSlice::DeleteExpiredStats stats = db_slice.DeleteExpiredStep(db_cntx, ttl_delete_target);
DbContext db_cntx;
db_cntx.time_now_ms = GetCurrentTimeMs();

eviction_goal -= std::min(eviction_goal, size_t(stats.deleted_bytes));
counter_[TTL_TRAVERSE].IncBy(stats.traversed);
counter_[TTL_DELETE].IncBy(stats.deleted);
}
size_t eviction_goal = GetFlag(FLAGS_enable_heartbeat_eviction) ? CalculateEvictionBytes() : 0;

if (eviction_goal) {
uint32_t starting_segment_id = rand() % pt->GetSegmentCount();
auto [evicted_items, evicted_bytes] =
db_slice.FreeMemWithEvictionStep(i, starting_segment_id, eviction_goal);
for (unsigned i = 0; i < db_slice.db_array_size(); ++i) {
if (!db_slice.IsDbValid(i))
continue;

DVLOG(2) << "Heartbeat eviction: Expected to evict " << eviction_goal
<< " bytes. Actually evicted " << evicted_items << " items, " << evicted_bytes
<< " bytes. Max eviction per heartbeat: "
<< GetFlag(FLAGS_max_eviction_per_heartbeat);
db_cntx.db_index = i;
auto [pt, expt] = db_slice.GetTables(i);
if (expt->size() > pt->size() / 4) {
DbSlice::DeleteExpiredStats stats = db_slice.DeleteExpiredStep(db_cntx, ttl_delete_target);

eviction_goal -= std::min(eviction_goal, evicted_bytes);
}
eviction_goal -= std::min(eviction_goal, size_t(stats.deleted_bytes));
counter_[TTL_TRAVERSE].IncBy(stats.traversed);
counter_[TTL_DELETE].IncBy(stats.deleted);
}
}

// Journal entries for expired entries are not writen to socket in the loop above.
// Trigger write to socket when loop finishes.
if (auto journal = EngineShard::tlocal()->journal(); journal) {
TriggerJournalWriteToSink();
if (eviction_goal) {
uint32_t starting_segment_id = rand() % pt->GetSegmentCount();
auto [evicted_items, evicted_bytes] =
db_slice.FreeMemWithEvictionStep(i, starting_segment_id, eviction_goal);

DVLOG(2) << "Heartbeat eviction: Expected to evict " << eviction_goal
<< " bytes. Actually evicted " << evicted_items << " items, " << evicted_bytes
<< " bytes. Max eviction per heartbeat: "
<< GetFlag(FLAGS_max_eviction_per_heartbeat);

eviction_goal -= std::min(eviction_goal, evicted_bytes);
}
}
}

Expand Down
4 changes: 3 additions & 1 deletion src/server/generic_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -616,7 +616,8 @@ void OpScan(const OpArgs& op_args, const ScanOpts& scan_opts, uint64_t* cursor,
// we enter the callback in a timing when journaling will not cause preemptions. Otherwise,
// the bucket might change as we Traverse and yield.
db_slice.BlockingCounter()->Wait();

// Disable flush journal changes to prevent preemtion in traverse.
journal::JournalFlushGuard journal_flush_guard(op_args.shard->journal());
util::FiberAtomicGuard guard;
unsigned cnt = 0;

Expand All @@ -630,6 +631,7 @@ void OpScan(const OpArgs& op_args, const ScanOpts& scan_opts, uint64_t* cursor,
cur = prime_table->Traverse(
cur, [&](PrimeIterator it) { cnt += ScanCb(op_args, it, scan_opts, &scratch, vec); });
} while (cur && cnt < scan_opts.limit);

VLOG(1) << "OpScan " << db_slice.shard_id() << " cursor: " << cur.value();
*cursor = cur.value();
}
Expand Down
8 changes: 6 additions & 2 deletions src/server/journal/journal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,12 @@ LSN Journal::GetLsn() const {
}

void Journal::RecordEntry(TxId txid, Op opcode, DbIndex dbid, unsigned shard_cnt,
std::optional<cluster::SlotId> slot, Entry::Payload payload, bool await) {
journal_slice.AddLogRecord(Entry{txid, opcode, dbid, shard_cnt, slot, std::move(payload)}, await);
std::optional<cluster::SlotId> slot, Entry::Payload payload) {
journal_slice.AddLogRecord(Entry{txid, opcode, dbid, shard_cnt, slot, std::move(payload)});
}

void Journal::SetFlushMode(bool allow_flush) {
journal_slice.SetFlushMode(allow_flush);
}

} // namespace journal
Expand Down
29 changes: 27 additions & 2 deletions src/server/journal/journal.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
//

#pragma once

#include "server/journal/types.h"
#include "util/fibers/detail/fiber_interface.h"
#include "util/proactor_pool.h"

namespace dfly {
Expand Down Expand Up @@ -35,11 +35,36 @@ class Journal {
LSN GetLsn() const;

void RecordEntry(TxId txid, Op opcode, DbIndex dbid, unsigned shard_cnt,
std::optional<cluster::SlotId> slot, Entry::Payload payload, bool await);
std::optional<cluster::SlotId> slot, Entry::Payload payload);

void SetFlushMode(bool allow_flush);

private:
mutable util::fb2::Mutex state_mu_;
};

class JournalFlushGuard {
public:
explicit JournalFlushGuard(Journal* journal) : journal_(journal) {
if (journal_) {
journal_->SetFlushMode(false);
}
util::fb2::detail::EnterFiberAtomicSection();
}

~JournalFlushGuard() {
util::fb2::detail::LeaveFiberAtomicSection();
if (journal_) {
journal_->SetFlushMode(true); // Restore the state on destruction
}
}

JournalFlushGuard(const JournalFlushGuard&) = delete;
JournalFlushGuard& operator=(const JournalFlushGuard&) = delete;

private:
Journal* journal_;
};

} // namespace journal
} // namespace dfly
Loading

0 comments on commit a3ef239

Please sign in to comment.