Skip to content

Commit 63d7fc0

Browse files
The actor offers publishing via control protocol (+test)
1 parent d5664e9 commit 63d7fc0

File tree

2 files changed

+37
-2
lines changed

2 files changed

+37
-2
lines changed

pyleco/actors/actor.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
import zmq
3030

3131
from ..utils.message_handler import MessageHandler
32-
from ..utils.data_publisher import DataPublisher
32+
from ..utils.extended_data_publisher import ExtendedDataPublisher as DataPublisher
3333
from ..utils.timers import RepeatingTimer
3434

3535

@@ -101,7 +101,9 @@ def __init__(
101101
self.pipeL.connect(f"inproc://listenerPipe:{pipe_port}")
102102

103103
self.timer = RepeatingTimer(interval=periodic_reading, function=self.queue_readout)
104-
self.publisher = DataPublisher(full_name=name, log=self.root_logger)
104+
self.publisher = DataPublisher(
105+
full_name=name, log=self.root_logger, send_message_method=self.send_message
106+
)
105107

106108
if auto_connect:
107109
self.connect(**auto_connect)
@@ -118,6 +120,8 @@ def register_rpc_methods(self) -> None:
118120
self.register_rpc_method(self.set_polling_interval)
119121
self.register_rpc_method(self.connect)
120122
self.register_rpc_method(self.disconnect)
123+
self.register_rpc_method(self.register_subscriber)
124+
self.register_rpc_method(self.unregister_subscriber)
121125

122126
def register_device_method(self, method: Callable) -> None:
123127
"""Make a device method available via RPC. The method name is prefixed with `device.`."""
@@ -271,3 +275,9 @@ def call_action(self, action: str, args: Optional[Sequence] = None,
271275
for attr in path[:-1]:
272276
obj = getattr(obj, attr)
273277
return getattr(obj, path[-1])(*args, **kwargs)
278+
279+
def register_subscriber(self):
280+
self.publisher.register_subscriber(self.current_message.sender)
281+
282+
def unregister_subscriber(self):
283+
self.publisher.unregister_subscriber(self.current_message.sender)

tests/acceptance_tests/test_director_actor.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929

3030
import pytest
3131

32+
from pyleco.core.message import MessageTypes
3233
from pyleco.coordinators.coordinator import Coordinator
3334
from pyleco.actors.actor import Actor
3435
from pyleco.directors.director import Director
@@ -83,10 +84,14 @@ def binary_method_created(additional_payload: list[bytes]) -> tuple[None, list[b
8384
"""Receive binary data and return it. Create binary method by registering it."""
8485
return None, [additional_payload[0] * 2]
8586

87+
def publish():
88+
actor.publisher.send_data("super content")
89+
8690
actor.register_rpc_method(binary_method_manually)
8791
actor.register_binary_rpc_method(
8892
binary_method_created, accept_binary_input=True, return_binary_output=True
8993
)
94+
actor.register_rpc_method(publish)
9095
actor.connect()
9196
actor.rpc.method()(actor.device.triple)
9297
actor.register_device_method(actor.device.triple)
@@ -165,3 +170,23 @@ def test_binary_data_transfer_created(director: Director):
165170
assert director.ask_rpc(
166171
method="binary_method_created", additional_payload=[b"123"], extract_additional_payload=True
167172
) == (None, [b"123123"])
173+
174+
175+
def test_data_via_control_protocol(director: Director):
176+
# act
177+
director.ask_rpc("register_subscriber")
178+
director.ask_rpc("publish")
179+
180+
msg = director.communicator.read_message()
181+
director.communicator.send(
182+
receiver=director.actor, # type: ignore
183+
data={"jsonrpc": "2.0", "id": 1, "result": None},
184+
conversation_id=msg.conversation_id,
185+
message_type=MessageTypes.JSON,
186+
)
187+
188+
# teardown
189+
director.ask_rpc("unregister_subscriber")
190+
191+
assert msg.data == {"jsonrpc": "2.0", "id": 1, "method": "set_subscription_message"}
192+
assert msg.payload[1:] == [b'super content']

0 commit comments

Comments
 (0)