Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
174 changes: 88 additions & 86 deletions crates/ember-persistence/src/aof.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,42 @@ pub enum AofRecord {
}

impl AofRecord {
/// Returns the on-disk tag byte for this record variant.
fn tag(&self) -> u8 {
match self {
AofRecord::Set { .. } => TAG_SET,
AofRecord::Del { .. } => TAG_DEL,
AofRecord::Expire { .. } => TAG_EXPIRE,
AofRecord::LPush { .. } => TAG_LPUSH,
AofRecord::RPush { .. } => TAG_RPUSH,
AofRecord::LPop { .. } => TAG_LPOP,
AofRecord::RPop { .. } => TAG_RPOP,
AofRecord::ZAdd { .. } => TAG_ZADD,
AofRecord::ZRem { .. } => TAG_ZREM,
AofRecord::Persist { .. } => TAG_PERSIST,
AofRecord::Pexpire { .. } => TAG_PEXPIRE,
AofRecord::Incr { .. } => TAG_INCR,
AofRecord::Decr { .. } => TAG_DECR,
AofRecord::HSet { .. } => TAG_HSET,
AofRecord::HDel { .. } => TAG_HDEL,
AofRecord::HIncrBy { .. } => TAG_HINCRBY,
AofRecord::SAdd { .. } => TAG_SADD,
AofRecord::SRem { .. } => TAG_SREM,
AofRecord::IncrBy { .. } => TAG_INCRBY,
AofRecord::DecrBy { .. } => TAG_DECRBY,
AofRecord::Append { .. } => TAG_APPEND,
AofRecord::Rename { .. } => TAG_RENAME,
#[cfg(feature = "vector")]
AofRecord::VAdd { .. } => TAG_VADD,
#[cfg(feature = "vector")]
AofRecord::VRem { .. } => TAG_VREM,
#[cfg(feature = "protobuf")]
AofRecord::ProtoSet { .. } => TAG_PROTO_SET,
#[cfg(feature = "protobuf")]
AofRecord::ProtoRegister { .. } => TAG_PROTO_REGISTER,
}
}

/// Estimates the serialized size of this record in bytes.
///
/// Used as a capacity hint for `to_bytes()` to avoid intermediate
Expand Down Expand Up @@ -291,143 +327,112 @@ impl AofRecord {
/// Serializes this record into a byte vector (tag + payload, no CRC).
fn to_bytes(&self) -> Result<Vec<u8>, FormatError> {
let mut buf = Vec::with_capacity(self.estimated_size());
format::write_u8(&mut buf, self.tag())?;

match self {
// key-only: tag + key
AofRecord::Del { key }
| AofRecord::LPop { key }
| AofRecord::RPop { key }
| AofRecord::Persist { key }
| AofRecord::Incr { key }
| AofRecord::Decr { key } => {
format::write_bytes(&mut buf, key.as_bytes())?;
}

// key + bytes value + expire
AofRecord::Set {
key,
value,
expire_ms,
} => {
format::write_u8(&mut buf, TAG_SET)?;
format::write_bytes(&mut buf, key.as_bytes())?;
format::write_bytes(&mut buf, value)?;
format::write_i64(&mut buf, *expire_ms)?;
}
AofRecord::Del { key } => {
format::write_u8(&mut buf, TAG_DEL)?;
format::write_bytes(&mut buf, key.as_bytes())?;
}

// key + i64
AofRecord::Expire { key, seconds } => {
format::write_u8(&mut buf, TAG_EXPIRE)?;
format::write_bytes(&mut buf, key.as_bytes())?;
format::write_i64(&mut buf, *seconds as i64)?;
}
AofRecord::LPush { key, values } => {
format::write_u8(&mut buf, TAG_LPUSH)?;
AofRecord::Pexpire { key, milliseconds } => {
format::write_bytes(&mut buf, key.as_bytes())?;
format::write_len(&mut buf, values.len())?;
for v in values {
format::write_bytes(&mut buf, v)?;
}
format::write_i64(&mut buf, *milliseconds as i64)?;
}
AofRecord::IncrBy { key, delta }
| AofRecord::DecrBy { key, delta } => {
format::write_bytes(&mut buf, key.as_bytes())?;
format::write_i64(&mut buf, *delta)?;
}
AofRecord::RPush { key, values } => {
format::write_u8(&mut buf, TAG_RPUSH)?;

// key + byte list
AofRecord::LPush { key, values }
| AofRecord::RPush { key, values } => {
format::write_bytes(&mut buf, key.as_bytes())?;
format::write_len(&mut buf, values.len())?;
for v in values {
format::write_bytes(&mut buf, v)?;
}
}
AofRecord::LPop { key } => {
format::write_u8(&mut buf, TAG_LPOP)?;

// key + string list
AofRecord::ZRem { key, members }
| AofRecord::SAdd { key, members }
| AofRecord::SRem { key, members } => {
format::write_bytes(&mut buf, key.as_bytes())?;
format::write_len(&mut buf, members.len())?;
for member in members {
format::write_bytes(&mut buf, member.as_bytes())?;
}
}
AofRecord::RPop { key } => {
format::write_u8(&mut buf, TAG_RPOP)?;
AofRecord::HDel { key, fields } => {
format::write_bytes(&mut buf, key.as_bytes())?;
format::write_len(&mut buf, fields.len())?;
for field in fields {
format::write_bytes(&mut buf, field.as_bytes())?;
}
}

// key + scored members
AofRecord::ZAdd { key, members } => {
format::write_u8(&mut buf, TAG_ZADD)?;
format::write_bytes(&mut buf, key.as_bytes())?;
format::write_len(&mut buf, members.len())?;
for (score, member) in members {
format::write_f64(&mut buf, *score)?;
format::write_bytes(&mut buf, member.as_bytes())?;
}
}
AofRecord::ZRem { key, members } => {
format::write_u8(&mut buf, TAG_ZREM)?;
format::write_bytes(&mut buf, key.as_bytes())?;
format::write_len(&mut buf, members.len())?;
for member in members {
format::write_bytes(&mut buf, member.as_bytes())?;
}
}
AofRecord::Persist { key } => {
format::write_u8(&mut buf, TAG_PERSIST)?;
format::write_bytes(&mut buf, key.as_bytes())?;
}
AofRecord::Pexpire { key, milliseconds } => {
format::write_u8(&mut buf, TAG_PEXPIRE)?;
format::write_bytes(&mut buf, key.as_bytes())?;
format::write_i64(&mut buf, *milliseconds as i64)?;
}
AofRecord::Incr { key } => {
format::write_u8(&mut buf, TAG_INCR)?;
format::write_bytes(&mut buf, key.as_bytes())?;
}
AofRecord::Decr { key } => {
format::write_u8(&mut buf, TAG_DECR)?;
format::write_bytes(&mut buf, key.as_bytes())?;
}

// key + field-value pairs
AofRecord::HSet { key, fields } => {
format::write_u8(&mut buf, TAG_HSET)?;
format::write_bytes(&mut buf, key.as_bytes())?;
format::write_len(&mut buf, fields.len())?;
for (field, value) in fields {
format::write_bytes(&mut buf, field.as_bytes())?;
format::write_bytes(&mut buf, value)?;
}
}
AofRecord::HDel { key, fields } => {
format::write_u8(&mut buf, TAG_HDEL)?;
format::write_bytes(&mut buf, key.as_bytes())?;
format::write_len(&mut buf, fields.len())?;
for field in fields {
format::write_bytes(&mut buf, field.as_bytes())?;
}
}

// key + field + delta
AofRecord::HIncrBy { key, field, delta } => {
format::write_u8(&mut buf, TAG_HINCRBY)?;
format::write_bytes(&mut buf, key.as_bytes())?;
format::write_bytes(&mut buf, field.as_bytes())?;
format::write_i64(&mut buf, *delta)?;
}
AofRecord::SAdd { key, members } => {
format::write_u8(&mut buf, TAG_SADD)?;
format::write_bytes(&mut buf, key.as_bytes())?;
format::write_len(&mut buf, members.len())?;
for member in members {
format::write_bytes(&mut buf, member.as_bytes())?;
}
}
AofRecord::SRem { key, members } => {
format::write_u8(&mut buf, TAG_SREM)?;
format::write_bytes(&mut buf, key.as_bytes())?;
format::write_len(&mut buf, members.len())?;
for member in members {
format::write_bytes(&mut buf, member.as_bytes())?;
}
}
AofRecord::IncrBy { key, delta } => {
format::write_u8(&mut buf, TAG_INCRBY)?;
format::write_bytes(&mut buf, key.as_bytes())?;
format::write_i64(&mut buf, *delta)?;
}
AofRecord::DecrBy { key, delta } => {
format::write_u8(&mut buf, TAG_DECRBY)?;
format::write_bytes(&mut buf, key.as_bytes())?;
format::write_i64(&mut buf, *delta)?;
}

// key + bytes value (no expire)
AofRecord::Append { key, value } => {
format::write_u8(&mut buf, TAG_APPEND)?;
format::write_bytes(&mut buf, key.as_bytes())?;
format::write_bytes(&mut buf, value)?;
}

// key + newkey
AofRecord::Rename { key, newkey } => {
format::write_u8(&mut buf, TAG_RENAME)?;
format::write_bytes(&mut buf, key.as_bytes())?;
format::write_bytes(&mut buf, newkey.as_bytes())?;
}

#[cfg(feature = "vector")]
AofRecord::VAdd {
key,
Expand All @@ -438,7 +443,6 @@ impl AofRecord {
connectivity,
expansion_add,
} => {
format::write_u8(&mut buf, TAG_VADD)?;
format::write_bytes(&mut buf, key.as_bytes())?;
format::write_bytes(&mut buf, element.as_bytes())?;
format::write_len(&mut buf, vector.len())?;
Expand All @@ -452,26 +456,24 @@ impl AofRecord {
}
#[cfg(feature = "vector")]
AofRecord::VRem { key, element } => {
format::write_u8(&mut buf, TAG_VREM)?;
format::write_bytes(&mut buf, key.as_bytes())?;
format::write_bytes(&mut buf, element.as_bytes())?;
}

#[cfg(feature = "protobuf")]
AofRecord::ProtoSet {
key,
type_name,
data,
expire_ms,
} => {
format::write_u8(&mut buf, TAG_PROTO_SET)?;
format::write_bytes(&mut buf, key.as_bytes())?;
format::write_bytes(&mut buf, type_name.as_bytes())?;
format::write_bytes(&mut buf, data)?;
format::write_i64(&mut buf, *expire_ms)?;
}
#[cfg(feature = "protobuf")]
AofRecord::ProtoRegister { name, descriptor } => {
format::write_u8(&mut buf, TAG_PROTO_REGISTER)?;
format::write_bytes(&mut buf, name.as_bytes())?;
format::write_bytes(&mut buf, descriptor)?;
}
Expand Down
57 changes: 17 additions & 40 deletions crates/ember-persistence/src/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ const TYPE_VECTOR: u8 = 6;
#[cfg(feature = "protobuf")]
const TYPE_PROTO: u8 = 5;

/// Reads a UTF-8 string from a length-prefixed byte field.
#[cfg(feature = "encryption")]
fn read_snap_string(r: &mut impl io::Read, field: &str) -> Result<String, FormatError> {
let bytes = format::read_bytes(r)?;
/// Converts raw bytes to a UTF-8 string, returning a descriptive error
/// on invalid data. `field` names the field for the error message
/// (e.g. "key", "member", "hash field").
fn parse_utf8(bytes: Vec<u8>, field: &str) -> Result<String, FormatError> {
String::from_utf8(bytes).map_err(|_| {
FormatError::Io(io::Error::new(
io::ErrorKind::InvalidData,
Expand All @@ -53,6 +53,13 @@ fn read_snap_string(r: &mut impl io::Read, field: &str) -> Result<String, Format
})
}

/// Reads a UTF-8 string from a length-prefixed byte field.
#[cfg(feature = "encryption")]
fn read_snap_string(r: &mut impl io::Read, field: &str) -> Result<String, FormatError> {
let bytes = format::read_bytes(r)?;
parse_utf8(bytes, field)
}

/// Parses a type-tagged SnapValue from a reader (v2+ format).
///
/// Used by `read_encrypted_entry` to parse the `[type_tag][payload]`
Expand Down Expand Up @@ -608,12 +615,7 @@ impl SnapshotReader {
format::write_f64(&mut buf, score)?;
let member_bytes = format::read_bytes(&mut self.reader)?;
format::write_bytes(&mut buf, &member_bytes)?;
let member = String::from_utf8(member_bytes).map_err(|_| {
FormatError::Io(io::Error::new(
io::ErrorKind::InvalidData,
"member is not valid utf-8",
))
})?;
let member = parse_utf8(member_bytes, "member")?;
members.push((score, member));
}
SnapValue::SortedSet(members)
Expand All @@ -626,12 +628,7 @@ impl SnapshotReader {
for _ in 0..count {
let field_bytes = format::read_bytes(&mut self.reader)?;
format::write_bytes(&mut buf, &field_bytes)?;
let field = String::from_utf8(field_bytes).map_err(|_| {
FormatError::Io(io::Error::new(
io::ErrorKind::InvalidData,
"hash field is not valid utf-8",
))
})?;
let field = parse_utf8(field_bytes, "hash field")?;
let value_bytes = format::read_bytes(&mut self.reader)?;
format::write_bytes(&mut buf, &value_bytes)?;
map.insert(field, Bytes::from(value_bytes));
Expand All @@ -646,12 +643,7 @@ impl SnapshotReader {
for _ in 0..count {
let member_bytes = format::read_bytes(&mut self.reader)?;
format::write_bytes(&mut buf, &member_bytes)?;
let member = String::from_utf8(member_bytes).map_err(|_| {
FormatError::Io(io::Error::new(
io::ErrorKind::InvalidData,
"set member is not valid utf-8",
))
})?;
let member = parse_utf8(member_bytes, "set member")?;
set.insert(member);
}
SnapValue::Set(set)
Expand Down Expand Up @@ -696,12 +688,7 @@ impl SnapshotReader {
for _ in 0..count {
let name_bytes = format::read_bytes(&mut self.reader)?;
format::write_bytes(&mut buf, &name_bytes)?;
let name = String::from_utf8(name_bytes).map_err(|_| {
FormatError::Io(io::Error::new(
io::ErrorKind::InvalidData,
"vector element name is not valid utf-8",
))
})?;
let name = parse_utf8(name_bytes, "vector element name")?;
let mut vector = Vec::with_capacity(dim as usize);
for _ in 0..dim {
let v = format::read_f32(&mut self.reader)?;
Expand All @@ -723,12 +710,7 @@ impl SnapshotReader {
TYPE_PROTO => {
let type_name_bytes = format::read_bytes(&mut self.reader)?;
format::write_bytes(&mut buf, &type_name_bytes)?;
let type_name = String::from_utf8(type_name_bytes).map_err(|_| {
FormatError::Io(io::Error::new(
io::ErrorKind::InvalidData,
"proto type_name is not valid utf-8",
))
})?;
let type_name = parse_utf8(type_name_bytes, "proto type_name")?;
let data = format::read_bytes(&mut self.reader)?;
format::write_bytes(&mut buf, &data)?;
SnapValue::Proto {
Expand All @@ -746,12 +728,7 @@ impl SnapshotReader {
format::write_i64(&mut buf, expire_ms)?;
self.hasher.update(&buf);

let key = String::from_utf8(key_bytes).map_err(|_| {
FormatError::Io(io::Error::new(
io::ErrorKind::InvalidData,
"key is not valid utf-8",
))
})?;
let key = parse_utf8(key_bytes, "key")?;

self.read_so_far += 1;
Ok(Some(SnapEntry {
Expand Down
Loading