From 6a29424915464b6e7f1fb163421f92c8d2ad30dd Mon Sep 17 00:00:00 2001 From: Ahmed Farghal Date: Fri, 26 Jul 2024 10:30:19 +0100 Subject: [PATCH] [Bifrost] Basic BifrostAdmin interface It now hosts `trim` and introduces the ability to seal and extend the chain with a pre-determined loglet params --- .../admin/src/cluster_controller/service.rs | 85 +++++--- crates/bifrost/Cargo.toml | 4 +- crates/bifrost/src/bifrost.rs | 46 ++-- crates/bifrost/src/bifrost_admin.rs | 205 ++++++++++++++++++ crates/bifrost/src/error.rs | 18 +- crates/bifrost/src/lib.rs | 2 + crates/bifrost/src/read_stream.rs | 84 +++---- crates/core/src/metadata/mod.rs | 2 +- crates/core/src/metadata_store/mod.rs | 8 + crates/node/src/roles/admin.rs | 8 +- crates/types/src/config/bifrost.rs | 8 + crates/types/src/logs/metadata.rs | 1 + 12 files changed, 373 insertions(+), 98 deletions(-) create mode 100644 crates/bifrost/src/bifrost_admin.rs diff --git a/crates/admin/src/cluster_controller/service.rs b/crates/admin/src/cluster_controller/service.rs index 6b14c6af93..098ee8cf00 100644 --- a/crates/admin/src/cluster_controller/service.rs +++ b/crates/admin/src/cluster_controller/service.rs @@ -15,21 +15,23 @@ use std::sync::Arc; use codederror::CodedError; use futures::future::OptionFuture; use futures::{Stream, StreamExt}; +use restate_core::metadata_store::MetadataStoreClient; use tokio::sync::{mpsc, oneshot}; use tokio::time; use tokio::time::{Instant, Interval, MissedTickBehavior}; use tracing::{debug, warn}; use restate_types::config::{AdminOptions, Configuration}; -use restate_types::live::LiveLoad; +use restate_types::live::Live; use restate_types::net::cluster_controller::{Action, AttachRequest, AttachResponse, RunPartition}; use restate_types::net::RequestId; use restate_types::partition_table::{FixedPartitionTable, KeyRange}; -use restate_bifrost::Bifrost; +use restate_bifrost::{Bifrost, BifrostAdmin}; use restate_core::network::{MessageRouterBuilder, NetworkSender}; use restate_core::{ - cancellation_watcher, Metadata, ShutdownError, TargetVersion, TaskCenter, TaskKind, + cancellation_watcher, Metadata, MetadataWriter, ShutdownError, TargetVersion, TaskCenter, + TaskKind, }; use restate_types::cluster::cluster_state::RunMode; use restate_types::cluster::cluster_state::{AliveNode, ClusterState, NodeState}; @@ -58,7 +60,9 @@ pub struct Service { command_tx: mpsc::Sender, command_rx: mpsc::Receiver, - configuration: Box + Send + Sync>, + configuration: Live, + metadata_writer: MetadataWriter, + metadata_store_client: MetadataStoreClient, heartbeat_interval: time::Interval, log_trim_interval: Option, log_trim_threshold: Lsn, @@ -69,7 +73,9 @@ where N: NetworkSender + 'static, { pub fn new( - mut configuration: impl LiveLoad + Send + Sync + 'static, + configuration: Live, + metadata_writer: MetadataWriter, + metadata_store_client: MetadataStoreClient, task_center: TaskCenter, metadata: Metadata, networking: N, @@ -85,13 +91,15 @@ where router_builder, ); - let options = configuration.live_load(); + let options = &configuration.pinned().admin; let heartbeat_interval = Self::create_heartbeat_interval(options); let (log_trim_interval, log_trim_threshold) = Self::create_log_trim_interval(options); Service { - configuration: Box::new(configuration), + configuration, + metadata_writer, + metadata_store_client, task_center, metadata, networking, @@ -189,6 +197,8 @@ where ) -> anyhow::Result<()> { // Make sure we have partition table before starting let _ = self.metadata.wait_for_partition_table(Version::MIN).await?; + let bifrost_admin = + BifrostAdmin::new(&bifrost, &self.metadata_writer, &self.metadata_store_client); let mut shutdown = std::pin::pin!(cancellation_watcher()); let mut config_watcher = Configuration::watcher(); @@ -216,21 +226,25 @@ where let _ = self.cluster_state_refresher.schedule_refresh(); }, _ = OptionFuture::from(self.log_trim_interval.as_mut().map(|interval| interval.tick())) => { - let result = self.trim_logs(&bifrost).await; + let result = self.trim_logs(bifrost_admin).await; if let Err(err) = result { warn!("Could not trim the logs. This can lead to increased disk usage: {err}"); } } Some(cmd) = self.command_rx.recv() => { - self.on_cluster_cmd(cmd, &bifrost).await; + self.on_cluster_cmd(cmd, bifrost_admin).await; } Some(message) = self.incoming_messages.next() => { let (from, message) = message.split(); self.on_attach_request(from, message)?; } _ = config_watcher.changed() => { - self.on_config_update(); + debug!("Updating the cluster controller settings."); + let options = &self.configuration.live_load().admin; + + self.heartbeat_interval = Self::create_heartbeat_interval(options); + (self.log_trim_interval, self.log_trim_threshold) = Self::create_log_trim_interval(options); } _ = &mut shutdown => { return Ok(()); @@ -239,15 +253,10 @@ where } } - fn on_config_update(&mut self) { - debug!("Updating the cluster controller settings."); - let options = self.configuration.live_load(); - - self.heartbeat_interval = Self::create_heartbeat_interval(options); - (self.log_trim_interval, self.log_trim_threshold) = Self::create_log_trim_interval(options); - } - - async fn trim_logs(&self, bifrost: &Bifrost) -> Result<(), restate_bifrost::Error> { + async fn trim_logs( + &self, + bifrost_admin: BifrostAdmin<'_>, + ) -> Result<(), restate_bifrost::Error> { let cluster_state = self.cluster_state_refresher.get_cluster_state(); let mut persisted_lsns_per_partition: BTreeMap< @@ -283,20 +292,24 @@ where let min_persisted_lsn = persisted_lsns.into_values().min().unwrap_or(Lsn::INVALID); let log_id = LogId::from(partition_id); // trim point is before the oldest record - let current_trim_point = bifrost.get_trim_point(log_id).await?; + let current_trim_point = bifrost_admin.get_trim_point(log_id).await?; if min_persisted_lsn >= current_trim_point + self.log_trim_threshold { debug!( "Automatic trim log '{log_id}' for all records before='{min_persisted_lsn}'" ); - bifrost.trim(log_id, min_persisted_lsn).await? + bifrost_admin.trim(log_id, min_persisted_lsn).await? } } Ok(()) } - async fn on_cluster_cmd(&self, command: ClusterControllerCommand, bifrost: &Bifrost) { + async fn on_cluster_cmd( + &self, + command: ClusterControllerCommand, + bifrost_admin: BifrostAdmin<'_>, + ) { match command { ClusterControllerCommand::GetClusterState(tx) => { let _ = tx.send(self.cluster_state_refresher.get_cluster_state()); @@ -307,14 +320,14 @@ where response_tx, } => { debug!("Manual trim log '{log_id}' until (inclusive) lsn='{trim_point}'"); - let result = bifrost.trim(log_id, trim_point).await; + let result = bifrost_admin.trim(log_id, trim_point).await; let _ = response_tx.send(result.map_err(Into::into)); } } } fn on_attach_request( - &mut self, + &self, from: GenerationalNodeId, request: AttachRequest, ) -> Result<(), ShutdownError> { @@ -412,9 +425,9 @@ mod tests { use restate_core::network::{MessageHandler, NetworkSender}; use restate_core::{MockNetworkSender, TaskKind, TestCoreEnvBuilder}; use restate_types::cluster::cluster_state::PartitionProcessorStatus; - use restate_types::config::AdminOptions; + use restate_types::config::{AdminOptions, Configuration}; use restate_types::identifiers::PartitionId; - use restate_types::live::Constant; + use restate_types::live::Live; use restate_types::logs::{LogId, Lsn, Payload, SequenceNumber}; use restate_types::net::partition_processor_manager::{ GetProcessorsState, ProcessorsStateResponse, @@ -432,7 +445,9 @@ mod tests { let mut builder = TestCoreEnvBuilder::new_with_mock_network(); let svc = Service::new( - Constant::new(AdminOptions::default()), + Live::from_value(Configuration::default()), + builder.metadata_writer.clone(), + builder.metadata_store_client.clone(), builder.tc.clone(), builder.metadata.clone(), builder.network_sender.clone(), @@ -518,9 +533,15 @@ mod tests { admin_options.log_trim_threshold = 5; let interval_duration = Duration::from_secs(10); admin_options.log_trim_interval = Some(interval_duration.into()); + let config = Configuration { + admin: admin_options, + ..Default::default() + }; let svc = Service::new( - Constant::new(admin_options), + Live::from_value(config), + builder.metadata_writer.clone(), + builder.metadata_store_client.clone(), builder.tc.clone(), builder.metadata.clone(), builder.network_sender.clone(), @@ -608,9 +629,15 @@ mod tests { admin_options.log_trim_threshold = 0; let interval_duration = Duration::from_secs(10); admin_options.log_trim_interval = Some(interval_duration.into()); + let config = Configuration { + admin: admin_options, + ..Default::default() + }; let svc = Service::new( - Constant::new(admin_options), + Live::from_value(config), + builder.metadata_writer.clone(), + builder.metadata_store_client.clone(), builder.tc.clone(), builder.metadata.clone(), builder.network_sender.clone(), diff --git a/crates/bifrost/Cargo.toml b/crates/bifrost/Cargo.toml index 0ba1bc66a3..38f39ef8b8 100644 --- a/crates/bifrost/Cargo.toml +++ b/crates/bifrost/Cargo.toml @@ -10,14 +10,14 @@ publish = false [features] default = [] options_schema = ["dep:schemars"] -replicated-loglet = ["restate-types/replicated-loglet", "restate-metadata-store"] +replicated-loglet = ["restate-types/replicated-loglet"] test-util = [] [dependencies] restate-core = { workspace = true } restate-rocksdb = { workspace = true } restate-types = { workspace = true } -restate-metadata-store = { workspace = true, optional = true } +restate-metadata-store = { workspace = true } anyhow = { workspace = true } async-trait = { workspace = true } diff --git a/crates/bifrost/src/bifrost.rs b/crates/bifrost/src/bifrost.rs index 4a47581771..50bbaf58c4 100644 --- a/crates/bifrost/src/bifrost.rs +++ b/crates/bifrost/src/bifrost.rs @@ -39,7 +39,7 @@ use crate::{ /// Bifrost handle is relatively cheap to clone. #[derive(Clone)] pub struct Bifrost { - inner: Arc, + pub(crate) inner: Arc, } impl Bifrost { @@ -163,18 +163,6 @@ impl Bifrost { self.inner.get_trim_point(log_id).await } - /// Trim the log prefix up to and including the `trim_point`. - /// Set `trim_point` to the value returned from `find_tail()` or `Lsn::MAX` to - /// trim all records of the log. - /// - /// Note that trim does not promise that the log will be trimmed immediately and atomically, - /// but they are promise to trim prefixes, earlier segments in the log will be trimmed - /// before later segments and the log will not be in an inconsistent state at any point. - #[instrument(level = "debug", skip(self), err)] - pub async fn trim(&self, log_id: LogId, trim_point: Lsn) -> Result<(), Error> { - self.inner.trim(log_id, trim_point).await - } - /// The version of the currently loaded logs metadata pub fn version(&self) -> Version { self.inner.metadata.logs_version() @@ -383,7 +371,7 @@ impl BifrostInner { Ok(trim_point.unwrap_or(Lsn::INVALID)) } - async fn trim(&self, log_id: LogId, trim_point: Lsn) -> Result<(), Error> { + pub async fn trim(&self, log_id: LogId, trim_point: Lsn) -> Result<(), Error> { self.fail_if_shutting_down()?; let log_metadata = self.metadata.logs(); @@ -437,7 +425,7 @@ impl BifrostInner { .ok_or_else(|| Error::Disabled(kind.to_string())) } - async fn writeable_loglet(&self, log_id: LogId) -> Result { + pub async fn writeable_loglet(&self, log_id: LogId) -> Result { let log_metadata = self.metadata.logs(); let tail_segment = log_metadata .chain(&log_id) @@ -446,11 +434,7 @@ impl BifrostInner { self.get_loglet(log_id, tail_segment).await } - pub(crate) async fn find_loglet_for_lsn( - &self, - log_id: LogId, - lsn: Lsn, - ) -> Result { + pub async fn find_loglet_for_lsn(&self, log_id: LogId, lsn: Lsn) -> Result { let log_metadata = self.metadata.logs(); let maybe_segment = log_metadata .chain(&log_id) @@ -463,7 +447,7 @@ impl BifrostInner { } } - async fn get_loglet( + pub async fn get_loglet( &self, log_id: LogId, segment: Segment<'_>, @@ -494,7 +478,7 @@ mod tests { use restate_types::metadata_store::keys::BIFROST_CONFIG_KEY; use restate_types::Versioned; - use crate::{Record, TrimGap}; + use crate::{BifrostAdmin, Record, TrimGap}; use restate_core::{metadata, TestCoreEnv}; use restate_core::{task_center, TestCoreEnvBuilder}; use restate_rocksdb::RocksDbManager; @@ -614,6 +598,11 @@ mod tests { let log_id = LogId::from(0); let bifrost = Bifrost::init_local(metadata()).await; + let bifrost_admin = BifrostAdmin::new( + &bifrost, + &node_env.metadata_writer, + &node_env.metadata_store_client, + ); assert_eq!( Lsn::OLDEST, @@ -630,7 +619,7 @@ mod tests { bifrost.append(log_id, Payload::default()).await?; } - bifrost.trim(log_id, Lsn::from(5)).await?; + bifrost_admin.trim(log_id, Lsn::from(5)).await?; let tail = bifrost .find_tail(log_id, FindTailAttributes::default()) @@ -665,7 +654,7 @@ mod tests { } // trimming beyond the release point will fall back to the release point - bifrost.trim(log_id, Lsn::MAX).await?; + bifrost_admin.trim(log_id, Lsn::MAX).await?; assert_eq!( Lsn::from(11), @@ -712,6 +701,11 @@ mod tests { let tc = node_env.tc; tc.run_in_scope("test", None, async { let bifrost = Bifrost::init_in_memory(metadata()).await; + let bifrost_admin = BifrostAdmin::new( + &bifrost, + &node_env.metadata_writer, + &node_env.metadata_store_client, + ); // Lsns [1..5] for i in 1..=5 { @@ -736,7 +730,9 @@ mod tests { .await?; // seal the segment - segment_1.seal().await?; + bifrost_admin + .seal(LOG_ID, segment_1.segment_index()) + .await?; // sealed, tail is what we expect assert_that!( diff --git a/crates/bifrost/src/bifrost_admin.rs b/crates/bifrost/src/bifrost_admin.rs new file mode 100644 index 0000000000..7c6fc4e62d --- /dev/null +++ b/crates/bifrost/src/bifrost_admin.rs @@ -0,0 +1,205 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::ops::Deref; + +use tracing::{info, instrument}; + +use restate_core::{MetadataKind, MetadataWriter}; +use restate_metadata_store::MetadataStoreClient; +use restate_types::config::Configuration; +use restate_types::logs::builder::BuilderError; +use restate_types::logs::metadata::{LogletParams, Logs, ProviderKind, SegmentIndex}; +use restate_types::logs::{LogId, Lsn}; +use restate_types::metadata_store::keys::BIFROST_CONFIG_KEY; +use restate_types::Version; + +use crate::error::AdminError; +use crate::loglet::LogletBase; +use crate::{Bifrost, Error, Result, TailState}; + +/// Bifrost's Admin API +#[derive(Clone, Copy)] +pub struct BifrostAdmin<'a> { + bifrost: &'a Bifrost, + metadata_writer: &'a MetadataWriter, + metadata_store_client: &'a MetadataStoreClient, +} + +impl<'a> AsRef for BifrostAdmin<'a> { + fn as_ref(&self) -> &Bifrost { + self.bifrost + } +} + +impl<'a> Deref for BifrostAdmin<'a> { + type Target = Bifrost; + fn deref(&self) -> &Self::Target { + self.bifrost + } +} + +impl<'a> BifrostAdmin<'a> { + pub fn new( + bifrost: &'a Bifrost, + metadata_writer: &'a MetadataWriter, + metadata_store_client: &'a MetadataStoreClient, + ) -> Self { + Self { + bifrost, + metadata_writer, + metadata_store_client, + } + } + /// Trim the log prefix up to and including the `trim_point`. + /// Set `trim_point` to the value returned from `find_tail()` or `Lsn::MAX` to + /// trim all records of the log. + /// + /// Note that trim does not promise that the log will be trimmed immediately and atomically, + /// but they are promise to trim prefixes, earlier segments in the log will be trimmed + /// before later segments and the log will not be in an inconsistent state at any point. + #[instrument(level = "debug", skip(self), err)] + pub async fn trim(&self, log_id: LogId, trim_point: Lsn) -> Result<()> { + self.bifrost.inner.trim(log_id, trim_point).await + } + + /// Seals a loglet under a set of conditions. + /// + /// The loglet will be sealed if and only if the following is true: + /// - log metadata is at least at version `min_version`. If not, this will wait for this + /// version to be synced (set to `Version::MIN` to ignore this step) + /// - if segment_index is set, the tail loglet must match segment_index. + /// + /// This will continue to retry sealing for seal retryable errors automatically. + #[instrument(level = "debug", skip(self), err)] + pub async fn seal_and_extend_chain( + &self, + log_id: LogId, + segment_index: Option, + min_version: Version, + provider: ProviderKind, + params: LogletParams, + ) -> Result<()> { + let _ = self + .bifrost + .inner + .metadata + .wait_for_version(MetadataKind::Logs, min_version) + .await?; + + let segment_index = segment_index + .or_else(|| { + self.bifrost + .inner + .metadata + .logs() + .chain(&log_id) + .map(|c| c.tail_index()) + }) + .ok_or(Error::UnknownLogId(log_id))?; + + let tail = self.seal(log_id, segment_index).await?; + assert!(tail.is_sealed()); + + self.add_segment_with_params(log_id, segment_index, tail.offset(), provider, params) + .await?; + Ok(()) + } + + #[instrument(level = "debug", skip(self), err)] + pub(crate) async fn seal( + &self, + log_id: LogId, + segment_index: SegmentIndex, + ) -> Result { + // first find the tail segment for this log. + let loglet = self.bifrost.inner.writeable_loglet(log_id).await?; + + if segment_index != loglet.segment_index() { + // Not the same segment. Bail! + return Err(AdminError::SegmentMismatch { + expected: segment_index, + found: loglet.segment_index(), + } + .into()); + } + + while let Err(e) = loglet.seal().await { + match e { + crate::loglet::OperationError::Shutdown(e) => return Err(e.into()), + crate::loglet::OperationError::Other(e) if e.retryable() => { + // sleep and retry later. + info!( + log_id = %log_id, + segment = %segment_index, + loglet = ?loglet, + ?e, + "Seal operation failed. Retrying later..." + ); + tokio::time::sleep(Configuration::pinned().bifrost.seal_retry_interval.into()) + .await; + } + crate::loglet::OperationError::Other(e) => { + // give up. + return Err(Error::LogletError(e)); + } + } + } + + Ok(loglet.find_tail().await?) + } + + /// Adds a segment to the end of the chain + /// + /// The loglet must be sealed first. This operations assumes that the loglet with + /// `last_segment_index` has been sealed prior to this call. + #[instrument(level = "debug", skip(self), err)] + async fn add_segment_with_params( + &self, + log_id: LogId, + last_segment_index: SegmentIndex, + base_lsn: Lsn, + provider: ProviderKind, + params: LogletParams, + ) -> Result<()> { + let logs = self + .metadata_store_client + .read_modify_write(BIFROST_CONFIG_KEY.clone(), move |logs: Option| { + let logs = logs.ok_or(Error::UnknownLogId(log_id))?; + + let mut builder = logs.into_builder(); + let mut chain_builder = + builder.chain(&log_id).ok_or(Error::UnknownLogId(log_id))?; + + if chain_builder.tail().index() != last_segment_index { + // tail is not what we expected. + return Err(Error::from(AdminError::SegmentMismatch { + expected: last_segment_index, + found: chain_builder.tail().index(), + })); + } + + match chain_builder.append_segment(base_lsn, provider, params.clone()) { + Err(e) => match e { + BuilderError::SegmentConflict(lsn) => { + Err(Error::from(AdminError::SegmentConflict(lsn))) + } + _ => unreachable!("the log must exist at this point"), + }, + Ok(_) => Ok(builder.build()), + } + }) + .await + .map_err(|e| e.transpose())?; + + self.metadata_writer.update(logs).await?; + Ok(()) + } +} diff --git a/crates/bifrost/src/error.rs b/crates/bifrost/src/error.rs index 98b95986b2..7bc03799dd 100644 --- a/crates/bifrost/src/error.rs +++ b/crates/bifrost/src/error.rs @@ -8,9 +8,10 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use restate_core::{ShutdownError, SyncError}; use std::sync::Arc; +use restate_core::{ShutdownError, SyncError}; +use restate_types::logs::metadata::SegmentIndex; use restate_types::logs::{LogId, Lsn}; use crate::loglet::{LogletError, OperationError}; @@ -37,6 +38,21 @@ pub enum Error { Disabled(String), #[error("read() at {0} failed waiting on reconfiguration of log {1}")] ReadFailureDuringReconfiguration(LogId, Lsn), + #[error(transparent)] + AdminError(#[from] AdminError), + #[error(transparent)] + MetadataStoreError(#[from] restate_core::metadata_store::ReadWriteError), +} + +#[derive(Debug, thiserror::Error)] +pub enum AdminError { + #[error("segment conflicts with existing segment with base_lsn={0}")] + SegmentConflict(Lsn), + #[error("segment index found in metadata does not match expected {expected}!={found}")] + SegmentMismatch { + expected: SegmentIndex, + found: SegmentIndex, + }, } impl From for Error { diff --git a/crates/bifrost/src/lib.rs b/crates/bifrost/src/lib.rs index c62d4b866d..bec7eab887 100644 --- a/crates/bifrost/src/lib.rs +++ b/crates/bifrost/src/lib.rs @@ -9,6 +9,7 @@ // by the Apache License, Version 2.0. mod bifrost; +mod bifrost_admin; mod error; pub mod loglet; mod loglet_wrapper; @@ -20,6 +21,7 @@ mod types; mod watchdog; pub use bifrost::Bifrost; +pub use bifrost_admin::BifrostAdmin; pub use error::{Error, Result}; pub use read_stream::LogReadStream; pub use record::*; diff --git a/crates/bifrost/src/read_stream.rs b/crates/bifrost/src/read_stream.rs index cc25e14eb1..a45fc3d6c8 100644 --- a/crates/bifrost/src/read_stream.rs +++ b/crates/bifrost/src/read_stream.rs @@ -411,7 +411,9 @@ mod tests { use std::sync::atomic::AtomicUsize; use crate::loglet::LogletBase; - use crate::{setup_panic_handler, BifrostService, FindTailAttributes, Record, TrimGap}; + use crate::{ + setup_panic_handler, BifrostAdmin, BifrostService, FindTailAttributes, Record, TrimGap, + }; use super::*; use bytes::Bytes; @@ -542,6 +544,12 @@ mod tests { let svc = BifrostService::new(task_center(), metadata()).enable_local_loglet(&config); let bifrost = svc.handle(); + + let bifrost_admin = BifrostAdmin::new( + &bifrost, + &node_env.metadata_writer, + &node_env.metadata_store_client, + ); svc.start().await.expect("loglet must start"); assert_eq!(Lsn::INVALID, bifrost.get_trim_point(log_id).await?); @@ -553,7 +561,7 @@ mod tests { } // [1..5] trimmed. trim_point = 5 - bifrost.trim(log_id, Lsn::from(5)).await?; + bifrost_admin.trim(log_id, Lsn::from(5)).await?; assert_eq!( Lsn::from(11), @@ -595,7 +603,7 @@ mod tests { .await? .offset(); // trimming beyond the release point will fall back to the release point - bifrost.trim(log_id, Lsn::from(u64::MAX)).await?; + bifrost_admin.trim(log_id, Lsn::from(u64::MAX)).await?; let trim_point = bifrost.get_trim_point(log_id).await?; assert_eq!(Lsn::from(10), bifrost.get_trim_point(log_id).await?); // trim point becomes the point before the next slot available for writes (aka. the @@ -845,6 +853,11 @@ mod tests { .enable_local_loglet(&config) .enable_in_memory_loglet(); let bifrost = svc.handle(); + let bifrost_admin = BifrostAdmin::new( + &bifrost, + &node_env.metadata_writer, + &node_env.metadata_store_client, + ); svc.start().await.expect("loglet must start"); let tail = bifrost @@ -862,50 +875,47 @@ mod tests { assert_eq!(Lsn::from(i), lsn); } - // manually seal the loglet, create a new in-memory loglet at base_lsn=11 - let raw_loglet = bifrost - .inner() - .find_loglet_for_lsn(LOG_ID, Lsn::new(5)) + // seal the loglet and extend with an in-memory one + let new_segment_params = new_single_node_loglet_params(ProviderKind::InMemory); + bifrost_admin + .seal_and_extend_chain( + LOG_ID, + None, + Version::MIN, + ProviderKind::InMemory, + new_segment_params, + ) .await?; - raw_loglet.seal().await?; let tail = bifrost .find_tail(LOG_ID, FindTailAttributes::default()) .await?; - assert!(tail.is_sealed()); + assert!(!tail.is_sealed()); assert_eq!(Lsn::from(11), tail.offset()); - // perform manual reconfiguration (can be replaced with bifrost reconfiguration API - // when it's implemented) - let old_version = bifrost.inner().metadata.logs_version(); - let mut builder = bifrost.inner().metadata.logs().clone().into_builder(); - let mut chain_builder = builder.chain(&LOG_ID).unwrap(); - assert_eq!(1, chain_builder.num_segments()); - let new_segment_params = new_single_node_loglet_params(ProviderKind::InMemory); - chain_builder.append_segment( - Lsn::new(11), - ProviderKind::InMemory, - new_segment_params, - )?; - let new_metadata = builder.build(); - let new_version = new_metadata.version(); - assert_eq!(new_version, old_version.next()); - node_env - .metadata_store_client - .put( - BIFROST_CONFIG_KEY.clone(), - new_metadata, - restate_metadata_store::Precondition::MatchesVersion(old_version), - ) - .await?; - // make sure we have updated metadata. - bifrost - .inner() - .metadata - .sync(MetadataKind::Logs, TargetVersion::Latest) + // validate that we have 2 segments now + assert_eq!( + 2, + node_env + .metadata + .logs() + .chain(&LOG_ID) + .unwrap() + .num_segments() + ); + + // validate that the first segment is sealed + let segment_1_loglet = bifrost + .inner + .find_loglet_for_lsn(LOG_ID, Lsn::from(1)) .await?; + assert_that!( + segment_1_loglet.find_tail().await?, + pat!(TailState::Sealed(eq(Lsn::from(11)))) + ); + // append 5 more records into the new loglet. for i in 11..=15 { let lsn = bifrost diff --git a/crates/core/src/metadata/mod.rs b/crates/core/src/metadata/mod.rs index 23f74f8bf1..c586d9e7e5 100644 --- a/crates/core/src/metadata/mod.rs +++ b/crates/core/src/metadata/mod.rs @@ -170,7 +170,7 @@ impl Metadata { Live::from(self.inner.logs.clone()) } - // Returns when the metadata kind is at the provided version (or newer) + /// Returns when the metadata kind is at the provided version (or newer) pub async fn wait_for_version( &self, metadata_kind: MetadataKind, diff --git a/crates/core/src/metadata_store/mod.rs b/crates/core/src/metadata_store/mod.rs index 9221cc5d89..a9880923e2 100644 --- a/crates/core/src/metadata_store/mod.rs +++ b/crates/core/src/metadata_store/mod.rs @@ -118,6 +118,8 @@ impl MetadataStoreClient { MetadataStoreClient::new(InMemoryMetadataStore::default()) } + /// Gets the value and its current version for the given key. If key-value pair is not present, + /// then return [`None`]. pub async fn get( &self, key: ByteString, @@ -140,10 +142,14 @@ impl MetadataStoreClient { } } + /// Gets the current version for the given key. If key-value pair is not present, then return + /// [`None`]. pub async fn get_version(&self, key: ByteString) -> Result, ReadError> { self.inner.get_version(key).await } + /// Puts the versioned value under the given key following the provided precondition. If the + /// precondition is not met, then the operation returns a [`WriteError::PreconditionViolation`]. pub async fn put( &self, key: ByteString, @@ -167,6 +173,8 @@ impl MetadataStoreClient { .await } + /// Deletes the key-value pair for the given key following the provided precondition. If the + /// precondition is not met, then the operation returns a [`WriteError::PreconditionViolation`]. pub async fn delete( &self, key: ByteString, diff --git a/crates/node/src/roles/admin.rs b/crates/node/src/roles/admin.rs index f600a822d7..d6d7988b20 100644 --- a/crates/node/src/roles/admin.rs +++ b/crates/node/src/roles/admin.rs @@ -68,14 +68,16 @@ impl AdminRole { let service_discovery = ServiceDiscovery::new(retry_policy, client); let admin = AdminService::new( - metadata_writer, - metadata_store_client, + metadata_writer.clone(), + metadata_store_client.clone(), config.ingress.clone(), service_discovery, ); let controller = restate_admin::cluster_controller::Service::new( - updateable_config.clone().map(|c| &c.admin), + updateable_config.clone(), + metadata_writer, + metadata_store_client, task_center, metadata, networking, diff --git a/crates/types/src/config/bifrost.rs b/crates/types/src/config/bifrost.rs index 0046150214..b8bcb87916 100644 --- a/crates/types/src/config/bifrost.rs +++ b/crates/types/src/config/bifrost.rs @@ -49,6 +49,13 @@ pub struct BifrostOptions { /// Retry policy to use when bifrost waits for reconfiguration to complete during /// read operations pub read_retry_policy: RetryPolicy, + + /// # Seal retry interval + /// + /// Interval to wait between retries of loglet seal failures + #[serde_as(as = "serde_with::DisplayFromStr")] + #[cfg_attr(feature = "schemars", schemars(with = "String"))] + pub seal_retry_interval: humantime::Duration, } impl BifrostOptions { @@ -71,6 +78,7 @@ impl Default for BifrostOptions { Some(50), Some(Duration::from_secs(1)), ), + seal_retry_interval: Duration::from_secs(2).into(), } } } diff --git a/crates/types/src/logs/metadata.rs b/crates/types/src/logs/metadata.rs index d81aea26c4..e3d09a8cfe 100644 --- a/crates/types/src/logs/metadata.rs +++ b/crates/types/src/logs/metadata.rs @@ -35,6 +35,7 @@ use crate::{flexbuffers_storage_encode_decode, Version, Versioned}; Serialize, Deserialize, derive_more::From, + derive_more::Display, )] #[repr(transparent)] #[serde(transparent)]