Skip to content

Commit

Permalink
fix(sdk): Mark tracked users as dirty when the SS connection is reset.
Browse files Browse the repository at this point in the history
There is a non-negligible difference MSC3575 and MSC4186 in how the
`e2ee` extension works. When the client sends a request with no `pos`:

* MSC3575 returns all device lists updates since the last request
  from the device that asked for device lists (this works similarly to
  to-device message handling),

* MSC4186 returns no device lists updates, as it only returns changes
  since the provided `pos` (which is `null` in this case); this is in
  line with sync v2.

Therefore, with MSC4186, the device list cache must be marked as to be
re-downloaded if the `since` token is `None`, otherwise it's easy to
miss device lists updates that happened between the previous request and
the new “initial” request.
  • Loading branch information
Hywan committed Sep 10, 2024
1 parent d8b2014 commit c0bb278
Show file tree
Hide file tree
Showing 2 changed files with 180 additions and 8 deletions.
10 changes: 10 additions & 0 deletions crates/matrix-sdk/src/sliding_sync/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,14 @@ pub enum Error {
/// The original `JoinError`.
error: JoinError,
},

/// No Olm machine.
#[cfg(feature = "e2e-encryption")]
#[error("The Olm machine is missing")]
NoOlmMachine,

/// An error occurred during a E2EE operation.
#[cfg(feature = "e2e-encryption")]
#[error(transparent)]
CryptoStoreError(#[from] matrix_sdk_base::crypto::CryptoStoreError),
}
178 changes: 170 additions & 8 deletions crates/matrix-sdk/src/sliding_sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,27 @@ impl SlidingSync {

Span::current().record("pos", &pos);

// There is a non-negligible difference MSC3575 and MSC4186 in how
// the `e2ee` extension works. When the client sends a request with
// no `pos`:
//
// * MSC3575 returns all device lists updates since the last request from the
// device that asked for device lists (this works similarly to to-device
// message handling),
// * MSC4186 returns no device lists updates, as it only returns changes since
// the provided `pos` (which is `null` in this case); this is in line with
// sync v2.
//
// Therefore, with MSC4186, the device list cache must be marked as to be
// re-downloaded if the `since` token is `None`, otherwise it's easy to miss
// device lists updates that happened between the previous request and the new
// “initial” request.
if pos.is_none() && self.inner.version.is_native() && self.is_e2ee_enabled() {
let olm_machine = self.inner.client.olm_machine().await;
let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
olm_machine.mark_all_tracked_users_as_dirty().await?;
}

// Configure the timeout.
//
// The `timeout` query is necessary when all lists require it. Please see
Expand Down Expand Up @@ -843,15 +864,9 @@ enum SlidingSyncInternalMessage {

#[cfg(any(test, feature = "testing"))]
impl SlidingSync {
/// Get a copy of the `pos` value.
pub fn pos(&self) -> Option<String> {
let position_lock = self.inner.position.blocking_lock();
position_lock.pos.clone()
}

/// Set a new value for `pos`.
pub fn set_pos(&self, new_pos: String) {
let mut position_lock = self.inner.position.blocking_lock();
pub async fn set_pos(&self, new_pos: String) {
let mut position_lock = self.inner.position.lock().await;
position_lock.pos = Some(new_pos);
}

Expand Down Expand Up @@ -1662,6 +1677,153 @@ mod tests {
Ok(())
}

// With MSC4186, with the `e2ee` extension enabled, if a request has no `pos`,
// all the tracked users by the `OlmMachine` must be marked as dirty, i.e.
// `/key/query` requests must be sent. See the code to see the details.
//
// This test is asserting that.
#[async_test]
#[cfg(feature = "e2e-encryption")]
async fn test_no_pos_with_e2ee_marks_all_tracked_users_as_dirty() -> anyhow::Result<()> {
use matrix_sdk_base::crypto::{IncomingResponse, OutgoingRequests};
use matrix_sdk_test::ruma_response_from_json;
use ruma::user_id;

let server = MockServer::start().await;
let client = logged_in_client(Some(server.uri())).await;

let alice = user_id!("@alice:localhost");
let bob = user_id!("@bob:localhost");
let me = user_id!("@example:localhost");

// Track and mark users are not dirty, so that we can check they are “dirty”
// after that. Dirty here means that a `/key/query` must be sent.
{
let olm_machine = client.olm_machine().await;
let olm_machine = olm_machine.as_ref().unwrap();

olm_machine.update_tracked_users([alice, bob]).await?;

// Assert requests.
let outgoing_requests = olm_machine.outgoing_requests().await?;

assert_eq!(outgoing_requests.len(), 2);
assert_matches!(outgoing_requests[0].request(), OutgoingRequests::KeysUpload(_));
assert_matches!(outgoing_requests[1].request(), OutgoingRequests::KeysQuery(_));

// Fake responses.
olm_machine
.mark_request_as_sent(
outgoing_requests[0].request_id(),
IncomingResponse::KeysUpload(&ruma_response_from_json(&json!({
"one_time_key_counts": {}
}))),
)
.await?;

olm_machine
.mark_request_as_sent(
outgoing_requests[1].request_id(),
IncomingResponse::KeysQuery(&ruma_response_from_json(&json!({
"device_keys": {
alice: {},
bob: {},
}
}))),
)
.await?;

// Once more.
let outgoing_requests = olm_machine.outgoing_requests().await?;

assert_eq!(outgoing_requests.len(), 1);
assert_matches!(outgoing_requests[0].request(), OutgoingRequests::KeysQuery(_));

olm_machine
.mark_request_as_sent(
outgoing_requests[0].request_id(),
IncomingResponse::KeysQuery(&ruma_response_from_json(&json!({
"device_keys": {
me: {},
}
}))),
)
.await?;

// No more.
let outgoing_requests = olm_machine.outgoing_requests().await?;

assert!(outgoing_requests.is_empty());
}

let sync = client
.sliding_sync("test-slidingsync")?
.add_list(SlidingSyncList::builder("new_list"))
.with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
.build()
.await?;

// First request: no `pos`.
let txn_id = TransactionId::new();
let (_request, _, _) = sync
.generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
.await?;

// Now, tracked users must be dirty.
{
let olm_machine = client.olm_machine().await;
let olm_machine = olm_machine.as_ref().unwrap();

// Assert requests.
let outgoing_requests = olm_machine.outgoing_requests().await?;

assert_eq!(outgoing_requests.len(), 1);
assert_matches!(
outgoing_requests[0].request(),
OutgoingRequests::KeysQuery(request) => {
assert!(request.device_keys.contains_key(alice));
assert!(request.device_keys.contains_key(bob));
assert!(request.device_keys.contains_key(me));
}
);

// Fake responses.
olm_machine
.mark_request_as_sent(
outgoing_requests[0].request_id(),
IncomingResponse::KeysQuery(&ruma_response_from_json(&json!({
"device_keys": {
alice: {},
bob: {},
me: {},
}
}))),
)
.await?;
}

// Second request: with a `pos` this time.
sync.set_pos("chocolat".to_owned()).await;

let txn_id = TransactionId::new();
let (_request, _, _) = sync
.generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
.await?;

// Tracked users are not marked as dirty.
{
let olm_machine = client.olm_machine().await;
let olm_machine = olm_machine.as_ref().unwrap();

// Assert requests.
let outgoing_requests = olm_machine.outgoing_requests().await?;

assert!(outgoing_requests.is_empty());
}

Ok(())
}

#[async_test]
async fn test_unknown_pos_resets_pos_and_sticky_parameters() -> Result<()> {
let server = MockServer::start().await;
Expand Down

0 comments on commit c0bb278

Please sign in to comment.