Skip to content

Registry #178

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions distributed-process.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ Library
Control.Distributed.Process.Internal.WeakTQueue,
Control.Distributed.Process.Management,
Control.Distributed.Process.Node,
Control.Distributed.Process.Node.RegistryAgent
Control.Distributed.Process.Serializable,
Control.Distributed.Process.UnsafePrimitives
Control.Distributed.Process.Management.Internal.Agent,
Expand Down
17 changes: 17 additions & 0 deletions doc/semantics/CloudHaskellSemantics.tex
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,23 @@ \subsection{Ordering and Typed Channels}
for all messages from $P$ to $Q$, rather than using an ordered connection per
typed channel plus one for direct messages.

\subsection{Registry}
The identifiers of both local and remote processes can be stored in the Registry.
The operation \texttt{registerRemoteAsync} can register processes at remote nodes.
When a message is sent to a remote node using \texttt{nsendRemote} there is no
guarantee that the process that should receive the message is located at the
node; thus it may be necessary to relay the message to a process on yet another node.

Both operations \texttt{nsend} and \texttt{nsendRemote} discard the messages if
no process is registered with the given name, or already dead.

Both \texttt{nsend} and \texttt{nsendRemote} guarantee ordering between messages
sent between two processes using one of those mechanism, however ordering between
messages sent by \texttt{send} and \texttt{nsend} is not preserved.

Current implementation for monitoring processes that are stored in registry
are left unspecified, this may be changed in future.

\bibliographystyle{apalike}
\bibliography{references}

Expand Down
5 changes: 3 additions & 2 deletions src/Control/Distributed/Process/Internal/Primitives.hs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ module Control.Distributed.Process.Internal.Primitives
, unlink
, monitor
, unmonitor
, unmonitorAsync
, withMonitor
-- * Logging
, say
Expand Down Expand Up @@ -121,6 +122,7 @@ import Prelude hiding (catch)
#endif

import Data.Binary (decode)
import Data.Foldable (traverse_)
import Data.Time.Clock (getCurrentTime)
import Data.Time.Format (formatTime)
import System.Locale (defaultTimeLocale)
Expand Down Expand Up @@ -1110,8 +1112,7 @@ whereisRemoteAsync nid label =

-- | Named send to a process in the local registry (asynchronous)
nsend :: Serializable a => String -> a -> Process ()
nsend label msg =
sendCtrlMsg Nothing (NamedSend label (createUnencodedMessage msg))
nsend label msg = traverse_ (`send` msg) =<< whereis label

-- | Named send to a process in the local registry (asynchronous).
-- This function makes /no/ attempt to serialize and (in the case when the
Expand Down
24 changes: 18 additions & 6 deletions src/Control/Distributed/Process/Node.hs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ import Control.Exception
)
import qualified Control.Exception as Exception (Handler(..), catches, finally)
import Control.Concurrent (forkIO, forkIOWithUnmask, myThreadId)
import Control.Distributed.Process.Node.RegistryAgent (registryMonitorAgent)
import Control.Distributed.Process.Internal.StrictMVar
( newMVar
, withMVar
Expand Down Expand Up @@ -159,7 +160,8 @@ import Control.Distributed.Process.Internal.Types
, createUnencodedMessage
, runLocalProcess
, firstNonReservedProcessId
, ImplicitReconnect(WithImplicitReconnect)
, ImplicitReconnect(NoImplicitReconnect, WithImplicitReconnect)
, messageToPayload
)
import Control.Distributed.Process.Management.Internal.Agent
( mxAgentController
Expand Down Expand Up @@ -187,6 +189,7 @@ import Control.Distributed.Process.Serializable (Serializable)
import Control.Distributed.Process.Internal.Messaging
( sendBinary
, sendMessage
, sendPayload
, closeImplicitReconnections
, impliesDeathOf
)
Expand Down Expand Up @@ -296,6 +299,7 @@ startServiceProcesses node = do
runProcess node $ register Table.mxTableCoordinator tableCoordinatorPid
logger <- forkProcess node loop
runProcess node $ register "logger" logger
runProcess node $ void $ registryMonitorAgent
where
fork = forkProcess node

Expand Down Expand Up @@ -686,8 +690,8 @@ nodeController = do
ncEffectRegister from label atnode pid force
NCMsg (ProcessIdentifier from) (WhereIs label) ->
ncEffectWhereIs from label
NCMsg _ (NamedSend label msg') ->
ncEffectNamedSend label msg'
NCMsg (ProcessIdentifier from) (NamedSend label msg') ->
ncEffectNamedSend from label msg'
NCMsg _ (LocalSend to msg') ->
ncEffectLocalSend node to msg'
NCMsg _ (LocalPortSend to msg') ->
Expand Down Expand Up @@ -904,11 +908,19 @@ ncEffectWhereIs from label = do
(WhereIsReply label mPid)

-- [Unified: Table 14]
ncEffectNamedSend :: String -> Message -> NC ()
ncEffectNamedSend label msg = do
ncEffectNamedSend :: ProcessId -> String -> Message -> NC ()
ncEffectNamedSend from label msg = do
mPid <- gets (^. registeredHereFor label)
-- If mPid is Nothing, we just ignore the named send (as per Table 14)
forM_ mPid $ \pid -> postMessage pid msg
node <- ask
forM_ mPid $ \pid ->
if isLocal node (ProcessIdentifier pid)
then postMessage pid msg
else liftIO $ sendPayload node
(ProcessIdentifier from)
(ProcessIdentifier pid)
NoImplicitReconnect
(messageToPayload msg)

-- [Issue #DP-20]
ncEffectLocalSend :: LocalNode -> ProcessId -> Message -> NC ()
Expand Down
64 changes: 64 additions & 0 deletions src/Control/Distributed/Process/Node/RegistryAgent.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
{-# LANGUAGE BangPatterns #-}
-----------------------------------------------------------------------------
---- |
---- Module : Control.Distributed.Process.Node.RegistryAgent
---- Copyright : (c) Tweag I/O 2015
---- License : BSD3 (see the file LICENSE)
----
---- Maintainer : Tim Watson <watson.timothy@gmail.com>
---- Stability : experimental
---- Portability : non-portable (requires concurrency)
----
---- This module provides registry monitoring agent, implemented as
---- a /distributed-process Management Agent/. Once 'node' starts it run this
---- agent, such agent will monitor every remove process that is added to node
---- and remove Processes from registry if they die.
----
-------------------------------------------------------------------------------

module Control.Distributed.Process.Node.RegistryAgent
( registryMonitorAgent
) where

import Control.Distributed.Process.Management
import Control.Distributed.Process.Internal.Types
import Control.Distributed.Process.Internal.Primitives
import Data.Foldable (forM_)
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as Map

registryMonitorAgentId :: MxAgentId
registryMonitorAgentId = MxAgentId "service.registry.monitoring"

registryMonitorAgent :: Process ProcessId
registryMonitorAgent = do
mxAgent registryMonitorAgentId initState
[ mxSink $ \(ProcessMonitorNotification _ pid _) -> do
mxUpdateLocal (Map.delete pid)
mxReady
, mxSink $ \ev ->
let act = case ev of
MxRegistered pid _ -> do
hm <- mxGetLocal
case pid `Map.lookup` hm of
Nothing -> do
mon <- liftMX $ monitor pid
mxUpdateLocal (Map.insert pid (mon, 1))
Just _ -> return ()
MxUnRegistered pid _ -> do
hm <- mxGetLocal
forM_ (pid `Map.lookup` hm) $ \(mref, i) ->
let !i' = succ i
in if i' == 0
then do liftMX $ unmonitorAsync mref
mxSetLocal $! pid `Map.delete` hm
else mxSetLocal $ Map.insert pid (mref,i') hm
_ -> return ()
in act >> mxReady
-- remove async answers from mailbox
, mxSink $ \RegisterReply{} -> mxReady
, mxSink $ \DidUnmonitor{} -> mxReady
]
where
initState :: Map ProcessId (MonitorRef,Int)
initState = Map.empty