Skip to content

A closeLocalNode that cleans up after itself #188

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jun 19, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 15 additions & 17 deletions src/Control/Distributed/Process/Internal/Messaging.hs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import Data.Binary (Binary, encode)
import qualified Data.Map as Map (partitionWithKey, elems)
import qualified Data.ByteString.Lazy as BSL (toChunks)
import qualified Data.ByteString as BSS (ByteString)
import Control.Distributed.Process.Internal.StrictMVar (withMVar, modifyMVar_)
import Control.Distributed.Process.Serializable ()

import Control.Concurrent.Chan (writeChan)
Expand All @@ -30,6 +29,8 @@ import qualified Network.Transport as NT
)
import Control.Distributed.Process.Internal.Types
( LocalNode(localState, localEndPoint, localCtrlChan)
, withValidLocalState
, modifyValidLocalState_
, Identifier
, localConnections
, localConnectionBetween
Expand Down Expand Up @@ -102,7 +103,7 @@ setupConnBetween :: LocalNode
-> ImplicitReconnect
-> IO (Maybe NT.Connection)
setupConnBetween node from to implicitReconnect = do
mConn <- NT.connect endPoint
mConn <- NT.connect (localEndPoint node)
(nodeAddress . nodeOf $ to)
NT.ReliableOrdered
NT.defaultConnectHints
Expand All @@ -113,48 +114,45 @@ setupConnBetween node from to implicitReconnect = do
Left _ ->
return Nothing
Right () -> do
modifyMVar_ nodeState $ return .
(localConnectionBetween from to ^= Just (conn, implicitReconnect))
modifyValidLocalState_ node $
return . (localConnectionBetween from to ^= Just (conn, implicitReconnect))
return $ Just conn
Left _ ->
return Nothing
where
endPoint = localEndPoint node
nodeState = localState node

connBetween :: LocalNode
-> Identifier
-> Identifier
-> ImplicitReconnect
-> IO (Maybe NT.Connection)
connBetween node from to implicitReconnect = do
mConn <- withMVar nodeState $ return . (^. localConnectionBetween from to)
mConn <- withValidLocalState node $
return . (^. localConnectionBetween from to)
case mConn of
Just (conn, _) ->
return $ Just conn
Nothing ->
setupConnBetween node from to implicitReconnect
where
nodeState = localState node

disconnect :: LocalNode -> Identifier -> Identifier -> IO ()
disconnect node from to =
modifyMVar_ (localState node) $ \st ->
case st ^. localConnectionBetween from to of
modifyValidLocalState_ node $ \vst ->
case vst ^. localConnectionBetween from to of
Nothing ->
return st
return vst
Just (conn, _) -> do
NT.close conn
return (localConnectionBetween from to ^= Nothing $ st)
return $ localConnectionBetween from to ^= Nothing $ vst

closeImplicitReconnections :: LocalNode -> Identifier -> IO ()
closeImplicitReconnections node to =
modifyMVar_ (localState node) $ \st -> do
modifyValidLocalState_ node $ \vst -> do
let shouldClose (_, to') (_, WithImplicitReconnect) = to `impliesDeathOf` to'
shouldClose _ _ = False
let (affected, unaffected) = Map.partitionWithKey shouldClose (st ^. localConnections)
let (affected, unaffected) =
Map.partitionWithKey shouldClose (vst ^. localConnections)
mapM_ (NT.close . fst) (Map.elems affected)
return (localConnections ^= unaffected $ st)
return $ localConnections ^= unaffected $ vst

-- | @a `impliesDeathOf` b@ is true if the death of @a@ (for instance, a node)
-- implies the death of @b@ (for instance, a process on that node)
Expand Down
49 changes: 39 additions & 10 deletions src/Control/Distributed/Process/Internal/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@ module Control.Distributed.Process.Internal.Types
, nullProcessId
-- * Local nodes and processes
, LocalNode(..)
, LocalNodeState(..)
, ValidLocalNodeState(..)
, withValidLocalState
, modifyValidLocalState_
, Tracer(..)
, MxEventBus(..)
, LocalNodeState(..)
, LocalProcess(..)
, LocalProcessState(..)
, Process(..)
Expand Down Expand Up @@ -101,7 +104,7 @@ import qualified Data.ByteString.Lazy.Internal as BSL (ByteString(..))
import Data.Accessor (Accessor, accessor)
import Control.Category ((>>>))
import Control.DeepSeq (NFData(..))
import Control.Exception (Exception)
import Control.Exception (Exception, throwIO)
import Control.Concurrent (ThreadId)
import Control.Concurrent.Chan (Chan)
import Control.Concurrent.STM (STM)
Expand All @@ -121,7 +124,11 @@ import Control.Distributed.Process.Serializable
, showFingerprint
)
import Control.Distributed.Process.Internal.CQueue (CQueue)
import Control.Distributed.Process.Internal.StrictMVar (StrictMVar)
import Control.Distributed.Process.Internal.StrictMVar
( StrictMVar
, withMVar
, modifyMVar_
)
import Control.Distributed.Process.Internal.WeakTQueue (TQueue)
import Control.Distributed.Static (RemoteTable, Closure)
import qualified Control.Distributed.Process.Internal.StrictContainerAccessors as DAC (mapMaybe)
Expand Down Expand Up @@ -253,7 +260,11 @@ data ImplicitReconnect = WithImplicitReconnect | NoImplicitReconnect
deriving (Eq, Show)

-- | Local node state
data LocalNodeState = LocalNodeState
data LocalNodeState =
LocalNodeValid {-# UNPACK #-} !ValidLocalNodeState
| LocalNodeClosed

data ValidLocalNodeState = ValidLocalNodeState
{ -- | Processes running on this node
_localProcesses :: !(Map LocalProcessId LocalProcess)
-- | Counter to assign PIDs
Expand All @@ -266,6 +277,24 @@ data LocalNodeState = LocalNodeState
(NT.Connection, ImplicitReconnect))
}

-- | Wrapper around 'withMVar' that checks that the local node is still in
-- a valid state.
withValidLocalState :: LocalNode
-> (ValidLocalNodeState -> IO r)
-> IO r
withValidLocalState node f = withMVar (localState node) $ \st -> case st of
LocalNodeValid vst -> f vst
LocalNodeClosed -> throwIO $ userError $ "Node closed " ++ show (localNodeId node)

-- | Wrapper around 'modifyMVar_' that checks that the local node is still in
-- a valid state.
modifyValidLocalState_ :: LocalNode
-> (ValidLocalNodeState -> IO ValidLocalNodeState)
-> IO ()
modifyValidLocalState_ node f = modifyMVar_ (localState node) $ \st -> case st of
LocalNodeValid vst -> LocalNodeValid <$> f vst
LocalNodeClosed -> return LocalNodeClosed

-- | Processes running on our local node
data LocalProcess = LocalProcess
{ processQueue :: !(CQueue Message)
Expand Down Expand Up @@ -725,22 +754,22 @@ instance Binary ProcessInfoNone where
-- Accessors --
--------------------------------------------------------------------------------

localProcesses :: Accessor LocalNodeState (Map LocalProcessId LocalProcess)
localProcesses :: Accessor ValidLocalNodeState (Map LocalProcessId LocalProcess)
localProcesses = accessor _localProcesses (\procs st -> st { _localProcesses = procs })

localPidCounter :: Accessor LocalNodeState Int32
localPidCounter :: Accessor ValidLocalNodeState Int32
localPidCounter = accessor _localPidCounter (\ctr st -> st { _localPidCounter = ctr })

localPidUnique :: Accessor LocalNodeState Int32
localPidUnique :: Accessor ValidLocalNodeState Int32
localPidUnique = accessor _localPidUnique (\unq st -> st { _localPidUnique = unq })

localConnections :: Accessor LocalNodeState (Map (Identifier, Identifier) (NT.Connection, ImplicitReconnect))
localConnections :: Accessor ValidLocalNodeState (Map (Identifier, Identifier) (NT.Connection, ImplicitReconnect))
localConnections = accessor _localConnections (\conns st -> st { _localConnections = conns })

localProcessWithId :: LocalProcessId -> Accessor LocalNodeState (Maybe LocalProcess)
localProcessWithId :: LocalProcessId -> Accessor ValidLocalNodeState (Maybe LocalProcess)
localProcessWithId lpid = localProcesses >>> DAC.mapMaybe lpid

localConnectionBetween :: Identifier -> Identifier -> Accessor LocalNodeState (Maybe (NT.Connection, ImplicitReconnect))
localConnectionBetween :: Identifier -> Identifier -> Accessor ValidLocalNodeState (Maybe (NT.Connection, ImplicitReconnect))
localConnectionBetween from' to' = localConnections >>> DAC.mapMaybe (from', to')

monitorCounter :: Accessor LocalProcessState Int32
Expand Down
76 changes: 45 additions & 31 deletions src/Control/Distributed/Process/Node.hs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ import Control.Exception
, uninterruptibleMask_
)
import qualified Control.Exception as Exception (Handler(..), catches, finally)
import Control.Concurrent (forkIO, forkIOWithUnmask, myThreadId)
import Control.Concurrent (forkIO, forkIOWithUnmask, killThread, myThreadId)
import Control.Distributed.Process.Internal.StrictMVar
( newMVar
, withMVar
Expand All @@ -76,7 +76,6 @@ import Control.Distributed.Process.Internal.StrictMVar
, newEmptyMVar
, putMVar
, takeMVar
, readMVar
)
import Control.Concurrent.Chan (newChan, writeChan, readChan)
import qualified Control.Concurrent.MVar as MVar (newEmptyMVar, takeMVar)
Expand Down Expand Up @@ -119,6 +118,9 @@ import Control.Distributed.Process.Internal.Types
, LocalNode(..)
, MxEventBus(..)
, LocalNodeState(..)
, ValidLocalNodeState(..)
, withValidLocalState
, modifyValidLocalState_
, LocalProcess(..)
, LocalProcessState(..)
, Process(..)
Expand Down Expand Up @@ -231,7 +233,7 @@ newLocalNode transport rtable = do
createBareLocalNode :: NT.EndPoint -> RemoteTable -> IO LocalNode
createBareLocalNode endPoint rtable = do
unq <- randomIO
state <- newMVar LocalNodeState
state <- newMVar $ LocalNodeValid $ ValidLocalNodeState
{ _localProcesses = Map.empty
, _localPidCounter = firstNonReservedProcessId
, _localPidUnique = unq
Expand Down Expand Up @@ -318,12 +320,21 @@ startServiceProcesses node = do
sendChan ch ()
]

-- | Force-close a local node
--
-- TODO: for now we just close the associated endpoint
-- | Force-close a local node, killing all processes on that node.
closeLocalNode :: LocalNode -> IO ()
closeLocalNode node =
-- TODO: close all our processes, surely!?
closeLocalNode node = do
modifyMVar_ (localState node) $ \st -> case st of
LocalNodeValid vst -> do
forM_ (vst ^. localProcesses) $ \lproc ->
-- Semantics of 'throwTo' guarantee that target thread will get
-- delivered an exception. Therefore, target thread will be killed
-- eventually and that's as good as we can do. No need to wait for
-- thread to actually finish dying.
killThread (processThread lproc)
return LocalNodeClosed
LocalNodeClosed -> return LocalNodeClosed
-- This call will have the effect of shutting down the NC as well (see
-- 'createBareLocalNode').
NT.closeEndPoint (localEndPoint node)

-- | Run a process on a local node and wait for it to finish
Expand All @@ -342,9 +353,9 @@ forkProcess node proc =
modifyMVarMasked (localState node) startProcess
where
startProcess :: LocalNodeState -> IO (LocalNodeState, ProcessId)
startProcess st = do
let lpid = LocalProcessId { lpidCounter = st ^. localPidCounter
, lpidUnique = st ^. localPidUnique
startProcess (LocalNodeValid vst) = do
let lpid = LocalProcessId { lpidCounter = vst ^. localPidCounter
, lpidUnique = vst ^. localPidUnique
}
let pid = ProcessId { processNodeId = localNodeId node
, processLocalId = lpid
Expand Down Expand Up @@ -376,7 +387,7 @@ forkProcess node proc =
(return . DiedException . (show :: SomeException -> String)))]

-- [Unified: Table 4, rules termination and exiting]
modifyMVar_ (localState node) (cleanupProcess pid)
modifyValidLocalState_ node (cleanupProcess pid)
writeChan (localCtrlChan node) NCMsg
{ ctrlMsgSender = ProcessIdentifier pid
, ctrlMsgSignal = Died (ProcessIdentifier pid) reason
Expand All @@ -391,27 +402,30 @@ forkProcess node proc =
-- TODO: this doesn't look right at all - how do we know
-- that newUnique represents a process id that is available!?
newUnique <- randomIO
return ( (localProcessWithId lpid ^= Just lproc)
return ( LocalNodeValid
$ (localProcessWithId lpid ^= Just lproc)
. (localPidCounter ^= firstNonReservedProcessId)
. (localPidUnique ^= newUnique)
$ st
$ vst
, pid
)
else
return ( (localProcessWithId lpid ^= Just lproc)
return ( LocalNodeValid
$ (localProcessWithId lpid ^= Just lproc)
. (localPidCounter ^: (+ 1))
$ st
$ vst
, pid
)
startProcess LocalNodeClosed = throwIO $ userError $ "Node closed " ++ show (localNodeId node)

cleanupProcess :: ProcessId -> LocalNodeState -> IO LocalNodeState
cleanupProcess pid st = do
cleanupProcess :: ProcessId -> ValidLocalNodeState -> IO ValidLocalNodeState
cleanupProcess pid vst = do
let pid' = ProcessIdentifier pid
let (affected, unaffected) = Map.partitionWithKey (\(fr, _to) !_v -> impliesDeathOf pid' fr) (st ^. localConnections)
let (affected, unaffected) = Map.partitionWithKey (\(fr, _to) !_v -> impliesDeathOf pid' fr) (vst ^. localConnections)
mapM_ (NT.close . fst) (Map.elems affected)
return $ (localProcessWithId (processLocalId pid) ^= Nothing)
. (localConnections ^= unaffected)
$ st
$ vst

-- note [tracer/forkProcess races]
--
Expand Down Expand Up @@ -500,7 +514,7 @@ handleIncomingMessages node = go initConnectionState
case decode (BSL.fromChunks payload) of
ProcessIdentifier pid -> do
let lpid = processLocalId pid
mProc <- withMVar state $ return . (^. localProcessWithId lpid)
mProc <- withValidLocalState node $ return . (^. localProcessWithId lpid)
case mProc of
Just proc ->
go (incomingAt cid ^= Just (src, ToProc pid (processWeakQ proc)) $ st)
Expand All @@ -509,7 +523,7 @@ handleIncomingMessages node = go initConnectionState
SendPortIdentifier chId -> do
let lcid = sendPortLocalId chId
lpid = processLocalId (sendPortProcessId chId)
mProc <- withMVar state $ return . (^. localProcessWithId lpid)
mProc <- withValidLocalState node $ return . (^. localProcessWithId lpid)
case mProc of
Just proc -> do
mChannel <- withMVar (processState proc) $ return . (^. typedChannelWithId lcid)
Expand Down Expand Up @@ -970,8 +984,8 @@ ncEffectGetInfo from pid =
them = (ProcessIdentifier pid)
in do
node <- ask
mProc <- liftIO $
withMVar (localState node) $ return . (^. localProcessWithId lpid)
mProc <- liftIO $ withValidLocalState node
$ return . (^. localProcessWithId lpid)
case mProc of
Nothing -> dispatch (isLocal node (ProcessIdentifier from))
from node (ProcessInfoNone DiedUnknownId)
Expand Down Expand Up @@ -1014,17 +1028,17 @@ ncEffectGetNodeStats :: ProcessId -> NodeId -> NC ()
ncEffectGetNodeStats from _nid = do
node <- ask
ncState <- StateT.get
nodeState <- liftIO $ readMVar (localState node)
let localProcesses' = nodeState ^. localProcesses
stats =
nodeState <- liftIO $ withValidLocalState node return
let stats =
NodeStats {
nodeStatsNode = localNodeId node
, nodeStatsRegisteredNames = Map.size $ ncState ^. registeredHere
, nodeStatsMonitors = Map.size $ ncState ^. monitors
, nodeStatsLinks = Map.size $ ncState ^. links
, nodeStatsProcesses = Map.size localProcesses'
, nodeStatsProcesses = Map.size (nodeState ^. localProcesses)
}
postAsMessage from stats

--------------------------------------------------------------------------------
-- Auxiliary --
--------------------------------------------------------------------------------
Expand Down Expand Up @@ -1097,7 +1111,7 @@ unClosure closure = do
isValidLocalIdentifier :: Identifier -> NC Bool
isValidLocalIdentifier ident = do
node <- ask
liftIO . withMVar (localState node) $ \nSt ->
liftIO . withValidLocalState node $ \nSt ->
case ident of
NodeIdentifier nid ->
return $ nid == localNodeId node
Expand Down Expand Up @@ -1135,8 +1149,8 @@ withLocalProc node pid p =
-- By [Unified: table 6, rule missing_process] messages to dead processes
-- can silently be dropped
let lpid = processLocalId pid in do
mProc <- withMVar (localState node) $ return . (^. localProcessWithId lpid)
forM_ mProc p
withValidLocalState node $ \vst ->
forM_ (vst ^. localProcessWithId lpid) p

--------------------------------------------------------------------------------
-- Accessors --
Expand Down