Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 100 additions & 79 deletions src/Simplex/Messaging/Notifications/Server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ module Simplex.Messaging.Notifications.Server where

import Control.Concurrent (threadDelay)
import Control.Concurrent.Async (mapConcurrently)
import qualified Control.Exception as E
import Control.Logger.Simple
import Control.Monad
import Control.Monad.Except
Expand Down Expand Up @@ -68,8 +69,8 @@ import Simplex.Messaging.Server.Control (CPClientRole (..))
import Simplex.Messaging.Server.Env.STM (StartOptions (..))
import Simplex.Messaging.Server.QueueStore (getSystemDate)
import Simplex.Messaging.Server.Stats (PeriodStats (..), PeriodStatCounts (..), periodStatCounts, periodStatDataCounts, updatePeriodStats)
import Simplex.Messaging.Session
import Simplex.Messaging.TMap (TMap)
import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Transport (ATransport (..), THandle (..), THandleAuth (..), THandleParams (..), TProxy, Transport (..), TransportPeer (..), defaultSupportedParams)
import Simplex.Messaging.Transport.Buffer (trimCR)
import Simplex.Messaging.Transport.Server (AddHTTP, runTransportServer, runLocalTCPServer)
Expand All @@ -78,7 +79,8 @@ import System.Environment (lookupEnv)
import System.Exit (exitFailure, exitSuccess)
import System.IO (BufferMode (..), hClose, hPrint, hPutStrLn, hSetBuffering, hSetNewlineMode, universalNewlineMode)
import System.Mem.Weak (deRefWeak)
import UnliftIO (IOMode (..), UnliftIO, askUnliftIO, unliftIO, withFile)
import System.Timeout (timeout)
import UnliftIO (IOMode (..), UnliftIO, askUnliftIO, race_, unliftIO, withFile)
import UnliftIO.Concurrent (forkIO, killThread, mkWeakThreadId)
import UnliftIO.Directory (doesFileExist, renameFile)
import UnliftIO.Exception
Expand Down Expand Up @@ -140,9 +142,13 @@ ntfServer cfg@NtfServerConfig {transports, transportConfig = tCfg, startOptions}
logNote "Saving server state..."
saveServer
NtfSubscriber {smpSubscribers, smpAgent} <- asks subscriber
liftIO $ readTVarIO smpSubscribers >>= mapM_ (\SMPSubscriber {subThreadId} -> readTVarIO subThreadId >>= mapM_ (deRefWeak >=> mapM_ killThread))
liftIO $ readTVarIO smpSubscribers >>= mapM_ stopSubscriber
liftIO $ closeSMPClientAgent smpAgent
logNote "Server stopped"
where
stopSubscriber v =
atomically (tryReadTMVar $ sessionVar v)
>>= mapM (deRefWeak . subThreadId >=> mapM_ killThread)

saveServer :: M ()
saveServer = asks store >>= liftIO . closeNtfDbStore >> saveServerStats
Expand Down Expand Up @@ -440,98 +446,101 @@ resubscribe NtfSubscriber {smpAgent = ca} = do
afterSubId_' = Just $ fst $ last subs
if len < dbBatchSize then pure n' else loop n' afterSubId_'

-- this function is concurrency-safe - only onle subscriber per server can be created at a time,
-- other threads would wait for the first thread to create it.
subscribeNtfs :: NtfSubscriber -> NtfPostgresStore -> SMPServer -> NonEmpty ServerNtfSub -> IO ()
subscribeNtfs NtfSubscriber {smpSubscribers, subscriberSeq, smpAgent = ca} st smpServer ntfSubs =
getSubscriberVar
>>= either createSMPSubscriber waitForSMPSubscriber
>>= mapM_ (\sub -> atomically $ writeTQueue (subscriberSubQ sub) ntfSubs)
where
getSubscriberVar :: IO (Either SMPSubscriberVar SMPSubscriberVar)
getSubscriberVar = atomically . getSessVar subscriberSeq smpServer smpSubscribers =<< getCurrentTime

createSMPSubscriber :: SMPSubscriberVar -> IO (Maybe SMPSubscriber)
createSMPSubscriber v =
E.handle (\(e :: SomeException) -> logError ("SMP subscriber exception: " <> tshow e) >> removeSubscriber v) $ do
q <- newTQueueIO
tId <- mkWeakThreadId =<< forkIO (runSMPSubscriber q)
let sub = SMPSubscriber {smpServer, subscriberSubQ = q, subThreadId = tId}
atomically $ putTMVar (sessionVar v) sub -- this makes it available for other threads
pure $ Just sub

waitForSMPSubscriber :: SMPSubscriberVar -> IO (Maybe SMPSubscriber)
waitForSMPSubscriber v =
-- reading without timeout first to avoid creating extra thread for timeout
atomically (tryReadTMVar $ sessionVar v)
>>= maybe (timeout 10000000 $ atomically $ readTMVar $ sessionVar v) (pure . Just)
>>= maybe (logError "SMP subscriber timeout" >> removeSubscriber v) (pure . Just)

-- create/waitForSMPSubscriber should never throw, removing it from map in case it did
removeSubscriber v = do
atomically $ removeSessVar v smpServer smpSubscribers
pure Nothing

runSMPSubscriber :: TQueue (NonEmpty ServerNtfSub) -> IO ()
runSMPSubscriber q = forever $ do
-- TODO [ntfdb] possibly, the subscriptions can be batched here and sent every say 5 seconds
-- this should be analysed once we have prometheus stats
subs <- atomically $ readTQueue q
updated <- batchUpdateSubStatus st subs NSPending
logSubStatus smpServer "subscribing" (L.length subs) updated
subscribeQueuesNtfs ca smpServer $ L.map snd subs

ntfSubscriber :: NtfSubscriber -> M ()
ntfSubscriber NtfSubscriber {smpSubscribers, newSubQ, smpAgent = ca@SMPClientAgent {msgQ, agentQ}} = do
raceAny_ [subscribe, receiveSMP, receiveAgent]
ntfSubscriber NtfSubscriber {smpAgent = ca@SMPClientAgent {msgQ, agentQ}} =
race_ receiveSMP receiveAgent
where
subscribe :: M ()
subscribe = forever $ do
(srv, subs) <- atomically $ readTBQueue newSubQ
SMPSubscriber {subscriberSubQ} <- getSMPSubscriber srv
atomically $ writeTQueue subscriberSubQ subs

-- TODO [ntfdb] this does not guarantee that only one subscriber per server is created (there should be TMVar in the map)
-- This does not need changing if single newSubQ remains, but if it is removed, it need to change
getSMPSubscriber :: SMPServer -> M SMPSubscriber
getSMPSubscriber smpServer =
liftIO (TM.lookupIO smpServer smpSubscribers) >>= maybe createSMPSubscriber pure
where
createSMPSubscriber = do
sub@SMPSubscriber {subThreadId} <- liftIO $ newSMPSubscriber smpServer
atomically $ TM.insert smpServer sub smpSubscribers
tId <- mkWeakThreadId =<< forkIO (runSMPSubscriber sub)
atomically . writeTVar subThreadId $ Just tId
pure sub

runSMPSubscriber :: SMPSubscriber -> M ()
runSMPSubscriber SMPSubscriber {smpServer, subscriberSubQ} = do
receiveSMP = do
st <- asks store
forever $ do
-- TODO [ntfdb] possibly, the subscriptions can be batched here and sent every say 5 seconds
-- this should be analysed once we have prometheus stats
subs <- atomically $ readTQueue subscriberSubQ
updated <- liftIO $ batchUpdateSubStatus st subs NSPending
logSubStatus smpServer "subscribing" (L.length subs) updated
liftIO $ subscribeQueuesNtfs ca smpServer $ L.map snd subs

receiveSMP :: M ()
receiveSMP = forever $ do
((_, srv, _), _thVersion, sessionId, ts) <- atomically $ readTBQueue msgQ
forM ts $ \(ntfId, t) -> case t of
STUnexpectedError e -> logError $ "SMP client unexpected error: " <> tshow e -- uncorrelated response, should not happen
STResponse {} -> pure () -- it was already reported as timeout error
STEvent msgOrErr -> do
let smpQueue = SMPQueueNtf srv ntfId
case msgOrErr of
Right (SMP.NMSG nmsgNonce encNMsgMeta) -> do
ntfTs <- liftIO getSystemTime
st <- asks store
NtfPushServer {pushQ} <- asks pushServer
stats <- asks serverStats
liftIO $ updatePeriodStats (activeSubs stats) ntfId
let newNtf = PNMessageData {smpQueue, ntfTs, nmsgNonce, encNMsgMeta}
ntfs_ <- liftIO $ addTokenLastNtf st newNtf
forM_ ntfs_ $ \(tkn, lastNtfs) -> atomically $ writeTBQueue pushQ (tkn, PNMessage lastNtfs)
incNtfStat ntfReceived
Right SMP.END -> do
whenM (atomically $ activeClientSession' ca sessionId srv) $ do
st <- asks store
void $ liftIO $ updateSrvSubStatus st smpQueue NSEnd
Right SMP.DELD -> do
st <- asks store
void $ liftIO $ updateSrvSubStatus st smpQueue NSDeleted
Right (SMP.ERR e) -> logError $ "SMP server error: " <> tshow e
Right _ -> logError "SMP server unexpected response"
Left e -> logError $ "SMP client error: " <> tshow e
NtfPushServer {pushQ} <- asks pushServer
stats <- asks serverStats
liftIO $ forever $ do
((_, srv, _), _thVersion, sessionId, ts) <- atomically $ readTBQueue msgQ
forM ts $ \(ntfId, t) -> case t of
STUnexpectedError e -> logError $ "SMP client unexpected error: " <> tshow e -- uncorrelated response, should not happen
STResponse {} -> pure () -- it was already reported as timeout error
STEvent msgOrErr -> do
let smpQueue = SMPQueueNtf srv ntfId
case msgOrErr of
Right (SMP.NMSG nmsgNonce encNMsgMeta) -> do
ntfTs <- getSystemTime
updatePeriodStats (activeSubs stats) ntfId
let newNtf = PNMessageData {smpQueue, ntfTs, nmsgNonce, encNMsgMeta}
ntfs_ <- addTokenLastNtf st newNtf
forM_ ntfs_ $ \(tkn, lastNtfs) -> atomically $ writeTBQueue pushQ (tkn, PNMessage lastNtfs)
incNtfStat_ stats ntfReceived
Right SMP.END ->
whenM (atomically $ activeClientSession' ca sessionId srv) $
void $ updateSrvSubStatus st smpQueue NSEnd
Right SMP.DELD ->
void $ updateSrvSubStatus st smpQueue NSDeleted
Right (SMP.ERR e) -> logError $ "SMP server error: " <> tshow e
Right _ -> logError "SMP server unexpected response"
Left e -> logError $ "SMP client error: " <> tshow e

receiveAgent = do
st <- asks store
forever $
liftIO $ forever $
atomically (readTBQueue agentQ) >>= \case
CAConnected srv ->
logInfo $ "SMP server reconnected " <> showServer' srv
CADisconnected srv subs -> do
forM_ (L.nonEmpty $ map snd $ S.toList subs) $ \nIds -> do
updated <- liftIO $ batchUpdateSrvSubStatus st srv nIds NSInactive
updated <- batchUpdateSrvSubStatus st srv nIds NSInactive
logSubStatus srv "disconnected" (L.length nIds) updated
CASubscribed srv _ nIds -> do
updated <- liftIO $ batchUpdateSrvSubStatus st srv nIds NSActive
updated <- batchUpdateSrvSubStatus st srv nIds NSActive
logSubStatus srv "subscribed" (L.length nIds) updated
CASubError srv _ errs -> do
forM_ (L.nonEmpty $ mapMaybe (\(nId, err) -> (nId,) <$> subErrorStatus err) $ L.toList errs) $ \subStatuses -> do
updated <- liftIO $ batchUpdateSrvSubStatuses st srv subStatuses
updated <- batchUpdateSrvSubStatuses st srv subStatuses
logSubErrors srv subStatuses updated

logSubStatus :: SMPServer -> T.Text -> Int -> Int64 -> M ()
logSubStatus srv event n updated =
logInfo $ "SMP server " <> event <> " " <> showServer' srv <> " (" <> tshow n <> " subs, " <> tshow updated <> " subs updated)"

logSubErrors :: SMPServer -> NonEmpty (SMP.NotifierId, NtfSubStatus) -> Int64 -> M ()
logSubErrors :: SMPServer -> NonEmpty (SMP.NotifierId, NtfSubStatus) -> Int64 -> IO ()
logSubErrors srv subs updated = forM_ (L.group $ L.sort $ L.map snd subs) $ \ss -> do
logError $ "SMP server subscription errors " <> showServer' srv <> ": " <> tshow (L.head ss) <> " (" <> tshow (length ss) <> " errors, " <> tshow updated <> " subs updated)"

showServer' = decodeLatin1 . strEncode . host

subErrorStatus :: SMPClientError -> Maybe NtfSubStatus
subErrorStatus = \case
PCEProtocolError AUTH -> Just NSAuth
Expand All @@ -549,6 +558,13 @@ ntfSubscriber NtfSubscriber {smpSubscribers, newSubQ, smpAgent = ca@SMPClientAge
updateErr :: Show e => ByteString -> e -> Maybe NtfSubStatus
updateErr errType e = Just $ NSErr $ errType <> bshow e

logSubStatus :: SMPServer -> T.Text -> Int -> Int64 -> IO ()
logSubStatus srv event n updated =
logInfo $ "SMP server " <> event <> " " <> showServer' srv <> " (" <> tshow n <> " subs, " <> tshow updated <> " subs updated)"

showServer' :: SMPServer -> Text
showServer' = decodeLatin1 . strEncode . host

ntfPush :: NtfPushServer -> M ()
ntfPush s@NtfPushServer {pushQ} = forever $ do
(tkn@NtfTknRec {ntfTknId, token = t@(DeviceToken pp _), tknStatus}, ntf) <- atomically (readTBQueue pushQ)
Expand Down Expand Up @@ -703,7 +719,7 @@ verifyNtfTransmission st auth_ (tAuth, authorized, (corrId, entId, _)) = \case
e -> VRFailed e

client :: NtfServerClient -> NtfSubscriber -> NtfPushServer -> M ()
client NtfServerClient {rcvQ, sndQ} NtfSubscriber {newSubQ, smpAgent = ca} NtfPushServer {pushQ} =
client NtfServerClient {rcvQ, sndQ} ns@NtfSubscriber {smpAgent = ca} NtfPushServer {pushQ} =
forever $
atomically (readTBQueue rcvQ)
>>= mapM processCommand
Expand Down Expand Up @@ -781,7 +797,8 @@ client NtfServerClient {rcvQ, sndQ} NtfSubscriber {newSubQ, smpAgent = ca} NtfPu
resp <-
withNtfStore (`addNtfSubscription` sub) $ \case
True -> do
atomically $ writeTBQueue newSubQ (srv, [(subId, (nId, nKey))])
st <- asks store
liftIO $ subscribeNtfs ns st srv [(subId, (nId, nKey))]
incNtfStat subCreated
pure $ NRSubId subId
False -> pure $ NRErr AUTH
Expand Down Expand Up @@ -823,11 +840,15 @@ withNtfStore stAction continue = do
incNtfStatT :: DeviceToken -> (NtfServerStats -> IORef Int) -> M ()
incNtfStatT (DeviceToken PPApnsNull _) _ = pure ()
incNtfStatT _ statSel = incNtfStat statSel
{-# INLINE incNtfStatT #-}

incNtfStat :: (NtfServerStats -> IORef Int) -> M ()
incNtfStat statSel = do
stats <- asks serverStats
liftIO $ atomicModifyIORef'_ (statSel stats) (+ 1)
incNtfStat statSel = asks serverStats >>= liftIO . (`incNtfStat_` statSel)
{-# INLINE incNtfStat #-}

incNtfStat_ :: NtfServerStats -> (NtfServerStats -> IORef Int) -> IO ()
incNtfStat_ stats statSel = atomicModifyIORef'_ (statSel stats) (+ 1)
{-# INLINE incNtfStat_ #-}

restoreServerLastNtfs :: NtfSTMStore -> FilePath -> IO ()
restoreServerLastNtfs st f =
Expand Down
19 changes: 8 additions & 11 deletions src/Simplex/Messaging/Notifications/Server/Env.hs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import Simplex.Messaging.Server.Env.STM (StartOptions (..))
import Simplex.Messaging.Server.Expiration
import Simplex.Messaging.Server.QueueStore.Postgres.Config (PostgresStoreCfg (..))
import Simplex.Messaging.Server.StoreLog (closeStoreLog)
import Simplex.Messaging.Session
import Simplex.Messaging.TMap (TMap)
import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Transport (ATransport, THandleParams, TransportPeer (..))
Expand Down Expand Up @@ -113,30 +114,26 @@ newNtfServerEnv config@NtfServerConfig {subQSize, pushQSize, smpAgentCfg, apnsCo
exitFailure

data NtfSubscriber = NtfSubscriber
{ smpSubscribers :: TMap SMPServer SMPSubscriber,
newSubQ :: TBQueue (SMPServer, NonEmpty ServerNtfSub),
{ smpSubscribers :: TMap SMPServer SMPSubscriberVar,
subscriberSeq :: TVar Int,
smpAgent :: SMPClientAgent
}

type SMPSubscriberVar = SessionVar SMPSubscriber

newNtfSubscriber :: Natural -> SMPClientAgentConfig -> TVar ChaChaDRG -> IO NtfSubscriber
newNtfSubscriber qSize smpAgentCfg random = do
smpSubscribers <- TM.emptyIO
newSubQ <- newTBQueueIO qSize
subscriberSeq <- newTVarIO 0
smpAgent <- newSMPClientAgent smpAgentCfg random
pure NtfSubscriber {smpSubscribers, newSubQ, smpAgent}
pure NtfSubscriber {smpSubscribers, subscriberSeq, smpAgent}

data SMPSubscriber = SMPSubscriber
{ smpServer :: SMPServer,
subscriberSubQ :: TQueue (NonEmpty ServerNtfSub),
subThreadId :: TVar (Maybe (Weak ThreadId))
subThreadId :: Weak ThreadId
}

newSMPSubscriber :: SMPServer -> IO SMPSubscriber
newSMPSubscriber smpServer = do
subscriberSubQ <- newTQueueIO
subThreadId <- newTVarIO Nothing
pure SMPSubscriber {smpServer, subscriberSubQ, subThreadId}

data NtfPushServer = NtfPushServer
{ pushQ :: TBQueue (NtfTknRec, PushNotification),
pushClients :: TMap PushProvider PushProviderClient,
Expand Down
Loading