Skip to content

Commit

Permalink
RabbitMQ temporary clients (#4360)
Browse files Browse the repository at this point in the history
Implements "temporary" queues for listening to events not bound to a specific client, meant to be used by team management or similar services.

When GET /events is called without a client_id parameter, we create a new temporary queue and bind it to the user-notifications exchange with routing keys <user-id> and <user-id>.temporary.

When a notification is published to RabbitMQ to all clients of a user, nothing changes, and <user-id> is used as its routing key. When it is published to a list of clients, it is now also published with routing key <user-id>.temporary. Each notifications is only published once with the <user-id>.temporary routing key even if the user has multiple capable clients.

When the websocket is closed, the temporary queue is deleted.


---------

Co-authored-by: Sven Tennie <sven.tennie@wire.com>
  • Loading branch information
pcapriotti and supersven authored Dec 12, 2024
1 parent 97b59ab commit 27849c6
Show file tree
Hide file tree
Showing 12 changed files with 164 additions and 43 deletions.
1 change: 1 addition & 0 deletions changelog.d/1-api-changes/rabbitmq-temp
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
The `client_id` query parameter of the `GET /events` endpoint is now optional. When not provided, events are returned from a temporary queue that's not bound to any specific client. The queue is deleted when the websocket disconnects.
113 changes: 91 additions & 22 deletions integration/test/Test/Events.hs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import Control.Retry
import Data.ByteString.Conversion (toByteString')
import qualified Data.Text as Text
import Data.Timeout
import MLS.Util
import Network.AMQP.Extended
import Network.RabbitMqAdmin
import qualified Network.WebSockets as WS
Expand All @@ -36,7 +37,7 @@ testConsumeEventsOneWebSocket = do
client <- addClient alice def {acapabilities = Just ["consumable-notifications"]} >>= getJSON 201
clientId <- objId client

runCodensity (createEventsWebSocket alice clientId) $ \ws -> do
runCodensity (createEventsWebSocket alice (Just clientId)) $ \ws -> do
deliveryTag <- assertEvent ws $ \e -> do
e %. "type" `shouldMatch` "event"
e %. "data.event.payload.0.type" `shouldMatch` "user.client-add"
Expand All @@ -61,6 +62,73 @@ testConsumeEventsOneWebSocket = do
resp.status `shouldMatchInt` 200
shouldBeEmpty $ resp.json %. "notifications"

testConsumeTempEvents :: (HasCallStack) => App ()
testConsumeTempEvents = do
alice <- randomUser OwnDomain def

client0 <- addClient alice def {acapabilities = Just ["consumable-notifications"]} >>= getJSON 201
runCodensity (createEventsWebSocket alice Nothing) $ \ws -> do
clientId <- objId client0

void $ assertEvent ws $ \e -> do
e %. "type" `shouldMatch` "event"
e %. "data.event.payload.0.type" `shouldMatch` "user.client-add"
e %. "data.event.payload.0.client.id" `shouldMatch` clientId

ackEvent ws e

runCodensity (createEventsWebSocket alice Nothing) $ \ws -> do
client <- addClient alice def {acapabilities = Just ["consumable-notifications"]} >>= getJSON 201
clientId <- objId client

void $ assertEvent ws $ \e -> do
e %. "type" `shouldMatch` "event"
e %. "data.event.payload.0.type" `shouldMatch` "user.client-add"
e %. "data.event.payload.0.client.id" `shouldMatch` clientId

ackEvent ws e

assertNoEvent_ ws

testMLSTempEvents :: (HasCallStack) => App ()
testMLSTempEvents = do
[alice, bob] <- createAndConnectUsers [OwnDomain, OwnDomain]
clients@[alice1, _, _] <-
traverse
( createMLSClient
def
def
{ clientArgs =
def
{ acapabilities = Just ["consumable-notifications"]
}
}
)
[alice, bob, bob]

traverse_ (uploadNewKeyPackage def) clients
convId <- createNewGroup def alice1

runCodensity (createEventsWebSocket bob Nothing) $ \ws -> do
commit <- createAddCommit alice1 convId [bob]
void $ postMLSCommitBundle commit.sender (mkBundle commit) >>= getJSON 201

-- FUTUREWORK: we should not rely on events arriving in this particular order

void $ assertEvent ws $ \e -> do
e %. "type" `shouldMatch` "event"
e %. "data.event.payload.0.type" `shouldMatch` "conversation.member-join"
user <- assertOne =<< (e %. "data.event.payload.0.data.users" & asList)
user %. "qualified_id" `shouldMatch` (bob %. "qualified_id")
ackEvent ws e

void $ assertEvent ws $ \e -> do
e %. "type" `shouldMatch` "event"
e %. "data.event.payload.0.type" `shouldMatch` "conversation.mls-welcome"
ackEvent ws e

assertNoEvent_ ws

testConsumeEventsForDifferentUsers :: (HasCallStack) => App ()
testConsumeEventsForDifferentUsers = do
alice <- randomUser OwnDomain def
Expand All @@ -73,8 +141,8 @@ testConsumeEventsForDifferentUsers = do
bobClientId <- objId bobClient

lowerCodensity $ do
aliceWS <- createEventsWebSocket alice aliceClientId
bobWS <- createEventsWebSocket bob bobClientId
aliceWS <- createEventsWebSocket alice (Just aliceClientId)
bobWS <- createEventsWebSocket bob (Just bobClientId)
lift $ assertClientAdd aliceClientId aliceWS
lift $ assertClientAdd bobClientId bobWS
where
Expand Down Expand Up @@ -110,7 +178,7 @@ testConsumeEventsWhileHavingLegacyClients = do
oldNotif <- awaitMatch isUserClientAddNotif oldWS
oldNotif %. "payload.0.client.id" `shouldMatch` newClientId

runCodensity (createEventsWebSocket alice newClientId) $ \ws ->
runCodensity (createEventsWebSocket alice (Just newClientId)) $ \ws ->
assertEvent ws $ \e -> do
e %. "data.event.payload.0.type" `shouldMatch` "user.client-add"
e %. "data.event.payload.0.client.id" `shouldMatch` newClientId
Expand All @@ -127,20 +195,20 @@ testConsumeEventsAcks = do
client <- addClient alice def {acapabilities = Just ["consumable-notifications"]} >>= getJSON 201
clientId <- objId client

runCodensity (createEventsWebSocket alice clientId) $ \ws -> do
runCodensity (createEventsWebSocket alice (Just clientId)) $ \ws -> do
assertEvent ws $ \e -> do
e %. "data.event.payload.0.type" `shouldMatch` "user.client-add"
e %. "data.event.payload.0.client.id" `shouldMatch` clientId

-- without ack, we receive the same event again
runCodensity (createEventsWebSocket alice clientId) $ \ws -> do
runCodensity (createEventsWebSocket alice (Just clientId)) $ \ws -> do
deliveryTag <- assertEvent ws $ \e -> do
e %. "data.event.payload.0.type" `shouldMatch` "user.client-add"
e %. "data.event.payload.0.client.id" `shouldMatch` clientId
e %. "data.delivery_tag"
sendAck ws deliveryTag False

runCodensity (createEventsWebSocket alice clientId) $ \ws -> do
runCodensity (createEventsWebSocket alice (Just clientId)) $ \ws -> do
assertNoEvent_ ws

testConsumeEventsMultipleAcks :: (HasCallStack) => App ()
Expand All @@ -152,7 +220,7 @@ testConsumeEventsMultipleAcks = do
handle <- randomHandle
putHandle alice handle >>= assertSuccess

runCodensity (createEventsWebSocket alice clientId) $ \ws -> do
runCodensity (createEventsWebSocket alice (Just clientId)) $ \ws -> do
assertEvent ws $ \e -> do
e %. "data.event.payload.0.type" `shouldMatch` "user.client-add"
e %. "data.event.payload.0.client.id" `shouldMatch` clientId
Expand All @@ -164,7 +232,7 @@ testConsumeEventsMultipleAcks = do

sendAck ws deliveryTag True

runCodensity (createEventsWebSocket alice clientId) $ \ws -> do
runCodensity (createEventsWebSocket alice (Just clientId)) $ \ws -> do
assertNoEvent_ ws

testConsumeEventsAckNewEventWithoutAckingOldOne :: (HasCallStack) => App ()
Expand All @@ -176,7 +244,7 @@ testConsumeEventsAckNewEventWithoutAckingOldOne = do
handle <- randomHandle
putHandle alice handle >>= assertSuccess

runCodensity (createEventsWebSocket alice clientId) $ \ws -> do
runCodensity (createEventsWebSocket alice (Just clientId)) $ \ws -> do
assertEvent ws $ \e -> do
e %. "data.event.payload.0.type" `shouldMatch` "user.client-add"
e %. "data.event.payload.0.client.id" `shouldMatch` clientId
Expand All @@ -190,15 +258,15 @@ testConsumeEventsAckNewEventWithoutAckingOldOne = do
sendAck ws deliveryTagHandleAdd False

-- Expect client-add event to be delivered again.
runCodensity (createEventsWebSocket alice clientId) $ \ws -> do
runCodensity (createEventsWebSocket alice (Just clientId)) $ \ws -> do
deliveryTagClientAdd <- assertEvent ws $ \e -> do
e %. "data.event.payload.0.type" `shouldMatch` "user.client-add"
e %. "data.event.payload.0.client.id" `shouldMatch` clientId
e %. "data.delivery_tag"

sendAck ws deliveryTagClientAdd False

runCodensity (createEventsWebSocket alice clientId) $ \ws -> do
runCodensity (createEventsWebSocket alice (Just clientId)) $ \ws -> do
assertNoEvent_ ws

testEventsDeadLettered :: (HasCallStack) => App ()
Expand All @@ -218,7 +286,7 @@ testEventsDeadLettered = do
handle1 <- randomHandle
putHandle alice handle1 >>= assertSuccess

runCodensity (createEventsWebSocket alice clientId) $ \ws -> do
runCodensity (createEventsWebSocket alice (Just clientId)) $ \ws -> do
assertEvent ws $ \e -> do
e %. "type" `shouldMatch` "notifications.missed"

Expand All @@ -245,7 +313,7 @@ testTransientEventsDoNotTriggerDeadLetters = do
clientId <- objId client

-- consume it
runCodensity (createEventsWebSocket alice clientId) $ \ws -> do
runCodensity (createEventsWebSocket alice (Just clientId)) $ \ws -> do
assertEvent ws $ \e -> do
e %. "data.event.payload.0.type" `shouldMatch` "user.client-add"
e %. "type" `shouldMatch` "event"
Expand All @@ -260,7 +328,7 @@ testTransientEventsDoNotTriggerDeadLetters = do
-- Typing status is transient, currently no one is listening.
sendTypingStatus alice selfConvId "started" >>= assertSuccess

runCodensity (createEventsWebSocket alice clientId) $ \ws -> do
runCodensity (createEventsWebSocket alice (Just clientId)) $ \ws -> do
assertNoEvent_ ws

testTransientEvents :: (HasCallStack) => App ()
Expand All @@ -273,7 +341,7 @@ testTransientEvents = do
-- indicators, so we don't have to create another conv.
selfConvId <- objQidObject alice

runCodensity (createEventsWebSocket alice clientId) $ \ws -> do
runCodensity (createEventsWebSocket alice (Just clientId)) $ \ws -> do
consumeAllEvents ws
sendTypingStatus alice selfConvId "started" >>= assertSuccess
assertEvent ws $ \e -> do
Expand All @@ -293,7 +361,7 @@ testTransientEvents = do
-- We shouldn't see the stopped typing status because we were not connected to
-- the websocket when it was sent. The other events should still show up in
-- order.
runCodensity (createEventsWebSocket alice clientId) $ \ws -> do
runCodensity (createEventsWebSocket alice (Just clientId)) $ \ws -> do
for_ [handle1, handle2] $ \handle ->
assertEvent ws $ \e -> do
e %. "data.event.payload.0.type" `shouldMatch` "user.update"
Expand Down Expand Up @@ -321,20 +389,21 @@ testChannelLimit = withModifiedBackend

lowerCodensity $ do
for_ clients $ \c -> do
ws <- createEventsWebSocket alice c
ws <- createEventsWebSocket alice (Just c)
lift $ assertEvent ws $ \e -> do
e %. "data.event.payload.0.type" `shouldMatch` "user.client-add"
e %. "data.event.payload.0.client.id" `shouldMatch` c

-- the first client fails to connect because the server runs out of channels
do
ws <- createEventsWebSocket alice client0
ws <- createEventsWebSocket alice (Just client0)
lift $ assertNoEvent_ ws

testChannelKilled :: (HasCallStack) => App ()
testChannelKilled = lowerCodensity $ do
pool <- lift $ asks (.resourcePool)
[backend] <- acquireResources 1 pool

domain <- startDynamicBackend backend mempty
alice <- lift $ randomUser domain def
[c1, c2] <-
Expand All @@ -345,7 +414,7 @@ testChannelKilled = lowerCodensity $ do
>>= (%. "id")
>>= asString

ws <- createEventsWebSocket alice c1
ws <- createEventsWebSocket alice (Just c1)
lift $ do
assertEvent ws $ \e -> do
e %. "data.event.payload.0.type" `shouldMatch` "user.client-add"
Expand Down Expand Up @@ -374,7 +443,7 @@ data EventWebSocket = EventWebSocket
createEventsWebSocket ::
(HasCallStack, MakesValue uid) =>
uid ->
String ->
Maybe String ->
Codensity App EventWebSocket
createEventsWebSocket user cid = do
eventsChan <- liftIO newChan
Expand All @@ -388,7 +457,7 @@ createEventsWebSocket user cid = do

uid <- lift $ objId =<< objQidObject user
let HostPort caHost caPort = serviceHostPort serviceMap Cannon
path = "/v" <> show apiVersion <> "/events?client=" <> cid
path = "/v" <> show apiVersion <> "/events" <> maybe "" ("?client=" <>) cid
caHdrs = [(fromString "Z-User", toByteString' uid)]
app conn =
race_
Expand Down
22 changes: 20 additions & 2 deletions libs/wire-api/src/Wire/API/Notification.hs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,10 @@ module Wire.API.Notification
userNotificationExchangeName,
userNotificationDlxName,
userNotificationDlqName,
RabbitMqClientId (..),
clientNotificationQueueName,
userRoutingKey,
temporaryRoutingKey,
clientRoutingKey,
)
where
Expand All @@ -49,6 +51,7 @@ import Control.Lens.Operators ((?~))
import Data.Aeson (FromJSON (..), ToJSON (..))
import Data.Aeson.Types qualified as Aeson
import Data.Bits
import Data.ByteString.Conversion
import Data.HashMap.Strict.InsOrd qualified as InsOrdHashMap
import Data.Id
import Data.Json.Util
Expand Down Expand Up @@ -188,12 +191,27 @@ userNotificationDlxName = "dead-user-notifications"
userNotificationDlqName :: Text
userNotificationDlqName = "dead-user-notifications"

clientNotificationQueueName :: UserId -> ClientId -> Text
data RabbitMqClientId
= RabbitMqClientId ClientId
| RabbitMqTempId Text

instance ToByteString RabbitMqClientId where
builder (RabbitMqClientId cid) = builder cid
builder (RabbitMqTempId temp) = builder temp

clientNotificationQueueName :: UserId -> RabbitMqClientId -> Text
clientNotificationQueueName uid cid =
"user-notifications." <> clientRoutingKey uid cid
"user-notifications." <> userRoutingKey uid <> "." <> rabbitMqClientToText cid

userRoutingKey :: UserId -> Text
userRoutingKey = idToText

clientRoutingKey :: UserId -> ClientId -> Text
clientRoutingKey uid cid = userRoutingKey uid <> "." <> clientToText cid

temporaryRoutingKey :: UserId -> Text
temporaryRoutingKey uid = userRoutingKey uid <> ".temporary"

rabbitMqClientToText :: RabbitMqClientId -> Text
rabbitMqClientToText (RabbitMqClientId cid) = clientToText cid
rabbitMqClientToText (RabbitMqTempId temp) = "temp-" <> temp
3 changes: 1 addition & 2 deletions libs/wire-api/src/Wire/API/Routes/Public/Cannon.hs
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@ type CannonAPI =
:> "events"
:> ZUser
:> QueryParam'
[ -- Make this optional in https://wearezeta.atlassian.net/browse/WPB-11173
Required,
[ Optional,
Strict,
Description "Client ID"
]
Expand Down
1 change: 1 addition & 0 deletions services/cannon/cannon.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ library
, lens >=4.4
, lens-family-core >=1.1
, metrics-wai >=0.4
, MonadRandom
, mwc-random >=0.13
, prometheus-client
, retry >=0.7
Expand Down
2 changes: 2 additions & 0 deletions services/cannon/default.nix
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
, lens-family-core
, lib
, metrics-wai
, MonadRandom
, mwc-random
, prometheus-client
, QuickCheck
Expand Down Expand Up @@ -91,6 +92,7 @@ mkDerivation {
lens
lens-family-core
metrics-wai
MonadRandom
mwc-random
prometheus-client
retry
Expand Down
6 changes: 3 additions & 3 deletions services/cannon/src/Cannon/API/Public.hs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ streamData userId connId clientId con = do
e <- wsenv
liftIO $ wsapp (mkKey userId connId) clientId e con

consumeEvents :: UserId -> ClientId -> PendingConnection -> Cannon ()
consumeEvents userId clientId con = do
consumeEvents :: UserId -> Maybe ClientId -> PendingConnection -> Cannon ()
consumeEvents userId mClientId con = do
e <- wsenv
liftIO $ rabbitMQWebSocketApp userId clientId e con
liftIO $ rabbitMQWebSocketApp userId mClientId e con
12 changes: 9 additions & 3 deletions services/cannon/src/Cannon/RabbitMq.hs
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,12 @@ ackMessage chan deliveryTag multiple = do
inner <- readMVar chan.inner
Q.ackMsg inner deliveryTag multiple

createChannel :: (Ord key) => RabbitMqPool key -> Text -> key -> Codensity IO RabbitMqChannel
createChannel pool queue key = do
type QueueName = Text

type CreateQueue = Q.Channel -> Codensity IO ()

createChannel :: (Ord key) => RabbitMqPool key -> QueueName -> CreateQueue -> key -> Codensity IO RabbitMqChannel
createChannel pool queueName createQueue key = do
closedVar <- lift newEmptyMVar
inner <- lift newEmptyMVar
msgVar <- lift newEmptyMVar
Expand Down Expand Up @@ -263,9 +267,11 @@ createChannel pool queue key = do
if connSize > pool.opts.maxChannels
then pure True
else do
createQueue chan

liftIO $ Q.addChannelExceptionHandler chan handleException
putMVar inner chan
void $ liftIO $ Q.consumeMsgs chan queue Q.Ack $ \(message, envelope) -> do
void $ liftIO $ Q.consumeMsgs chan queueName Q.Ack $ \(message, envelope) -> do
putMVar msgVar (Just (message, envelope))
retry <- takeMVar closedVar
void $ takeMVar inner
Expand Down
Loading

0 comments on commit 27849c6

Please sign in to comment.