Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions src/Simplex/Messaging/Client/Agent.hs
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ smpSubscribeQueues party ca smp srv subs = do
pending <- maybe (pure M.empty) readTVar =<< TM.lookup srv (pendingSrvSubs ca)
let acc@(_, _, oks, notPending) = foldr (groupSub pending) (False, [], [], []) (L.zip subs rs)
unless (null oks) $ addSubscriptions ca srv party oks
unless (null notPending) $ removePendingSubs ca srv party notPending
unless (null notPending) $ removePendingSubs ca srv party $ S.fromList notPending
pure acc
sessId = sessionId $ thParams smp
groupSub :: Map SMPSub C.APrivateAuthKey -> ((QueueId, C.APrivateAuthKey), Either SMPClientError ()) -> (Bool, [(QueueId, SMPClientError)], [(QueueId, (SessionId, C.APrivateAuthKey))], [QueueId]) -> (Bool, [(QueueId, SMPClientError)], [(QueueId, (SessionId, C.APrivateAuthKey))], [QueueId])
Expand Down Expand Up @@ -412,14 +412,22 @@ removeSubscription :: SMPClientAgent -> SMPServer -> SMPSub -> STM ()
removeSubscription = removeSub_ . srvSubs
{-# INLINE removeSubscription #-}

removePendingSub :: SMPClientAgent -> SMPServer -> SMPSub -> STM ()
removePendingSub = removeSub_ . pendingSrvSubs
{-# INLINE removePendingSub #-}

removeSub_ :: TMap SMPServer (TMap SMPSub s) -> SMPServer -> SMPSub -> STM ()
removeSub_ subs srv s = TM.lookup srv subs >>= mapM_ (TM.delete s)

removePendingSubs :: SMPClientAgent -> SMPServer -> SMPSubParty -> [QueueId] -> STM ()
removeSubscriptions :: SMPClientAgent -> SMPServer -> SMPSubParty -> Set QueueId -> STM ()
removeSubscriptions = removeSubs_ . srvSubs
{-# INLINE removeSubscriptions #-}

removePendingSubs :: SMPClientAgent -> SMPServer -> SMPSubParty -> Set QueueId -> STM ()
removePendingSubs = removeSubs_ . pendingSrvSubs
{-# INLINE removePendingSubs #-}

removeSubs_ :: TMap SMPServer (TMap SMPSub C.APrivateAuthKey) -> SMPServer -> SMPSubParty -> [QueueId] -> STM ()
removeSubs_ :: TMap SMPServer (TMap SMPSub s) -> SMPServer -> SMPSubParty -> Set QueueId -> STM ()
removeSubs_ subs srv party qs = TM.lookup srv subs >>= mapM_ (`modifyTVar'` (`M.withoutKeys` ss))
where
ss = S.fromList $ map (party,) qs
ss = S.map (party,) qs
15 changes: 9 additions & 6 deletions src/Simplex/Messaging/Notifications/Server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -691,9 +691,10 @@ client NtfServerClient {rcvQ, sndQ} NtfSubscriber {newSubQ, smpAgent = ca} NtfPu
TDEL -> do
logDebug "TDEL"
st <- asks store
qs <- atomically $ deleteNtfToken st tknId
forM_ qs $ \SMPQueueNtf {smpServer, notifierId} ->
atomically $ removeSubscription ca smpServer (SPNotifier, notifierId)
ss <- atomically $ deleteNtfToken st tknId
forM_ (M.assocs ss) $ \(smpServer, nIds) -> do
atomically $ removeSubscriptions ca smpServer SPNotifier nIds
atomically $ removePendingSubs ca smpServer SPNotifier nIds
cancelInvervalNotifications tknId
withNtfLog (`logDeleteToken` tknId)
incNtfStatT token tknDeleted
Expand Down Expand Up @@ -732,9 +733,10 @@ client NtfServerClient {rcvQ, sndQ} NtfSubscriber {newSubQ, smpAgent = ca} NtfPu
subId <- getId
sub <- atomically $ mkNtfSubData subId newSub
resp <-
atomically (addNtfSubscription st subId sub) >>= \case
Just _ -> atomically (writeTBQueue newSubQ [NtfSub sub]) $> NRSubId subId
_ -> pure $ NRErr AUTH
ifM
(atomically $ addNtfSubscription st subId sub)
(atomically (writeTBQueue newSubQ [NtfSub sub]) $> NRSubId subId)
(pure $ NRErr AUTH)
withNtfLog (`logCreateSubscription` sub)
incNtfStat subCreated
pure (corrId, NoEntity, resp)
Expand All @@ -756,6 +758,7 @@ client NtfServerClient {rcvQ, sndQ} NtfSubscriber {newSubQ, smpAgent = ca} NtfPu
st <- asks store
atomically $ deleteNtfSubscription st subId
atomically $ removeSubscription ca smpServer (SPNotifier, notifierId)
atomically $ removePendingSub ca smpServer (SPNotifier, notifierId)
withNtfLog (`logDeleteSubscription` subId)
incNtfStat subDeleted
pure NROk
Expand Down
92 changes: 56 additions & 36 deletions src/Simplex/Messaging/Notifications/Server/Store.hs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
{-# LANGUAGE OverloadedLists #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StandaloneDeriving #-}

module Simplex.Messaging.Notifications.Server.Store where

Expand All @@ -16,15 +17,16 @@ import Control.Monad
import Data.ByteString.Char8 (ByteString)
import Data.Functor (($>))
import Data.List.NonEmpty (NonEmpty (..), (<|))
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as M
import Data.Maybe (catMaybes)
import Data.Maybe (isNothing)
import Data.Set (Set)
import qualified Data.Set as S
import Data.Word (Word16)
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Notifications.Protocol
import Simplex.Messaging.Protocol (NtfPrivateAuthKey, NtfPublicAuthKey, SMPServer)
import Simplex.Messaging.Protocol (NotifierId, NtfPrivateAuthKey, NtfPublicAuthKey, SMPServer)
import Simplex.Messaging.Server.QueueStore (RoundedSystemTime)
import Simplex.Messaging.TMap (TMap)
import qualified Simplex.Messaging.TMap as TM
Expand All @@ -35,8 +37,11 @@ data NtfStore = NtfStore
-- multiple registrations exist to protect from malicious registrations if token is compromised
tokenRegistrations :: TMap DeviceToken (TMap ByteString NtfTokenId),
subscriptions :: TMap NtfSubscriptionId NtfSubData,
tokenSubscriptions :: TMap NtfTokenId (TVar (Set NtfSubscriptionId)),
subscriptionLookup :: TMap SMPQueueNtf NtfSubscriptionId,
-- the first set is used to delete from `subscriptions` when token is deleted, the second - to cancel SMP subsriptions.
-- TODO [notifications] it can be simplified once NtfSubData is fully removed.
tokenSubscriptions :: TMap NtfTokenId (TMap SMPServer (TVar (Set NtfSubscriptionId), TVar (Set NotifierId))),
-- TODO [notifications] for subscriptions that "migrated" to server subscription, we may replace NtfSubData with NtfTokenId here (Either NtfSubData NtfTokenId).
subscriptionLookup :: TMap SMPServer (TMap NotifierId NtfSubData),
tokenLastNtfs :: TMap NtfTokenId (TVar (NonEmpty PNMessageData))
}

Expand Down Expand Up @@ -134,7 +139,7 @@ removeTokenRegistration st NtfTknData {ntfTknId = tId, token, tknVerifyKey} =
>>= mapM_ (\tId' -> when (tId == tId') $ TM.delete k regs)
k = C.toPubKey C.pubKeyBytes tknVerifyKey

deleteNtfToken :: NtfStore -> NtfTokenId -> STM [SMPQueueNtf]
deleteNtfToken :: NtfStore -> NtfTokenId -> STM (Map SMPServer (Set NotifierId))
deleteNtfToken st tknId = do
void $
TM.lookupDelete tknId (tokens st) $>>= \NtfTknData {token, tknVerifyKey} ->
Expand All @@ -147,25 +152,25 @@ deleteNtfToken st tknId = do
regs = tokenRegistrations st
regKey = C.toPubKey C.pubKeyBytes

deleteTokenSubs :: NtfStore -> NtfTokenId -> STM [SMPQueueNtf]
deleteTokenSubs st tknId = do
qs <-
TM.lookupDelete tknId (tokenSubscriptions st)
>>= mapM (readTVar >=> mapM deleteSub . S.toList)
pure $ maybe [] catMaybes qs
deleteTokenSubs :: NtfStore -> NtfTokenId -> STM (Map SMPServer (Set NotifierId))
deleteTokenSubs st tknId =
TM.lookupDelete tknId (tokenSubscriptions st)
>>= maybe (pure M.empty) (readTVar >=> deleteSrvSubs)
where
deleteSub subId = do
TM.lookupDelete subId (subscriptions st)
$>>= \NtfSubData {smpQueue} ->
TM.delete smpQueue (subscriptionLookup st) $> Just smpQueue
deleteSrvSubs :: Map SMPServer (TVar (Set NtfSubscriptionId), TVar (Set NotifierId)) -> STM (Map SMPServer (Set NotifierId))
deleteSrvSubs = M.traverseWithKey $ \smpServer (sVar, nVar) -> do
sIds <- readTVar sVar
modifyTVar' (subscriptions st) (`M.withoutKeys` sIds)
nIds <- readTVar nVar
TM.lookup smpServer (subscriptionLookup st) >>= mapM_ (`modifyTVar'` (`M.withoutKeys` nIds))
pure nIds

getNtfSubscriptionIO :: NtfStore -> NtfSubscriptionId -> IO (Maybe NtfSubData)
getNtfSubscriptionIO st subId = TM.lookupIO subId (subscriptions st)

findNtfSubscription :: NtfStore -> SMPQueueNtf -> STM (Maybe NtfSubData)
findNtfSubscription st smpQueue = do
TM.lookup smpQueue (subscriptionLookup st)
$>>= \subId -> TM.lookup subId (subscriptions st)
findNtfSubscription st SMPQueueNtf {smpServer, notifierId} =
TM.lookup smpServer (subscriptionLookup st) $>>= TM.lookup notifierId

findNtfSubscriptionToken :: NtfStore -> SMPQueueNtf -> STM (Maybe NtfTknData)
findNtfSubscriptionToken st smpQueue = do
Expand All @@ -183,30 +188,45 @@ mkNtfSubData ntfSubId (NewNtfSub tokenId smpQueue notifierKey) = do
subStatus <- newTVar NSNew
pure NtfSubData {ntfSubId, smpQueue, tokenId, subStatus, notifierKey}

addNtfSubscription :: NtfStore -> NtfSubscriptionId -> NtfSubData -> STM (Maybe ())
addNtfSubscription st subId sub@NtfSubData {smpQueue, tokenId} =
TM.lookup tokenId (tokenSubscriptions st) >>= maybe newTokenSub pure >>= insertSub
-- returns False if subscription existed before
addNtfSubscription :: NtfStore -> NtfSubscriptionId -> NtfSubData -> STM Bool
addNtfSubscription st subId sub@NtfSubData {smpQueue = SMPQueueNtf {smpServer, notifierId}, tokenId} =
TM.lookup tokenId (tokenSubscriptions st)
>>= maybe newTokenSubs pure
>>= \ts -> TM.lookup smpServer ts
>>= maybe (newTokenSrvSubs ts) pure
>>= insertSub
where
newTokenSub = do
ts <- newTVar S.empty
newTokenSubs = do
ts <- newTVar M.empty
TM.insert tokenId ts $ tokenSubscriptions st
pure ts
insertSub ts = do
modifyTVar' ts $ S.insert subId
newTokenSrvSubs ts = do
tss <- (,) <$> newTVar S.empty <*> newTVar S.empty
TM.insert smpServer tss ts
pure tss
insertSub :: (TVar (Set NtfSubscriptionId), TVar (Set NotifierId)) -> STM Bool
insertSub (sIds, nIds) = do
modifyTVar' sIds $ S.insert subId
modifyTVar' nIds $ S.insert notifierId
TM.insert subId sub $ subscriptions st
TM.insert smpQueue subId (subscriptionLookup st)
-- return Nothing if subscription existed before
pure $ Just ()
TM.lookup smpServer (subscriptionLookup st)
>>= maybe newSubs pure
>>= fmap isNothing . TM.lookupInsert notifierId sub
newSubs = do
ss <- newTVar M.empty
TM.insert smpServer ss $ subscriptionLookup st
pure ss

deleteNtfSubscription :: NtfStore -> NtfSubscriptionId -> STM ()
deleteNtfSubscription st subId = do
TM.lookupDelete subId (subscriptions st)
>>= mapM_
( \NtfSubData {smpQueue, tokenId} -> do
TM.delete smpQueue $ subscriptionLookup st
ts_ <- TM.lookup tokenId (tokenSubscriptions st)
forM_ ts_ $ \ts -> modifyTVar' ts $ S.delete subId
)
deleteNtfSubscription st subId = TM.lookupDelete subId (subscriptions st) >>= mapM_ deleteSubIndices
where
deleteSubIndices NtfSubData {smpQueue = SMPQueueNtf {smpServer, notifierId}, tokenId} = do
TM.lookup smpServer (subscriptionLookup st) >>= mapM_ (TM.delete notifierId)
tss_ <- TM.lookup tokenId (tokenSubscriptions st) $>>= TM.lookup smpServer
forM_ tss_ $ \(sIds, nIds) -> do
modifyTVar' sIds $ S.delete subId
modifyTVar' nIds $ S.delete notifierId

addTokenLastNtf :: NtfStore -> NtfTokenId -> PNMessageData -> IO (NonEmpty PNMessageData)
addTokenLastNtf st tknId newNtf =
Expand Down
7 changes: 2 additions & 5 deletions tests/ServerTests/SchemaDump.hs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import Simplex.Messaging.Server.QueueStore.Postgres.Migrations (serverMigrations
import Simplex.Messaging.Util (ifM)
import System.Directory (doesFileExist, removeFile)
import System.Environment (lookupEnv)
import System.Process (readCreateProcess, readCreateProcessWithExitCode, shell)
import System.Process (readCreateProcess, shell)
import Test.Hspec

testDBSchema :: B.ByteString
Expand Down Expand Up @@ -87,10 +87,7 @@ getSchema schemaPath = do
("pg_dump " <> B.unpack testServerDBConnstr <> " --schema " <> B.unpack testDBSchema)
<> " --schema-only --no-owner --no-privileges --no-acl --no-subscriptions --no-tablespaces > "
<> schemaPath
(code, out, err) <- readCreateProcessWithExitCode (shell cmd) ""
print code
putStrLn $ "out: " <> out
putStrLn $ "err: " <> err
void $ readCreateProcess (shell cmd) ""
threadDelay 20000
let sed = (if ci then "sed -i" else "sed -i ''")
void $ readCreateProcess (shell $ sed <> " '/^--/d' " <> schemaPath) ""
Expand Down
Loading