Skip to content

Commit

Permalink
feat: append versionstamp to key (#40)
Browse files Browse the repository at this point in the history
This adds a new `M_SET_SUFFIX_VERSIONSTAMPED_KEY` mutation type that is
same as `M_SET` except that the hex-encoded versionstamp of the current
atomic operation is appended to the key.
  • Loading branch information
losfair authored Dec 5, 2023
1 parent 801b32a commit e667027
Show file tree
Hide file tree
Showing 7 changed files with 45 additions and 2 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions proto/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,11 @@ impl TryFrom<pb::AtomicWrite> for AtomicWrite {
.ok_or(ConvertError::DecodeError)?;
MutationKind::Max(value)
}
(pb::MutationType::MSetSuffixVersionstampedKey, Some(value)) => {
let value = decode_value(value.data, value.encoding as i64)
.ok_or(ConvertError::DecodeError)?;
MutationKind::SetSuffixVersionstampedKey(value)
}
_ => return Err(ConvertError::InvalidMutationKind),
};
let expire_at = match mutation.expire_at_ms {
Expand Down
2 changes: 2 additions & 0 deletions proto/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ pub enum MutationKind {
Sum(KvValue),
Min(KvValue),
Max(KvValue),
SetSuffixVersionstampedKey(KvValue),
}

impl MutationKind {
Expand All @@ -332,6 +333,7 @@ impl MutationKind {
MutationKind::Sum(value) => Some(value),
MutationKind::Min(value) => Some(value),
MutationKind::Max(value) => Some(value),
MutationKind::SetSuffixVersionstampedKey(value) => Some(value),
MutationKind::Delete => None,
}
}
Expand Down
2 changes: 2 additions & 0 deletions proto/schema/datapath.proto
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ enum MutationType {
M_MAX = 4;
// Max the stored value with the new value. Both values must be LE64 encoded.
M_MIN = 5;
// Set the value, with the versionstamp appended to the end of the key as a string.
M_SET_SUFFIX_VERSIONSTAMPED_KEY = 9;
}

// The encoding of a value.
Expand Down
8 changes: 8 additions & 0 deletions remote/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,14 @@ impl<P: RemotePermissions> Database for Remote<P> {
expire_at_ms,
});
}
denokv_proto::MutationKind::SetSuffixVersionstampedKey(value) => {
mutations.push(pb::Mutation {
key: mutation.key,
value: Some(encode_value_to_pb(value)),
mutation_type: pb::MutationType::MSetSuffixVersionstampedKey as _,
expire_at_ms,
});
}
}
}

Expand Down
1 change: 1 addition & 0 deletions sqlite/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ async-trait.workspace = true
chrono.workspace = true
denokv_proto.workspace = true
futures.workspace = true
hex.workspace = true
log.workspace = true
num-bigint.workspace = true
rand.workspace = true
Expand Down
28 changes: 26 additions & 2 deletions sqlite/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ impl SqliteBackend {
let version: i64 = tx
.prepare_cached(STATEMENT_INC_AND_GET_DATA_VERSION)?
.query_row([incrementer_count], |row| row.get(0))?;
let new_versionstamp = version_to_versionstamp(version);

for mutation in &write.mutations {
match &mutation.kind {
Expand Down Expand Up @@ -326,6 +327,31 @@ impl SqliteBackend {
a.max(b)
})?;
}
MutationKind::SetSuffixVersionstampedKey(value) => {
let mut versionstamp_suffix = [0u8; 22];
versionstamp_suffix[0] = 0x02;
hex::encode_to_slice(
new_versionstamp,
&mut versionstamp_suffix[1..21],
)
.unwrap();

let key = [&mutation.key[..], &versionstamp_suffix[..]].concat();

let (value, encoding) = encode_value(value);
let changed =
tx.prepare_cached(STATEMENT_KV_POINT_SET)?.execute(params![
key,
value,
&encoding,
&version,
mutation
.expire_at
.map(|time| time.timestamp_millis())
.unwrap_or(-1i64)
])?;
assert_eq!(changed, 1)
}
}
}

Expand Down Expand Up @@ -355,8 +381,6 @@ impl SqliteBackend {
assert_eq!(changed, 1)
}

let new_versionstamp = version_to_versionstamp(version);

Ok((
has_enqueues,
Some(CommitResult {
Expand Down

0 comments on commit e667027

Please sign in to comment.