Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/partition-store/src/deduplication_table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ fn get_dedup_sequence_number<S: StorageAccess>(
.partition_id(partition_id.into())
.producer_id(producer_id.clone());

storage.get_value(key)
storage.get_value_proto(key)
}

impl ReadDeduplicationTable for PartitionStore {
Expand Down Expand Up @@ -66,6 +66,6 @@ impl WriteDeduplicationTable for PartitionStoreTransaction<'_> {
.partition_id(self.partition_id().into())
.producer_id(producer_id);

self.put_kv(key, dedup_sequence_number)
self.put_kv_proto(key, dedup_sequence_number)
}
}
32 changes: 25 additions & 7 deletions crates/partition-store/src/fsm_table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use restate_types::SemanticRestateVersion;
use restate_types::identifiers::PartitionId;
use restate_types::logs::Lsn;
use restate_types::message::MessageIndex;
use restate_types::schema::Schema;

use crate::TableKind::PartitionStateMachine;
use crate::keys::{KeyKind, define_table_key};
Expand All @@ -37,7 +38,10 @@ pub(crate) mod fsm_variable {
pub(crate) const PARTITION_DURABILITY: u64 = 4;

/// Schema versions are represented as a strictly monotonically increasing number.
pub(crate) const SCHEMA_VERSION: u64 = 5;
/// This represent the partition storage schema version, not the user services schema.
pub(crate) const STORAGE_VERSION: u64 = 5;

pub(crate) const SERVICES_SCHEMA_METADATA: u64 = 6;
}

fn get<T: PartitionStoreProtobufValue, S: StorageAccess>(
Expand All @@ -51,7 +55,7 @@ where
let key = PartitionStateMachineKey::default()
.partition_id(partition_id.into())
.state_id(state_id);
storage.get_value(key)
storage.get_value_proto(key)
}

/// Forces a read from persistent storage, bypassing memtables and block cache.
Expand All @@ -78,7 +82,7 @@ fn put<S: StorageAccess>(
let key = PartitionStateMachineKey::default()
.partition_id(partition_id.into())
.state_id(state_id);
storage.put_kv(key, state_value)
storage.put_kv_proto(key, state_value)
}

pub async fn get_locally_durable_lsn(partition_store: &mut PartitionStore) -> Result<Option<Lsn>> {
Expand All @@ -90,23 +94,23 @@ pub async fn get_locally_durable_lsn(partition_store: &mut PartitionStore) -> Re
.map(|opt| opt.map(|seq_number| Lsn::from(u64::from(seq_number))))
}

pub(crate) async fn get_schema_version<S: StorageAccess>(
pub(crate) async fn get_storage_version<S: StorageAccess>(
storage: &mut S,
partition_id: PartitionId,
) -> Result<u16> {
get::<SequenceNumber, _>(storage, partition_id, fsm_variable::SCHEMA_VERSION)
get::<SequenceNumber, _>(storage, partition_id, fsm_variable::STORAGE_VERSION)
.map(|opt| opt.map(|s| s.0 as u16).unwrap_or_default())
}

pub(crate) async fn put_schema_version<S: StorageAccess>(
pub(crate) async fn put_storage_version<S: StorageAccess>(
storage: &mut S,
partition_id: PartitionId,
last_executed_migration: u16,
) -> Result<()> {
put(
storage,
partition_id,
fsm_variable::SCHEMA_VERSION,
fsm_variable::STORAGE_VERSION,
&SequenceNumber::from(last_executed_migration as u64),
)
}
Expand Down Expand Up @@ -143,6 +147,13 @@ impl ReadFsmTable for PartitionStore {
fsm_variable::PARTITION_DURABILITY,
)
}

async fn get_schema(&mut self) -> Result<Option<Schema>> {
let key = PartitionStateMachineKey::default()
.partition_id(self.partition_id().into())
.state_id(fsm_variable::SERVICES_SCHEMA_METADATA);
self.get_value_storage_codec(key)
}
}

impl WriteFsmTable for PartitionStoreTransaction<'_> {
Expand Down Expand Up @@ -190,4 +201,11 @@ impl WriteFsmTable for PartitionStoreTransaction<'_> {
durability,
)
}

fn put_schema(&mut self, schema: &Schema) -> Result<()> {
let key = PartitionStateMachineKey::default()
.partition_id(self.partition_id().into())
.state_id(fsm_variable::SERVICES_SCHEMA_METADATA);
self.put_kv_storage_codec(key, schema)
}
}
4 changes: 2 additions & 2 deletions crates/partition-store/src/idempotency_table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,15 @@ fn get_idempotency_metadata<S: StorageAccess>(
storage: &mut S,
idempotency_id: &IdempotencyId,
) -> Result<Option<IdempotencyMetadata>> {
storage.get_value(create_key(idempotency_id))
storage.get_value_proto(create_key(idempotency_id))
}

fn put_idempotency_metadata<S: StorageAccess>(
storage: &mut S,
idempotency_id: &IdempotencyId,
metadata: &IdempotencyMetadata,
) -> Result<()> {
storage.put_kv(create_key(idempotency_id), metadata)
storage.put_kv_proto(create_key(idempotency_id), metadata)
}

fn delete_idempotency_metadata<S: StorageAccess>(
Expand Down
2 changes: 1 addition & 1 deletion crates/partition-store/src/inbox_table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ impl WriteInboxTable for PartitionStoreTransaction<'_> {
.service_key(service_id.key.clone())
.sequence_number(inbox_sequence_number);

self.put_kv(key, inbox_entry)
self.put_kv_proto(key, inbox_entry)
}

fn delete_inbox_entry(&mut self, service_id: &ServiceId, sequence_number: u64) -> Result<()> {
Expand Down
4 changes: 2 additions & 2 deletions crates/partition-store/src/invocation_status_table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ fn put_invocation_status<S: StorageAccess>(
) -> Result<()> {
match status {
InvocationStatus::Free => storage.delete_key(&create_invocation_status_key(invocation_id)),
_ => storage.put_kv(create_invocation_status_key(invocation_id), status),
_ => storage.put_kv_proto(create_invocation_status_key(invocation_id), status),
}
}

Expand All @@ -86,7 +86,7 @@ fn get_invocation_status<S: StorageAccess>(
let _x = RocksDbPerfGuard::new("get-invocation-status");

storage
.get_value::<_, InvocationStatus>(create_invocation_status_key(invocation_id))
.get_value_proto::<_, InvocationStatus>(create_invocation_status_key(invocation_id))
.map(|value| {
if let Some(invocation_status) = value {
invocation_status
Expand Down
2 changes: 1 addition & 1 deletion crates/partition-store/src/journal_events/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ fn put_journal_event<S: StorageAccess>(
lsn: u64,
) -> Result<()> {
let (event_ty, event_value) = event.event.into_inner();
storage.put_kv(
storage.put_kv_proto(
write_journal_event_key(
invocation_id,
event_ty as u8,
Expand Down
4 changes: 2 additions & 2 deletions crates/partition-store/src/journal_table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ fn put_journal_entry<S: StorageAccess>(
) -> Result<()> {
let key = write_journal_entry_key(invocation_id, journal_index);

storage.put_kv(key, journal_entry)
storage.put_kv_proto(key, journal_entry)
}

fn get_journal_entry<S: StorageAccess>(
Expand All @@ -66,7 +66,7 @@ fn get_journal_entry<S: StorageAccess>(
) -> Result<Option<JournalEntry>> {
let key = write_journal_entry_key(invocation_id, journal_index);

storage.get_value(key)
storage.get_value_proto(key)
}

fn get_journal<S: StorageAccess>(
Expand Down
16 changes: 9 additions & 7 deletions crates/partition-store/src/journal_table_v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ fn put_journal_entry<S: StorageAccess>(
related_completion_ids: &[CompletionId],
) -> Result<()> {
if let RawEntry::Notification(notification) = &journal_entry.inner {
storage.put_kv(
storage.put_kv_proto(
JournalNotificationIdToNotificationIndexKey::default()
.partition_key(invocation_id.partition_key())
.invocation_uuid(invocation_id.invocation_uuid())
Expand All @@ -90,7 +90,7 @@ fn put_journal_entry<S: StorageAccess>(
)?;
} else if let RawEntry::Command(_) = &journal_entry.inner {
for completion_id in related_completion_ids {
storage.put_kv(
storage.put_kv_proto(
JournalCompletionIdToCommandIndexKey::default()
.partition_key(invocation_id.partition_key())
.invocation_uuid(invocation_id.invocation_uuid())
Expand All @@ -100,7 +100,7 @@ fn put_journal_entry<S: StorageAccess>(
}
}

storage.put_kv(
storage.put_kv_proto(
write_journal_entry_key(invocation_id, journal_index),
&StoredEntry(journal_entry.clone()),
)
Expand All @@ -112,7 +112,7 @@ fn get_journal_entry<S: StorageAccess>(
journal_index: u32,
) -> Result<Option<StoredRawEntry>> {
let key = write_journal_entry_key(invocation_id, journal_index);
let opt: Option<StoredEntry> = storage.get_value(key)?;
let opt: Option<StoredEntry> = storage.get_value_proto(key)?;
Ok(opt.map(|e| e.0))
}

Expand Down Expand Up @@ -251,15 +251,15 @@ fn get_command_by_completion_id<S: StorageAccess>(
.partition_key(invocation_id.partition_key())
.invocation_uuid(invocation_id.invocation_uuid())
.completion_id(completion_id);
let opt: Option<JournalEntryIndex> = storage.get_value(completion_id_to_command_index)?;
let opt: Option<JournalEntryIndex> = storage.get_value_proto(completion_id_to_command_index)?;
if opt.is_none() {
return Ok(None);
}

// Now access the entry
let journal_index = opt.unwrap().0;
let key = write_journal_entry_key(&invocation_id, journal_index);
let opt: Option<StoredEntry> = storage.get_value(key)?;
let opt: Option<StoredEntry> = storage.get_value_proto(key)?;
if opt.is_none() {
return Ok(None);
}
Expand Down Expand Up @@ -287,7 +287,9 @@ fn has_completion<S: StorageAccess>(
.partition_key(invocation_id.partition_key())
.invocation_uuid(invocation_id.invocation_uuid())
.notification_id(NotificationId::CompletionId(completion_id));
Ok(storage.get_value::<_, JournalEntryIndex>(key)?.is_some())
Ok(storage
.get_value_proto::<_, JournalEntryIndex>(key)?
.is_some())
}

impl ReadJournalTable for PartitionStore {
Expand Down
4 changes: 2 additions & 2 deletions crates/partition-store/src/outbox_table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ fn add_message<S: StorageAccess>(
.partition_id(partition_id.into())
.message_index(message_index);

storage.put_kv(key, outbox_message)
storage.put_kv_proto(key, outbox_message)
}

fn get_outbox_head_seq_number<S: StorageAccess>(
Expand Down Expand Up @@ -103,7 +103,7 @@ fn get_outbox_message<S: StorageAccess>(
.partition_id(partition_id.into())
.message_index(sequence_number);

storage.get_value(outbox_key)
storage.get_value_proto(outbox_key)
}

fn truncate_outbox<S: StorageAccess>(
Expand Down
64 changes: 42 additions & 22 deletions crates/partition-store/src/partition_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,10 @@ use restate_types::identifiers::{PartitionId, PartitionKey, SnapshotId, WithPart
use restate_types::logs::Lsn;
use restate_types::partitions::Partition;
use restate_types::storage::StorageCodec;
use restate_types::storage::StorageDecode;
use restate_types::storage::StorageEncode;

use crate::fsm_table::{get_locally_durable_lsn, get_schema_version, put_schema_version};
use crate::fsm_table::{get_locally_durable_lsn, get_storage_version, put_storage_version};
use crate::keys::KeyKind;
use crate::keys::TableKey;
use crate::migrations::{LATEST_VERSION, SchemaVersion};
Expand Down Expand Up @@ -679,15 +681,15 @@ impl PartitionStore {

pub async fn verify_and_run_migrations(&mut self) -> Result<()> {
let mut schema_version: SchemaVersion =
get_schema_version(self, self.partition_id()).await?.into();
get_storage_version(self, self.partition_id()).await?.into();
if schema_version != LATEST_VERSION {
// We need to run some migrations!
debug!(
"Running storage migration from {:?} to {:?}",
schema_version, LATEST_VERSION
);
schema_version = schema_version.run_all_migrations(self).await?;
put_schema_version(self, self.partition_id(), schema_version as u16).await?;
put_storage_version(self, self.partition_id(), schema_version as u16).await?;
}

Ok(())
Expand Down Expand Up @@ -1071,7 +1073,19 @@ pub(crate) trait StorageAccess {
}

#[inline]
fn put_kv<K: TableKey, V: PartitionStoreProtobufValue + Clone + 'static>(
fn put_kv_proto<K: TableKey, V: PartitionStoreProtobufValue + Clone + 'static>(
&mut self,
key: K,
value: &V,
) -> Result<()> {
self.put_kv_storage_codec(
key,
&ProtobufStorageWrapper::<V::ProtobufType>(value.clone().into()),
)
}

#[inline]
fn put_kv_storage_codec<K: TableKey, V: StorageEncode + 'static>(
&mut self,
key: K,
value: &V,
Expand All @@ -1081,11 +1095,7 @@ pub(crate) trait StorageAccess {
let key_buffer = key_buffer.split();

let value_buffer = self.cleared_value_buffer_mut(0);
StorageCodec::encode(
&ProtobufStorageWrapper::<V::ProtobufType>(value.clone().into()),
value_buffer,
)
.map_err(|e| StorageError::Generic(e.into()))?;
StorageCodec::encode(value, value_buffer).map_err(|e| StorageError::Generic(e.into()))?;
let value_buffer = value_buffer.split();

self.put_cf(K::TABLE, key_buffer, value_buffer)
Expand All @@ -1101,29 +1111,39 @@ pub(crate) trait StorageAccess {
}

#[inline]
fn get_value<K, V>(&mut self, key: K) -> Result<Option<V>>
fn get_value_proto<K, V>(&mut self, key: K) -> Result<Option<V>>
where
K: TableKey,
V: PartitionStoreProtobufValue,
<<V as PartitionStoreProtobufValue>::ProtobufType as TryInto<V>>::Error:
Into<anyhow::Error>,
{
let value: Option<ProtobufStorageWrapper<V::ProtobufType>> =
self.get_value_storage_codec(key)?;

value
.map(|v| v.0.try_into())
.transpose()
.map_err(|err| StorageError::Conversion(err.into()))
}

#[inline]
fn get_value_storage_codec<K, V>(&mut self, key: K) -> Result<Option<V>>
where
K: TableKey,
V: StorageDecode,
{
let mut buf = self.cleared_key_buffer_mut(key.serialized_length());
key.serialize_to(&mut buf);
let buf = buf.split();

match self.get(K::TABLE, &buf) {
Ok(value) => {
let slice = value.as_ref().map(|v| v.as_ref());

if let Some(mut slice) = slice {
Ok(Some(V::decode(&mut slice)?))
} else {
Ok(None)
}
}
Err(err) => Err(err),
}
self.get(K::TABLE, &buf)?
.map(|value| {
let mut slice = value.as_ref();
StorageCodec::decode(&mut slice)
})
.transpose()
.map_err(|err| StorageError::Generic(err.into()))
}

/// Forces a read from persistent storage, bypassing memtables and block cache.
Expand Down
4 changes: 2 additions & 2 deletions crates/partition-store/src/promise_table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ fn get_promise<S: StorageAccess>(
key: &ByteString,
) -> Result<Option<Promise>> {
let _x = RocksDbPerfGuard::new("get-promise");
storage.get_value(create_key(service_id, key))
storage.get_value_proto(create_key(service_id, key))
}

fn put_promise<S: StorageAccess>(
Expand All @@ -62,7 +62,7 @@ fn put_promise<S: StorageAccess>(
key: &ByteString,
metadata: &Promise,
) -> Result<()> {
storage.put_kv(create_key(service_id, key), metadata)
storage.put_kv_proto(create_key(service_id, key), metadata)
}

fn delete_all_promises<S: StorageAccess>(storage: &mut S, service_id: &ServiceId) -> Result<()> {
Expand Down
Loading
Loading