Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 18 additions & 6 deletions crates/ember-core/src/keyspace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -386,10 +386,16 @@ impl Keyspace {
/// Measures entry size before and after a mutation, adjusting the
/// memory tracker for the difference. Touches the entry afterwards.
fn track_size<T>(&mut self, key: &str, f: impl FnOnce(&mut Entry) -> T) -> T {
let entry = self.entries.get_mut(key).expect("caller verified key exists");
let entry = self
.entries
.get_mut(key)
.expect("caller verified key exists");
let old_size = memory::entry_size(key, &entry.value);
let result = f(entry);
let entry = self.entries.get(key).expect("mutation should not remove key");
let entry = self
.entries
.get(key)
.expect("mutation should not remove key");
let new_size = memory::entry_size(key, &entry.value);
self.memory.adjust(old_size, new_size);
result
Expand Down Expand Up @@ -1128,7 +1134,12 @@ impl Keyspace {
.iter()
.map(|v| memory::VECDEQUE_ELEMENT_OVERHEAD + v.len())
.sum();
self.reserve_memory(is_new, key, memory::VECDEQUE_BASE_OVERHEAD, element_increase)?;
self.reserve_memory(
is_new,
key,
memory::VECDEQUE_BASE_OVERHEAD,
element_increase,
)?;

if is_new {
self.insert_empty(key, Value::List(VecDeque::new()));
Expand Down Expand Up @@ -1201,8 +1212,7 @@ impl Keyspace {
) -> Result<ZAddResult, WriteError> {
self.remove_if_expired(key);

let is_new =
self.ensure_collection_type(key, |v| matches!(v, Value::SortedSet(_)))?;
let is_new = self.ensure_collection_type(key, |v| matches!(v, Value::SortedSet(_)))?;

// worst-case estimate: assume all members are new
let member_increase: usize = members
Expand Down Expand Up @@ -1260,7 +1270,9 @@ impl Keyspace {
return Ok(vec![]);
}

let Some(entry) = self.entries.get(key) else { return Ok(vec![]) };
let Some(entry) = self.entries.get(key) else {
return Ok(vec![]);
};
if !matches!(entry.value, Value::SortedSet(_)) {
return Err(WrongType);
}
Expand Down
6 changes: 2 additions & 4 deletions crates/ember-persistence/src/aof.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,15 +360,13 @@ impl AofRecord {
format::write_bytes(&mut buf, key.as_bytes())?;
format::write_i64(&mut buf, *milliseconds as i64)?;
}
AofRecord::IncrBy { key, delta }
| AofRecord::DecrBy { key, delta } => {
AofRecord::IncrBy { key, delta } | AofRecord::DecrBy { key, delta } => {
format::write_bytes(&mut buf, key.as_bytes())?;
format::write_i64(&mut buf, *delta)?;
}

// key + byte list
AofRecord::LPush { key, values }
| AofRecord::RPush { key, values } => {
AofRecord::LPush { key, values } | AofRecord::RPush { key, values } => {
format::write_bytes(&mut buf, key.as_bytes())?;
format::write_len(&mut buf, values.len())?;
for v in values {
Expand Down
13 changes: 4 additions & 9 deletions crates/ember-server/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1249,9 +1249,7 @@ async fn resolve_response(
fn resolve_shard_response(resp: ShardResponse, tag: ResponseTag) -> Frame {
match tag {
// Value(Some(String)) → Bulk, Value(None) → Null
ResponseTag::Get
| ResponseTag::PopResult
| ResponseTag::HGetResult => match resp {
ResponseTag::Get | ResponseTag::PopResult | ResponseTag::HGetResult => match resp {
ShardResponse::Value(Some(Value::String(data))) => Frame::Bulk(data),
ShardResponse::Value(None) => Frame::Null,
ShardResponse::WrongType => wrongtype_error(),
Expand All @@ -1273,8 +1271,7 @@ fn resolve_shard_response(resp: ShardResponse, tag: ResponseTag) -> Frame {
},

// Bool → Integer(0/1), with WrongType
ResponseTag::HExistsResult
| ResponseTag::SIsMemberResult => match resp {
ResponseTag::HExistsResult | ResponseTag::SIsMemberResult => match resp {
ShardResponse::Bool(b) => Frame::Integer(i64::from(b)),
ShardResponse::WrongType => wrongtype_error(),
other => Frame::Error(format!("ERR unexpected shard response: {other:?}")),
Expand Down Expand Up @@ -1310,8 +1307,7 @@ fn resolve_shard_response(resp: ShardResponse, tag: ResponseTag) -> Frame {
},

// Len → Integer, with WrongType + OOM
ResponseTag::LenResultOom
| ResponseTag::HSetResult => match resp {
ResponseTag::LenResultOom | ResponseTag::HSetResult => match resp {
ShardResponse::Len(n) => Frame::Integer(n as i64),
ShardResponse::WrongType => wrongtype_error(),
ShardResponse::OutOfMemory => oom_error(),
Expand All @@ -1327,8 +1323,7 @@ fn resolve_shard_response(resp: ShardResponse, tag: ResponseTag) -> Frame {
},

// Array of Bytes → Array of Bulk
ResponseTag::ArrayResult
| ResponseTag::HValsResult => match resp {
ResponseTag::ArrayResult | ResponseTag::HValsResult => match resp {
ShardResponse::Array(items) => {
Frame::Array(items.into_iter().map(Frame::Bulk).collect())
}
Expand Down
Loading
Loading