Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CAP] [Feature] Get list of actors from directory service. #2073

Merged
merged 22 commits into from
Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
f8417af
Search directory for list of actors using regex '.*' gets all actors
rajan-chari Mar 18, 2024
3b7d95a
docs changes
rajan-chari Mar 19, 2024
a6cd5ac
pre-commit fixes
rajan-chari Mar 19, 2024
3055b78
Use ActorInfo from protobuf
rajan-chari Mar 19, 2024
5858a9c
Merge branch 'main' into rajan/list-actors
rajan-chari Mar 19, 2024
2c79561
Merge branch 'main' into rajan/list-actors
rajan-chari Mar 19, 2024
bed19d7
pre-commit
rajan-chari Mar 19, 2024
588e4b7
Merge branch 'main' into rajan/list-actors
rajan-chari Mar 19, 2024
8963331
Merge branch 'main' into rajan/list-actors
rajan-chari Mar 21, 2024
603804e
Added zmq tests to work on removing sleeps
rajan-chari Mar 25, 2024
11019dd
Merge branch 'rajan/list-actors' of https://github.com/rajan-chari/au…
rajan-chari Mar 25, 2024
16c7b93
minor refactor of zmq tests
rajan-chari Mar 26, 2024
cd35cb1
1) Change DirSvr to user Broker. 2) Add req-router to broker 3) In A…
rajan-chari Mar 26, 2024
5b2b727
1) Change DirSvr to user Broker. 2) Add req-router to broker 3) In A…
rajan-chari Mar 26, 2024
b5f8907
move socket creation to thread with recv
rajan-chari Mar 26, 2024
ecf4fef
move socket creation to thread with recv
rajan-chari Mar 26, 2024
cf18ebf
Better logging for DirectorySvc
rajan-chari Mar 26, 2024
51d0f5c
better logging for directory svc
rajan-chari Mar 26, 2024
bb03669
Use logging config
rajan-chari Mar 26, 2024
bbdf193
Start removing sleeps
rajan-chari Mar 26, 2024
74fc690
pre-commit
rajan-chari Mar 26, 2024
c28fd9e
Cleanup monitor socket
rajan-chari Mar 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
pre-commit
  • Loading branch information
rajan-chari committed Mar 26, 2024
commit 74fc690eddcbb4791a1364688520c7658f527a66
9 changes: 5 additions & 4 deletions samples/apps/cap/py/autogencap/ActorConnector.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from .Config import xsub_url, xpub_url, router_url
from typing import Any, Dict


class ActorConnector:
def __init__(self, context, topic):
self._context = context
Expand All @@ -29,10 +30,10 @@ def _send_recv_router_msg(self):
req_socket = self._context.socket(zmq.REQ)
req_socket.connect(router_url)
try:
Debug("ActorConnector","Broker Check Request Sent")
Debug("ActorConnector", "Broker Check Request Sent")
req_socket.send_string("Request")
_ = req_socket.recv_string()
Debug("ActorConnector","Broker Check Response Received")
Debug("ActorConnector", "Broker Check Response Received")
finally:
req_socket.close()

Expand All @@ -46,8 +47,8 @@ def _connect_pub_socket(self):
evt: Dict[str, Any] = {}
mon_evt = recv_monitor_message(monitor)
evt.update(mon_evt)
if evt['event'] == zmq.EVENT_MONITOR_STOPPED or evt['event'] == zmq.EVENT_HANDSHAKE_SUCCEEDED:
Debug("ActorConnector","Handshake received (Or Monitor stopped)")
if evt["event"] == zmq.EVENT_MONITOR_STOPPED or evt["event"] == zmq.EVENT_HANDSHAKE_SUCCEEDED:
Debug("ActorConnector", "Handshake received (Or Monitor stopped)")
break
monitor.close()
self._send_recv_router_msg()
Expand Down
22 changes: 13 additions & 9 deletions samples/apps/cap/py/autogencap/Broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from autogencap.DebugLog import Debug, Info, Warn
from autogencap.Config import xsub_url, xpub_url, router_url


class Broker:
def __init__(self, context: zmq.Context = zmq.Context()):
self._context: zmq.Context = context
Expand Down Expand Up @@ -37,17 +38,18 @@ def _init_sockets(self):
if self._router:
self._router.close()
return False

def start(self) -> bool:
Debug("BROKER", f"Trying to start broker.")
Debug("BROKER", "Trying to start broker.")
self._run = True
self._broker_thread: threading.Thread = threading.Thread(target=self.thread_fn)
self._broker_thread.start()
time.sleep(0.01)
return True

def stop(self):
if not self._run: return
if not self._run:
return
# Error("BROKER_ERR", "fix cleanup self._context.term()")
Debug("BROKER", "stopped")
self._run = False
Expand All @@ -64,9 +66,9 @@ def thread_fn(self):
try:
if not self._init_sockets():
Debug("BROKER", "Receive thread not started since sockets were not initialized")
self._run = False
self._run = False
return

# Poll sockets for events
self._poller: zmq.Poller = zmq.Poller()
self._poller.register(self._xpub, zmq.POLLIN)
Expand All @@ -81,27 +83,28 @@ def thread_fn(self):
message = self._xpub.recv_multipart()
Debug("BROKER", f"subscription message: {message[0]}")
self._xsub.send_multipart(message)

if self._xsub in events:
message = self._xsub.recv_multipart()
Debug("BROKER", f"publishing message: {message}")
self._xpub.send_multipart(message)

if self._router in events:
message = self._router.recv_multipart()
Debug("BROKER", f"router message: {message}")
# Mirror it back for now to confirm connectivity
# More interesting reserved point to point
# More interesting reserved point to point
# routing coming in the the future
self._router.send_multipart(message)

except Exception as e:
Debug("BROKER", f"thread encountered an error: {e}")
finally:
self._run = False
Debug("BROKER", "thread ended")
return


# Run a standalone broker that all other Actors can connect to.
# This can also run inproc with the other actors.
def main():
Expand Down Expand Up @@ -133,5 +136,6 @@ def main():
Info("BROKER", "KeyboardInterrupt. Stopping the broker.")
broker.stop()


if __name__ == "__main__":
main()
4 changes: 4 additions & 0 deletions samples/apps/cap/py/autogencap/DirectorySvc.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
# TODO (Future DirectorySv PR) use actor description, network_id, other properties to make directory
# service more generic and powerful


class DirectoryActor(Actor):
def __init__(self, topic: str, name: str):
super().__init__(topic, name)
Expand Down Expand Up @@ -86,6 +87,7 @@ def _actor_lookup_msg_handler(self, topic: str, msg_type: str, msg: bytes, sende
serialized_msg = actor_lookup_resp.SerializeToString()
sender_connection.send_bin_msg(ActorLookupResponse.__name__, serialized_msg)


class DirectorySvc:
def __init__(self, context: zmq.Context = zmq.Context()):
self._context: zmq.Context = context
Expand Down Expand Up @@ -151,6 +153,7 @@ def lookup_actor_info_by_name(self, actor_name: str) -> ActorInfoCollection:
return actor_lookup_resp.actor
return None


# Run a standalone directory service
def main():
context: zmq.Context = zmq.Context()
Expand Down Expand Up @@ -192,5 +195,6 @@ def main():
context.term()
Info("main", "Done.")


if __name__ == "__main__":
main()
25 changes: 16 additions & 9 deletions samples/apps/cap/py/demo/zmq_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from zmq.utils.monitor import recv_monitor_message
from autogencap.Config import xsub_url, xpub_url, router_url, dealer_url


def zmq_sub_test():
context = zmq.Context()
sub_socket = context.socket(zmq.SUB)
Expand All @@ -29,18 +30,20 @@ def zmq_sub_test():
print(f"No message received in {elapsed_time:.2f} seconds")
sub_socket.close()


def event_monitor(pub_socket: zmq.Socket) -> None:
monitor = pub_socket.get_monitor_socket()
while monitor.poll():
evt: Dict[str, Any] = {}
mon_evt = recv_monitor_message(monitor)
evt.update(mon_evt)
print(evt)
if evt['event'] == zmq.EVENT_MONITOR_STOPPED or evt['event'] == zmq.EVENT_HANDSHAKE_SUCCEEDED:
if evt["event"] == zmq.EVENT_MONITOR_STOPPED or evt["event"] == zmq.EVENT_HANDSHAKE_SUCCEEDED:
break
monitor.close()

def zmq_pub_test():

def zmq_pub_test():
context = zmq.Context()
pub_socket = context.socket(zmq.PUB)
pub_socket.setsockopt(zmq.XPUB_VERBOSE, 1)
Expand All @@ -52,6 +55,7 @@ def zmq_pub_test():
pub_socket.send_string(str(i))
pub_socket.close()


def zmq_router_dealer_test():
context = zmq.Context()
router_socket = context.socket(zmq.ROUTER)
Expand Down Expand Up @@ -85,13 +89,14 @@ def zmq_router_dealer_test():
message = dealer_socket.recv_multipart()
print("BROKER", f"publishing message: {message[0]}")
router_socket.send_multipart(message)

except Exception as e:
print("BROKER", f"thread encountered an error: {e}")
finally:
print("BROKER", "thread ended")
return


def zmq_req_test(context: zmq.Context = None):
if context is None:
context = zmq.Context()
Expand All @@ -106,6 +111,7 @@ def zmq_req_test(context: zmq.Context = None):
finally:
req_socket.close()


def zmq_rep_test():
context = zmq.Context()
rep_socket = context.socket(zmq.REP)
Expand All @@ -120,19 +126,20 @@ def zmq_rep_test():
finally:
rep_socket.close()


if __name__ == "__main__":
if len(sys.argv) > 1:
if sys.argv[1] == 'pub':
if sys.argv[1] == "pub":
zmq_pub_test()
elif sys.argv[1] == 'sub':
elif sys.argv[1] == "sub":
zmq_sub_test()
elif sys.argv[1] == 'router':
elif sys.argv[1] == "router":
zmq_router_dealer_test()
elif sys.argv[1] == 'req':
elif sys.argv[1] == "req":
zmq_req_test()
elif sys.argv[1] == 'rep':
elif sys.argv[1] == "rep":
zmq_rep_test()
else:
print("Invalid argument. Please use 'pub', 'sub' 'router', 'req', 'rep'")
else:
print("Please provide an argument. Please use 'pub', 'sub' 'router', 'req', 'rep'")
print("Please provide an argument. Please use 'pub', 'sub' 'router', 'req', 'rep'")
Loading