Skip to content

Commit 467a611

Browse files
committed
Update comments to reflect what we're doing
1 parent 2c9691b commit 467a611

File tree

3 files changed

+92
-27
lines changed
  • distributed-process-tests/src/Control/Distributed/Process/Tests
  • src/Control/Distributed/Process

3 files changed

+92
-27
lines changed

distributed-process-tests/src/Control/Distributed/Process/Tests/CH.hs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import Control.Distributed.Process.Internal.Types
3939
import Control.Distributed.Process.Node
4040
import Control.Distributed.Process.Debug
4141
import Control.Distributed.Process.Management.Internal.Types
42+
import Control.Distributed.Process.Tests.Internal.Utils (shouldBe)
4243
import Control.Distributed.Process.Serializable (Serializable)
4344

4445
import Test.HUnit (Assertion, assertFailure, assertBool)
@@ -1404,6 +1405,9 @@ testRegistryMonitoring TestTransport{..} = do
14041405
w <- whereis "test" :: Process (Maybe ProcessId)
14051406
forM_ w (const waitpoll)
14061407
in do register "test" remote1
1408+
-- [race-condition] invalid test if we force exit before waitpoll runs
1409+
registered <- whereis "test"
1410+
registered `shouldBe` (equalTo (Just remote1))
14071411
send remote1 ()
14081412
waitpoll
14091413
return ()
@@ -1412,15 +1416,19 @@ testRegistryMonitoring TestTransport{..} = do
14121416
-- Many labels. Test if all labels associated with process
14131417
-- are removed from registry when it dies.
14141418
remote2 <- testRemote remoteNode
1419+
remoteX <- testRemote remoteNode
14151420
runProcess localNode $
14161421
let waitpoll = do
14171422
w1 <- whereis "test-3" :: Process (Maybe ProcessId)
14181423
w2 <- whereis "test-4" :: Process (Maybe ProcessId)
14191424
forM_ (w1 <|> w2) (const waitpoll)
14201425
in do register "test-3" remote2
14211426
register "test-4" remote2
1427+
register "test-X" remoteX
14221428
send remote2 ()
14231429
waitpoll
1430+
rX <- whereis "test-X"
1431+
rX `shouldBe` (equalTo (Just remoteX))
14241432
return ()
14251433

14261434
{- XXX: waiting including patch for nsend for remote process

distributed-process-tests/src/Control/Distributed/Process/Tests/Mx.hs

Lines changed: 63 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -176,14 +176,14 @@ testAgentEventHandling result = do
176176

177177
stash result $ seenAlive && seenDead
178178

179-
testMxMonitorEvents :: Process ()
180-
testMxMonitorEvents = do
179+
testMxRegEvents :: Process ()
180+
testMxRegEvents = do
181181

182182
{- This test only deals with the local case, to ensure that we are being
183183
notified in the expected order - the remote cases related to the
184184
behaviour of the node controller are contained in the CH test suite. -}
185185

186-
let label = "testMxMonitorEvents"
186+
let label = "testMxRegEvents"
187187
let agentLabel = "listener-agent"
188188
let delay = 1000000
189189
(regChan, regSink) <- newChan
@@ -223,9 +223,65 @@ testMxMonitorEvents = do
223223

224224
mapM_ (flip kill $ "test-complete") [agent, p1, p2]
225225

226+
testMxRegMon :: LocalNode -> Process ()
227+
testMxRegMon remoteNode = do
228+
229+
-- ensure that when a registered process dies, we get a notification that
230+
-- it has been unregistered as well as seeing the name get removed
231+
232+
let label1 = "aaaaa"
233+
let label2 = "bbbbb"
234+
let isValid l = l ==label1 || l == label2
235+
let agentLabel = "listener-agent"
236+
let delay = 1000000
237+
(regChan, regSink) <- newChan
238+
(unRegChan, unRegSink) <- newChan
239+
agent <- mxAgent (MxAgentId agentLabel) () [
240+
mxSink $ \ev -> do
241+
case ev of
242+
MxRegistered pid label
243+
| isValid label -> liftMX $ sendChan regChan (label, pid)
244+
MxUnRegistered pid label
245+
| isValid label -> liftMX $ sendChan unRegChan (label, pid)
246+
_ -> return ()
247+
mxReady
248+
]
249+
250+
(sp, rp) <- newChan
251+
liftIO $ forkProcess remoteNode $ do
252+
getSelfPid >>= sendChan sp
253+
expect :: Process ()
254+
255+
p1 <- receiveChan rp
256+
257+
register label1 p1
258+
reg1 <- receiveChanTimeout delay regSink
259+
reg1 `shouldBe` equalTo (Just (label1, p1))
260+
261+
register label2 p1
262+
reg2 <- receiveChanTimeout delay regSink
263+
reg2 `shouldBe` equalTo (Just (label2, p1))
264+
265+
n1 <- whereis label1
266+
n1 `shouldBe` equalTo (Just p1)
267+
268+
n2 <- whereis label2
269+
n2 `shouldBe` equalTo (Just p1)
270+
271+
kill p1 "goodbye"
272+
273+
unreg1 <- receiveChanTimeout delay unRegSink
274+
unreg2 <- receiveChanTimeout delay unRegSink
275+
276+
sort [unreg1, unreg2]
277+
`shouldBe` equalTo [Just (label1, p1), Just (label2, p1)]
278+
279+
kill agent "test-complete"
280+
226281
tests :: TestTransport -> IO [Test]
227282
tests TestTransport{..} = do
228283
node1 <- newLocalNode testTransport initRemoteTable
284+
node2 <- newLocalNode testTransport initRemoteTable
229285
return [
230286
testGroup "Mx Agents" [
231287
testCase "Event Handling"
@@ -252,7 +308,9 @@ tests TestTransport{..} = do
252308
"fifth"]) testAgentPrioritisation)
253309
]
254310
, testGroup "Mx Events" [
255-
testCase "Monitor Events"
256-
(runProcess node1 testMxMonitorEvents)
311+
testCase "Name Registration Events"
312+
(runProcess node1 testMxRegEvents)
313+
, testCase "Post Death Name UnRegistration Events"
314+
(runProcess node1 (testMxRegMon node2))
257315
]
258316
]

src/Control/Distributed/Process/Node.hs

Lines changed: 21 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ import qualified Data.Map as Map
3434
, filter
3535
, insert
3636
, lookup
37-
, insertLookupWithKey
37+
, partition
3838
, partitionWithKey
3939
, elems
4040
, size
@@ -328,28 +328,23 @@ startDefaultTracer node' = do
328328
registryMonitorAgentId :: MxAgentId
329329
registryMonitorAgentId = MxAgentId "service.registry.monitoring"
330330

331-
-- note [registry monitoring agent]
332-
-- this agent listens for 'MxRegistered' and 'MxUnRegistered' events and tracks
333-
-- all remote 'ProcessId's that are stored in the registry.
334-
--
335-
-- When a remote process is registered, the agent starts monitoring it until it
336-
-- is unregistered or the monitor notification arrives.
337-
--
338-
-- The agent keeps the amount of labels associated with each registered remote
339-
-- process. This is necessary so the process is unmonitored only when it has
340-
-- been unregistered from all of the labels.
341-
--
331+
{- note [registry monitoring agent]
332+
This agent listens for 'MxRegistered' and 'MxUnRegistered' events and tracks
333+
all labels for remote 'ProcessId's that are stored in the registry.
334+
335+
When a remote process is registered, the agent starts monitoring it until it
336+
is unregistered. We can safely ignore ProcessMonitorNotification's, since the
337+
node controller is obligated to issue an MxUnRegistered event for any labels
338+
connected to a dead process. Thus, our only responsibility here is to ensure
339+
that we set up monitors for locally registered processes, and tear them down
340+
once unregistration occurs.
341+
-}
342342

343343
registryMonitorAgent :: Process ProcessId
344344
registryMonitorAgent = do
345345
nid <- getSelfNode
346-
-- For each process the map associates the 'MonitorRef' used to monitor it and
347-
-- the amount of labels associated with it.
348346
mxAgent registryMonitorAgentId (Map.empty :: Map String MonitorRef)
349-
[ mxSink $ \(ProcessMonitorNotification mref _ _) -> do
350-
mxUpdateLocal $ Map.filter (== mref)
351-
mxReady
352-
, mxSink $ \ev -> do
347+
[ mxSink $ \ev -> do
353348
case ev of
354349
MxRegistered pid label
355350
| processNodeId pid /= nid -> do
@@ -363,9 +358,9 @@ registryMonitorAgent = do
363358
mxUpdateLocal (Map.delete label)
364359
_ -> return ()
365360
mxReady
366-
-- remove async answers from mailbox
367-
, mxSink $ \RegisterReply{} -> mxReady
368-
, mxSink $ \DidUnmonitor{} -> mxReady
361+
-- [note: no need to remove async answers from mailbox]
362+
-- The framework simply discards any input you don't have a handler for.
363+
-- See Management.hs `runAgent` for details.
369364
]
370365

371366
-- | Start and register the service processes on a node
@@ -961,7 +956,11 @@ ncEffectDied ident reason = do
961956

962957
modify' $ (links ^= unaffectedLinks') . (monitors ^= unaffectedMons')
963958

964-
modify' $ registeredHere ^: Map.filter (\pid -> not $ ident `impliesDeathOf` ProcessIdentifier pid)
959+
-- we now consider all labels for this identifier unregistered
960+
let toDrop pid = not $ ident `impliesDeathOf` ProcessIdentifier pid
961+
(keepNames, dropNames) <- Map.partition toDrop <$> gets (^. registeredHere)
962+
mapM_ (\(p, l) -> liftIO $ trace node (MxUnRegistered l p)) (Map.toList dropNames)
963+
modify' $ registeredHere ^= keepNames
965964

966965
remaining <- fmap Map.toList (gets (^. registeredOnNodes)) >>=
967966
mapM (\(pid,nidlist) ->

0 commit comments

Comments
 (0)