Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions src/Simplex/Messaging/Agent/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -1551,10 +1551,10 @@ sendTSessionBatches statCmd toRQ action c qs =
where
agentError = second . first $ protocolClientError SMP $ clientServer smp

sendBatch :: (SMPClient -> NonEmpty (SMP.RcvPrivateAuthKey, SMP.RecipientId) -> IO (NonEmpty (Either SMPClientError ()))) -> SMPClient -> NonEmpty RcvQueue -> IO (BatchResponses RcvQueue SMPClientError ())
sendBatch :: (SMPClient -> NonEmpty (SMP.RecipientId, SMP.RcvPrivateAuthKey) -> IO (NonEmpty (Either SMPClientError ()))) -> SMPClient -> NonEmpty RcvQueue -> IO (BatchResponses RcvQueue SMPClientError ())
sendBatch smpCmdFunc smp qs = L.zip qs <$> smpCmdFunc smp (L.map queueCreds qs)
where
queueCreds RcvQueue {rcvPrivateKey, rcvId} = (rcvPrivateKey, rcvId)
queueCreds RcvQueue {rcvPrivateKey, rcvId} = (rcvId, rcvPrivateKey)

addSubscription :: AgentClient -> SessionId -> RcvQueue -> STM ()
addSubscription c sessId rq@RcvQueue {connId} = do
Expand Down Expand Up @@ -1715,12 +1715,12 @@ enableQueuesNtfs = sendTSessionBatches "NKEY" eqnrRq enableQueues_
where
enableQueues_ :: SMPClient -> NonEmpty EnableQueueNtfReq -> IO (NonEmpty (EnableQueueNtfReq, Either (ProtocolClientError ErrorType) (SMP.NotifierId, RcvNtfPublicDhKey)))
enableQueues_ smp qs' = L.zip qs' <$> enableSMPQueuesNtfs smp (L.map queueCreds qs')
queueCreds :: EnableQueueNtfReq -> (SMP.RcvPrivateAuthKey, SMP.RecipientId, SMP.NtfPublicAuthKey, SMP.RcvNtfPublicDhKey)
queueCreds :: EnableQueueNtfReq -> (SMP.RecipientId, SMP.RcvPrivateAuthKey, SMP.NtfPublicAuthKey, SMP.RcvNtfPublicDhKey)
queueCreds EnableQueueNtfReq {eqnrRq, eqnrAuthKeyPair, eqnrRcvKeyPair} =
let RcvQueue {rcvPrivateKey, rcvId} = eqnrRq
(ntfPublicKey, _) = eqnrAuthKeyPair
(rcvNtfPubDhKey, _) = eqnrRcvKeyPair
in (rcvPrivateKey, rcvId, ntfPublicKey, rcvNtfPubDhKey)
in (rcvId, rcvPrivateKey, ntfPublicKey, rcvNtfPubDhKey)

disableQueueNotifications :: AgentClient -> RcvQueue -> AM ()
disableQueueNotifications c rq@RcvQueue {rcvId, rcvPrivateKey} =
Expand All @@ -1734,8 +1734,8 @@ disableQueuesNtfs = sendTSessionBatches "NDEL" snd disableQueues_
where
disableQueues_ :: SMPClient -> NonEmpty DisableQueueNtfReq -> IO (NonEmpty (DisableQueueNtfReq, Either (ProtocolClientError ErrorType) ()))
disableQueues_ smp qs' = L.zip qs' <$> disableSMPQueuesNtfs smp (L.map queueCreds qs')
queueCreds :: DisableQueueNtfReq -> (SMP.RcvPrivateAuthKey, SMP.RecipientId)
queueCreds (_, RcvQueue {rcvPrivateKey, rcvId}) = (rcvPrivateKey, rcvId)
queueCreds :: DisableQueueNtfReq -> (SMP.RecipientId, SMP.RcvPrivateAuthKey)
queueCreds (_, RcvQueue {rcvPrivateKey, rcvId}) = (rcvId, rcvPrivateKey)

sendAck :: AgentClient -> RcvQueue -> MsgId -> AM ()
sendAck c rq@RcvQueue {rcvId, rcvPrivateKey} msgId =
Expand Down
28 changes: 14 additions & 14 deletions src/Simplex/Messaging/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ smpClientStub g sessionId thVersion thAuth = do
type SMPClient = ProtocolClient SMPVersion ErrorType BrokerMsg

-- | Type for client command data
type ClientCommand msg = (Maybe C.APrivateAuthKey, EntityId, ProtoCommand msg)
type ClientCommand msg = (EntityId, Maybe C.APrivateAuthKey, ProtoCommand msg)

-- | Type synonym for transmission from SPM servers.
-- Batch response is presented as a single `ServerTransmissionBatch` tuple.
Expand Down Expand Up @@ -765,17 +765,17 @@ subscribeSMPQueue c rpKey rId = do
r -> throwE $ unexpectedResponse r

-- | Subscribe to multiple SMP queues batching commands if supported.
subscribeSMPQueues :: SMPClient -> NonEmpty (RcvPrivateAuthKey, RecipientId) -> IO (NonEmpty (Either SMPClientError ()))
subscribeSMPQueues :: SMPClient -> NonEmpty (RecipientId, RcvPrivateAuthKey) -> IO (NonEmpty (Either SMPClientError ()))
subscribeSMPQueues c qs = do
liftIO $ enablePings c
sendProtocolCommands c cs >>= mapM (processSUBResponse c)
where
cs = L.map (\(rpKey, rId) -> (Just rpKey, rId, Cmd SRecipient SUB)) qs
cs = L.map (\(rId, rpKey) -> (rId, Just rpKey, Cmd SRecipient SUB)) qs

streamSubscribeSMPQueues :: SMPClient -> NonEmpty (RcvPrivateAuthKey, RecipientId) -> ([(RecipientId, Either SMPClientError ())] -> IO ()) -> IO ()
streamSubscribeSMPQueues :: SMPClient -> NonEmpty (RecipientId, RcvPrivateAuthKey) -> ([(RecipientId, Either SMPClientError ())] -> IO ()) -> IO ()
streamSubscribeSMPQueues c qs cb = streamProtocolCommands c cs $ mapM process >=> cb
where
cs = L.map (\(rpKey, rId) -> (Just rpKey, rId, Cmd SRecipient SUB)) qs
cs = L.map (\(rId, rpKey) -> (rId, Just rpKey, Cmd SRecipient SUB)) qs
process r@(Response rId _) = (rId,) <$> processSUBResponse c r

processSUBResponse :: SMPClient -> Response ErrorType BrokerMsg -> IO (Either SMPClientError ())
Expand Down Expand Up @@ -813,7 +813,7 @@ subscribeSMPQueueNotifications c npKey nId = do
{-# INLINE subscribeSMPQueueNotifications #-}

-- | Subscribe to multiple SMP queues notifications batching commands if supported.
subscribeSMPQueuesNtfs :: SMPClient -> NonEmpty (NtfPrivateAuthKey, NotifierId) -> IO (NonEmpty (Either SMPClientError ()))
subscribeSMPQueuesNtfs :: SMPClient -> NonEmpty (NotifierId, NtfPrivateAuthKey) -> IO (NonEmpty (Either SMPClientError ()))
subscribeSMPQueuesNtfs c qs = do
liftIO $ enablePings c
okSMPCommands NSUB c qs
Expand Down Expand Up @@ -887,10 +887,10 @@ enableSMPQueueNotifications c rpKey rId notifierKey rcvNtfPublicDhKey =
r -> throwE $ unexpectedResponse r

-- | Enable notifications for the multiple queues for push notifications server.
enableSMPQueuesNtfs :: SMPClient -> NonEmpty (RcvPrivateAuthKey, RecipientId, NtfPublicAuthKey, RcvNtfPublicDhKey) -> IO (NonEmpty (Either SMPClientError (NotifierId, RcvNtfPublicDhKey)))
enableSMPQueuesNtfs :: SMPClient -> NonEmpty (RecipientId, RcvPrivateAuthKey, NtfPublicAuthKey, RcvNtfPublicDhKey) -> IO (NonEmpty (Either SMPClientError (NotifierId, RcvNtfPublicDhKey)))
enableSMPQueuesNtfs c qs = L.map process <$> sendProtocolCommands c cs
where
cs = L.map (\(rpKey, rId, notifierKey, rcvNtfPublicDhKey) -> (Just rpKey, rId, Cmd SRecipient $ NKEY notifierKey rcvNtfPublicDhKey)) qs
cs = L.map (\(rId, rpKey, notifierKey, rcvNtfPublicDhKey) -> (rId, Just rpKey, Cmd SRecipient $ NKEY notifierKey rcvNtfPublicDhKey)) qs
process (Response _ r) = case r of
Right (NID nId rcvNtfSrvPublicDhKey) -> Right (nId, rcvNtfSrvPublicDhKey)
Right r' -> Left $ unexpectedResponse r'
Expand All @@ -904,7 +904,7 @@ disableSMPQueueNotifications = okSMPCommand NDEL
{-# INLINE disableSMPQueueNotifications #-}

-- | Disable notifications for multiple queues for push notifications server.
disableSMPQueuesNtfs :: SMPClient -> NonEmpty (RcvPrivateAuthKey, RecipientId) -> IO (NonEmpty (Either SMPClientError ()))
disableSMPQueuesNtfs :: SMPClient -> NonEmpty (RecipientId, RcvPrivateAuthKey) -> IO (NonEmpty (Either SMPClientError ()))
disableSMPQueuesNtfs = okSMPCommands NDEL
{-# INLINE disableSMPQueuesNtfs #-}

Expand Down Expand Up @@ -946,7 +946,7 @@ deleteSMPQueue = okSMPCommand DEL
{-# INLINE deleteSMPQueue #-}

-- | Delete multiple SMP queues batching commands if supported.
deleteSMPQueues :: SMPClient -> NonEmpty (RcvPrivateAuthKey, RecipientId) -> IO (NonEmpty (Either SMPClientError ()))
deleteSMPQueues :: SMPClient -> NonEmpty (RecipientId, RcvPrivateAuthKey) -> IO (NonEmpty (Either SMPClientError ()))
deleteSMPQueues = okSMPCommands DEL
{-# INLINE deleteSMPQueues #-}

Expand Down Expand Up @@ -1120,11 +1120,11 @@ okSMPCommand cmd c pKey qId =
OK -> return ()
r -> throwE $ unexpectedResponse r

okSMPCommands :: PartyI p => Command p -> SMPClient -> NonEmpty (C.APrivateAuthKey, QueueId) -> IO (NonEmpty (Either SMPClientError ()))
okSMPCommands :: PartyI p => Command p -> SMPClient -> NonEmpty (QueueId, C.APrivateAuthKey) -> IO (NonEmpty (Either SMPClientError ()))
okSMPCommands cmd c qs = L.map process <$> sendProtocolCommands c cs
where
aCmd = Cmd sParty cmd
cs = L.map (\(pKey, qId) -> (Just pKey, qId, aCmd)) qs
cs = L.map (\(qId, pKey) -> (qId, Just pKey, aCmd)) qs
process (Response _ r) = case r of
Right OK -> Right ()
Right r' -> Left $ unexpectedResponse r'
Expand Down Expand Up @@ -1187,7 +1187,7 @@ sendProtocolCommand c = sendProtocolCommand_ c Nothing Nothing
-- Please note: if nonce is passed it is also used as a correlation ID
sendProtocolCommand_ :: forall v err msg. Protocol v err msg => ProtocolClient v err msg -> Maybe C.CbNonce -> Maybe Int -> Maybe C.APrivateAuthKey -> EntityId -> ProtoCommand msg -> ExceptT (ProtocolClientError err) IO msg
sendProtocolCommand_ c@ProtocolClient {client_ = PClient {sndQ}, thParams = THandleParams {batch, blockSize}} nonce_ tOut pKey entId cmd =
ExceptT $ uncurry sendRecv =<< mkTransmission_ c nonce_ (pKey, entId, cmd)
ExceptT $ uncurry sendRecv =<< mkTransmission_ c nonce_ (entId, pKey, cmd)
where
-- two separate "atomically" needed to avoid blocking
sendRecv :: Either TransportError SentRawTransmission -> Request err msg -> IO (Either (ProtocolClientError err) msg)
Expand Down Expand Up @@ -1225,7 +1225,7 @@ mkTransmission :: Protocol v err msg => ProtocolClient v err msg -> ClientComman
mkTransmission c = mkTransmission_ c Nothing

mkTransmission_ :: forall v err msg. Protocol v err msg => ProtocolClient v err msg -> Maybe C.CbNonce -> ClientCommand msg -> IO (PCTransmission err msg)
mkTransmission_ ProtocolClient {thParams, client_ = PClient {clientCorrId, sentCommands}} nonce_ (pKey_, entityId, command) = do
mkTransmission_ ProtocolClient {thParams, client_ = PClient {clientCorrId, sentCommands}} nonce_ (entityId, pKey_, command) = do
nonce@(C.CbNonce corrId) <- maybe (atomically $ C.randomCbNonce clientCorrId) pure nonce_
let TransmissionForAuth {tForAuth, tToSend} = encodeTransmissionForAuth thParams (CorrId corrId, entityId, command)
auth = authTransmission (thAuth thParams) pKey_ nonce tForAuth
Expand Down
Loading
Loading