Skip to content

Implement monitoring for registry values #184

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions distributed-process.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ Library
Control.Distributed.Process.Internal.WeakTQueue,
Control.Distributed.Process.Management,
Control.Distributed.Process.Node,
Control.Distributed.Process.Node.RegistryAgent
Control.Distributed.Process.Serializable,
Control.Distributed.Process.UnsafePrimitives
Control.Distributed.Process.Management.Internal.Agent,
Expand Down
1 change: 1 addition & 0 deletions src/Control/Distributed/Process/Internal/Primitives.hs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ module Control.Distributed.Process.Internal.Primitives
, unlink
, monitor
, unmonitor
, unmonitorAsync
, withMonitor
-- * Logging
, say
Expand Down
2 changes: 2 additions & 0 deletions src/Control/Distributed/Process/Management/Internal/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import Control.Distributed.Process.Internal.Types
, NodeId
)
import Control.Monad.IO.Class (MonadIO)
import Control.Monad.Fix (MonadFix)
import qualified Control.Monad.State as ST
( MonadState
, StateT
Expand Down Expand Up @@ -123,6 +124,7 @@ newtype MxAgent s a =
} deriving ( Functor
, Monad
, MonadIO
, MonadFix
, ST.MonadState (MxAgentState s)
, Typeable
, Applicative
Expand Down
2 changes: 2 additions & 0 deletions src/Control/Distributed/Process/Node.hs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ import Control.Exception
)
import qualified Control.Exception as Exception (Handler(..), catches, finally)
import Control.Concurrent (forkIO, forkIOWithUnmask, killThread, myThreadId)
import Control.Distributed.Process.Node.RegistryAgent (registryMonitorAgent)
import Control.Distributed.Process.Internal.StrictMVar
( newMVar
, withMVar
Expand Down Expand Up @@ -306,6 +307,7 @@ startServiceProcesses node = do
-- loops during tracing if the user reregisters the "logger" with a custom
-- process which uses 'send' or other primitives which are traced.
register "trace.logger" logger
void $ registryMonitorAgent
where
fork = forkProcess node

Expand Down
80 changes: 80 additions & 0 deletions src/Control/Distributed/Process/Node/RegistryAgent.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
{-# LANGUAGE RecursiveDo #-}
-----------------------------------------------------------------------------
---- |
---- Module : Control.Distributed.Process.Node.RegistryAgent
---- Copyright : (c) Tweag I/O 2015
---- License : BSD3 (see the file LICENSE)
----
---- Maintainer : Tim Watson <watson.timothy@gmail.com>
---- Stability : experimental
---- Portability : non-portable (requires concurrency)
----
---- This module provides a registry monitoring agent, implemented as a
---- /distributed-process Management Agent/. Every 'node' starts this agent on
---- startup. The agent will monitor every remote process that was added to the
---- local registry, so the node removes the process from the registry when it
---- dies or when a network failure is detected.
----
-------------------------------------------------------------------------------

module Control.Distributed.Process.Node.RegistryAgent
( registryMonitorAgent
) where

import Control.Distributed.Process.Management
import Control.Distributed.Process.Internal.Types
import Control.Distributed.Process.Internal.Primitives
import Data.Foldable (forM_)
import Data.Map (Map)
import qualified Data.Map as Map

registryMonitorAgentId :: MxAgentId
registryMonitorAgentId = MxAgentId "service.registry.monitoring"

-- | Registry monitor agent
--
-- This agent listens for 'MxRegistered' and 'MxUnRegistered' events and tracks
-- all remote 'ProcessId's that are stored in the registry.
--
-- When a remote process is registered, the agent starts monitoring it until it
-- is unregistered or the monitor notification arrives.
--
-- The agent keeps the amount of labels associated with each registered remote
-- process. This is necessary so the process is unmonitored only when it has
-- been unregistered from all of the labels.
--
registryMonitorAgent :: Process ProcessId
registryMonitorAgent = do
nid <- getSelfNode
-- For each process the map associates the 'MonitorRef' used to monitor it and
-- the amount of labels associated with it.
mxAgent registryMonitorAgentId (Map.empty :: Map ProcessId (MonitorRef, Int))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any downside to simply having as many MonitorRef's as there are registered labels, rather than reference counting? I don't think the memory saving (if any) is worth it, seeing as multiple labels for the same ProcessId is not the common case. Dropping the reference counting would make the code simpler, I reckon.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that will work, the differences are memory footprint and complexity of the MxUnRegistered case, that will be slightly higher (we will need traverse the structure and remove MonitorRef). MxRegistered will be simplified, another one will be more or less the same.
So I'm open to this change.

@facundominguez, any thoughts here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not convinced the rewriting is worth the trouble. We already have a reasonable PR. Your call.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

complexity of MxUnRegistered case, that will be slightly higher (we will need traverse the structure and remove MonitorRef).

I don't see it. The following is untested but compiles:

        case ev of
          MxRegistered pid _
            | processNodeId pid /= nid -> do
              mref <- liftMX $ monitor pid
              mxUpdateLocal (Map.insert pid mref)
          MxUnRegistered pid _
            | processNodeId pid /= nid -> do
              mrefs <- mxGetLocal
              forM_ (pid `Map.lookup` mrefs) $ \mref -> do
                liftMX $ unmonitorAsync mref
                mxUpdateLocal (Map.delete pid)
          _ -> return ()

I expect the memory usage to be pretty much the exactly the same as with reference counting: monitors are cheap, and registering a process under multiple labels is uncommon anyways.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code above do not solve the problem that original version intended to solve, i.e. if process is registered multiple times but unregistered only once then it will be removed from map, but it shouldn't. Also I'm not terribly happy about "leaking" mrefs. I have to check code in order to understand if its safe to do. Also here we have a monitor call for each registered process (I can agree that it's a rare case).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops - late at night pasted wrong code snippet. I meant to paste this one:

        case ev of
          MxRegistered pid label
            | processNodeId pid /= nid -> do
              mref <- liftMX $ monitor pid
              mxUpdateLocal (Map.insert label mref)
          MxUnRegistered pid label
            | processNodeId pid /= nid -> do
              mrefs <- mxGetLocal
              forM_ (label `Map.lookup` mrefs) $ \mref -> do
                liftMX $ unmonitorAsync mref
                mxUpdateLocal (Map.delete label)
          _ -> return ()

i.e. as the previous comment was saying, one monitor ref per label, not per process. The only difference with the previous snippet is the key of the map. Everything else is the same. BUT, there is a complication, as you point out. It's just that it's not in the MxUnregistered case but in the ProcessMonitorNotification case, which when written out becomes something like:

mxSink $ \(ProcessMonitorNotification mref _ _) -> do
        mxUpdateLocal $ Map.filter (== mref)
        mxReady

I assume you meant to write ProcessMonitorNotification instead of MxUnregistered. The code is just as simple, but note the linear operation, which I also assume is what you were referring to. It's the same linear operation that's already here, over the exact same set of labels. Processes shouldn't be overusing the registry to register thousands of processes, so that's just about ok.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about Monoid m => Map ProcesaId (m MonitorRef) in that case complexity is in MxUnregister because delete is linear or logarithmic there. So I think I was correct but thought about different solution in design space.
About this code we need to check that reregistration MxUnregistred then MxRegistered exactly in this order, otherwise MxRegistered case is not correct, but I hope that ordering and existence of messages are provided.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, if events get dropped then even the current code will have trouble. Events on the management bus shouldn't get dropped.

If we must assume that messages do get reordered (because the Management API documentation claims to provide no guarantees), or dropped or duplicated, then I'm not sure that using the Management API for building this functionality is a great option after all.

As @facundominguez says, not worth agonizing over the implementation of this function, since it's basically correct as-is. Though on the assumption that the Management API does provide some guarantees. @hyperthunk could you confirm what those are precisely?

[ mxSink $ \(ProcessMonitorNotification _ pid _) -> do
mxUpdateLocal (Map.delete pid)
mxReady
, mxSink $ \ev -> do
case ev of
MxRegistered pid _
| processNodeId pid /= nid -> do
hm <- mxGetLocal
m <- liftMX $ mdo
let (v,m) = Map.insertLookupWithKey (\_ (m',r) _ -> (m',r+1))
pid (mref,1) hm
mref <- maybe (monitor pid) (return . fst) v
return m
mxSetLocal m
MxUnRegistered pid _
| processNodeId pid /= nid -> do
hm <- mxGetLocal
forM_ (pid `Map.lookup` hm) $ \(mref, i) ->
let i' = pred i
in if i' == 0
then do liftMX $ unmonitorAsync mref
mxSetLocal $! pid `Map.delete` hm
else mxSetLocal $ Map.insert pid (mref,i') hm
_ -> return ()
mxReady
-- remove async answers from mailbox
, mxSink $ \RegisterReply{} -> mxReady
, mxSink $ \DidUnmonitor{} -> mxReady
]