diff --git a/testing/matrix-sdk-integration-testing/src/tests.rs b/testing/matrix-sdk-integration-testing/src/tests.rs index eeadb405db1..dbb4c9354ae 100644 --- a/testing/matrix-sdk-integration-testing/src/tests.rs +++ b/testing/matrix-sdk-integration-testing/src/tests.rs @@ -1,6 +1,7 @@ mod auth; mod e2ee; mod invitations; +mod nse; mod reactions; mod redaction; mod repeated_join; diff --git a/testing/matrix-sdk-integration-testing/src/tests/nse.rs b/testing/matrix-sdk-integration-testing/src/tests/nse.rs new file mode 100644 index 00000000000..41227a4353e --- /dev/null +++ b/testing/matrix-sdk-integration-testing/src/tests/nse.rs @@ -0,0 +1,468 @@ +use std::{ + path::Path, + sync::{Arc, Mutex}, + time::{Duration, Instant}, +}; + +use anyhow::Result; +use assign::assign; +use futures::Future; +use matrix_sdk::{ + encryption::EncryptionSettings, + ruma::{ + api::client::room::create_room::v3::Request as CreateRoomRequest, + events::{ + room::{ + encrypted::{OriginalSyncRoomEncryptedEvent, RoomEncryptedEventContent}, + encryption::RoomEncryptionEventContent, + message::{MessageType, OriginalSyncRoomMessageEvent, RoomMessageEventContent}, + }, + room_key::ToDeviceRoomKeyEvent, + AnyMessageLikeEventContent, AnySyncTimelineEvent, AnyTimelineEvent, + OriginalSyncMessageLikeEvent, + }, + serde::Raw, + EventEncryptionAlgorithm, OwnedEventId, OwnedRoomId, RoomId, + }, + Client, Room, +}; +use matrix_sdk_ui::{ + notification_client::{NotificationClient, NotificationEvent, NotificationProcessSetup}, + sync_service::SyncService, +}; +use serde_json::json; +use tempfile::tempdir; +use tracing::{info, instrument, span, Level}; + +use crate::helpers::{SyncTokenAwareClient, TestClientBuilder}; + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn test_multiple_clients_share_crypto_state() -> Result<()> { + // This failed before https://github.com/matrix-org/matrix-rust-sdk/pull/3338 + // because, even though the cross-process lock was working, the SessionStore + // inside the crypto store was caching the Olm sessions, meaning we didn't + // actually get the new one even when we called regenerate_olm. + + // Given two alice clients with the same DB and user ID: + // alice_main is a normal client, and alice_nse is a NotificationClient + let alice_sqlite_dir = tempdir()?; + let alice_main = + ClientWrapper::new("alice", Some(alice_sqlite_dir.path()), Some("alice_main".to_owned())) + .await; + let alice_nse = + NotificationClientWrapper::notification_duplicate_of(&alice_main, alice_sqlite_dir.path()) + .await; + // Sanity: alice's clients have the same device_id + assert_eq!(alice_main.client.device_id(), alice_nse.inner_client.device_id()); + + // And given a normal client for bob + let bob = ClientWrapper::new("bob", None, None).await; + + info!("alice's device: {}", alice_main.client.device_id().unwrap()); + info!("bob's device: {}", bob.client.device_id().unwrap()); + + // And given they are both in an encrypted room together + let room_id = create_encrypted_room(&alice_main, &bob).await; + + // And given both alices have an Olm session with bob (because they received a + // message from him) + { + let _span = span!(Level::INFO, "msg1_from_bob").entered(); + + let msg1 = bob.send(&room_id, "msg1_from_bob").await; + alice_nse.nse_wait_until_received(&room_id, &msg1).await; + alice_main.wait_until_received(&msg1).await; + + info!("alice_nse received msg1 from bob"); + } + + // When the alice_main process sends a message + { + let _span = span!(Level::INFO, "msg2_from_alice").entered(); + + let msg2 = alice_main.send(&room_id, "msg2_from_alice").await; + bob.wait_until_received(&msg2).await; + + info!("bob received msg2 from alice_main"); + } + + // Then the NSE process can still receive messages from bob. + // (This means that the NSE process must have notice that its Olm machine was + // out-of-date and refreshed it from the DB.) + { + let _span = span!(Level::INFO, "msg3_from_bob").entered(); + + let msg3 = bob.send(&room_id, "msg3_from_bob").await; + alice_nse.nse_wait_until_received(&room_id, &msg3).await; + + info!("alice_nse received msg3 from bob"); + } + + Ok(()) +} + +/// A wrapper around a SyncTokenAwareClient that sets up a SyncService and +/// decrypts events if their keys arrive late. +/// +/// Received, decrypted events are stored in the `events` Vec. +struct ClientWrapper { + /// The client we are wrapping. + pub client: SyncTokenAwareClient, + + /// A sync service that is started whenever we wait_until_... something. + sync_service: SyncService, + + /// The received events and their plain text content. + events: Arc>>, +} + +type EncryptedEventInfo = (Raw, OwnedRoomId); + +impl ClientWrapper { + /// Create a new ClientWrapper. + /// + /// If sqlite_dir contains a path, that path is used for the sqlite DB. + /// Otherwise, a random path is used. + /// + /// The contained SyncService always has a cross-process lock. If + /// app_identifier is supplied, it is used to identify this client's + /// process. If not, the default name is used. + async fn new( + username: &str, + sqlite_dir: Option<&Path>, + app_identifier: Option, + ) -> Self { + let builder = TestClientBuilder::new(username).randomize_username(); + + let builder = if let Some(sqlite_dir) = sqlite_dir { + builder.use_sqlite_dir(sqlite_dir) + } else { + builder.use_sqlite() + }; + + let events = Arc::new(Mutex::new(Vec::new())); + let encrypted_events: Arc>> = + Arc::new(Mutex::new(Vec::new())); + + let inner_client = builder + .encryption_settings(encryption_settings()) + .build() + .await + .expect("Failed to create client"); + + let client = SyncTokenAwareClient::new(inner_client.clone()); + + let sync_service = SyncService::builder(inner_client) + .with_cross_process_lock(app_identifier) + .build() + .await + .expect("Failed to create sync service"); + + // Collect decrypted messages as they arrive and put them in events + let events_clone = events.clone(); + client.add_event_handler(|ev: OriginalSyncRoomMessageEvent, _: Client| async move { + let content = match ev.content.msgtype { + MessageType::Text(text) => text.body, + _ => panic!("Unexpected message type"), + }; + + events_clone.lock().unwrap().push((ev.event_id.clone(), content)) + }); + + // Collect encrypted events and put them in encrypted_events + let encrypted_events_clone = encrypted_events.clone(); + client.add_event_handler( + |ev: Raw, room: Room| async move { + encrypted_events_clone.lock().unwrap().push((ev, room.room_id().to_owned())); + }, + ); + + // Notice incoming keys and attempt to decrypt stored encrypted events + let encrypted_events_clone2 = encrypted_events.clone(); + let events_clone2 = events.clone(); + client.add_event_handler(|_ev: ToDeviceRoomKeyEvent, client: Client| async move { + // Whenever we received any room key, attempt to decrypt all existing encrypted + // events. This could be more efficient, but it does the job. + let evts = encrypted_events_clone2.lock().unwrap().clone(); + for (event, room_id) in evts.iter() { + if let Some((event_id, content)) = decrypt_event(&client, room_id, event).await { + // If we did decrypt an event, remember it in our list of events we've seen + events_clone2.lock().unwrap().push((event_id, content)); + } + } + }); + + Self { client, sync_service, events } + } + + /// Create an encrypted room and invite the supplied people. + async fn create_room(&self, invite: &[&ClientWrapper]) -> OwnedRoomId { + let invite = invite.iter().map(|cw| cw.client.user_id().unwrap().to_owned()).collect(); + + let request = assign!(CreateRoomRequest::new(), { + invite, + is_direct: true, + }); + + let room = self.client.create_room(request).await.expect("Failed to create room"); + self.enable_encryption(&room, 1).await; + room.room_id().to_owned() + } + + /// Send an m.room.encryption event in the supplied room to make it + /// encrypted. + async fn enable_encryption(&self, room: &Room, rotation_period_msgs: usize) { + // Adapted from crates/matrix-sdk/src/room/mod.rs enable_encryption + if !room.is_encrypted().await.expect("Failed to check encrypted") { + let content: RoomEncryptionEventContent = serde_json::from_value(json!({ + "algorithm": EventEncryptionAlgorithm::MegolmV1AesSha2, + "rotation_period_msgs": rotation_period_msgs, + })) + .expect("Failed parsing encryption JSON"); + + room.send_state_event(content).await.expect("Failed to send state event"); + + // Give the sync loop time to run, to be fairly sure the encryption event is + // received + tokio::time::sleep(Duration::from_millis(100)).await; + } + } + + /// Join the room with the supplied ID. + async fn join(&self, room_id: &RoomId) { + let room = self.wait_until_room_exists(room_id).await; + room.join().await.expect("Unable to join room") + } + + /// Send a text message in the supplied room and return the event ID and + /// contents. + async fn send(&self, room_id: &RoomId, message: &str) -> (OwnedEventId, String) { + let room = self.wait_until_room_exists(room_id).await; + + ( + room.send(RoomMessageEventContent::text_plain(message.to_owned())) + .await + .expect("Sending message failed") + .event_id + .to_owned(), + message.to_owned(), + ) + } + + /// Return true if the room with the supplied ID is encrypted. + async fn room_is_encrypted(&self, room_id: &RoomId) -> bool { + self.wait_until_room_exists(room_id) + .await + .is_encrypted() + .await + .expect("Failed to check encrypted") + } + + /// Wait (syncing if needed) until the room with supplied ID exists, or time + /// out. + async fn wait_until_room_exists(&self, room_id: &RoomId) -> Room { + self.sync_until(|| async { self.client.get_room(room_id) }) + .await + .unwrap_or_else(|| panic!("Timed out waiting for room {room_id} to exist")) + } + + /// Wait (syncing if needed) until the user appears in the supplied room, or + /// time out. + async fn wait_until_user_in_room(&self, room_id: &RoomId, other: &ClientWrapper) { + let room = self.wait_until_room_exists(room_id).await; + let user_id = other.client.user_id().unwrap(); + + self.sync_until(|| async { + room.get_member_no_sync(user_id).await.expect("get_member failed") + }) + .await + .unwrap_or_else(|| panic!("Timed out waiting for user {user_id} to be in room {room_id}")); + } + + /// Wait (syncing if needed) until the event with this ID appears, or time + /// out. + #[instrument(skip(self))] + async fn wait_until_received(&self, event_info: &(OwnedEventId, String)) { + self.sync_until(|| async { + self.events.lock().unwrap().contains(event_info).then_some(()) + }) + .await + .unwrap_or_else(|| { + panic!( + "Timed out waiting for event ({}, {}) to be received. Events: {:?}", + event_info.0, + event_info.1, + self.events.lock().unwrap() + ) + }); + } + + /// Start syncing and then wait until the supplied function returns Some, or + /// time out. + /// + /// Returns the returned value, or None if we timed out. + async fn sync_until(&self, f: F) -> Option + where + F: Fn() -> R, + R: Future>, + { + self.sync_service.start().await; + + // Repeatedly call f until it returns Some + let end_time = Instant::now() + timeout(); + while Instant::now() < end_time { + if let Some(ans) = f().await { + // We found what we were looking for + self.sync_service.stop().await.expect("Failed to stop sync service"); + return Some(ans); + } + tokio::time::sleep(Duration::from_millis(100)).await; + } + + // We timed out + self.sync_service.stop().await.expect("Failed to stop sync service"); + None + } +} + +/// A wrapper around a NotificationClient that can wait until a given event has +/// been received and decrypted. +/// +/// Received, decrypted events are stored in the `events` Vec. +struct NotificationClientWrapper { + /// The raw client we are wrapping. + pub inner_client: Client, + + /// The NotificationClient we are wrapping. + pub notif_client: NotificationClient, + + /// The received events and their plain text content. + events: Arc>>, +} + +impl NotificationClientWrapper { + /// Create a NotificationClientWrapper whose Client is a duplicate of + /// `other`'s. This means it has the same user ID and device ID because + /// it is created using [`Client::restore_session`]. + async fn notification_duplicate_of(other: &ClientWrapper, sqlite_dir: &Path) -> Self { + let events = Arc::new(Mutex::new(Vec::new())); + + let inner_client = TestClientBuilder::new(other.client.user_id().unwrap()) + .use_sqlite() + .encryption_settings(encryption_settings()) + .use_sqlite_dir(sqlite_dir) + .duplicate(&other.client) + .await + .expect("Failed to duplicate client"); + + let notif_client = NotificationClient::new( + inner_client.clone(), + NotificationProcessSetup::MultipleProcesses, + ) + .await + .expect("Failed to build NotificationClient"); + + Self { inner_client, notif_client, events } + } + + /// Wait (using [`NotificationClient::get_notification`]) until the event + /// with this ID appears, or time out. + #[instrument(skip(self))] + async fn nse_wait_until_received(&self, room_id: &RoomId, event_info: &(OwnedEventId, String)) { + if self.events.lock().unwrap().contains(event_info) { + // The event is already here - we are done. + return; + } + + // Wait until this event can be got via get_notification + let end_time = Instant::now() + timeout(); + while Instant::now() < end_time { + let item = self + .notif_client + .get_notification(room_id, &event_info.0) + .await + .expect("Failed to get_notification"); + + if let Some(item) = item { + if let NotificationEvent::Timeline(AnySyncTimelineEvent::MessageLike(e)) = + item.event + { + if let AnyMessageLikeEventContent::RoomMessage(c) = + e.original_content().expect("Empty original content") + { + self.events + .lock() + .unwrap() + .push((event_info.0.clone(), c.body().to_owned())); + return; + } + } + }; + } + + panic!( + "Timed out waiting for event ({}, {}) to be received via NotificationClient. Events: {:?}", + event_info.0, + event_info.1, + self.events.lock().unwrap() + ); + } +} + +/// Attempt to decrypt an event and return its ID and text body if we succeed. +/// Otherwise, return None. +/// +/// Only works for room messages that contain text - anything else is ignored +/// and we return None. +async fn decrypt_event( + client: &Client, + room_id: &RoomId, + event: &Raw>, +) -> Option<(OwnedEventId, String)> { + let Ok(decrypted) = client.get_room(room_id).unwrap().decrypt_event(event).await else { + return None; + }; + + let Ok(deserialized) = decrypted.event.deserialize() else { return None }; + + let AnyTimelineEvent::MessageLike(message) = &deserialized else { return None }; + + let Some(AnyMessageLikeEventContent::RoomMessage(content)) = message.original_content() else { + return None; + }; + + let MessageType::Text(text) = content.msgtype else { return None }; + + let event_id = deserialized.event_id().to_owned(); + info!("Successfully decrypted event {event_id}"); + + Some((event_id, text.body)) +} + +/// The standard settings for an encrypted room. +fn encryption_settings() -> EncryptionSettings { + EncryptionSettings { auto_enable_cross_signing: true, ..Default::default() } +} + +/// How long to wait before giving up on an operation. +fn timeout() -> Duration { + Duration::from_secs(10) +} + +/// Create an encrypted room as `alice_main` and join `bob` to it. +async fn create_encrypted_room(alice_main: &ClientWrapper, bob: &ClientWrapper) -> OwnedRoomId { + let room_id = alice_main.create_room(&[bob]).await; + + info!("alice_main has created and enabled encryption in the room"); + + bob.join(&room_id).await; + + alice_main.wait_until_user_in_room(&room_id, bob).await; + + // Sanity: the room is encrypted + assert!(bob.room_is_encrypted(&room_id).await); + assert!(alice_main.room_is_encrypted(&room_id).await); + + info!("alice_main and bob are both aware of each other in the e2ee room"); + room_id +}