Skip to content

Commit a386058

Browse files
committed
Merge pull request #188 from mboes/localnode-closed-state
A closeLocalNode that cleans up after itself
2 parents 2b770e6 + f575a46 commit a386058

File tree

3 files changed

+99
-58
lines changed

3 files changed

+99
-58
lines changed

src/Control/Distributed/Process/Internal/Messaging.hs

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ import Data.Binary (Binary, encode)
1313
import qualified Data.Map as Map (partitionWithKey, elems)
1414
import qualified Data.ByteString.Lazy as BSL (toChunks)
1515
import qualified Data.ByteString as BSS (ByteString)
16-
import Control.Distributed.Process.Internal.StrictMVar (withMVar, modifyMVar_)
1716
import Control.Distributed.Process.Serializable ()
1817

1918
import Control.Concurrent.Chan (writeChan)
@@ -30,6 +29,8 @@ import qualified Network.Transport as NT
3029
)
3130
import Control.Distributed.Process.Internal.Types
3231
( LocalNode(localState, localEndPoint, localCtrlChan)
32+
, withValidLocalState
33+
, modifyValidLocalState_
3334
, Identifier
3435
, localConnections
3536
, localConnectionBetween
@@ -102,7 +103,7 @@ setupConnBetween :: LocalNode
102103
-> ImplicitReconnect
103104
-> IO (Maybe NT.Connection)
104105
setupConnBetween node from to implicitReconnect = do
105-
mConn <- NT.connect endPoint
106+
mConn <- NT.connect (localEndPoint node)
106107
(nodeAddress . nodeOf $ to)
107108
NT.ReliableOrdered
108109
NT.defaultConnectHints
@@ -113,48 +114,45 @@ setupConnBetween node from to implicitReconnect = do
113114
Left _ ->
114115
return Nothing
115116
Right () -> do
116-
modifyMVar_ nodeState $ return .
117-
(localConnectionBetween from to ^= Just (conn, implicitReconnect))
117+
modifyValidLocalState_ node $
118+
return . (localConnectionBetween from to ^= Just (conn, implicitReconnect))
118119
return $ Just conn
119120
Left _ ->
120121
return Nothing
121-
where
122-
endPoint = localEndPoint node
123-
nodeState = localState node
124122

125123
connBetween :: LocalNode
126124
-> Identifier
127125
-> Identifier
128126
-> ImplicitReconnect
129127
-> IO (Maybe NT.Connection)
130128
connBetween node from to implicitReconnect = do
131-
mConn <- withMVar nodeState $ return . (^. localConnectionBetween from to)
129+
mConn <- withValidLocalState node $
130+
return . (^. localConnectionBetween from to)
132131
case mConn of
133132
Just (conn, _) ->
134133
return $ Just conn
135134
Nothing ->
136135
setupConnBetween node from to implicitReconnect
137-
where
138-
nodeState = localState node
139136

140137
disconnect :: LocalNode -> Identifier -> Identifier -> IO ()
141138
disconnect node from to =
142-
modifyMVar_ (localState node) $ \st ->
143-
case st ^. localConnectionBetween from to of
139+
modifyValidLocalState_ node $ \vst ->
140+
case vst ^. localConnectionBetween from to of
144141
Nothing ->
145-
return st
142+
return vst
146143
Just (conn, _) -> do
147144
NT.close conn
148-
return (localConnectionBetween from to ^= Nothing $ st)
145+
return $ localConnectionBetween from to ^= Nothing $ vst
149146

150147
closeImplicitReconnections :: LocalNode -> Identifier -> IO ()
151148
closeImplicitReconnections node to =
152-
modifyMVar_ (localState node) $ \st -> do
149+
modifyValidLocalState_ node $ \vst -> do
153150
let shouldClose (_, to') (_, WithImplicitReconnect) = to `impliesDeathOf` to'
154151
shouldClose _ _ = False
155-
let (affected, unaffected) = Map.partitionWithKey shouldClose (st ^. localConnections)
152+
let (affected, unaffected) =
153+
Map.partitionWithKey shouldClose (vst ^. localConnections)
156154
mapM_ (NT.close . fst) (Map.elems affected)
157-
return (localConnections ^= unaffected $ st)
155+
return $ localConnections ^= unaffected $ vst
158156

159157
-- | @a `impliesDeathOf` b@ is true if the death of @a@ (for instance, a node)
160158
-- implies the death of @b@ (for instance, a process on that node)

src/Control/Distributed/Process/Internal/Types.hs

Lines changed: 39 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,12 @@ module Control.Distributed.Process.Internal.Types
2121
, nullProcessId
2222
-- * Local nodes and processes
2323
, LocalNode(..)
24+
, LocalNodeState(..)
25+
, ValidLocalNodeState(..)
26+
, withValidLocalState
27+
, modifyValidLocalState_
2428
, Tracer(..)
2529
, MxEventBus(..)
26-
, LocalNodeState(..)
2730
, LocalProcess(..)
2831
, LocalProcessState(..)
2932
, Process(..)
@@ -102,7 +105,7 @@ import qualified Data.ByteString.Lazy.Internal as BSL (ByteString(..))
102105
import Data.Accessor (Accessor, accessor)
103106
import Control.Category ((>>>))
104107
import Control.DeepSeq (NFData(..))
105-
import Control.Exception (Exception)
108+
import Control.Exception (Exception, throwIO)
106109
import Control.Concurrent (ThreadId)
107110
import Control.Concurrent.Chan (Chan)
108111
import Control.Concurrent.STM (STM)
@@ -122,7 +125,11 @@ import Control.Distributed.Process.Serializable
122125
, showFingerprint
123126
)
124127
import Control.Distributed.Process.Internal.CQueue (CQueue)
125-
import Control.Distributed.Process.Internal.StrictMVar (StrictMVar)
128+
import Control.Distributed.Process.Internal.StrictMVar
129+
( StrictMVar
130+
, withMVar
131+
, modifyMVar_
132+
)
126133
import Control.Distributed.Process.Internal.WeakTQueue (TQueue)
127134
import Control.Distributed.Static (RemoteTable, Closure)
128135
import qualified Control.Distributed.Process.Internal.StrictContainerAccessors as DAC (mapMaybe)
@@ -258,7 +265,11 @@ data ImplicitReconnect = WithImplicitReconnect | NoImplicitReconnect
258265
deriving (Eq, Show)
259266

260267
-- | Local node state
261-
data LocalNodeState = LocalNodeState
268+
data LocalNodeState =
269+
LocalNodeValid {-# UNPACK #-} !ValidLocalNodeState
270+
| LocalNodeClosed
271+
272+
data ValidLocalNodeState = ValidLocalNodeState
262273
{ -- | Processes running on this node
263274
_localProcesses :: !(Map LocalProcessId LocalProcess)
264275
-- | Counter to assign PIDs
@@ -271,6 +282,24 @@ data LocalNodeState = LocalNodeState
271282
(NT.Connection, ImplicitReconnect))
272283
}
273284

285+
-- | Wrapper around 'withMVar' that checks that the local node is still in
286+
-- a valid state.
287+
withValidLocalState :: LocalNode
288+
-> (ValidLocalNodeState -> IO r)
289+
-> IO r
290+
withValidLocalState node f = withMVar (localState node) $ \st -> case st of
291+
LocalNodeValid vst -> f vst
292+
LocalNodeClosed -> throwIO $ userError $ "Node closed " ++ show (localNodeId node)
293+
294+
-- | Wrapper around 'modifyMVar_' that checks that the local node is still in
295+
-- a valid state.
296+
modifyValidLocalState_ :: LocalNode
297+
-> (ValidLocalNodeState -> IO ValidLocalNodeState)
298+
-> IO ()
299+
modifyValidLocalState_ node f = modifyMVar_ (localState node) $ \st -> case st of
300+
LocalNodeValid vst -> LocalNodeValid <$> f vst
301+
LocalNodeClosed -> return LocalNodeClosed
302+
274303
-- | Processes running on our local node
275304
data LocalProcess = LocalProcess
276305
{ processQueue :: !(CQueue Message)
@@ -746,22 +775,22 @@ instance Binary ProcessInfoNone where
746775
-- Accessors --
747776
--------------------------------------------------------------------------------
748777

749-
localProcesses :: Accessor LocalNodeState (Map LocalProcessId LocalProcess)
778+
localProcesses :: Accessor ValidLocalNodeState (Map LocalProcessId LocalProcess)
750779
localProcesses = accessor _localProcesses (\procs st -> st { _localProcesses = procs })
751780

752-
localPidCounter :: Accessor LocalNodeState Int32
781+
localPidCounter :: Accessor ValidLocalNodeState Int32
753782
localPidCounter = accessor _localPidCounter (\ctr st -> st { _localPidCounter = ctr })
754783

755-
localPidUnique :: Accessor LocalNodeState Int32
784+
localPidUnique :: Accessor ValidLocalNodeState Int32
756785
localPidUnique = accessor _localPidUnique (\unq st -> st { _localPidUnique = unq })
757786

758-
localConnections :: Accessor LocalNodeState (Map (Identifier, Identifier) (NT.Connection, ImplicitReconnect))
787+
localConnections :: Accessor ValidLocalNodeState (Map (Identifier, Identifier) (NT.Connection, ImplicitReconnect))
759788
localConnections = accessor _localConnections (\conns st -> st { _localConnections = conns })
760789

761-
localProcessWithId :: LocalProcessId -> Accessor LocalNodeState (Maybe LocalProcess)
790+
localProcessWithId :: LocalProcessId -> Accessor ValidLocalNodeState (Maybe LocalProcess)
762791
localProcessWithId lpid = localProcesses >>> DAC.mapMaybe lpid
763792

764-
localConnectionBetween :: Identifier -> Identifier -> Accessor LocalNodeState (Maybe (NT.Connection, ImplicitReconnect))
793+
localConnectionBetween :: Identifier -> Identifier -> Accessor ValidLocalNodeState (Maybe (NT.Connection, ImplicitReconnect))
765794
localConnectionBetween from' to' = localConnections >>> DAC.mapMaybe (from', to')
766795

767796
monitorCounter :: Accessor LocalProcessState Int32

src/Control/Distributed/Process/Node.hs

Lines changed: 45 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ import Control.Exception
6767
, uninterruptibleMask_
6868
)
6969
import qualified Control.Exception as Exception (Handler(..), catches, finally)
70-
import Control.Concurrent (forkIO, forkIOWithUnmask, myThreadId)
70+
import Control.Concurrent (forkIO, forkIOWithUnmask, killThread, myThreadId)
7171
import Control.Distributed.Process.Internal.StrictMVar
7272
( newMVar
7373
, withMVar
@@ -76,7 +76,6 @@ import Control.Distributed.Process.Internal.StrictMVar
7676
, newEmptyMVar
7777
, putMVar
7878
, takeMVar
79-
, readMVar
8079
)
8180
import Control.Concurrent.Chan (newChan, writeChan, readChan)
8281
import qualified Control.Concurrent.MVar as MVar (newEmptyMVar, takeMVar)
@@ -119,6 +118,9 @@ import Control.Distributed.Process.Internal.Types
119118
, LocalNode(..)
120119
, MxEventBus(..)
121120
, LocalNodeState(..)
121+
, ValidLocalNodeState(..)
122+
, withValidLocalState
123+
, modifyValidLocalState_
122124
, LocalProcess(..)
123125
, LocalProcessState(..)
124126
, Process(..)
@@ -233,7 +235,7 @@ newLocalNode transport rtable = do
233235
createBareLocalNode :: NT.EndPoint -> RemoteTable -> IO LocalNode
234236
createBareLocalNode endPoint rtable = do
235237
unq <- randomIO
236-
state <- newMVar LocalNodeState
238+
state <- newMVar $ LocalNodeValid $ ValidLocalNodeState
237239
{ _localProcesses = Map.empty
238240
, _localPidCounter = firstNonReservedProcessId
239241
, _localPidUnique = unq
@@ -320,12 +322,21 @@ startServiceProcesses node = do
320322
sendChan ch ()
321323
]
322324

323-
-- | Force-close a local node
324-
--
325-
-- TODO: for now we just close the associated endpoint
325+
-- | Force-close a local node, killing all processes on that node.
326326
closeLocalNode :: LocalNode -> IO ()
327-
closeLocalNode node =
328-
-- TODO: close all our processes, surely!?
327+
closeLocalNode node = do
328+
modifyMVar_ (localState node) $ \st -> case st of
329+
LocalNodeValid vst -> do
330+
forM_ (vst ^. localProcesses) $ \lproc ->
331+
-- Semantics of 'throwTo' guarantee that target thread will get
332+
-- delivered an exception. Therefore, target thread will be killed
333+
-- eventually and that's as good as we can do. No need to wait for
334+
-- thread to actually finish dying.
335+
killThread (processThread lproc)
336+
return LocalNodeClosed
337+
LocalNodeClosed -> return LocalNodeClosed
338+
-- This call will have the effect of shutting down the NC as well (see
339+
-- 'createBareLocalNode').
329340
NT.closeEndPoint (localEndPoint node)
330341

331342
-- | Run a process on a local node and wait for it to finish
@@ -344,9 +355,9 @@ forkProcess node proc =
344355
modifyMVarMasked (localState node) startProcess
345356
where
346357
startProcess :: LocalNodeState -> IO (LocalNodeState, ProcessId)
347-
startProcess st = do
348-
let lpid = LocalProcessId { lpidCounter = st ^. localPidCounter
349-
, lpidUnique = st ^. localPidUnique
358+
startProcess (LocalNodeValid vst) = do
359+
let lpid = LocalProcessId { lpidCounter = vst ^. localPidCounter
360+
, lpidUnique = vst ^. localPidUnique
350361
}
351362
let pid = ProcessId { processNodeId = localNodeId node
352363
, processLocalId = lpid
@@ -378,7 +389,7 @@ forkProcess node proc =
378389
(return . DiedException . (show :: SomeException -> String)))]
379390

380391
-- [Unified: Table 4, rules termination and exiting]
381-
modifyMVar_ (localState node) (cleanupProcess pid)
392+
modifyValidLocalState_ node (cleanupProcess pid)
382393
writeChan (localCtrlChan node) NCMsg
383394
{ ctrlMsgSender = ProcessIdentifier pid
384395
, ctrlMsgSignal = Died (ProcessIdentifier pid) reason
@@ -393,27 +404,30 @@ forkProcess node proc =
393404
-- TODO: this doesn't look right at all - how do we know
394405
-- that newUnique represents a process id that is available!?
395406
newUnique <- randomIO
396-
return ( (localProcessWithId lpid ^= Just lproc)
407+
return ( LocalNodeValid
408+
$ (localProcessWithId lpid ^= Just lproc)
397409
. (localPidCounter ^= firstNonReservedProcessId)
398410
. (localPidUnique ^= newUnique)
399-
$ st
411+
$ vst
400412
, pid
401413
)
402414
else
403-
return ( (localProcessWithId lpid ^= Just lproc)
415+
return ( LocalNodeValid
416+
$ (localProcessWithId lpid ^= Just lproc)
404417
. (localPidCounter ^: (+ 1))
405-
$ st
418+
$ vst
406419
, pid
407420
)
421+
startProcess LocalNodeClosed = throwIO $ userError $ "Node closed " ++ show (localNodeId node)
408422

409-
cleanupProcess :: ProcessId -> LocalNodeState -> IO LocalNodeState
410-
cleanupProcess pid st = do
423+
cleanupProcess :: ProcessId -> ValidLocalNodeState -> IO ValidLocalNodeState
424+
cleanupProcess pid vst = do
411425
let pid' = ProcessIdentifier pid
412-
let (affected, unaffected) = Map.partitionWithKey (\(fr, _to) !_v -> impliesDeathOf pid' fr) (st ^. localConnections)
426+
let (affected, unaffected) = Map.partitionWithKey (\(fr, _to) !_v -> impliesDeathOf pid' fr) (vst ^. localConnections)
413427
mapM_ (NT.close . fst) (Map.elems affected)
414428
return $ (localProcessWithId (processLocalId pid) ^= Nothing)
415429
. (localConnections ^= unaffected)
416-
$ st
430+
$ vst
417431

418432
-- note [tracer/forkProcess races]
419433
--
@@ -502,7 +516,7 @@ handleIncomingMessages node = go initConnectionState
502516
case decode (BSL.fromChunks payload) of
503517
ProcessIdentifier pid -> do
504518
let lpid = processLocalId pid
505-
mProc <- withMVar state $ return . (^. localProcessWithId lpid)
519+
mProc <- withValidLocalState node $ return . (^. localProcessWithId lpid)
506520
case mProc of
507521
Just proc ->
508522
go (incomingAt cid ^= Just (src, ToProc pid (processWeakQ proc)) $ st)
@@ -511,7 +525,7 @@ handleIncomingMessages node = go initConnectionState
511525
SendPortIdentifier chId -> do
512526
let lcid = sendPortLocalId chId
513527
lpid = processLocalId (sendPortProcessId chId)
514-
mProc <- withMVar state $ return . (^. localProcessWithId lpid)
528+
mProc <- withValidLocalState node $ return . (^. localProcessWithId lpid)
515529
case mProc of
516530
Just proc -> do
517531
mChannel <- withMVar (processState proc) $ return . (^. typedChannelWithId lcid)
@@ -978,8 +992,8 @@ ncEffectGetInfo from pid =
978992
them = (ProcessIdentifier pid)
979993
in do
980994
node <- ask
981-
mProc <- liftIO $
982-
withMVar (localState node) $ return . (^. localProcessWithId lpid)
995+
mProc <- liftIO $ withValidLocalState node
996+
$ return . (^. localProcessWithId lpid)
983997
case mProc of
984998
Nothing -> dispatch (isLocal node (ProcessIdentifier from))
985999
from node (ProcessInfoNone DiedUnknownId)
@@ -1022,17 +1036,17 @@ ncEffectGetNodeStats :: ProcessId -> NodeId -> NC ()
10221036
ncEffectGetNodeStats from _nid = do
10231037
node <- ask
10241038
ncState <- StateT.get
1025-
nodeState <- liftIO $ readMVar (localState node)
1026-
let localProcesses' = nodeState ^. localProcesses
1027-
stats =
1039+
nodeState <- liftIO $ withValidLocalState node return
1040+
let stats =
10281041
NodeStats {
10291042
nodeStatsNode = localNodeId node
10301043
, nodeStatsRegisteredNames = Map.size $ ncState ^. registeredHere
10311044
, nodeStatsMonitors = Map.size $ ncState ^. monitors
10321045
, nodeStatsLinks = Map.size $ ncState ^. links
1033-
, nodeStatsProcesses = Map.size localProcesses'
1046+
, nodeStatsProcesses = Map.size (nodeState ^. localProcesses)
10341047
}
10351048
postAsMessage from stats
1049+
10361050
--------------------------------------------------------------------------------
10371051
-- Auxiliary --
10381052
--------------------------------------------------------------------------------
@@ -1105,7 +1119,7 @@ unClosure closure = do
11051119
isValidLocalIdentifier :: Identifier -> NC Bool
11061120
isValidLocalIdentifier ident = do
11071121
node <- ask
1108-
liftIO . withMVar (localState node) $ \nSt ->
1122+
liftIO . withValidLocalState node $ \nSt ->
11091123
case ident of
11101124
NodeIdentifier nid ->
11111125
return $ nid == localNodeId node
@@ -1145,8 +1159,8 @@ withLocalProc node pid p =
11451159
-- By [Unified: table 6, rule missing_process] messages to dead processes
11461160
-- can silently be dropped
11471161
let lpid = processLocalId pid in do
1148-
mProc <- withMVar (localState node) $ return . (^. localProcessWithId lpid)
1149-
forM_ mProc p
1162+
withValidLocalState node $ \vst ->
1163+
forM_ (vst ^. localProcessWithId lpid) p
11501164

11511165
--------------------------------------------------------------------------------
11521166
-- Accessors --

0 commit comments

Comments
 (0)