Skip to content

Commit

Permalink
Add tick timeout when debouncing the events
Browse files Browse the repository at this point in the history
  • Loading branch information
liuchengxu committed May 19, 2024
1 parent 874a233 commit e814b2e
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 43 deletions.
2 changes: 0 additions & 2 deletions crates/cli/src/command/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use clap::Parser;
use maple_core::stdio_server::ConfigError;
use std::io::IsTerminal;
use tracing_subscriber::filter::EnvFilter;
use tracing_subscriber::fmt::format::FmtSpan;

/// Starts a RPC service using stdio.
#[derive(Parser, Debug, Clone)]
Expand Down Expand Up @@ -69,7 +68,6 @@ impl Rpc {
}

let subscriber = tracing_subscriber::FmtSubscriber::builder()
.with_span_events(FmtSpan::FULL)
.with_max_level(max_level)
.with_env_filter(env_filter)
.with_line_number(true)
Expand Down
1 change: 0 additions & 1 deletion crates/maple_core/src/searcher/grep/stoppable_searcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,6 @@ pub async fn search(query: String, matcher: Matcher, search_context: SearchConte

let total_matched = search_info.total_matched.load(Ordering::SeqCst);
let total_processed = search_info.total_processed.load(Ordering::SeqCst);

progressor.on_finished(display_lines, total_matched, total_processed);

tracing::debug!(
Expand Down
13 changes: 13 additions & 0 deletions crates/maple_core/src/stdio_server/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,19 @@ pub enum ProviderEvent {
Internal(InternalProviderEvent),
}

impl ProviderEvent {
pub fn is_same_type(&self, other: &Self) -> bool {
match self {
Self::OnMove(_) => matches!(other, Self::OnMove(_)),
Self::OnTyped(_) => matches!(other, Self::OnTyped(_)),
Self::RemoteSink(_) => matches!(other, Self::RemoteSink(_)),
Self::Exit => matches!(other, Self::Exit),
Self::Key(_) => matches!(other, Self::Key(_)),
Self::Internal(_) => matches!(other, Self::Internal(_)),
}
}
}

#[derive(Debug, Clone)]
pub enum InternalProviderEvent {
Initialize,
Expand Down
91 changes: 51 additions & 40 deletions crates/maple_core/src/stdio_server/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::stdio_server::plugin::{ActionType, ClapPlugin, PluginId};
use crate::stdio_server::provider::{ClapProvider, Context, ProviderId};
use rpc::Params;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::collections::{HashMap, VecDeque};
use std::fmt::Debug;
use std::ops::ControlFlow;
use std::sync::atomic::{AtomicBool, Ordering};
Expand Down Expand Up @@ -50,15 +50,12 @@ impl DebounceTimer {

fn should_emit(&mut self) -> bool {
let now = std::time::Instant::now();
if self.last_emitted.is_none() {
if self.last_emitted.is_none()
|| now.duration_since(self.last_emitted.expect("Must be Some as checked"))
> self.debounce_period
{
self.last_emitted.replace(now);
return true;
} else {
let elapsed = now.duration_since(self.last_emitted.expect("Must be Some as checked"));
if elapsed > self.debounce_period {
self.last_emitted.replace(now);
return true;
}
}
false
}
Expand Down Expand Up @@ -89,38 +86,52 @@ impl ProviderSession {
let mut on_move_timer = DebounceTimer::new(Duration::from_millis(200));
let mut on_typed_timer = DebounceTimer::new(Duration::from_millis(debounce_delay));

let mut event_cache = Vec::new();

// TODO: this does not fully resolve the problem, find another solution.
// Text input from users could be overloaded in a short period of time, e.g., OnMove
// and OnTyped can be too frequent if user types too fast, in which case we observe
// the receiver side of unbounded channel may not be able to receive the events in time,
// leading to the annoying frozen UI on the vim side. The cause is probably because of
// the event processing logic on the receiver side are not running in separate tasks,
// the processing of incoming messages cannot keep up with the rate of message generation,
// rendering the receiver may hang for a while. One proper solution is to process each
// provider event in a separate task, that requires more effoets however, now we choose to
// debounce the stream to avoid overwhelming the system.
while let Some(event) = origin_provider_event_receiver.recv().await {
let should_emit = match &event {
ProviderEvent::OnMove(..) => on_move_timer.should_emit(),
ProviderEvent::OnTyped(..) => on_typed_timer.should_emit(),
_ => true,
};

// tracing::debug!(should_emit, "Recv origin event: {event:?}");

// Send event after debounce period
if should_emit {
if provider_is_busy.load(Ordering::SeqCst) {
tracing::debug!(
"=============== provider is busy, caching {event:?}, cache size: {}",
event_cache.len()
);
event_cache.push(event);
} else if debounced_provider_event_sender.send(event.clone()).is_err() {
tracing::debug!("Sending debounced event: {event:?}");
return;
let mut event_cache = VecDeque::with_capacity(2);

let mut tick_timeout = {
let mut interval = tokio::time::interval(Duration::from_millis(100));
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
interval
};

// NOTE: The processing logic within the provider must not contain any blocking operation,
// otherwise the blocking operation taking too long can block the runtime, leading a
// frozen UI, one symptom is that the receiver failed to receive the debounded messages
// in time.

loop {
tokio::select! {
maybe_event = origin_provider_event_receiver.recv() => {
let Some(event) = maybe_event else {
continue;
};

// Params are unused at present, include the params when it's not the case
// in the future.
let should_emit = match &event {
ProviderEvent::OnMove(..) => on_move_timer.should_emit(),
ProviderEvent::OnTyped(..) => on_typed_timer.should_emit(),
_ => true,
};

// Send event after debounce period if the provider is not overloaded.
if should_emit {
if provider_is_busy.load(Ordering::SeqCst) {
if event_cache.iter().any(|e| event.is_same_type(e)) {
continue;
}
event_cache.push_back(event);
} else if debounced_provider_event_sender.send(event).is_err() {
return;
}
}
}
_ = tick_timeout.tick() => {
if let Some(event) = event_cache.pop_front() {
if debounced_provider_event_sender.send(event).is_err() {
return;
}
}
}
}
}
Expand Down

0 comments on commit e814b2e

Please sign in to comment.