Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion crates/ember-cluster/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ distributed coordination for [ember](https://github.com/kacy/ember). provides cl
- **SWIM gossip** — failure detection with configurable probe intervals and suspicion timeouts
- **raft consensus** — cluster configuration changes via [openraft](https://github.com/datafuselabs/openraft)
- **live migration** — slot resharding without downtime, MOVED/ASK redirects
- **authentication** — HMAC-SHA256 signing and constant-time verification for cluster transport messages
- **automatic failover** — replica election state machine; first replica to reach quorum is promoted via raft

## modules

Expand All @@ -21,6 +23,9 @@ distributed coordination for [ember](https://github.com/kacy/ember). provides cl
| `migration` | migration state machine, batch streaming, key tracking |
| `message` | binary wire format for gossip messages |
| `error` | cluster-specific error types with MOVED/ASK support |
| `auth` | HMAC-SHA256 cluster transport authentication |
| `election` | replica failover election state machine |
| `raft_transport` | raft network transport over TCP |

## usage

Expand Down Expand Up @@ -70,7 +75,7 @@ the following CLUSTER commands are supported at the protocol layer:

| crate | what it does |
|-------|-------------|
| [emberkv-core](../ember-core) | storage engine, keyspace, sharding |
| [ember-core](../ember-core) | storage engine, keyspace, sharding |
| [ember-protocol](../ember-protocol) | RESP3 parsing and command dispatch |
| [ember-persistence](../ember-persistence) | AOF, snapshots, and crash recovery |
| [ember-server](../ember-server) | TCP server and connection handling |
Expand Down
3 changes: 2 additions & 1 deletion crates/ember-core/src/dropper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ mod tests {
Entry {
value: Value::String(Bytes::from(format!("val-{i}"))),
expires_at_ms: 0,
last_access_ms: 0,
cached_value_size: 0,
last_access_secs: 0,
},
);
}
Expand Down
8 changes: 5 additions & 3 deletions crates/ember-core/src/keyspace/hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ impl Keyspace {
return Err(WrongType);
}

let old_entry_size = memory::entry_size(key, &entry.value);
let old_entry_size = entry.entry_size(key);
let mut removed = Vec::new();
let mut removed_bytes: usize = 0;
let is_empty = if let Value::Hash(ref mut map) = entry.value {
Expand Down Expand Up @@ -189,7 +189,7 @@ impl Keyspace {
let Some(entry) = self.entries.get_mut(key) else {
return Err(IncrError::WrongType);
};
let old_entry_size = memory::entry_size(key, &entry.value);
let old_entry_size = entry.entry_size(key);

let Value::Hash(ref mut map) = entry.value else {
return Err(IncrError::WrongType);
Expand All @@ -205,7 +205,9 @@ impl Keyspace {
map.insert(field.to_owned(), Bytes::from(new_val.to_string()));
entry.touch();

let new_entry_size = memory::entry_size(key, &entry.value);
let new_value_size = memory::value_size(&entry.value);
entry.cached_value_size = new_value_size;
let new_entry_size = key.len() + new_value_size + memory::ENTRY_OVERHEAD;
self.memory.adjust(old_entry_size, new_entry_size);

Ok(new_val)
Expand Down
1 change: 1 addition & 0 deletions crates/ember-core/src/keyspace/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ impl Keyspace {
}
let len = deque.len();
entry.touch();
entry.cached_value_size += element_increase;

// apply the known delta — no need to rescan the entire list
self.memory.grow_by(element_increase);
Expand Down
73 changes: 56 additions & 17 deletions crates/ember-core/src/keyspace/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,23 +229,33 @@ pub enum SetResult {
/// A single entry in the keyspace: a value plus optional expiration
/// and last access time for LRU approximation.
///
/// Memory optimized: uses u64 timestamps instead of Option<Instant>.
/// Saves 8 bytes per entry (24 bytes down from 32 for metadata).
/// Field order is chosen for cache-line packing: `value` and
/// `expires_at_ms` (the hot-path read fields) sit at the front so
/// they share the first L1 cache line with the HashMap key pointer.
/// `cached_value_size` is warm (used on writes). `last_access_secs`
/// is cold (only used during eviction sampling).
#[derive(Debug, Clone)]
pub(crate) struct Entry {
pub(crate) value: Value,
/// Monotonic expiry timestamp in ms. 0 = no expiry.
pub(crate) expires_at_ms: u64,
/// Monotonic last access timestamp in ms (for LRU).
pub(crate) last_access_ms: u64,
/// Cached result of `memory::value_size(&self.value)`. Updated on
/// every mutation so that memory accounting is O(1) instead of
/// walking entire collections.
pub(crate) cached_value_size: usize,
/// Monotonic last access time in seconds since process start (for LRU).
/// Using u32 saves 4 bytes per entry; wraps at ~136 years.
pub(crate) last_access_secs: u32,
}

impl Entry {
fn new(value: Value, ttl: Option<Duration>) -> Self {
let cached_value_size = memory::value_size(&value);
Self {
value,
expires_at_ms: time::expiry_from_duration(ttl),
last_access_ms: time::now_ms(),
cached_value_size,
last_access_secs: time::now_secs(),
}
}

Expand All @@ -256,7 +266,13 @@ impl Entry {

/// Marks this entry as accessed right now.
fn touch(&mut self) {
self.last_access_ms = time::now_ms();
self.last_access_secs = time::now_secs();
}

/// Returns the full estimated memory footprint of this entry
/// (key + value + overhead) using the cached value size.
fn entry_size(&self, key: &str) -> usize {
key.len() + self.cached_value_size + memory::ENTRY_OVERHEAD
}
}

Expand Down Expand Up @@ -356,7 +372,8 @@ impl Keyspace {
/// set, hash, or set). If the collection is now empty, removes the key
/// entirely and subtracts `old_size` from the memory tracker. Otherwise
/// subtracts `removed_bytes` (the byte cost of the removed element(s))
/// without rescanning the remaining collection.
/// without rescanning the remaining collection, and updates the cached
/// value size.
fn cleanup_after_remove(
&mut self,
key: &str,
Expand All @@ -371,6 +388,9 @@ impl Keyspace {
self.memory.remove_with_size(old_size);
} else {
self.memory.shrink_by(removed_bytes);
if let Some(entry) = self.entries.get_mut(key) {
entry.cached_value_size = entry.cached_value_size.saturating_sub(removed_bytes);
}
}
}

Expand Down Expand Up @@ -420,13 +440,20 @@ impl Keyspace {
}

/// Measures entry size before and after a mutation, adjusting the
/// memory tracker for the difference. Touches the entry afterwards.
/// memory tracker for the difference.
///
/// Uses `cached_value_size` for the pre-mutation size (O(1)) and
/// recomputes after the mutation to update the cache. This halves
/// the cost of memory tracking for large collections.
fn track_size<T>(&mut self, key: &str, f: impl FnOnce(&mut Entry) -> T) -> Option<T> {
let entry = self.entries.get_mut(key)?;
let old_size = memory::entry_size(key, &entry.value);
let old_size = entry.entry_size(key);
let result = f(entry);
let entry = self.entries.get(key)?;
let new_size = memory::entry_size(key, &entry.value);
// re-lookup after mutation (f consumed the borrow)
let entry = self.entries.get_mut(key)?;
let new_value_size = memory::value_size(&entry.value);
entry.cached_value_size = new_value_size;
let new_size = key.len() + new_value_size + memory::ENTRY_OVERHEAD;
self.memory.adjust(old_size, new_size);
Some(result)
}
Expand Down Expand Up @@ -458,10 +485,10 @@ impl Keyspace {
let mut rng = rand::rng();

// reservoir sample k=1 from the iterator, tracking the oldest
// entry by last_access_ms. this replaces choose_multiple() which
// entry by last_access_secs. this replaces choose_multiple() which
// allocates a Vec internally.
let mut best_key: Option<&str> = None;
let mut best_access = u64::MAX;
let mut best_access = u32::MAX;
let mut seen = 0usize;

for (key, entry) in &self.entries {
Expand All @@ -470,15 +497,15 @@ impl Keyspace {
// enough candidates
seen += 1;
if seen <= EVICTION_SAMPLE_SIZE {
if entry.last_access_ms < best_access {
best_access = entry.last_access_ms;
if entry.last_access_secs < best_access {
best_access = entry.last_access_secs;
best_key = Some(&**key);
}
} else {
use rand::Rng;
let j = rng.random_range(0..seen);
if j < EVICTION_SAMPLE_SIZE && entry.last_access_ms < best_access {
best_access = entry.last_access_ms;
if j < EVICTION_SAMPLE_SIZE && entry.last_access_secs < best_access {
best_access = entry.last_access_secs;
best_key = Some(&**key);
}
}
Expand Down Expand Up @@ -913,6 +940,18 @@ impl Keyspace {
removed
}

/// Removes a key that is already known to be expired. Used by
/// fused lookup paths that check expiry inline via `get_mut()` and
/// need a second probe only on the rare expired path.
fn remove_expired_entry(&mut self, key: &str) {
if let Some(entry) = self.entries.remove(key) {
self.memory.remove(key, &entry.value);
self.decrement_expiry_if_set(&entry);
self.expired_total += 1;
self.defer_drop(entry.value);
}
}

/// Checks if a key is expired and removes it if so. Returns `true`
/// if the key was removed (or didn't exist).
fn remove_if_expired(&mut self, key: &str) -> bool {
Expand Down
2 changes: 1 addition & 1 deletion crates/ember-core/src/keyspace/proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ impl Keyspace {
let old_size = self
.entries
.get(&key)
.map(|e| memory::entry_size(&key, &e.value))
.map(|e| e.entry_size(&key))
.unwrap_or(0);
let net_increase = new_size.saturating_sub(old_size);

Expand Down
2 changes: 1 addition & 1 deletion crates/ember-core/src/keyspace/set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ impl Keyspace {
return Err(WrongType);
}

let old_entry_size = memory::entry_size(key, &entry.value);
let old_entry_size = entry.entry_size(key);

let mut removed = 0;
let mut removed_bytes: usize = 0;
Expand Down
69 changes: 41 additions & 28 deletions crates/ember-core/src/keyspace/string.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,42 +6,54 @@ impl Keyspace {
/// Returns `Err(WrongType)` if the key holds a non-string value.
/// Expired keys are removed lazily on access. Successful reads update
/// the entry's last access time for LRU tracking.
///
/// Uses a single hash probe on the common (non-expired) path.
/// The expired path (rare) does a second probe to remove.
pub fn get(&mut self, key: &str) -> Result<Option<Value>, WrongType> {
if self.remove_if_expired(key) {
return Ok(None);
}
match self.entries.get_mut(key) {
Some(e) => match &e.value {
Value::String(_) => {
e.touch();
// Value::String wraps Bytes — clone is a cheap refcount increment.
Ok(Some(e.value.clone()))
}
_ => Err(WrongType),
},
None => Ok(None),
let expired = match self.entries.get_mut(key) {
Some(e) if e.is_expired() => true,
Some(e) => {
return match &e.value {
Value::String(_) => {
e.touch();
Ok(Some(e.value.clone()))
}
_ => Err(WrongType),
};
}
None => return Ok(None),
};
if expired {
self.remove_expired_entry(key);
}
Ok(None)
}

/// Retrieves the raw `Bytes` for a string key, avoiding the `Value`
/// enum wrapper. `Bytes::clone()` is a cheap refcount increment.
///
/// Returns `Err(WrongType)` if the key holds a non-string value.
///
/// Uses a single hash probe on the common (non-expired) path.
pub fn get_string(&mut self, key: &str) -> Result<Option<Bytes>, WrongType> {
if self.remove_if_expired(key) {
return Ok(None);
}
match self.entries.get_mut(key) {
Some(e) => match &e.value {
Value::String(b) => {
let data = b.clone(); // Bytes::clone is a refcount bump
e.touch();
Ok(Some(data))
}
_ => Err(WrongType),
},
None => Ok(None),
let expired = match self.entries.get_mut(key) {
Some(e) if e.is_expired() => true,
Some(e) => {
return match &e.value {
Value::String(b) => {
let data = b.clone();
e.touch();
Ok(Some(data))
}
_ => Err(WrongType),
};
}
None => return Ok(None),
};
if expired {
self.remove_expired_entry(key);
}
Ok(None)
}

/// Returns the type name of the value at `key`, or "none" if missing.
Expand Down Expand Up @@ -77,12 +89,13 @@ impl Keyspace {
let new_size = memory::entry_size(&key, &new_value);

// single lookup: check existence, gather old size and expiry state.
// treat expired entries as non-existent.
// treat expired entries as non-existent. uses cached_value_size
// for O(1) size lookup instead of walking the value.
let old_info = self.entries.get(key.as_str()).and_then(|e| {
if e.is_expired() {
None
} else {
Some((memory::entry_size(&key, &e.value), e.expires_at_ms != 0))
Some((e.entry_size(&key), e.expires_at_ms != 0))
}
});

Expand Down
Loading
Loading