Skip to content

Commit 500dcee

Browse files
committed
[PartitionStore] Separate concrete keys from key builder
Better type separation between keys and key prefix builders leads to removal of unnecessary "expect()" calls, increased efficiency (Option adds 8 bytes on each field on the key), and safer code. You can only deserialize full keys, and you can serialize key builders and keys alike.
1 parent da4f72d commit 500dcee

File tree

19 files changed

+406
-388
lines changed

19 files changed

+406
-388
lines changed

crates/partition-store/src/deduplication_table/mod.rs

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,15 +25,24 @@ define_table_key!(
2525
DeduplicationKey(partition_id: PaddedPartitionId, producer_id: ProducerId)
2626
);
2727

28+
#[inline]
29+
fn create_key(
30+
partition_id: impl Into<PaddedPartitionId>,
31+
producer_id: ProducerId,
32+
) -> DeduplicationKey {
33+
DeduplicationKey {
34+
partition_id: partition_id.into(),
35+
producer_id,
36+
}
37+
}
38+
2839
fn get_dedup_sequence_number<S: StorageAccess>(
2940
storage: &mut S,
3041
partition_id: PartitionId,
3142
producer_id: &ProducerId,
3243
) -> Result<Option<DedupSequenceNumber>> {
3344
let _x = RocksDbPerfGuard::new("get-dedup-seq");
34-
let key = DeduplicationKey::default()
35-
.partition_id(partition_id.into())
36-
.producer_id(producer_id.clone());
45+
let key = create_key(partition_id, producer_id.clone());
3746

3847
storage.get_value_proto(key)
3948
}
@@ -62,9 +71,7 @@ impl WriteDeduplicationTable for PartitionStoreTransaction<'_> {
6271
producer_id: ProducerId,
6372
dedup_sequence_number: &DedupSequenceNumber,
6473
) -> Result<()> {
65-
let key = DeduplicationKey::default()
66-
.partition_id(self.partition_id().into())
67-
.producer_id(producer_id);
74+
let key = create_key(self.partition_id(), producer_id);
6875

6976
self.put_kv_proto(key, dedup_sequence_number)
7077
}

crates/partition-store/src/durable_lsn_tracking.rs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -157,10 +157,8 @@ fn extract_partition_applied_lsn(
157157
}
158158

159159
let fsm_key = PartitionStateMachineKey::deserialize_from(&mut key)?;
160-
if fsm_key.state_id == Some(fsm_variable::APPLIED_LSN)
161-
&& let Some(padded_partition_id) = fsm_key.partition_id
162-
{
163-
let partition_id = PartitionId::from(padded_partition_id);
160+
if fsm_key.state_id == fsm_variable::APPLIED_LSN {
161+
let partition_id = PartitionId::from(fsm_key.partition_id);
164162
let applied_lsn = SequenceNumber::decode(&mut value).map(u64::from)?.into();
165163
return Ok(Some((partition_id, applied_lsn)));
166164
}
@@ -184,8 +182,8 @@ mod tests {
184182
let lsn = Lsn::from(12345);
185183

186184
let applied_lsn_key = PartitionStateMachineKey {
187-
partition_id: Some(partition_id.into()),
188-
state_id: Some(fsm_variable::APPLIED_LSN),
185+
partition_id: partition_id.into(),
186+
state_id: fsm_variable::APPLIED_LSN,
189187
};
190188

191189
let mut key_buf = BytesMut::new();
@@ -224,8 +222,8 @@ mod tests {
224222
let lsn = Lsn::from(12345);
225223

226224
let other_fsm_key = PartitionStateMachineKey {
227-
partition_id: Some(partition_id.into()),
228-
state_id: Some(fsm_variable::INBOX_SEQ_NUMBER),
225+
partition_id: partition_id.into(),
226+
state_id: fsm_variable::INBOX_SEQ_NUMBER,
229227
};
230228

231229
let mut key_buf = BytesMut::new();

crates/partition-store/src/fsm_table/mod.rs

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,17 @@ define_table_key!(
2929
PartitionStateMachineKey(partition_id: PaddedPartitionId, state_id: u64)
3030
);
3131

32+
#[inline]
33+
fn create_key(
34+
partition_id: impl Into<PaddedPartitionId>,
35+
state_id: u64,
36+
) -> PartitionStateMachineKey {
37+
PartitionStateMachineKey {
38+
partition_id: partition_id.into(),
39+
state_id,
40+
}
41+
}
42+
3243
pub(crate) mod fsm_variable {
3344
pub(crate) const INBOX_SEQ_NUMBER: u64 = 0;
3445
pub(crate) const OUTBOX_SEQ_NUMBER: u64 = 1;
@@ -52,10 +63,7 @@ fn get<T: PartitionStoreProtobufValue, S: StorageAccess>(
5263
where
5364
<<T as PartitionStoreProtobufValue>::ProtobufType as TryInto<T>>::Error: Into<anyhow::Error>,
5465
{
55-
let key = PartitionStateMachineKey::default()
56-
.partition_id(partition_id.into())
57-
.state_id(state_id);
58-
storage.get_value_proto(key)
66+
storage.get_value_proto(create_key(partition_id, state_id))
5967
}
6068

6169
/// Forces a read from persistent storage, bypassing memtables and block cache.
@@ -67,10 +75,7 @@ fn get_durable<T: PartitionStoreProtobufValue, S: StorageAccess>(
6775
where
6876
<<T as PartitionStoreProtobufValue>::ProtobufType as TryInto<T>>::Error: Into<anyhow::Error>,
6977
{
70-
let key = PartitionStateMachineKey::default()
71-
.partition_id(partition_id.into())
72-
.state_id(state_id);
73-
storage.get_durable_value(key)
78+
storage.get_durable_value(create_key(partition_id, state_id))
7479
}
7580

7681
fn put<S: StorageAccess>(
@@ -79,9 +84,10 @@ fn put<S: StorageAccess>(
7984
state_id: u64,
8085
state_value: &(impl PartitionStoreProtobufValue + Clone + 'static),
8186
) -> Result<()> {
82-
let key = PartitionStateMachineKey::default()
83-
.partition_id(partition_id.into())
84-
.state_id(state_id);
87+
let key = PartitionStateMachineKey {
88+
partition_id: partition_id.into(),
89+
state_id,
90+
};
8591
storage.put_kv_proto(key, state_value)
8692
}
8793

@@ -149,9 +155,7 @@ impl ReadFsmTable for PartitionStore {
149155
}
150156

151157
async fn get_schema(&mut self) -> Result<Option<Schema>> {
152-
let key = PartitionStateMachineKey::default()
153-
.partition_id(self.partition_id().into())
154-
.state_id(fsm_variable::SERVICES_SCHEMA_METADATA);
158+
let key = create_key(self.partition_id(), fsm_variable::SERVICES_SCHEMA_METADATA);
155159
self.get_value_storage_codec(key)
156160
}
157161
}
@@ -203,9 +207,7 @@ impl WriteFsmTable for PartitionStoreTransaction<'_> {
203207
}
204208

205209
fn put_schema(&mut self, schema: &Schema) -> Result<()> {
206-
let key = PartitionStateMachineKey::default()
207-
.partition_id(self.partition_id().into())
208-
.state_id(fsm_variable::SERVICES_SCHEMA_METADATA);
210+
let key = create_key(self.partition_id(), fsm_variable::SERVICES_SCHEMA_METADATA);
209211
self.put_kv_storage_codec(key, schema)
210212
}
211213
}

crates/partition-store/src/idempotency_table/mod.rs

Lines changed: 21 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -37,20 +37,21 @@ define_table_key!(
3737
)
3838
);
3939

40+
#[inline]
4041
fn create_key(idempotency_id: &IdempotencyId) -> IdempotencyKey {
41-
IdempotencyKey::default()
42-
.partition_key(idempotency_id.partition_key())
43-
.service_name(idempotency_id.service_name.clone())
44-
.service_key(
45-
idempotency_id
46-
.service_key
47-
.as_ref()
48-
.cloned()
49-
.unwrap_or_default()
50-
.into_bytes(),
51-
)
52-
.service_handler(idempotency_id.service_handler.clone())
53-
.idempotency_key(idempotency_id.idempotency_key.clone())
42+
IdempotencyKey {
43+
partition_key: idempotency_id.partition_key(),
44+
service_name: idempotency_id.service_name.clone(),
45+
service_key: idempotency_id
46+
.service_key
47+
.as_ref()
48+
.cloned()
49+
.unwrap_or_default()
50+
.into_bytes(),
51+
52+
service_handler: idempotency_id.service_handler.clone(),
53+
idempotency_key: idempotency_id.idempotency_key.clone(),
54+
}
5455
}
5556

5657
fn get_idempotency_metadata<S: StorageAccess>(
@@ -103,17 +104,13 @@ impl ScanIdempotencyTable for PartitionStore {
103104
let idempotency_metadata = break_on_err(IdempotencyMetadata::decode(&mut value))?;
104105

105106
let idempotency_id = IdempotencyId::new(
106-
break_on_err(key.service_name_ok_or())?.clone(),
107-
break_on_err(
108-
key.service_key
109-
.clone()
110-
.map(|b| {
111-
ByteString::try_from(b).map_err(|e| StorageError::Generic(e.into()))
112-
})
113-
.transpose(),
114-
)?,
115-
break_on_err(key.service_handler_ok_or())?.clone(),
116-
break_on_err(key.idempotency_key_ok_or())?.clone(),
107+
key.service_name,
108+
Some(break_on_err(
109+
ByteString::try_from(key.service_key)
110+
.map_err(|e| StorageError::Generic(e.into())),
111+
)?),
112+
key.service_handler,
113+
key.idempotency_key,
117114
);
118115

119116
f((idempotency_id, idempotency_metadata)).map_break(Ok)

crates/partition-store/src/inbox_table/mod.rs

Lines changed: 36 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
// the Business Source License, use of this software will be governed
99
// by the Apache License, Version 2.0.
1010

11-
use std::io::Cursor;
1211
use std::ops::ControlFlow;
1312

1413
use bytestring::ByteString;
@@ -46,7 +45,7 @@ fn peek_inbox<S: StorageAccess>(
4645
storage: &mut S,
4746
service_id: &ServiceId,
4847
) -> Result<Option<SequenceNumberInboxEntry>> {
49-
let key = InboxKey::default()
48+
let key = InboxKey::builder()
5049
.partition_key(service_id.partition_key())
5150
.service_name(service_id.service_name.clone())
5251
.service_key(service_id.key.clone());
@@ -67,7 +66,7 @@ fn inbox<S: StorageAccess>(
6766
storage: &mut S,
6867
service_id: &ServiceId,
6968
) -> Result<impl Stream<Item = Result<SequenceNumberInboxEntry>> + Send> {
70-
let key = InboxKey::default()
69+
let key = InboxKey::builder()
7170
.partition_key(service_id.partition_key())
7271
.service_name(service_id.service_name.clone())
7372
.service_key(service_id.key.clone());
@@ -113,10 +112,13 @@ impl ScanInboxTable for PartitionStore {
113112
TableScan::FullScanPartitionKeyRange::<InboxKey>(range),
114113
move |(mut key, mut value)| {
115114
let key = break_on_err(InboxKey::deserialize_from(&mut key))?;
116-
let sequence_number = *break_on_err(key.sequence_number_ok_or())?;
117115
let inbox_entry = break_on_err(InboxEntry::decode(&mut value))?;
118116

119-
f(SequenceNumberInboxEntry::new(sequence_number, inbox_entry)).map_break(Ok)
117+
f(SequenceNumberInboxEntry::new(
118+
key.sequence_number,
119+
inbox_entry,
120+
))
121+
.map_break(Ok)
120122
},
121123
)
122124
.map_err(|_| StorageError::OperationalError)
@@ -150,11 +152,12 @@ impl WriteInboxTable for PartitionStoreTransaction<'_> {
150152
let service_id = inbox_entry.service_id();
151153
self.assert_partition_key(service_id)?;
152154

153-
let key = InboxKey::default()
154-
.partition_key(service_id.partition_key())
155-
.service_name(service_id.service_name.clone())
156-
.service_key(service_id.key.clone())
157-
.sequence_number(inbox_sequence_number);
155+
let key = InboxKey {
156+
partition_key: service_id.partition_key(),
157+
service_name: service_id.service_name.clone(),
158+
service_key: service_id.key.clone(),
159+
sequence_number: inbox_sequence_number,
160+
};
158161

159162
self.put_kv_proto(key, inbox_entry)
160163
}
@@ -185,50 +188,48 @@ fn delete_inbox_entry(
185188
service_id: &ServiceId,
186189
sequence_number: u64,
187190
) -> Result<()> {
188-
let key = InboxKey::default()
189-
.partition_key(service_id.partition_key())
190-
.service_name(service_id.service_name.clone())
191-
.service_key(service_id.key.clone())
192-
.sequence_number(sequence_number);
191+
let key = InboxKey {
192+
partition_key: service_id.partition_key(),
193+
service_name: service_id.service_name.clone(),
194+
service_key: service_id.key.clone(),
195+
sequence_number,
196+
};
193197

194198
txn.delete_key(&key)
195199
}
196200

197-
fn decode_inbox_key_value(k: &[u8], mut v: &[u8]) -> Result<SequenceNumberInboxEntry> {
198-
let key = InboxKey::deserialize_from(&mut Cursor::new(k))?;
199-
let sequence_number = *key.sequence_number_ok_or()?;
200-
201+
fn decode_inbox_key_value(mut k: &[u8], mut v: &[u8]) -> Result<SequenceNumberInboxEntry> {
202+
let key = InboxKey::deserialize_from(&mut k)?;
201203
let inbox_entry = InboxEntry::decode(&mut v)?;
202204

203-
Ok(SequenceNumberInboxEntry::new(sequence_number, inbox_entry))
205+
Ok(SequenceNumberInboxEntry::new(
206+
key.sequence_number,
207+
inbox_entry,
208+
))
204209
}
205210

206211
#[cfg(test)]
207212
mod tests {
208213
use crate::inbox_table::InboxKey;
209-
use crate::keys::TableKey;
210-
use bytes::{Bytes, BytesMut};
214+
use crate::keys::TableKeyPrefix;
215+
use bytes::Bytes;
211216
use restate_types::identifiers::{ServiceId, WithPartitionKey};
212217

213218
fn message_key(service_id: &ServiceId, sequence_number: u64) -> Bytes {
214219
let key = InboxKey {
215-
partition_key: Some(service_id.partition_key()),
216-
service_name: Some(service_id.service_name.clone()),
217-
service_key: Some(service_id.key.clone()),
218-
sequence_number: Some(sequence_number),
220+
partition_key: service_id.partition_key(),
221+
service_name: service_id.service_name.clone(),
222+
service_key: service_id.key.clone(),
223+
sequence_number,
219224
};
220-
let mut buf = BytesMut::new();
221-
key.serialize_to(&mut buf);
222-
buf.freeze()
225+
key.serialize().freeze()
223226
}
224227

225228
fn inbox_key(service_id: &ServiceId) -> Bytes {
226-
let key = InboxKey {
227-
partition_key: Some(service_id.partition_key()),
228-
service_name: Some(service_id.service_name.clone()),
229-
service_key: Some(service_id.key.clone()),
230-
sequence_number: None,
231-
};
229+
let key = InboxKey::builder()
230+
.partition_key(service_id.partition_key())
231+
.service_name(service_id.service_name.clone())
232+
.service_key(service_id.key.clone());
232233

233234
key.serialize().freeze()
234235
}

0 commit comments

Comments
 (0)