Skip to content

Commit

Permalink
[Bifrost] Basic BifrostAdmin interface
Browse files Browse the repository at this point in the history
It now hosts `trim` and introduces the ability to seal and extend the chain with a pre-determined loglet params
  • Loading branch information
AhmedSoliman committed Jul 26, 2024
1 parent a71f1d1 commit 6a29424
Show file tree
Hide file tree
Showing 12 changed files with 373 additions and 98 deletions.
85 changes: 56 additions & 29 deletions crates/admin/src/cluster_controller/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,23 @@ use std::sync::Arc;
use codederror::CodedError;
use futures::future::OptionFuture;
use futures::{Stream, StreamExt};
use restate_core::metadata_store::MetadataStoreClient;
use tokio::sync::{mpsc, oneshot};
use tokio::time;
use tokio::time::{Instant, Interval, MissedTickBehavior};
use tracing::{debug, warn};

use restate_types::config::{AdminOptions, Configuration};
use restate_types::live::LiveLoad;
use restate_types::live::Live;
use restate_types::net::cluster_controller::{Action, AttachRequest, AttachResponse, RunPartition};
use restate_types::net::RequestId;
use restate_types::partition_table::{FixedPartitionTable, KeyRange};

use restate_bifrost::Bifrost;
use restate_bifrost::{Bifrost, BifrostAdmin};
use restate_core::network::{MessageRouterBuilder, NetworkSender};
use restate_core::{
cancellation_watcher, Metadata, ShutdownError, TargetVersion, TaskCenter, TaskKind,
cancellation_watcher, Metadata, MetadataWriter, ShutdownError, TargetVersion, TaskCenter,
TaskKind,
};
use restate_types::cluster::cluster_state::RunMode;
use restate_types::cluster::cluster_state::{AliveNode, ClusterState, NodeState};
Expand Down Expand Up @@ -58,7 +60,9 @@ pub struct Service<N> {
command_tx: mpsc::Sender<ClusterControllerCommand>,
command_rx: mpsc::Receiver<ClusterControllerCommand>,

configuration: Box<dyn LiveLoad<AdminOptions> + Send + Sync>,
configuration: Live<Configuration>,
metadata_writer: MetadataWriter,
metadata_store_client: MetadataStoreClient,
heartbeat_interval: time::Interval,
log_trim_interval: Option<time::Interval>,
log_trim_threshold: Lsn,
Expand All @@ -69,7 +73,9 @@ where
N: NetworkSender + 'static,
{
pub fn new(
mut configuration: impl LiveLoad<AdminOptions> + Send + Sync + 'static,
configuration: Live<Configuration>,
metadata_writer: MetadataWriter,
metadata_store_client: MetadataStoreClient,
task_center: TaskCenter,
metadata: Metadata,
networking: N,
Expand All @@ -85,13 +91,15 @@ where
router_builder,
);

let options = configuration.live_load();
let options = &configuration.pinned().admin;

let heartbeat_interval = Self::create_heartbeat_interval(options);
let (log_trim_interval, log_trim_threshold) = Self::create_log_trim_interval(options);

Service {
configuration: Box::new(configuration),
configuration,
metadata_writer,
metadata_store_client,
task_center,
metadata,
networking,
Expand Down Expand Up @@ -189,6 +197,8 @@ where
) -> anyhow::Result<()> {
// Make sure we have partition table before starting
let _ = self.metadata.wait_for_partition_table(Version::MIN).await?;
let bifrost_admin =
BifrostAdmin::new(&bifrost, &self.metadata_writer, &self.metadata_store_client);

let mut shutdown = std::pin::pin!(cancellation_watcher());
let mut config_watcher = Configuration::watcher();
Expand Down Expand Up @@ -216,21 +226,25 @@ where
let _ = self.cluster_state_refresher.schedule_refresh();
},
_ = OptionFuture::from(self.log_trim_interval.as_mut().map(|interval| interval.tick())) => {
let result = self.trim_logs(&bifrost).await;
let result = self.trim_logs(bifrost_admin).await;

if let Err(err) = result {
warn!("Could not trim the logs. This can lead to increased disk usage: {err}");
}
}
Some(cmd) = self.command_rx.recv() => {
self.on_cluster_cmd(cmd, &bifrost).await;
self.on_cluster_cmd(cmd, bifrost_admin).await;
}
Some(message) = self.incoming_messages.next() => {
let (from, message) = message.split();
self.on_attach_request(from, message)?;
}
_ = config_watcher.changed() => {
self.on_config_update();
debug!("Updating the cluster controller settings.");
let options = &self.configuration.live_load().admin;

self.heartbeat_interval = Self::create_heartbeat_interval(options);
(self.log_trim_interval, self.log_trim_threshold) = Self::create_log_trim_interval(options);
}
_ = &mut shutdown => {
return Ok(());
Expand All @@ -239,15 +253,10 @@ where
}
}

fn on_config_update(&mut self) {
debug!("Updating the cluster controller settings.");
let options = self.configuration.live_load();

self.heartbeat_interval = Self::create_heartbeat_interval(options);
(self.log_trim_interval, self.log_trim_threshold) = Self::create_log_trim_interval(options);
}

async fn trim_logs(&self, bifrost: &Bifrost) -> Result<(), restate_bifrost::Error> {
async fn trim_logs(
&self,
bifrost_admin: BifrostAdmin<'_>,
) -> Result<(), restate_bifrost::Error> {
let cluster_state = self.cluster_state_refresher.get_cluster_state();

let mut persisted_lsns_per_partition: BTreeMap<
Expand Down Expand Up @@ -283,20 +292,24 @@ where
let min_persisted_lsn = persisted_lsns.into_values().min().unwrap_or(Lsn::INVALID);
let log_id = LogId::from(partition_id);
// trim point is before the oldest record
let current_trim_point = bifrost.get_trim_point(log_id).await?;
let current_trim_point = bifrost_admin.get_trim_point(log_id).await?;

if min_persisted_lsn >= current_trim_point + self.log_trim_threshold {
debug!(
"Automatic trim log '{log_id}' for all records before='{min_persisted_lsn}'"
);
bifrost.trim(log_id, min_persisted_lsn).await?
bifrost_admin.trim(log_id, min_persisted_lsn).await?
}
}

Ok(())
}

async fn on_cluster_cmd(&self, command: ClusterControllerCommand, bifrost: &Bifrost) {
async fn on_cluster_cmd(
&self,
command: ClusterControllerCommand,
bifrost_admin: BifrostAdmin<'_>,
) {
match command {
ClusterControllerCommand::GetClusterState(tx) => {
let _ = tx.send(self.cluster_state_refresher.get_cluster_state());
Expand All @@ -307,14 +320,14 @@ where
response_tx,
} => {
debug!("Manual trim log '{log_id}' until (inclusive) lsn='{trim_point}'");
let result = bifrost.trim(log_id, trim_point).await;
let result = bifrost_admin.trim(log_id, trim_point).await;
let _ = response_tx.send(result.map_err(Into::into));
}
}
}

fn on_attach_request(
&mut self,
&self,
from: GenerationalNodeId,
request: AttachRequest,
) -> Result<(), ShutdownError> {
Expand Down Expand Up @@ -412,9 +425,9 @@ mod tests {
use restate_core::network::{MessageHandler, NetworkSender};
use restate_core::{MockNetworkSender, TaskKind, TestCoreEnvBuilder};
use restate_types::cluster::cluster_state::PartitionProcessorStatus;
use restate_types::config::AdminOptions;
use restate_types::config::{AdminOptions, Configuration};
use restate_types::identifiers::PartitionId;
use restate_types::live::Constant;
use restate_types::live::Live;
use restate_types::logs::{LogId, Lsn, Payload, SequenceNumber};
use restate_types::net::partition_processor_manager::{
GetProcessorsState, ProcessorsStateResponse,
Expand All @@ -432,7 +445,9 @@ mod tests {
let mut builder = TestCoreEnvBuilder::new_with_mock_network();

let svc = Service::new(
Constant::new(AdminOptions::default()),
Live::from_value(Configuration::default()),
builder.metadata_writer.clone(),
builder.metadata_store_client.clone(),
builder.tc.clone(),
builder.metadata.clone(),
builder.network_sender.clone(),
Expand Down Expand Up @@ -518,9 +533,15 @@ mod tests {
admin_options.log_trim_threshold = 5;
let interval_duration = Duration::from_secs(10);
admin_options.log_trim_interval = Some(interval_duration.into());
let config = Configuration {
admin: admin_options,
..Default::default()
};

let svc = Service::new(
Constant::new(admin_options),
Live::from_value(config),
builder.metadata_writer.clone(),
builder.metadata_store_client.clone(),
builder.tc.clone(),
builder.metadata.clone(),
builder.network_sender.clone(),
Expand Down Expand Up @@ -608,9 +629,15 @@ mod tests {
admin_options.log_trim_threshold = 0;
let interval_duration = Duration::from_secs(10);
admin_options.log_trim_interval = Some(interval_duration.into());
let config = Configuration {
admin: admin_options,
..Default::default()
};

let svc = Service::new(
Constant::new(admin_options),
Live::from_value(config),
builder.metadata_writer.clone(),
builder.metadata_store_client.clone(),
builder.tc.clone(),
builder.metadata.clone(),
builder.network_sender.clone(),
Expand Down
4 changes: 2 additions & 2 deletions crates/bifrost/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ publish = false
[features]
default = []
options_schema = ["dep:schemars"]
replicated-loglet = ["restate-types/replicated-loglet", "restate-metadata-store"]
replicated-loglet = ["restate-types/replicated-loglet"]
test-util = []

[dependencies]
restate-core = { workspace = true }
restate-rocksdb = { workspace = true }
restate-types = { workspace = true }
restate-metadata-store = { workspace = true, optional = true }
restate-metadata-store = { workspace = true }

anyhow = { workspace = true }
async-trait = { workspace = true }
Expand Down
46 changes: 21 additions & 25 deletions crates/bifrost/src/bifrost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use crate::{
/// Bifrost handle is relatively cheap to clone.
#[derive(Clone)]
pub struct Bifrost {
inner: Arc<BifrostInner>,
pub(crate) inner: Arc<BifrostInner>,
}

impl Bifrost {
Expand Down Expand Up @@ -163,18 +163,6 @@ impl Bifrost {
self.inner.get_trim_point(log_id).await
}

/// Trim the log prefix up to and including the `trim_point`.
/// Set `trim_point` to the value returned from `find_tail()` or `Lsn::MAX` to
/// trim all records of the log.
///
/// Note that trim does not promise that the log will be trimmed immediately and atomically,
/// but they are promise to trim prefixes, earlier segments in the log will be trimmed
/// before later segments and the log will not be in an inconsistent state at any point.
#[instrument(level = "debug", skip(self), err)]
pub async fn trim(&self, log_id: LogId, trim_point: Lsn) -> Result<(), Error> {
self.inner.trim(log_id, trim_point).await
}

/// The version of the currently loaded logs metadata
pub fn version(&self) -> Version {
self.inner.metadata.logs_version()
Expand Down Expand Up @@ -383,7 +371,7 @@ impl BifrostInner {
Ok(trim_point.unwrap_or(Lsn::INVALID))
}

async fn trim(&self, log_id: LogId, trim_point: Lsn) -> Result<(), Error> {
pub async fn trim(&self, log_id: LogId, trim_point: Lsn) -> Result<(), Error> {
self.fail_if_shutting_down()?;

let log_metadata = self.metadata.logs();
Expand Down Expand Up @@ -437,7 +425,7 @@ impl BifrostInner {
.ok_or_else(|| Error::Disabled(kind.to_string()))
}

async fn writeable_loglet(&self, log_id: LogId) -> Result<LogletWrapper> {
pub async fn writeable_loglet(&self, log_id: LogId) -> Result<LogletWrapper> {
let log_metadata = self.metadata.logs();
let tail_segment = log_metadata
.chain(&log_id)
Expand All @@ -446,11 +434,7 @@ impl BifrostInner {
self.get_loglet(log_id, tail_segment).await
}

pub(crate) async fn find_loglet_for_lsn(
&self,
log_id: LogId,
lsn: Lsn,
) -> Result<LogletWrapper> {
pub async fn find_loglet_for_lsn(&self, log_id: LogId, lsn: Lsn) -> Result<LogletWrapper> {
let log_metadata = self.metadata.logs();
let maybe_segment = log_metadata
.chain(&log_id)
Expand All @@ -463,7 +447,7 @@ impl BifrostInner {
}
}

async fn get_loglet(
pub async fn get_loglet(
&self,
log_id: LogId,
segment: Segment<'_>,
Expand Down Expand Up @@ -494,7 +478,7 @@ mod tests {
use restate_types::metadata_store::keys::BIFROST_CONFIG_KEY;
use restate_types::Versioned;

use crate::{Record, TrimGap};
use crate::{BifrostAdmin, Record, TrimGap};
use restate_core::{metadata, TestCoreEnv};
use restate_core::{task_center, TestCoreEnvBuilder};
use restate_rocksdb::RocksDbManager;
Expand Down Expand Up @@ -614,6 +598,11 @@ mod tests {

let log_id = LogId::from(0);
let bifrost = Bifrost::init_local(metadata()).await;
let bifrost_admin = BifrostAdmin::new(
&bifrost,
&node_env.metadata_writer,
&node_env.metadata_store_client,
);

assert_eq!(
Lsn::OLDEST,
Expand All @@ -630,7 +619,7 @@ mod tests {
bifrost.append(log_id, Payload::default()).await?;
}

bifrost.trim(log_id, Lsn::from(5)).await?;
bifrost_admin.trim(log_id, Lsn::from(5)).await?;

let tail = bifrost
.find_tail(log_id, FindTailAttributes::default())
Expand Down Expand Up @@ -665,7 +654,7 @@ mod tests {
}

// trimming beyond the release point will fall back to the release point
bifrost.trim(log_id, Lsn::MAX).await?;
bifrost_admin.trim(log_id, Lsn::MAX).await?;

assert_eq!(
Lsn::from(11),
Expand Down Expand Up @@ -712,6 +701,11 @@ mod tests {
let tc = node_env.tc;
tc.run_in_scope("test", None, async {
let bifrost = Bifrost::init_in_memory(metadata()).await;
let bifrost_admin = BifrostAdmin::new(
&bifrost,
&node_env.metadata_writer,
&node_env.metadata_store_client,
);

// Lsns [1..5]
for i in 1..=5 {
Expand All @@ -736,7 +730,9 @@ mod tests {
.await?;

// seal the segment
segment_1.seal().await?;
bifrost_admin
.seal(LOG_ID, segment_1.segment_index())
.await?;

// sealed, tail is what we expect
assert_that!(
Expand Down
Loading

0 comments on commit 6a29424

Please sign in to comment.