|
1 |
| -# Copyright 2015, 2016 OpenMarket Ltd |
| 1 | +# Copyright 2015-2021 The Matrix.org Foundation C.I.C. |
2 | 2 | #
|
3 | 3 | # Licensed under the Apache License, Version 2.0 (the "License");
|
4 | 4 | # you may not use this file except in compliance with the License.
|
|
12 | 12 | # See the License for the specific language governing permissions and
|
13 | 13 | # limitations under the License.
|
14 | 14 |
|
| 15 | +from typing import Dict, Iterable, List, Optional, Tuple |
15 | 16 | from unittest.mock import Mock
|
16 | 17 |
|
17 | 18 | from twisted.internet import defer
|
18 | 19 |
|
| 20 | +import synapse.rest.admin |
| 21 | +import synapse.storage |
| 22 | +from synapse.appservice import ApplicationService |
19 | 23 | from synapse.handlers.appservice import ApplicationServicesHandler
|
20 |
| -from synapse.types import RoomStreamToken |
| 24 | +from synapse.rest.client import login, receipts, room, sendtodevice |
| 25 | +from synapse.types import JsonDict, RoomStreamToken |
| 26 | +from synapse.util import json_encoder |
| 27 | +from synapse.util.stringutils import random_string |
21 | 28 |
|
| 29 | +from tests import unittest |
22 | 30 | from tests.test_utils import make_awaitable
|
23 | 31 | from tests.utils import MockClock
|
24 | 32 |
|
25 |
| -from .. import unittest |
26 |
| - |
27 | 33 |
|
28 | 34 | class AppServiceHandlerTestCase(unittest.TestCase):
|
29 | 35 | """Tests the ApplicationServicesHandler."""
|
@@ -253,20 +259,24 @@ async def get_3pe_protocol(service, unusedProtocol):
|
253 | 259 | },
|
254 | 260 | )
|
255 | 261 |
|
256 |
| - def test_notify_interested_services_ephemeral(self): |
| 262 | + def test_notify_interested_services_ephemeral_read_receipt(self): |
257 | 263 | """
|
258 |
| - Test sending ephemeral events to the appservice handler are scheduled |
| 264 | + Test sending read receipts to the appservice handler are scheduled |
259 | 265 | to be pushed out to interested appservices, and that the stream ID is
|
260 | 266 | updated accordingly.
|
261 | 267 | """
|
| 268 | + # Create an application service that is guaranteed to be interested in |
| 269 | + # any new events |
262 | 270 | interested_service = self._mkservice(is_interested=True)
|
263 | 271 | services = [interested_service]
|
264 |
| - |
265 | 272 | self.mock_store.get_app_services.return_value = services
|
| 273 | + |
| 274 | + # State that this application service has received up until stream ID 579 |
266 | 275 | self.mock_store.get_type_stream_id_for_appservice.return_value = make_awaitable(
|
267 | 276 | 579
|
268 | 277 | )
|
269 | 278 |
|
| 279 | + # Set up a dummy event that should be sent to the application service |
270 | 280 | event = Mock(event_id="event_1")
|
271 | 281 | self.event_source.sources.receipt.get_new_events_as.return_value = (
|
272 | 282 | make_awaitable(([event], None))
|
@@ -321,3 +331,272 @@ def _mkservice_alias(self, is_interested_in_alias):
|
321 | 331 | service.token = "mock_service_token"
|
322 | 332 | service.url = "mock_service_url"
|
323 | 333 | return service
|
| 334 | + |
| 335 | + |
| 336 | +class ApplicationServiceEphemeralEventsTestCase(unittest.HomeserverTestCase): |
| 337 | + servlets = [ |
| 338 | + synapse.rest.admin.register_servlets_for_client_rest_resource, |
| 339 | + login.register_servlets, |
| 340 | + room.register_servlets, |
| 341 | + sendtodevice.register_servlets, |
| 342 | + receipts.register_servlets, |
| 343 | + ] |
| 344 | + |
| 345 | + def prepare(self, reactor, clock, hs): |
| 346 | + # Mock the application service scheduler so that we can track any |
| 347 | + # outgoing transactions |
| 348 | + self.mock_scheduler = Mock() |
| 349 | + self.mock_scheduler.submit_ephemeral_events_for_as = Mock() |
| 350 | + |
| 351 | + hs.get_application_service_handler().scheduler = self.mock_scheduler |
| 352 | + |
| 353 | + self.device1 = "device1" |
| 354 | + self.user1 = self.register_user("user1", "password") |
| 355 | + self.token1 = self.login("user1", "password", self.device1) |
| 356 | + |
| 357 | + self.device2 = "device2" |
| 358 | + self.user2 = self.register_user("user2", "password") |
| 359 | + self.token2 = self.login("user2", "password", self.device2) |
| 360 | + |
| 361 | + @unittest.override_config( |
| 362 | + {"experimental_features": {"msc2409_to_device_messages_enabled": True}} |
| 363 | + ) |
| 364 | + def test_application_services_receive_local_to_device(self): |
| 365 | + """ |
| 366 | + Test that when a user sends a to-device message to another user, and |
| 367 | + that is in an application service's user namespace, that application |
| 368 | + service will receive it. |
| 369 | + """ |
| 370 | + ( |
| 371 | + interested_services, |
| 372 | + _, |
| 373 | + ) = self._register_interested_and_uninterested_application_services() |
| 374 | + interested_service = interested_services[0] |
| 375 | + |
| 376 | + # Have user1 send a to-device message to user2 |
| 377 | + message_content = {"some_key": "some really interesting value"} |
| 378 | + chan = self.make_request( |
| 379 | + "PUT", |
| 380 | + "/_matrix/client/r0/sendToDevice/m.room_key_request/3", |
| 381 | + content={"messages": {self.user2: {self.device2: message_content}}}, |
| 382 | + access_token=self.token1, |
| 383 | + ) |
| 384 | + self.assertEqual(chan.code, 200, chan.result) |
| 385 | + |
| 386 | + # Have user2 send a to-device message to user1 |
| 387 | + chan = self.make_request( |
| 388 | + "PUT", |
| 389 | + "/_matrix/client/r0/sendToDevice/m.room_key_request/4", |
| 390 | + content={"messages": {self.user1: {self.device1: message_content}}}, |
| 391 | + access_token=self.token2, |
| 392 | + ) |
| 393 | + self.assertEqual(chan.code, 200, chan.result) |
| 394 | + |
| 395 | + # Check if our application service - that is interested in user2 - received |
| 396 | + # the to-device message as part of an AS transaction. |
| 397 | + # Only the user1 -> user2 to-device message should have been forwarded to the AS. |
| 398 | + # |
| 399 | + # The uninterested application service should not have been notified at all. |
| 400 | + self.assertEqual( |
| 401 | + 1, self.mock_scheduler.submit_ephemeral_events_for_as.call_count |
| 402 | + ) |
| 403 | + service, events = self.mock_scheduler.submit_ephemeral_events_for_as.call_args[ |
| 404 | + 0 |
| 405 | + ] |
| 406 | + |
| 407 | + # Assert that this was the same to-device message that user1 sent |
| 408 | + self.assertEqual(service, interested_service) |
| 409 | + self.assertEqual(events[0]["type"], "m.room_key_request") |
| 410 | + self.assertEqual(events[0]["sender"], self.user1) |
| 411 | + |
| 412 | + # Additional fields 'to_user_id' and 'to_device_id' specifically for |
| 413 | + # to-device messages via the AS API |
| 414 | + self.assertEqual(events[0]["to_user_id"], self.user2) |
| 415 | + self.assertEqual(events[0]["to_device_id"], self.device2) |
| 416 | + self.assertEqual(events[0]["content"], message_content) |
| 417 | + |
| 418 | + @unittest.override_config( |
| 419 | + {"experimental_features": {"msc2409_to_device_messages_enabled": True}} |
| 420 | + ) |
| 421 | + def test_application_services_receive_bursts_of_to_device(self): |
| 422 | + """ |
| 423 | + Test that when a user sends >100 to-device messages at once, any |
| 424 | + interested AS's will receive them in separate transactions. |
| 425 | + """ |
| 426 | + ( |
| 427 | + interested_services, |
| 428 | + _, |
| 429 | + ) = self._register_interested_and_uninterested_application_services( |
| 430 | + interested_count=2, |
| 431 | + uninterested_count=2, |
| 432 | + ) |
| 433 | + |
| 434 | + to_device_message_content = { |
| 435 | + "some key": "some interesting value", |
| 436 | + } |
| 437 | + |
| 438 | + # We need to send a large burst of to-device messages. We also would like to |
| 439 | + # include them all in the same application service transaction so that we can |
| 440 | + # test large transactions. |
| 441 | + # |
| 442 | + # To do this, we can send a single to-device message to many user devices at |
| 443 | + # once. However, registering 100+ devices will make this test slow. |
| 444 | + # |
| 445 | + # So instead, let's insert some to-device messages directly into the inbox |
| 446 | + # intended for some dummy devices. |
| 447 | + # |
| 448 | + # We insert number_of_messages - 1, as user2 already has one device. We'll then |
| 449 | + # send a final to-device message to the real device, which will also kick off |
| 450 | + # an AS transaction (as just inserting messages into the DB won't). |
| 451 | + number_of_messages = 150 |
| 452 | + messages = { |
| 453 | + self.user2: { |
| 454 | + f"device_{num}": to_device_message_content |
| 455 | + for num in range(number_of_messages - 1) |
| 456 | + } |
| 457 | + } |
| 458 | + # Seed the device_inbox table with to-device messages intended for user2 |
| 459 | + next_device_inbox_stream_id = self.get_success( |
| 460 | + self.hs.get_datastore()._device_inbox_id_gen.get_next().__aenter__() |
| 461 | + ) |
| 462 | + self.get_success( |
| 463 | + self.hs.get_datastore().db_pool.simple_insert_many( |
| 464 | + desc="test_application_services_receive_burst_of_to_device", |
| 465 | + table="device_inbox", |
| 466 | + values=[ |
| 467 | + { |
| 468 | + "user_id": user_id, |
| 469 | + "device_id": device_id, |
| 470 | + "stream_id": next_device_inbox_stream_id, |
| 471 | + "message_json": json_encoder.encode(message_json), |
| 472 | + "instance_name": "master", |
| 473 | + } |
| 474 | + for user_id, messages_by_device in messages.items() |
| 475 | + for device_id, message_json in messages_by_device.items() |
| 476 | + ], |
| 477 | + ) |
| 478 | + ) |
| 479 | + |
| 480 | + # Now have user1 send a final to-device message to user2. All unsent |
| 481 | + # to-device messages should be sent to any application services |
| 482 | + # interested in user2. |
| 483 | + chan = self.make_request( |
| 484 | + "PUT", |
| 485 | + "/_matrix/client/r0/sendToDevice/m.room_key_request/4", |
| 486 | + content={ |
| 487 | + "messages": {self.user2: {self.device2: to_device_message_content}} |
| 488 | + }, |
| 489 | + access_token=self.token1, |
| 490 | + ) |
| 491 | + self.assertEqual(chan.code, 200, chan.result) |
| 492 | + |
| 493 | + # Count the total number of to-device messages that were sent out per-service. |
| 494 | + # Ensure that we only sent to-device messages to interested services, and that |
| 495 | + # each interested service received the full count of to-device messages. |
| 496 | + service_id_to_message_count: Dict[str, int] = {} |
| 497 | + self.assertGreater( |
| 498 | + self.mock_scheduler.submit_ephemeral_events_for_as.call_count, 0 |
| 499 | + ) |
| 500 | + for call in self.mock_scheduler.submit_ephemeral_events_for_as.call_args_list: |
| 501 | + service, events = call[0] |
| 502 | + |
| 503 | + # Check that this was made to an interested service |
| 504 | + self.assertIn(service, interested_services) |
| 505 | + |
| 506 | + # Add to the count of messages for this application service |
| 507 | + service_id_to_message_count.setdefault(service.id, 0) |
| 508 | + service_id_to_message_count[service.id] += len(events) |
| 509 | + |
| 510 | + # Assert that each interested service received the full count of messages |
| 511 | + for count in service_id_to_message_count.values(): |
| 512 | + self.assertEqual(count, number_of_messages) |
| 513 | + |
| 514 | + def _register_interested_and_uninterested_application_services( |
| 515 | + self, |
| 516 | + interested_count: int = 1, |
| 517 | + uninterested_count: int = 1, |
| 518 | + ) -> Tuple[List[ApplicationService], List[ApplicationService]]: |
| 519 | + """ |
| 520 | + Create application services with and without exclusive interest |
| 521 | + in user2. |
| 522 | +
|
| 523 | + Args: |
| 524 | + interested_count: The number of application services to create |
| 525 | + and register with exclusive interest. |
| 526 | + uninterested_count: The number of application services to create |
| 527 | + and register without any interest. |
| 528 | +
|
| 529 | + Returns: |
| 530 | + A two-tuple containing: |
| 531 | + * Interested application services |
| 532 | + * Uninterested application services |
| 533 | + """ |
| 534 | + # Create an application service with exclusive interest in user2 |
| 535 | + interested_services = [] |
| 536 | + uninterested_services = [] |
| 537 | + for _ in range(interested_count): |
| 538 | + interested_service = self._make_application_service( |
| 539 | + namespaces={ |
| 540 | + ApplicationService.NS_USERS: [ |
| 541 | + { |
| 542 | + "regex": "@user2:.+", |
| 543 | + "exclusive": True, |
| 544 | + } |
| 545 | + ], |
| 546 | + }, |
| 547 | + ) |
| 548 | + interested_services.append(interested_service) |
| 549 | + |
| 550 | + for _ in range(uninterested_count): |
| 551 | + uninterested_services.append(self._make_application_service()) |
| 552 | + |
| 553 | + # Register this application service, along with another, uninterested one |
| 554 | + services = [ |
| 555 | + *uninterested_services, |
| 556 | + *interested_services, |
| 557 | + ] |
| 558 | + self.hs.get_datastore().get_app_services = Mock(return_value=services) |
| 559 | + |
| 560 | + return interested_services, uninterested_services |
| 561 | + |
| 562 | + def _make_application_service( |
| 563 | + self, |
| 564 | + namespaces: Optional[Dict[str, Iterable[Dict]]] = None, |
| 565 | + ) -> ApplicationService: |
| 566 | + return ApplicationService( |
| 567 | + token=None, |
| 568 | + hostname="example.com", |
| 569 | + id=random_string(10), |
| 570 | + sender="@as:example.com", |
| 571 | + rate_limited=False, |
| 572 | + namespaces=namespaces, |
| 573 | + supports_ephemeral=True, |
| 574 | + ) |
| 575 | + |
| 576 | + def _event_id_from_read_receipt(self, read_receipt_dict: JsonDict): |
| 577 | + """ |
| 578 | + Extracts the first event ID from a read receipt. Read receipt dictionaries |
| 579 | + are in the form: |
| 580 | +
|
| 581 | + { |
| 582 | + 'type': 'm.receipt', |
| 583 | + 'room_id': '!PEzCqHyycBVxqMKIjI:test', |
| 584 | + 'content': { |
| 585 | + '$DETIeTEH651c1N7sP_j-YZiaQqCaayHhYwmhZDVWDY8': { # We want this |
| 586 | + 'm.read': { |
| 587 | + '@user1:test': { |
| 588 | + 'ts': 1300, |
| 589 | + 'hidden': False |
| 590 | + } |
| 591 | + } |
| 592 | + } |
| 593 | + } |
| 594 | + } |
| 595 | +
|
| 596 | + Args: |
| 597 | + read_receipt_dict: The dictionary returned from a POST read receipt call. |
| 598 | +
|
| 599 | + Returns: |
| 600 | + The (first) event ID the read receipt refers to. |
| 601 | + """ |
| 602 | + return list(read_receipt_dict["content"].keys())[0] |
0 commit comments