Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Get back to Recovering syncing when we haven't sync for a while #3995

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions crates/matrix-sdk-ui/src/room_list_service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ pub struct RoomListService {
///
/// `RoomListService` is a simple state-machine.
state: SharedObservable<State>,

/// State machine used to transition between the states.
state_machine: StateMachine,
}

impl RoomListService {
Expand Down Expand Up @@ -172,7 +175,12 @@ impl RoomListService {
// Eagerly subscribe the event cache to sync responses.
client.event_cache().subscribe()?;

Ok(Self { client, sliding_sync, state: SharedObservable::new(State::Init) })
Ok(Self {
client,
sliding_sync,
state: SharedObservable::new(State::Init),
state_machine: StateMachine::new(),
})
}

/// Start to sync the room list.
Expand Down Expand Up @@ -207,8 +215,9 @@ impl RoomListService {
loop {
debug!("Run a sync iteration");

let current_state = self.state.get();
// Calculate the next state, and run the associated actions.
let next_state = self.state.get().next(&self.sliding_sync).await?;
let next_state = self.state_machine.next(current_state, &self.sliding_sync).await?;

// Do the sync.
match sync.next().await {
Expand Down
122 changes: 99 additions & 23 deletions crates/matrix-sdk-ui/src/room_list_service/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,18 @@

//! States and actions for the `RoomList` state machine.

use std::future::ready;
use std::{
future::ready,
time::{Duration, Instant},
};

use matrix_sdk::{sliding_sync::Range, SlidingSync, SlidingSyncMode};

use super::Error;

pub const ALL_ROOMS_LIST_NAME: &str = "all_rooms";

/// The state of the [`super::RoomList`]' state machine.
/// The state of the [`super::RoomList`].
#[derive(Clone, Debug, PartialEq)]
pub enum State {
/// That's the first initial state.
Expand All @@ -46,21 +49,49 @@ pub enum State {
Terminated { from: Box<State> },
}

impl State {
const DEFAULT_DELAY_BEFORE_RECOVER: Duration = Duration::from_secs(1800);

/// The state machine used to transition between the [`State`]s.
#[derive(Clone, Debug, PartialEq)]
pub struct StateMachine {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you move State inside StateMachine please?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So that was my first try, and then I got lost in how to deal with observing the State and ownership trouble. I can try again but the change will probably be quite more invasive.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you allow me to try please?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me push my current state. I am not far, but I broke a sync indicator test so I think I messed up something regarding observable.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done !

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oupsss it seems to break a lot of complement crypto tests.

Copy link
Author

@MatMaul MatMaul Sep 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it was not the case before my last refactor.

last_sync_date: Instant,
delay_before_recover: Duration,
}

impl StateMachine {
pub(super) fn new() -> Self {
StateMachine {
last_sync_date: Instant::now(),
delay_before_recover: DEFAULT_DELAY_BEFORE_RECOVER,
}
}

/// Transition to the next state, and execute the associated transition's
/// [`Actions`].
pub(super) async fn next(&self, sliding_sync: &SlidingSync) -> Result<Self, Error> {
pub(super) async fn next(
&self,
current: State,
sliding_sync: &SlidingSync,
) -> Result<State, Error> {
use State::*;

let next_state = match self {
let next_state = match current {
Init => SettingUp,

SettingUp | Recovering => {
set_all_rooms_to_growing_sync_mode(sliding_sync).await?;
Running
}

Running => Running,
Running => {
// We haven't sync for a while so we should go back to recovering
if self.last_sync_date.elapsed() > self.delay_before_recover {
set_all_rooms_to_selective_sync_mode(sliding_sync).await?;
Recovering
} else {
Running
}
}

Error { from: previous_state } | Terminated { from: previous_state } => {
match previous_state.as_ref() {
Expand Down Expand Up @@ -130,91 +161,104 @@ mod tests {
let room_list = new_room_list().await?;
let sliding_sync = room_list.sliding_sync();

let state_machine = StateMachine::new();

// First state.
let state = State::Init;

// Hypothetical error.
{
let state = State::Error { from: Box::new(state.clone()) }.next(sliding_sync).await?;
let state = state_machine
.next(State::Error { from: Box::new(state.clone()) }, sliding_sync)
.await?;

// Back to the previous state.
assert_eq!(state, State::Init);
}

// Hypothetical termination.
{
let state =
State::Terminated { from: Box::new(state.clone()) }.next(sliding_sync).await?;
let state = state_machine
.next(State::Terminated { from: Box::new(state.clone()) }, sliding_sync)
.await?;

// Back to the previous state.
assert_eq!(state, State::Init);
}

// Next state.
let state = state.next(sliding_sync).await?;
let state = state_machine.next(state, sliding_sync).await?;
assert_eq!(state, State::SettingUp);

// Hypothetical error.
{
let state = State::Error { from: Box::new(state.clone()) }.next(sliding_sync).await?;
let state = state_machine
.next(State::Error { from: Box::new(state.clone()) }, sliding_sync)
.await?;

// Back to the previous state.
assert_eq!(state, State::SettingUp);
}

// Hypothetical termination.
{
let state =
State::Terminated { from: Box::new(state.clone()) }.next(sliding_sync).await?;
let state = state_machine
.next(State::Terminated { from: Box::new(state.clone()) }, sliding_sync)
.await?;

// Back to the previous state.
assert_eq!(state, State::SettingUp);
}

// Next state.
let state = state.next(sliding_sync).await?;
let state = state_machine.next(state, sliding_sync).await?;
assert_eq!(state, State::Running);

// Hypothetical error.
{
let state = State::Error { from: Box::new(state.clone()) }.next(sliding_sync).await?;
let state = state_machine
.next(State::Error { from: Box::new(state.clone()) }, sliding_sync)
.await?;

// Jump to the **recovering** state!
assert_eq!(state, State::Recovering);

let state = state.next(sliding_sync).await?;
let state = state_machine.next(state, sliding_sync).await?;

// Now, back to the previous state.
assert_eq!(state, State::Running);
}

// Hypothetical termination.
{
let state =
State::Terminated { from: Box::new(state.clone()) }.next(sliding_sync).await?;
let state = state_machine
.next(State::Terminated { from: Box::new(state.clone()) }, sliding_sync)
.await?;

// Jump to the **recovering** state!
assert_eq!(state, State::Recovering);

let state = state.next(sliding_sync).await?;
let state = state_machine.next(state, sliding_sync).await?;

// Now, back to the previous state.
assert_eq!(state, State::Running);
}

// Hypothetical error when recovering.
{
let state =
State::Error { from: Box::new(State::Recovering) }.next(sliding_sync).await?;
let state = state_machine
.next(State::Error { from: Box::new(State::Recovering) }, sliding_sync)
.await?;

// Back to the previous state.
assert_eq!(state, State::Recovering);
}

// Hypothetical termination when recovering.
{
let state =
State::Terminated { from: Box::new(State::Recovering) }.next(sliding_sync).await?;
let state = state_machine
.next(State::Terminated { from: Box::new(State::Recovering) }, sliding_sync)
.await?;

// Back to the previous state.
assert_eq!(state, State::Recovering);
Expand All @@ -223,6 +267,38 @@ mod tests {
Ok(())
}

#[async_test]
async fn test_recover_state_after_delay() -> Result<(), Error> {
let room_list = new_room_list().await?;
let sliding_sync = room_list.sliding_sync();

let mut state_machine = StateMachine::new();
state_machine.delay_before_recover = Duration::from_millis(50);

let state = State::Init;

let state = state_machine.next(state, sliding_sync).await?;
assert_eq!(state, State::SettingUp);

let state = state_machine.next(state, sliding_sync).await?;
assert_eq!(state, State::Running);

// We haven't reach `delay_before_recover` yet so should still be running
let state = state_machine.next(state, sliding_sync).await?;
assert_eq!(state, State::Running);

tokio::time::sleep(Duration::from_millis(100)).await;

// `delay_before_recover` reached, time to recover
let state = state_machine.next(state, sliding_sync).await?;
assert_eq!(state, State::Recovering);

let state = state_machine.next(state, sliding_sync).await?;
assert_eq!(state, State::Running);

Ok(())
}

#[async_test]
async fn test_action_set_all_rooms_list_to_growing_and_selective_sync_mode() -> Result<(), Error>
{
Expand Down
Loading