Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 68 additions & 28 deletions src/Simplex/Messaging/Agent.hs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ module Simplex.Messaging.Agent
)
where

import Control.Applicative ((<|>))
import Control.Concurrent.STM (retry)
import Control.Logger.Simple
import Control.Monad
Expand Down Expand Up @@ -859,7 +860,7 @@ switchConnectionAsync' c corrId connId =
rq1 <- withStore' c $ \db -> setRcvSwitchStatus db rq $ Just RSSwitchStarted
enqueueCommand c corrId connId Nothing $ AClientCommand SWCH
let rqs' = updatedQs rq1 rqs
pure . connectionStats $ DuplexConnection cData rqs' sqs
connectionStats c $ DuplexConnection cData rqs' sqs
_ -> throwE $ CMD PROHIBITED "switchConnectionAsync: not duplex"

newConn :: ConnectionModeI c => AgentClient -> NetworkRequestMode -> UserId -> Bool -> SConnectionMode c -> Maybe UserLinkData -> Maybe CRClientData -> CR.InitialKeys -> SubscriptionMode -> AM (ConnId, (CreatedConnLink c, Maybe ClientServiceId))
Expand Down Expand Up @@ -1704,7 +1705,8 @@ runCommandProcessing c@AgentClient {subQ} connId server_ Worker {doWork} = do
rq1' <- withStore' c $ \db -> setRcvSwitchStatus db rq1 $ Just RSSendingQUSE
let rqs' = updatedQs rq1' rqs
conn' = DuplexConnection cData rqs' sqs
notify . SWITCH QDRcv SPSecured $ connectionStats conn'
cStats <- connectionStats c conn'
notify $ SWITCH QDRcv SPSecured cStats
_ -> internalErr "ICQSecure: no switching queue found"
_ -> internalErr "ICQSecure: queue address not found in connection"
ICQDelete rId -> do
Expand All @@ -1727,7 +1729,8 @@ runCommandProcessing c@AgentClient {subQ} connId server_ Worker {doWork} = do
ns <- asks ntfSupervisor
liftIO $ sendNtfSubCommand ns (NSCCreate, [connId])
let conn' = DuplexConnection cData (rq'' :| rqs') sqs
notify $ SWITCH QDRcv SPCompleted $ connectionStats conn'
cStats <- connectionStats c conn'
notify $ SWITCH QDRcv SPCompleted cStats
_ -> internalErr "ICQDelete: cannot delete the only queue in connection"
where
ack srv rId srvMsgId = do
Expand Down Expand Up @@ -2016,7 +2019,8 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} sq@SndQueue {userId, connId, server,
AM_QADD_ -> pure ()
AM_QKEY_ -> do
SomeConn _ conn <- withStore c (`getConn` connId)
notify . SWITCH QDSnd SPConfirmed $ connectionStats conn
cStats <- connectionStats c conn
notify $ SWITCH QDSnd SPConfirmed cStats
AM_QUSE_ -> pure ()
AM_QTEST_ -> withConnLock c connId "runSmpQueueMsgDelivery AM_QTEST_" $ do
withStore' c $ \db -> setSndQueueStatus db sq Active
Expand All @@ -2041,7 +2045,8 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} sq@SndQueue {userId, connId, server,
deleteConnSndQueue db connId sq'
let sqs'' = sq'' :| sqs'
conn' = DuplexConnection cData' rqs sqs''
notify . SWITCH QDSnd SPCompleted $ connectionStats conn'
cStats <- connectionStats c conn'
notify $ SWITCH QDSnd SPCompleted cStats
_ -> internalErr msgId "sent QTEST: there is only one queue in connection"
_ -> internalErr msgId "sent QTEST: queue not in connection or not replacing another queue"
_ -> internalErr msgId "QTEST sent not in duplex connection"
Expand Down Expand Up @@ -2152,7 +2157,7 @@ switchDuplexConnection c nm (DuplexConnection cData@ConnData {connId, userId} rq
void . enqueueMessages c cData sqs SMP.noMsgFlags $ QADD [(qUri, Just (server, sndId))]
rq1 <- withStore' c $ \db -> setRcvSwitchStatus db rq $ Just RSSendingQADD
let rqs' = updatedQs rq1 rqs <> [rq'']
pure . connectionStats $ DuplexConnection cData rqs' sqs
connectionStats c $ DuplexConnection cData rqs' sqs

abortConnectionSwitch' :: AgentClient -> ConnId -> AM ConnectionStats
abortConnectionSwitch' c connId =
Expand All @@ -2172,7 +2177,7 @@ abortConnectionSwitch' c connId =
forM_ delRqs $ \RcvQueue {server, rcvId} -> enqueueCommand c "" connId (Just server) $ AInternalCommand $ ICDeleteRcvQueue rcvId
let rqs'' = updatedQs rq' rqs'
conn' = DuplexConnection cData rqs'' sqs
pure $ connectionStats conn'
connectionStats c conn'
_ -> throwE $ INTERNAL "won't delete all rcv queues in connection"
| otherwise -> throwE $ CMD PROHIBITED "abortConnectionSwitch: no rcv queues left"
_ -> throwE $ CMD PROHIBITED "abortConnectionSwitch: not allowed"
Expand All @@ -2195,7 +2200,7 @@ synchronizeRatchet' c connId pqSupport' force = withConnLock c connId "synchroni
setRatchetX3dhKeys db connId pk1 pk2 pKem
let cData'' = cData' {ratchetSyncState = RSStarted} :: ConnData
conn' = DuplexConnection cData'' rqs sqs
pure $ connectionStats conn'
connectionStats c conn'
| otherwise -> throwE $ CMD PROHIBITED "synchronizeRatchet: not allowed"
_ -> throwE $ CMD PROHIBITED "synchronizeRatchet: not duplex"

Expand Down Expand Up @@ -2363,34 +2368,62 @@ deleteConnections_ getConnections ntf waitDelivery c nm connIds = do
getConnectionServers' :: AgentClient -> ConnId -> AM ConnectionStats
getConnectionServers' c connId = do
SomeConn _ conn <- withStore c (`getConn` connId)
pure $ connectionStats conn
connectionStats c conn

getConnectionRatchetAdHash' :: AgentClient -> ConnId -> AM ByteString
getConnectionRatchetAdHash' c connId = do
CR.Ratchet {rcAD = Str rcAD} <- withStore c (`getRatchet` connId)
pure $ C.sha256Hash rcAD

connectionStats :: Connection c -> ConnectionStats
connectionStats = \case
RcvConnection cData rq ->
(stats cData) {rcvQueuesInfo = [rcvQueueInfo rq]}
SndConnection cData sq ->
(stats cData) {sndQueuesInfo = [sndQueueInfo sq]}
DuplexConnection cData rqs sqs ->
(stats cData) {rcvQueuesInfo = map rcvQueueInfo $ L.toList rqs, sndQueuesInfo = map sndQueueInfo $ L.toList sqs}
ContactConnection cData rq ->
(stats cData) {rcvQueuesInfo = [rcvQueueInfo rq]}
connectionStats :: AgentClient -> Connection c -> AM ConnectionStats
connectionStats c = \case
RcvConnection cData rq -> do
rcvQueuesInfo <- (: []) <$> rcvQueueInfo rq
pure (stats cData) {rcvQueuesInfo, subStatus = connSubStatus rcvQueuesInfo}
SndConnection cData sq -> do
pure (stats cData) {sndQueuesInfo = [sndQueueInfo sq]}
DuplexConnection cData rqs sqs -> do
rcvQueuesInfo <- mapM rcvQueueInfo (L.toList rqs)
pure
(stats cData)
{ rcvQueuesInfo,
sndQueuesInfo = map sndQueueInfo $ L.toList sqs,
subStatus = connSubStatus rcvQueuesInfo
}
ContactConnection cData rq -> do
rcvQueuesInfo <- (: []) <$> rcvQueueInfo rq
pure (stats cData) {rcvQueuesInfo, subStatus = connSubStatus rcvQueuesInfo}
NewConnection cData ->
stats cData
pure $ stats cData
where
stats :: ConnData -> ConnectionStats
stats ConnData {connAgentVersion, ratchetSyncState} =
ConnectionStats
{ connAgentVersion,
rcvQueuesInfo = [],
sndQueuesInfo = [],
ratchetSyncState,
ratchetSyncSupported = connAgentVersion >= ratchetSyncSMPAgentVersion
ratchetSyncSupported = connAgentVersion >= ratchetSyncSMPAgentVersion,
subStatus = Nothing
}
rcvQueueInfo :: RcvQueue -> AM RcvQueueInfo
rcvQueueInfo rq@RcvQueue {server, status, rcvSwchStatus} = do
subStatus <- atomically checkQueueSubStatus
pure $ RcvQueueInfo {rcvServer = server, status, rcvSwitchStatus = rcvSwchStatus, canAbortSwitch = canAbortRcvSwitch rq, subStatus}
where
checkQueueSubStatus :: STM SubscriptionStatus
checkQueueSubStatus =
ifM (hasActiveSubscription c rq) (pure SSActive) $
ifM (hasPendingSubscription c rq) (pure SSPending) $
maybe SSNoSub (SSRemoved . show) <$> hasRemovedSubscription c rq
sndQueueInfo :: SndQueue -> SndQueueInfo
sndQueueInfo SndQueue {server, status, sndSwchStatus} =
SndQueueInfo {sndServer = server, status, sndSwitchStatus = sndSwchStatus}
connSubStatus :: [RcvQueueInfo] -> Maybe SubscriptionStatus
connSubStatus rqs =
let isActive RcvQueueInfo {status} = status == Active
subStatus' RcvQueueInfo {subStatus} = subStatus
in minimum . L.map subStatus' <$> (L.nonEmpty (filter isActive rqs) <|> L.nonEmpty rqs)

-- | Change servers to be used for creating new queues.
-- This function will set all servers as enabled in case all passed servers are disabled.
Expand Down Expand Up @@ -2903,7 +2936,8 @@ processSMPTransmissions c@AgentClient {subQ} (tSess@(userId, srv, _), _v, sessId
| rss `notElem` ([RSOk, RSStarted] :: [RatchetSyncState]) = do
let cData'' = (toConnData conn') {ratchetSyncState = RSOk} :: ConnData
conn'' = updateConnection cData'' conn'
notify . RSYNC RSOk Nothing $ connectionStats conn''
cStats <- connectionStats c conn''
notify $ RSYNC RSOk Nothing cStats
withStore' c $ \db -> setConnRatchetSync db connId RSOk
pure conn''
| otherwise = pure conn'
Expand Down Expand Up @@ -2933,7 +2967,8 @@ processSMPTransmissions c@AgentClient {subQ} (tSess@(userId, srv, _), _v, sessId
when (rss `elem` ([RSOk, RSAllowed, RSRequired] :: [RatchetSyncState])) $ do
let cData'' = (toConnData conn') {ratchetSyncState = rss'} :: ConnData
conn'' = updateConnection cData'' connDuplex
notify . RSYNC rss' (Just e) $ connectionStats conn''
cStats <- connectionStats c conn''
notify $ RSYNC rss' (Just e) cStats
withStore' c $ \db -> setConnRatchetSync db connId rss'
Left e -> do
atomically $ incSMPServerStat c userId srv recvErrs
Expand Down Expand Up @@ -3188,7 +3223,8 @@ processSMPTransmissions c@AgentClient {subQ} (tSess@(userId, srv, _), _v, sessId
sq1 <- withStore' c $ \db -> setSndSwitchStatus db sq $ Just SSSendingQKEY
let sqs'' = updatedQs sq1 sqs' <> [sq2]
conn' = DuplexConnection cData' rqs sqs''
notify . SWITCH QDSnd SPStarted $ connectionStats conn'
cStats <- connectionStats c conn'
notify $ SWITCH QDSnd SPStarted cStats
_ -> qError "QADD: won't delete all snd queues in connection"
_ -> qError "QADD: replaced queue address is not found in connection"
_ -> throwE $ AGENT A_VERSION
Expand All @@ -3207,7 +3243,8 @@ processSMPTransmissions c@AgentClient {subQ} (tSess@(userId, srv, _), _v, sessId
let dhSecret = C.dh' dhPublicKey dhPrivKey
withStore' c $ \db -> setRcvQueueConfirmedE2E db rq' dhSecret $ min cVer cVer'
enqueueCommand c "" connId (Just smpServer) $ AInternalCommand $ ICQSecure rcvId senderKey
notify . SWITCH QDRcv SPConfirmed $ connectionStats conn'
cStats <- connectionStats c conn'
notify $ SWITCH QDRcv SPConfirmed cStats
| otherwise -> qError "QKEY: queue already secured"
_ -> qError "QKEY: queue address not found in connection"
where
Expand All @@ -3232,7 +3269,8 @@ processSMPTransmissions c@AgentClient {subQ} (tSess@(userId, srv, _), _v, sessId
sq1' <- withStore' c $ \db -> setSndSwitchStatus db sq1 $ Just SSSendingQTEST
let sqs' = updatedQs sq1' sqs
conn' = DuplexConnection cData' rqs sqs'
notify . SWITCH QDSnd SPSecured $ connectionStats conn'
cStats <- connectionStats c conn'
notify $ SWITCH QDSnd SPSecured cStats
_ -> qError "QUSE: switching SndQueue not found in connection"
_ -> qError "QUSE: switched queue address not found in connection"

Expand Down Expand Up @@ -3308,12 +3346,14 @@ processSMPTransmissions c@AgentClient {subQ} (tSess@(userId, srv, _), _v, sessId
notifyRatchetSyncError = do
let cData'' = cData' {ratchetSyncState = RSRequired} :: ConnData
conn'' = updateConnection cData'' conn'
notify $ RSYNC RSRequired (Just RATCHET_SYNC) (connectionStats conn'')
cStats <- connectionStats c conn''
notify $ RSYNC RSRequired (Just RATCHET_SYNC) cStats
notifyAgreed :: AM ()
notifyAgreed = do
let cData'' = cData' {ratchetSyncState = RSAgreed} :: ConnData
conn'' = updateConnection cData'' conn'
notify . RSYNC RSAgreed Nothing $ connectionStats conn''
cStats <- connectionStats c conn''
notify $ RSYNC RSAgreed Nothing cStats
recreateRatchet :: CR.Ratchet 'C.X448 -> AM ()
recreateRatchet rc = withStore' c $ \db -> do
setConnRatchetSync db connId RSAgreed
Expand Down
12 changes: 12 additions & 0 deletions src/Simplex/Messaging/Agent/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ module Simplex.Messaging.Agent.Client
removeSubscription,
removeSubscriptions,
hasActiveSubscription,
hasPendingSubscription,
hasRemovedSubscription,
hasGetLock,
releaseGetLock,
activeClientSession,
Expand Down Expand Up @@ -1688,6 +1690,16 @@ hasActiveSubscription c rq = do
SS.hasActiveSub tSess (queueId rq) $ currentSubs c
{-# INLINE hasActiveSubscription #-}

hasPendingSubscription :: SomeRcvQueue q => AgentClient -> q -> STM Bool
hasPendingSubscription c rq = do
tSess <- mkSMPTransportSession c rq
SS.hasPendingSub tSess (queueId rq) $ currentSubs c
{-# INLINE hasPendingSubscription #-}

hasRemovedSubscription :: SomeRcvQueue q => AgentClient -> q -> STM (Maybe SMPClientError)
hasRemovedSubscription c rq = do
TM.lookup (qUserId rq, qServer rq) (removedSubs c) $>>= TM.lookup (queueId rq)

removeSubscription :: SomeRcvQueue q => AgentClient -> SMPTransportSession -> ConnId -> q -> STM ()
removeSubscription c tSess connId rq = do
modifyTVar' (subscrConns c) $ S.delete connId
Expand Down
20 changes: 18 additions & 2 deletions src/Simplex/Messaging/Agent/Protocol.hs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ module Simplex.Messaging.Agent.Protocol
MsgMeta (..),
RcvQueueInfo (..),
SndQueueInfo (..),
SubscriptionStatus (..),
ConnectionStats (..),
SwitchPhase (..),
RcvSwitchStatus (..),
Expand Down Expand Up @@ -643,23 +644,34 @@ instance FromJSON RatchetSyncState where

data RcvQueueInfo = RcvQueueInfo
{ rcvServer :: SMPServer,
status :: QueueStatus,
rcvSwitchStatus :: Maybe RcvSwitchStatus,
canAbortSwitch :: Bool
canAbortSwitch :: Bool,
subStatus :: SubscriptionStatus
}
deriving (Eq, Show)

data SndQueueInfo = SndQueueInfo
{ sndServer :: SMPServer,
status :: QueueStatus,
sndSwitchStatus :: Maybe SndSwitchStatus
}
deriving (Eq, Show)

data SubscriptionStatus
= SSActive
| SSPending
| SSRemoved {subError :: String}
| SSNoSub
deriving (Eq, Ord, Show)

data ConnectionStats = ConnectionStats
{ connAgentVersion :: VersionSMPA,
rcvQueuesInfo :: [RcvQueueInfo],
sndQueuesInfo :: [SndQueueInfo],
ratchetSyncState :: RatchetSyncState,
ratchetSyncSupported :: Bool
ratchetSyncSupported :: Bool,
subStatus :: Maybe SubscriptionStatus
}
deriving (Eq, Show)

Expand Down Expand Up @@ -2000,6 +2012,10 @@ serializeCommand = \case
serializeBinary :: ByteString -> ByteString
serializeBinary body = bshow (B.length body) <> "\n" <> body

$(J.deriveJSON (enumJSON fstToLower) ''QueueStatus)

$(J.deriveJSON (sumTypeJSON $ dropPrefix "SS") ''SubscriptionStatus)

$(J.deriveJSON defaultJSON ''RcvQueueInfo)

$(J.deriveJSON defaultJSON ''SndQueueInfo)
Expand Down
8 changes: 0 additions & 8 deletions src/Simplex/Messaging/Agent/Store.hs
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,6 @@ clientServiceId :: RcvQueue -> Maybe ClientServiceId
clientServiceId = fmap dbServiceId . clientService
{-# INLINE clientServiceId #-}

rcvQueueInfo :: RcvQueue -> RcvQueueInfo
rcvQueueInfo rq@RcvQueue {server, rcvSwchStatus} =
RcvQueueInfo {rcvServer = server, rcvSwitchStatus = rcvSwchStatus, canAbortSwitch = canAbortRcvSwitch rq}

rcvSMPQueueAddress :: RcvQueue -> SMPQueueAddress
rcvSMPQueueAddress RcvQueue {server, sndId, e2ePrivKey, queueMode} =
SMPQueueAddress server sndId (C.publicKey e2ePrivKey) queueMode
Expand Down Expand Up @@ -211,10 +207,6 @@ data StoredSndQueue (q :: DBStored) = SndQueue
}
deriving (Show)

sndQueueInfo :: SndQueue -> SndQueueInfo
sndQueueInfo SndQueue {server, sndSwchStatus} =
SndQueueInfo {sndServer = server, sndSwitchStatus = sndSwchStatus}

instance SMPQueue RcvQueue where
qServer RcvQueue {server} = server
{-# INLINE qServer #-}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ schemaMigrations =
("m20250203_msg_bodies", m20250203_msg_bodies, Just down_m20250203_msg_bodies),
("m20250322_short_links", m20250322_short_links, Just down_m20250322_short_links),
("m20250702_conn_invitations_remove_cascade_delete", m20250702_conn_invitations_remove_cascade_delete, Just down_m20250702_conn_invitations_remove_cascade_delete),
("20251009_queue_to_subscribe", m20251009_queue_to_subscribe, Just down_m20251009_queue_to_subscribe)
("m20251009_queue_to_subscribe", m20251009_queue_to_subscribe, Just down_m20251009_queue_to_subscribe)
]

-- | The list of migrations in ascending order by date
Expand Down
Loading