Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 8 additions & 24 deletions crates/bifrost/src/appender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use restate_types::logs::metadata::SegmentIndex;
use restate_types::logs::{LogId, Lsn, Record};
use restate_types::storage::StorageEncode;

use crate::bifrost::{BifrostInner, ErrorRecoveryStrategy, PreferenceToken};
use crate::bifrost::{BifrostInner, ErrorRecoveryStrategy, PreferenceControl};
use crate::loglet::AppendError;
use crate::loglet_wrapper::LogletWrapper;
use crate::{BifrostAdmin, Error, InputRecord, Result};
Expand All @@ -40,40 +40,26 @@ pub struct Appender<T> {
#[debug(skip)]
bifrost_inner: Arc<BifrostInner>,
#[debug(skip)]
preference_token: Option<PreferenceToken>,
pub(super) preference: PreferenceControl,
arena: BytesMut,
_phantom: PhantomData<T>,
}

impl<T> Clone for Appender<T> {
fn clone(&self) -> Self {
Self {
log_id: self.log_id,
config: self.config.clone(),
error_recovery_strategy: self.error_recovery_strategy,
loglet_cache: self.loglet_cache.clone(),
bifrost_inner: self.bifrost_inner.clone(),
preference_token: self.preference_token.clone(),
arena: BytesMut::default(),
_phantom: PhantomData,
}
}
}

impl<T: StorageEncode> Appender<T> {
pub(crate) fn new(
log_id: LogId,
error_recovery_strategy: ErrorRecoveryStrategy,
bifrost_inner: Arc<BifrostInner>,
) -> Self {
let config = Configuration::live();
let preference = bifrost_inner.control_preference(log_id);
Self {
log_id,
config,
error_recovery_strategy,
loglet_cache: Default::default(),
bifrost_inner,
preference_token: None,
preference,
arena: BytesMut::default(),
_phantom: PhantomData,
}
Expand All @@ -85,15 +71,13 @@ impl<T: StorageEncode> Appender<T> {
}

/// Marks this node as a preferred writer for the underlying log
pub fn mark_as_preferred(&mut self) {
if self.preference_token.is_none() {
self.preference_token = Some(self.bifrost_inner.acquire_preference_token(self.log_id));
}
pub fn mark_as_preferred(&self) {
self.preference.mark_as_preferred();
}

/// Removes the preference about this node being the preferred writer for the log
pub fn forget_preference(&mut self) {
self.preference_token.take();
pub fn forget_preference(&self) {
self.preference.forget_preference();
}

/// Appends a single record to the log.
Expand Down
51 changes: 12 additions & 39 deletions crates/bifrost/src/background_appender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use restate_core::{ShutdownError, TaskCenter, TaskHandle, cancellation_token};
use restate_types::logs::Record;
use restate_types::storage::StorageEncode;

use crate::bifrost::PreferenceControl;
use crate::error::EnqueueError;
use crate::{Appender, InputRecord, Result};

Expand Down Expand Up @@ -57,6 +58,7 @@ impl<T: StorageEncode> BackgroundAppender<T> {
pub fn start(self, name: &'static str) -> Result<AppenderHandle<T>, ShutdownError> {
let (tx, rx) = tokio::sync::mpsc::channel(self.queue_capacity);
let record_size_limit = self.appender.record_size_limit();
let preference = self.appender.preference.clone();

let handle = TaskCenter::spawn_unmanaged_child(
restate_core::TaskKind::BifrostAppender,
Expand All @@ -70,6 +72,7 @@ impl<T: StorageEncode> BackgroundAppender<T> {
arena: BytesMut::default(),
tx,
record_size_limit,
preference,
_phantom: std::marker::PhantomData,
}),
})
Expand Down Expand Up @@ -172,12 +175,6 @@ impl<T: StorageEncode> BackgroundAppender<T> {
AppendOperation::Canary(tx) => {
notif_buffer.push(tx);
}
AppendOperation::MarkAsPreferred => {
appender.mark_as_preferred();
}
AppendOperation::ForgetPreference => {
appender.forget_preference();
}
}
}

Expand Down Expand Up @@ -274,6 +271,10 @@ pub struct LogSender<T> {
arena: BytesMut,
tx: tokio::sync::mpsc::Sender<AppendOperation>,
record_size_limit: NonZeroUsize,
/// Controls the recovery preference of the underlying appender.
///
/// This is shared between the handle and the appender.
preference: PreferenceControl,
_phantom: std::marker::PhantomData<T>,
}

Expand All @@ -283,6 +284,7 @@ impl<T> Clone for LogSender<T> {
arena: BytesMut::default(),
tx: self.tx.clone(),
record_size_limit: self.record_size_limit,
preference: self.preference.clone(),
_phantom: std::marker::PhantomData,
}
}
Expand Down Expand Up @@ -439,31 +441,13 @@ impl<T: StorageEncode> LogSender<T> {
}

/// Marks this node as a preferred writer for the underlying log
pub async fn mark_as_preferred(&self) -> Result<(), EnqueueError<()>> {
if self
.tx
.send(AppendOperation::MarkAsPreferred)
.await
.is_err()
{
return Err(EnqueueError::Closed(()));
};

Ok(())
pub fn mark_as_preferred(&self) {
self.preference.mark_as_preferred();
}

/// Removes the preference about this node being the preferred writer for the log
pub async fn forget_preference(&self) -> Result<(), EnqueueError<()>> {
if self
.tx
.send(AppendOperation::ForgetPreference)
.await
.is_err()
{
return Err(EnqueueError::Closed(()));
};

Ok(())
pub fn forget_preference(&self) {
self.preference.forget_preference();
}
}

Expand All @@ -489,10 +473,6 @@ enum AppendOperation {
// A message denoting a request to be notified when it's processed by the appender.
// It's used to check if previously enqueued appends have been committed or not
Canary(oneshot::Sender<()>),
/// Let's bifrost know that this node is the preferred writer of this log
MarkAsPreferred,
/// Let's bifrost know that this node might not be the preferred writer of this log
ForgetPreference,
}

impl AppendOperation {
Expand All @@ -501,8 +481,6 @@ impl AppendOperation {
AppendOperation::Enqueue(record) => record.estimated_encode_size(),
AppendOperation::EnqueueWithNotification(record, _) => record.estimated_encode_size(),
AppendOperation::Canary(_) => 0,
AppendOperation::MarkAsPreferred => 0,
AppendOperation::ForgetPreference => 0,
}
}
}
Expand Down Expand Up @@ -573,8 +551,6 @@ mod tests {
// Test that control operations have zero cost
let (tx, _rx) = oneshot::channel();
assert_eq!(AppendOperation::Canary(tx).cost_in_bytes(), 0);
assert_eq!(AppendOperation::MarkAsPreferred.cost_in_bytes(), 0);
assert_eq!(AppendOperation::ForgetPreference.cost_in_bytes(), 0);
}

#[test]
Expand Down Expand Up @@ -640,8 +616,5 @@ mod tests {
let (tx, _rx) = oneshot::channel();
batch.push(AppendOperation::Canary(tx));
assert_eq!(batch.bytes_accumulated, expected_total);

batch.push(AppendOperation::MarkAsPreferred);
assert_eq!(batch.bytes_accumulated, expected_total);
}
}
42 changes: 39 additions & 3 deletions crates/bifrost/src/bifrost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use std::sync::OnceLock;
use std::sync::atomic::{AtomicBool, Ordering};

use enum_map::EnumMap;
use parking_lot::Mutex;
use tokio::time::Instant;
use tracing::debug;
use tracing::{info, instrument, warn};
Expand Down Expand Up @@ -498,12 +499,14 @@ impl BifrostInner {
}
}

/// Acquire a token to tell bifrost that this node is the intended primary writer for this log.
/// Enables acquiring and forgetting a recovery preference token for a given log.
///
/// An acquired preference token tells bifrost that this node is the intended primary writer for this log.
///
/// The preference can be kept for as long as the token is not dropped. Multiple tokens can be
/// taken for the same log. Preference is only lost after the last token is dropped.
pub fn acquire_preference_token(&self, log_id: LogId) -> PreferenceToken {
PreferenceToken::new(self.watchdog.clone(), log_id)
pub fn control_preference(&self, log_id: LogId) -> PreferenceControl {
PreferenceControl::new(self.watchdog.clone(), log_id)
}

/// Adds a new log if it doesn't exist.
Expand Down Expand Up @@ -718,6 +721,39 @@ impl Drop for PreferenceToken {
}
}

/// Shared/cloneable handle to acquire/release preference token for a given log.
#[derive(Clone)]
pub struct PreferenceControl(Arc<Inner>);

struct Inner {
log_id: LogId,
watchdog: WatchdogSender,
current_preference: Mutex<Option<PreferenceToken>>,
}

impl PreferenceControl {
fn new(watchdog: WatchdogSender, log_id: LogId) -> Self {
Self(Arc::new(Inner {
log_id,
watchdog,
current_preference: Mutex::new(None),
}))
}

/// Marks this node as a preferred writer for the underlying log
pub fn mark_as_preferred(&self) {
let mut guard = self.0.current_preference.lock();
if guard.is_none() {
*guard = Some(PreferenceToken::new(self.0.watchdog.clone(), self.0.log_id));
}
}

/// Removes the preference about this node being the preferred writer for the log
pub fn forget_preference(&self) {
self.0.current_preference.lock().take();
}
}

#[cfg(all(test, feature = "local-loglet"))]
mod tests {
use super::*;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,8 @@ impl<T: TransportConnect> SequencerAppender<T> {
match stored.status {
Status::Unknown => {
warn!(peer = %node_id, "Store failed on peer. Unknown error!");
self.nodeset_status
.merge(node_id, PerNodeStatus::failed_with_status(stored.status));
self.graylist.insert(node_id);
}
Status::Ok => {
Expand All @@ -448,19 +450,27 @@ impl<T: TransportConnect> SequencerAppender<T> {
Status::Sealed | Status::Sealing => {
self.checker
.set_attribute(node_id, NodeAttributes::sealed());
self.nodeset_status
.merge(node_id, PerNodeStatus::failed_with_status(stored.status));
self.graylist.insert(node_id);
}
Status::Dropped => {
// Overloaded, or request expired
debug!(peer = %node_id, status=?stored.status, "Store failed on peer. Peer is load shedding");
self.nodeset_status
.merge(node_id, PerNodeStatus::failed_with_status(stored.status));
self.graylist.insert(node_id);
}
Status::Disabled => {
debug!(peer = %node_id, status=?stored.status, "Store failed on peer. Peer's log-store is disabled");
self.nodeset_status
.merge(node_id, PerNodeStatus::failed_with_status(stored.status));
self.graylist.insert(node_id);
}
Status::SequencerMismatch | Status::Malformed | Status::OutOfBounds => {
warn!(peer = %node_id, status=?stored.status, "Store failed on peer due to unexpected error, please check logs of the peer to investigate");
self.nodeset_status
.merge(node_id, PerNodeStatus::failed_with_status(stored.status));
self.graylist.insert(node_id);
}
}
Expand All @@ -487,13 +497,21 @@ impl<T: TransportConnect> SequencerAppender<T> {
}
}

#[derive(Debug, derive_more::Display)]
enum Failure {
#[display("{_0}")]
Status(Status),
#[display("{_0}")]
RpcError(RpcError),
}

#[derive(Default, Debug)]
enum PerNodeStatus {
#[default]
NotAttempted,
Failed {
attempts: usize,
last_err: RpcError,
last_err: Failure,
},
Committed,
Sealed,
Expand All @@ -513,10 +531,17 @@ impl Display for PerNodeStatus {
}

impl PerNodeStatus {
fn failed(err: RpcError) -> Self {
pub fn failed(err: RpcError) -> Self {
Self::Failed {
attempts: 1,
last_err: Failure::RpcError(err),
}
}

pub fn failed_with_status(status: Status) -> Self {
Self::Failed {
attempts: 1,
last_err: err,
last_err: Failure::Status(status),
}
}
}
Expand Down
22 changes: 13 additions & 9 deletions crates/bifrost/src/watchdog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,9 +223,11 @@ impl Watchdog {
let mut config = Configuration::live();

loop {
let shutdown_requested = TaskCenter::is_shutdown_requested();

if self.in_flight_trim.is_none()
&& !self.pending_trims.is_empty()
&& !TaskCenter::is_shutdown_requested()
&& !shutdown_requested
{
let trims = self.pending_trims.drain().collect();
match self.spawn_trim(trims) {
Expand All @@ -238,6 +240,12 @@ impl Watchdog {
}
}

// Do not run log improvement if the system is shutting down
// (watchdog will continue to run until it's asked to shutdown).
let disable_auto_improvement = config.live_load().bifrost.disable_auto_improvement
|| shutdown_requested
|| !TaskCenter::is_my_node_alive();

tokio::select! {
biased;
_ = &mut shutdown => {
Expand All @@ -247,14 +255,10 @@ impl Watchdog {
Some(cmd) = self.inbound.recv() => {
self.handle_command(cmd)
}
_tick = improvement_interval.tick() => {
if !config.live_load().bifrost.disable_auto_improvement
&& TaskCenter::is_my_node_alive()
&& !TaskCenter::is_shutdown_requested() {
// check if we have logs to improve
let logs = logs.live_load();
self.improve_logs(logs);
}
_tick = improvement_interval.tick(), if !disable_auto_improvement => {
// check if we have logs to improve
let logs = logs.live_load();
self.improve_logs(logs);
}
Some(_) = OptionFuture::from(self.in_flight_trim.as_mut()) => {
self.in_flight_trim = None;
Expand Down
Loading
Loading