Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/partition-store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ rand = { workspace = true }
serde_json = { workspace = true }
tempfile = { workspace = true }
tracing-subscriber = { workspace = true }
tracing-test = { workspace = true }

[[bench]]
name = "basic_benchmark"
Expand Down
18 changes: 9 additions & 9 deletions crates/partition-store/src/partition_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@ use restate_types::partitions::{CfName, Partition};

use crate::durable_lsn_tracking::{AppliedLsnCollectorFactory, DurableLsnEventListener};
use crate::memory::MemoryBudget;
use crate::snapshots::LocalPartitionSnapshot;
use crate::snapshots::{ArchivedLsn, LocalPartitionSnapshot};

type SmartString = smartstring::SmartString<smartstring::LazyCompact>;

#[derive(Clone)]
pub struct PartitionDb {
meta: Arc<Partition>,
durable_lsn: watch::Sender<Option<Lsn>>,
archived_lsn: watch::Sender<Option<Lsn>>,
archived_lsn: watch::Sender<Option<ArchivedLsn>>,
// Note: Rust will drop the fields in the order they are declared in the struct.
// It's crucial to keep the column family and the database in this exact order.
cf: PartitionBoundCfHandle,
Expand All @@ -48,7 +48,7 @@ pub struct PartitionDb {
impl PartitionDb {
pub(crate) fn new(
meta: Arc<Partition>,
archived_lsn: watch::Sender<Option<Lsn>>,
archived_lsn: watch::Sender<Option<ArchivedLsn>>,
rocksdb: Arc<RocksDb>,
cf: Arc<BoundColumnFamily<'_>>,
) -> Self {
Expand Down Expand Up @@ -94,10 +94,10 @@ impl PartitionDb {
.await
}

pub(crate) fn note_archived_lsn(&self, lsn: Lsn) -> bool {
pub(crate) fn note_archived_lsn(&self, archived_lsn: ArchivedLsn) -> bool {
self.archived_lsn.send_if_modified(|current| {
if current.is_none_or(|c| lsn > c) {
*current = Some(lsn);
if current.as_mut().is_none_or(|c| &archived_lsn > c) {
*current = Some(archived_lsn);
true
} else {
false
Expand All @@ -106,11 +106,11 @@ impl PartitionDb {
}

/// The last (locally) known archived LSN for this partition
pub fn get_archived_lsn(&self) -> Option<Lsn> {
pub fn get_archived_lsn(&self) -> Option<ArchivedLsn> {
*self.archived_lsn.borrow()
}

pub fn watch_archived_lsn(&self) -> watch::Receiver<Option<Lsn>> {
pub fn watch_archived_lsn(&self) -> watch::Receiver<Option<ArchivedLsn>> {
self.archived_lsn.subscribe()
}

Expand Down Expand Up @@ -171,7 +171,7 @@ impl PartitionBoundCfHandle {

pub(crate) struct PartitionCell {
meta: Arc<Partition>,
archived_lsn: watch::Sender<Option<Lsn>>,
archived_lsn: watch::Sender<Option<ArchivedLsn>>,
durable_lsn: RwLock<Option<watch::Sender<Option<Lsn>>>>,
pub(crate) inner: AsyncRwLock<State>,
}
Expand Down
7 changes: 5 additions & 2 deletions crates/partition-store/src/partition_store_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use restate_types::partitions::Partition;
use crate::SnapshotError;
use crate::memory::MemoryController;
use crate::partition_db::{AllDataCf, PartitionCell, PartitionDb, RocksConfigurator};
use crate::snapshots::{LocalPartitionSnapshot, Snapshots};
use crate::snapshots::{ArchivedLsn, LocalPartitionSnapshot, Snapshots};
use crate::{BuildError, OpenError, PartitionStore, SnapshotErrorKind};

const PARTITION_CF_PREFIX: &str = "data-";
Expand Down Expand Up @@ -183,7 +183,10 @@ impl PartitionStoreManager {
self.snapshots.is_repository_configured()
}

pub async fn refresh_latest_archived_lsn(&self, partition_id: PartitionId) -> Option<Lsn> {
pub async fn refresh_latest_archived_lsn(
&self,
partition_id: PartitionId,
) -> Option<ArchivedLsn> {
let db = self.get_partition_db(partition_id).await?;
self.snapshots.refresh_latest_archived_lsn(db).await
}
Expand Down
16 changes: 10 additions & 6 deletions crates/partition-store/src/snapshots.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ use std::sync::Arc;
use crate::{PartitionDb, PartitionStore, SnapshotError, SnapshotErrorKind};

pub use self::metadata::*;
pub use self::repository::SnapshotRepository;
pub use self::repository::{ArchivedLsn, SnapshotRepository};
pub use self::snapshot_task::*;

use tokio::sync::Semaphore;
use tracing::{debug, instrument, warn};

use restate_types::config::Configuration;
use restate_types::identifiers::{PartitionId, SnapshotId};
use restate_types::logs::{Lsn, SequenceNumber};
use restate_types::logs::Lsn;

#[derive(Clone)]
pub struct Snapshots {
Expand Down Expand Up @@ -85,7 +85,7 @@ impl Snapshots {
})
}

pub async fn refresh_latest_archived_lsn(&self, db: PartitionDb) -> Option<Lsn> {
pub async fn refresh_latest_archived_lsn(&self, db: PartitionDb) -> Option<ArchivedLsn> {
let Some(repository) = &self.repository else {
return None;
};
Expand All @@ -94,10 +94,14 @@ impl Snapshots {
let archived_lsn = repository
.get_latest_archived_lsn(partition_id)
.await
.inspect(|lsn| debug!(?partition_id, "Latest archived LSN: {}", lsn))
.inspect_err(|err| warn!(?partition_id, "Unable to get latest archived LSN: {}", err))
.inspect_err(|err| {
warn!(
?partition_id,
"Unable to get archived LSN from snapshot repository: {}", err
)
})
.ok()
.unwrap_or(Lsn::INVALID);
.unwrap_or(ArchivedLsn::None);
db.note_archived_lsn(archived_lsn);
Some(archived_lsn)
}
Expand Down
Loading
Loading