Skip to content

Commit 12f2c27

Browse files
committed
Add partition-store schema metadata accessors
## Summary - add a dedicated metadata slot and read/write helpers for storing service schemas in the partition-store FSM table - extend the storage-api FSM table traits with schema accessors so callers can persist schema metadata
1 parent 40d3eb0 commit 12f2c27

File tree

3 files changed

+60
-17
lines changed

3 files changed

+60
-17
lines changed

crates/partition-store/src/fsm_table/mod.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use restate_types::SemanticRestateVersion;
1717
use restate_types::identifiers::PartitionId;
1818
use restate_types::logs::Lsn;
1919
use restate_types::message::MessageIndex;
20+
use restate_types::schema::Schema;
2021

2122
use crate::TableKind::PartitionStateMachine;
2223
use crate::keys::{KeyKind, define_table_key};
@@ -37,7 +38,10 @@ pub(crate) mod fsm_variable {
3738
pub(crate) const PARTITION_DURABILITY: u64 = 4;
3839

3940
/// Schema versions are represented as a strictly monotonically increasing number.
41+
/// This represent the partition storage schema version, not the user services schema.
4042
pub(crate) const SCHEMA_VERSION: u64 = 5;
43+
44+
pub(crate) const SERVICES_SCHEMA_METADATA: u64 = 6;
4145
}
4246

4347
fn get<T: PartitionStoreProtobufValue, S: StorageAccess>(
@@ -143,6 +147,13 @@ impl ReadFsmTable for PartitionStore {
143147
fsm_variable::PARTITION_DURABILITY,
144148
)
145149
}
150+
151+
async fn get_schema(&mut self) -> Result<Option<Schema>> {
152+
let key = PartitionStateMachineKey::default()
153+
.partition_id(self.partition_id().into())
154+
.state_id(fsm_variable::SERVICES_SCHEMA_METADATA);
155+
self.get_value_storage_codec(key)
156+
}
146157
}
147158

148159
impl WriteFsmTable for PartitionStoreTransaction<'_> {
@@ -190,4 +201,11 @@ impl WriteFsmTable for PartitionStoreTransaction<'_> {
190201
durability,
191202
)
192203
}
204+
205+
fn put_schema(&mut self, schema: &Schema) -> Result<()> {
206+
let key = PartitionStateMachineKey::default()
207+
.partition_id(self.partition_id().into())
208+
.state_id(fsm_variable::SERVICES_SCHEMA_METADATA);
209+
self.put_kv_storage_codec(key, schema)
210+
}
193211
}

crates/partition-store/src/partition_store.rs

Lines changed: 37 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ use restate_types::identifiers::{PartitionId, PartitionKey, SnapshotId, WithPart
3838
use restate_types::logs::Lsn;
3939
use restate_types::partitions::Partition;
4040
use restate_types::storage::StorageCodec;
41+
use restate_types::storage::StorageDecode;
42+
use restate_types::storage::StorageEncode;
4143

4244
use crate::fsm_table::{get_locally_durable_lsn, get_schema_version, put_schema_version};
4345
use crate::keys::KeyKind;
@@ -1075,17 +1077,25 @@ pub(crate) trait StorageAccess {
10751077
&mut self,
10761078
key: K,
10771079
value: &V,
1080+
) -> Result<()> {
1081+
self.put_kv_storage_codec(
1082+
key,
1083+
&ProtobufStorageWrapper::<V::ProtobufType>(value.clone().into()),
1084+
)
1085+
}
1086+
1087+
#[inline]
1088+
fn put_kv_storage_codec<K: TableKey, V: StorageEncode + 'static>(
1089+
&mut self,
1090+
key: K,
1091+
value: &V,
10781092
) -> Result<()> {
10791093
let key_buffer = self.cleared_key_buffer_mut(key.serialized_length());
10801094
key.serialize_to(key_buffer);
10811095
let key_buffer = key_buffer.split();
10821096

10831097
let value_buffer = self.cleared_value_buffer_mut(0);
1084-
StorageCodec::encode(
1085-
&ProtobufStorageWrapper::<V::ProtobufType>(value.clone().into()),
1086-
value_buffer,
1087-
)
1088-
.map_err(|e| StorageError::Generic(e.into()))?;
1098+
StorageCodec::encode(value, value_buffer).map_err(|e| StorageError::Generic(e.into()))?;
10891099
let value_buffer = value_buffer.split();
10901100

10911101
self.put_cf(K::TABLE, key_buffer, value_buffer)
@@ -1107,23 +1117,33 @@ pub(crate) trait StorageAccess {
11071117
V: PartitionStoreProtobufValue,
11081118
<<V as PartitionStoreProtobufValue>::ProtobufType as TryInto<V>>::Error:
11091119
Into<anyhow::Error>,
1120+
{
1121+
let value: Option<ProtobufStorageWrapper<V::ProtobufType>> =
1122+
self.get_value_storage_codec(key)?;
1123+
1124+
value
1125+
.map(|v| v.0.try_into())
1126+
.transpose()
1127+
.map_err(|err| StorageError::Conversion(err.into()))
1128+
}
1129+
1130+
#[inline]
1131+
fn get_value_storage_codec<K, V>(&mut self, key: K) -> Result<Option<V>>
1132+
where
1133+
K: TableKey,
1134+
V: StorageDecode,
11101135
{
11111136
let mut buf = self.cleared_key_buffer_mut(key.serialized_length());
11121137
key.serialize_to(&mut buf);
11131138
let buf = buf.split();
11141139

1115-
match self.get(K::TABLE, &buf) {
1116-
Ok(value) => {
1117-
let slice = value.as_ref().map(|v| v.as_ref());
1118-
1119-
if let Some(mut slice) = slice {
1120-
Ok(Some(V::decode(&mut slice)?))
1121-
} else {
1122-
Ok(None)
1123-
}
1124-
}
1125-
Err(err) => Err(err),
1126-
}
1140+
self.get(K::TABLE, &buf)?
1141+
.map(|value| {
1142+
let mut slice = value.as_ref();
1143+
StorageCodec::decode(&mut slice)
1144+
})
1145+
.transpose()
1146+
.map_err(|err| StorageError::Generic(err.into()))
11271147
}
11281148

11291149
/// Forces a read from persistent storage, bypassing memtables and block cache.

crates/storage-api/src/fsm_table/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use std::future::Future;
1313
use restate_types::SemanticRestateVersion;
1414
use restate_types::logs::Lsn;
1515
use restate_types::message::MessageIndex;
16+
use restate_types::schema::Schema;
1617
use restate_types::time::MillisSinceEpoch;
1718

1819
use crate::Result;
@@ -32,6 +33,8 @@ pub trait ReadFsmTable {
3233
fn get_partition_durability(
3334
&mut self,
3435
) -> impl Future<Output = Result<Option<PartitionDurability>>> + Send + '_;
36+
37+
fn get_schema(&mut self) -> impl Future<Output = Result<Option<Schema>>> + Send + '_;
3538
}
3639

3740
pub trait WriteFsmTable {
@@ -44,6 +47,8 @@ pub trait WriteFsmTable {
4447
fn put_min_restate_version(&mut self, version: &SemanticRestateVersion) -> Result<()>;
4548

4649
fn put_partition_durability(&mut self, durability: &PartitionDurability) -> Result<()>;
50+
51+
fn put_schema(&mut self, schema: &Schema) -> Result<()>;
4752
}
4853

4954
#[derive(Debug, Clone, Copy, derive_more::From, derive_more::Into)]

0 commit comments

Comments
 (0)