Skip to content

Commit

Permalink
add debounce_scheduler() to configure debounce for scheduler
Browse files Browse the repository at this point in the history
Signed-off-by: Sanskar Jaiswal <jaiswalsanskar078@gmail.com>
  • Loading branch information
aryan9600 committed Aug 4, 2023
1 parent fe94fee commit af1b5ae
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 74 deletions.
5 changes: 1 addition & 4 deletions examples/configmapgen_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use k8s_openapi::api::core::v1::ConfigMap;
use kube::{
api::{Api, ObjectMeta, Patch, PatchParams, Resource},
runtime::{
controller::{Action, Config, Controller},
controller::{Action, Controller},
watcher,
},
Client, CustomResource,
Expand Down Expand Up @@ -102,11 +102,8 @@ async fn main() -> Result<()> {
}
});

let mut config = Config::default();
config.debounce(Duration::from_secs(2));
Controller::new(cmgs, watcher::Config::default())
.owns(cms, watcher::Config::default())
.with_config(config)
.reconcile_all_on(reload_rx.map(|_| ()))
.shutdown_on_signal()
.run(reconcile, error_policy, Arc::new(Data { client }))
Expand Down
90 changes: 44 additions & 46 deletions kube-runtime/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{
store::{Store, Writer},
ObjectRef,
},
scheduler::{scheduler, ScheduleRequest},
scheduler::{debounced_scheduler, ScheduleRequest},
utils::{trystream_try_via, CancelableJoinHandle, KubeRuntimeStreamExt, StreamBackoff, WatchStreamExt},
watcher::{self, metadata_watcher, watcher, DefaultBackoff},
};
Expand Down Expand Up @@ -234,7 +234,6 @@ impl Display for ReconcileReason {
}

const APPLIER_REQUEUE_BUF_SIZE: usize = 100;
const SCHEDULER_DEBOUNCE_PERIOD: Duration = Duration::from_secs(1);

/// Apply a reconciler to an input stream, with a given retry policy
///
Expand All @@ -253,7 +252,7 @@ pub fn applier<K, QueueStream, ReconcilerFut, Ctx>(
context: Arc<Ctx>,
store: Store<K>,
queue: QueueStream,
debounce: Option<Duration>,
config: Config,
) -> impl Stream<Item = Result<(ObjectRef<K>, Action), Error<ReconcilerFut::Error, QueueStream::Error>>>
where
K: Clone + Resource + 'static,
Expand Down Expand Up @@ -293,42 +292,39 @@ where
)),
// all the Oks from the select gets passed through the scheduler stream, and are then executed
move |s| {
Runner::new(
scheduler(s, debounce.or(Some(SCHEDULER_DEBOUNCE_PERIOD))),
move |request| {
let request = request.clone();
match store.get(&request.obj_ref) {
Some(obj) => {
let scheduler_tx = scheduler_tx.clone();
let error_policy_ctx = context.clone();
let error_policy = error_policy.clone();
let reconciler_span = info_span!(
"reconciling object",
"object.ref" = %request.obj_ref,
object.reason = %request.reason
);
reconciler_span
.in_scope(|| reconciler(Arc::clone(&obj), context.clone()))
.into_future()
.then(move |res| {
let error_policy = error_policy;
RescheduleReconciliation::new(
res,
|err| error_policy(obj, err, error_policy_ctx),
request.obj_ref.clone(),
scheduler_tx,
)
// Reconciler errors are OK from the applier's PoV, we need to apply the error policy
// to them separately
.map(|res| Ok((request.obj_ref, res)))
})
.instrument(reconciler_span)
.left_future()
}
None => future::err(Error::ObjectNotFound(request.obj_ref.erase())).right_future(),
Runner::new(debounced_scheduler(s, config.debounce), move |request| {
let request = request.clone();
match store.get(&request.obj_ref) {
Some(obj) => {
let scheduler_tx = scheduler_tx.clone();
let error_policy_ctx = context.clone();
let error_policy = error_policy.clone();
let reconciler_span = info_span!(
"reconciling object",
"object.ref" = %request.obj_ref,
object.reason = %request.reason
);
reconciler_span
.in_scope(|| reconciler(Arc::clone(&obj), context.clone()))
.into_future()
.then(move |res| {
let error_policy = error_policy;
RescheduleReconciliation::new(
res,
|err| error_policy(obj, err, error_policy_ctx),
request.obj_ref.clone(),
scheduler_tx,
)
// Reconciler errors are OK from the applier's PoV, we need to apply the error policy
// to them separately
.map(|res| Ok((request.obj_ref, res)))
})
.instrument(reconciler_span)
.left_future()
}
},
)
None => future::err(Error::ObjectNotFound(request.obj_ref.erase())).right_future(),
}
})
.delay_tasks_until(async move {
tracing::debug!("applier runner held until store is ready");
let res = delay_store.wait_until_ready().await;
Expand Down Expand Up @@ -426,15 +422,15 @@ where
/// the behavior of the contorller.
#[derive(Clone, Debug, Default)]
pub struct Config {
/// The debounce time that allows for deduplication of events, preventing
/// unnecessary reconciliations. By default, it is set to 1 second, but users
/// should modify it according to the needs of their controller.
debounce: Option<Duration>,
debounce: Duration,
}

impl Config {
/// Sets the debounce period for the controller that allows for deduplication
/// of events, preventing unnecessary reconciliations. This is particularly useful
/// if the object is requeued instantly for a reconciliation by multiple streams.
pub fn debounce(&mut self, debounce: Duration) {
self.debounce = Some(debounce);
self.debounce = debounce;
}
}

Expand Down Expand Up @@ -1245,7 +1241,7 @@ where
self.reader,
StreamBackoff::new(self.trigger_selector, self.trigger_backoff)
.take_until(future::select_all(self.graceful_shutdown_selector)),
self.config.debounce,
self.config,

Check warning on line 1244 in kube-runtime/src/controller/mod.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/controller/mod.rs#L1244

Added line #L1244 was not covered by tests
)
.take_until(futures::future::select_all(self.forceful_shutdown_selector))
}
Expand All @@ -1260,7 +1256,7 @@ mod tests {
applier,
reflector::{self, ObjectRef},
watcher::{self, metadata_watcher, watcher, Event},
Controller,
Config, Controller,
};
use futures::{pin_mut, Stream, StreamExt, TryStreamExt};
use k8s_openapi::api::core::v1::ConfigMap;
Expand Down Expand Up @@ -1327,6 +1323,8 @@ mod tests {

let (queue_tx, queue_rx) = futures::channel::mpsc::unbounded::<ObjectRef<ConfigMap>>();
let (store_rx, mut store_tx) = reflector::store();
let mut config = Config::default();
config.debounce(Duration::from_millis(1));
let applier = applier(
|obj, _| {
Box::pin(async move {
Expand All @@ -1341,7 +1339,7 @@ mod tests {
Arc::new(()),
store_rx,
queue_rx.map(Result::<_, Infallible>::Ok),
Some(Duration::from_millis(1)),
config,
);
pin_mut!(applier);
for i in 0..items {
Expand Down
11 changes: 4 additions & 7 deletions kube-runtime/src/controller/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,9 @@ mod tests {
let mut count = 0;
let (mut sched_tx, sched_rx) = mpsc::unbounded();
let mut runner = Box::pin(
// The debounce period needs to zero because otherwise the scheduler has a default
// debounce period of 1 ms, which will lead to the second request to be discarded.
Runner::new(scheduler(sched_rx, Some(Duration::ZERO)), |_| {
// The debounce period needs to zero because a debounce period > 0
// will lead to the second request to be discarded.
Runner::new(scheduler(sched_rx), |_| {
count += 1;
// Panic if this ref is already held, to simulate some unsafe action..
let mutex_ref = rc.borrow_mut();
Expand Down Expand Up @@ -205,7 +205,7 @@ mod tests {
// pause();
let (mut sched_tx, sched_rx) = mpsc::unbounded();
let (result_tx, result_rx) = oneshot::channel();
let mut runner = Runner::new(scheduler(sched_rx, None), |msg: &u8| futures::future::ready(*msg));
let mut runner = Runner::new(scheduler(sched_rx), |msg: &u8| futures::future::ready(*msg));
// Start a background task that starts listening /before/ we enqueue the message
// We can't just use Stream::poll_next(), since that bypasses the waker system
Handle::current().spawn(async move { result_tx.send(runner.next().await).unwrap() });
Expand Down Expand Up @@ -244,7 +244,6 @@ mod tests {
run_at: Instant::now(),
}])
.chain(stream::pending()),
None,
),
|msg| {
assert!(*is_ready.lock().unwrap());
Expand Down Expand Up @@ -281,7 +280,6 @@ mod tests {
},
])
.chain(stream::pending()),
None,
),
|msg| {
assert!(*is_ready.lock().unwrap());
Expand Down Expand Up @@ -317,7 +315,6 @@ mod tests {
run_at: Instant::now(),
}])
.chain(stream::pending()),
None,
),
|()| {
panic!("run_msg should never be invoked if readiness gate fails");
Expand Down
2 changes: 1 addition & 1 deletion kube-runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub mod utils;
pub mod wait;
pub mod watcher;

pub use controller::{applier, Controller};
pub use controller::{applier, Config, Controller};
pub use finalizer::finalizer;
pub use reflector::reflector;
pub use scheduler::scheduler;
Expand Down
40 changes: 24 additions & 16 deletions kube-runtime/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,11 @@ pub struct Scheduler<T, R> {
/// Incoming queue of scheduling requests.
#[pin]
requests: Fuse<R>,
/// Debounce time to allow for deduplication of requests.
/// Debounce time to allow for deduplication of requests. It is added to the request's
/// initial expiration time. If another request with the same message arrives before
/// the request expires, its added to the new request's expiration time. This allows
/// for a request to be emitted, if the scheduler is "uninterrupted" for the configured
/// debounce period. Its primary purpose to deduplicate requests that expire instantly.
debounce: Duration,
}

Expand Down Expand Up @@ -209,18 +213,29 @@ where
/// is ready for it).
///
/// The [`Scheduler`] terminates as soon as `requests` does.
pub fn scheduler<T: Eq + Hash + Clone, S: Stream<Item = ScheduleRequest<T>>>(
pub fn scheduler<T: Eq + Hash + Clone, S: Stream<Item = ScheduleRequest<T>>>(requests: S) -> Scheduler<T, S> {
Scheduler::new(requests, Duration::ZERO)
}

/// Stream transformer that delays and deduplicates [`Stream`] items.
///
/// The debounce period lets the scheduler deduplicate requests that ask to be
/// emitted instantly, by making sure we wait for the configured period of time
/// to receive an uninterrupted request before actually emitting it.
///
/// For more info, see [`scheduler()`].
pub fn debounced_scheduler<T: Eq + Hash + Clone, S: Stream<Item = ScheduleRequest<T>>>(
requests: S,
debounce: Option<Duration>,
debounce: Duration,
) -> Scheduler<T, S> {
Scheduler::new(requests, debounce.unwrap_or(Duration::ZERO))
Scheduler::new(requests, debounce)
}

#[cfg(test)]
mod tests {
use crate::utils::KubeRuntimeStreamExt;

use super::{scheduler, ScheduleRequest};
use super::{debounced_scheduler, scheduler, ScheduleRequest};
use derivative::Derivative;
use futures::{channel::mpsc, future, pin_mut, poll, stream, FutureExt, SinkExt, StreamExt};
use std::task::Poll;
Expand Down Expand Up @@ -248,7 +263,6 @@ mod tests {
run_at: Instant::now(),
}])
.on_complete(sleep(Duration::from_secs(4))),
None,
));
assert!(!scheduler.contains_pending(&1));
assert!(poll!(scheduler.as_mut().hold_unless(|_| false).next()).is_pending());
Expand All @@ -265,7 +279,7 @@ mod tests {
async fn scheduler_should_not_reschedule_pending_items() {
pause();
let (mut tx, rx) = mpsc::unbounded::<ScheduleRequest<u8>>();
let mut scheduler = Box::pin(scheduler(rx, None));
let mut scheduler = Box::pin(scheduler(rx));
tx.send(ScheduleRequest {
message: 1,
run_at: Instant::now(),
Expand Down Expand Up @@ -306,7 +320,6 @@ mod tests {
},
])
.on_complete(sleep(Duration::from_secs(2))),
None,
));
assert_eq!(
scheduler.as_mut().hold_unless(|x| *x != 1).next().await.unwrap(),
Expand All @@ -329,7 +342,6 @@ mod tests {
},
])
.on_complete(sleep(Duration::from_secs(5))),
None,
);
pin_mut!(scheduler);
assert!(poll!(scheduler.next()).is_pending());
Expand Down Expand Up @@ -357,7 +369,6 @@ mod tests {
},
])
.on_complete(sleep(Duration::from_secs(5))),
None,
);
pin_mut!(scheduler);
assert!(poll!(scheduler.next()).is_pending());
Expand All @@ -382,7 +393,6 @@ mod tests {
},
])
.on_complete(sleep(Duration::from_secs(5))),
None,
);
pin_mut!(scheduler);
assert!(poll!(scheduler.next()).is_pending());
Expand All @@ -396,7 +406,7 @@ mod tests {
async fn scheduler_dedupe_should_allow_rescheduling_emitted_item() {
pause();
let (mut schedule_tx, schedule_rx) = mpsc::unbounded();
let mut scheduler = scheduler(schedule_rx, None);
let mut scheduler = scheduler(schedule_rx);
schedule_tx
.send(ScheduleRequest {
message: (),
Expand Down Expand Up @@ -438,7 +448,6 @@ mod tests {
},
])
.on_complete(sleep(Duration::from_secs(5))),
None,
);
assert_eq!(scheduler.map(|msg| msg.0).collect::<Vec<_>>().await, vec![2]);
}
Expand All @@ -460,7 +469,6 @@ mod tests {
},
])
.on_complete(sleep(Duration::from_secs(5))),
None,
);
assert_eq!(scheduler.map(|msg| msg.0).collect::<Vec<_>>().await, vec![1]);
}
Expand All @@ -471,7 +479,7 @@ mod tests {

let now = Instant::now();
let (mut sched_tx, sched_rx) = mpsc::unbounded::<ScheduleRequest<SingletonMessage>>();
let mut scheduler = scheduler(sched_rx, Some(Duration::from_secs(2)));
let mut scheduler = debounced_scheduler(sched_rx, Duration::from_secs(2));

sched_tx
.send(ScheduleRequest {
Expand All @@ -491,7 +499,7 @@ mod tests {

let now = Instant::now();
let (mut sched_tx, sched_rx) = mpsc::unbounded::<ScheduleRequest<SingletonMessage>>();
let mut scheduler = scheduler(sched_rx, Some(Duration::from_secs(2)));
let mut scheduler = debounced_scheduler(sched_rx, Duration::from_secs(2));

sched_tx
.send(ScheduleRequest {
Expand Down

0 comments on commit af1b5ae

Please sign in to comment.