Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 26 additions & 2 deletions src/sorunlib/smurf.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,14 +346,36 @@ def shutdown(concurrent=True, settling_time=0):
settling_time=settling_time)


def stream(state, tag=None, subtype=None, **kwargs):
def _wait_for_stream_start(smurf, timeout):
"""Wait for, at most, timeout seconds until the stream for the specified
smurf client is enabled.

Args:
smurf (ocs.ocs_client.OCSClient): pysmurf-controller client.
timeout (int): Timeout for the check in seconds.

"""
for i in range(int(timeout)):
resp = smurf.stream.status()
stream_on = resp.session['data']['stream_on']
if stream_on:
return
time.sleep(1)

raise RuntimeError(f"Stream for {smurf} did not turn on within {timeout} seconds.")


def stream(state, tag=None, subtype=None, wait_for_stream=True, **kwargs):
"""Stream data on all SMuRF Controllers.

Args:
state (str): Streaming state, either 'on' or 'off'.
tag (str, optional): Tag or comma-separated listed of tags to attach to
the operation.
subtype (str, optional): Operation subtype used to tag the stream.
wait_for_stream (bool, optional): If True, block until the streams are
all enabled. If False, check that the client call goes through, but
do not wait. Defaults to True.
**kwargs: Additional keyword arguments. Passed through to the SMuRF
controller unmodified. See the `controller documentation
<https://socs.readthedocs.io/en/main/agents/pysmurf-controller.html#socs.agents.pysmurf_controller.agent.PysmurfController.stream>`_.
Expand All @@ -368,7 +390,9 @@ def stream(state, tag=None, subtype=None, **kwargs):
for smurf in run.CLIENTS['smurf']:
resp = smurf.stream.status()
try:
check_started(smurf, resp, timeout=120)
check_started(smurf, resp, timeout=60)
if wait_for_stream:
_wait_for_stream_start(smurf, timeout=120)
except RuntimeError as e:
print(f"Failed to start stream on {smurf}, removing from targets list.")
print(e)
Expand Down
25 changes: 24 additions & 1 deletion tests/test_smurf.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@

import pytest

import ocs
from ocs.client_http import ControlClientError
from ocs.ocs_client import OCSReply

import sorunlib as run
from sorunlib import smurf
from util import create_patch_clients
from util import create_patch_clients, create_session

os.environ["SORUNLIB_CONFIG"] = "./data/example_config.yaml"

Expand Down Expand Up @@ -202,6 +203,28 @@ def test_stream_single_failure(state):
client.stream.stop.assert_called_once()


@patch('sorunlib.smurf.time.sleep', MagicMock())
def test_wait_for_stream_timeout():
# Make smurf1 stream not turn on
session = create_session('stream', status='running')
session.data = {'stream_on': False}
reply = OCSReply(ocs.OK, 'msg', session.encoded())
smurf.run.CLIENTS['smurf'][0].stream.start = MagicMock(return_value=reply)
smurf.run.CLIENTS['smurf'][0].stream.status = MagicMock(return_value=reply)

# Verify session.data modification
for client in smurf.run.CLIENTS['smurf']:
print(client.stream.status().session['data'])

# Turn on stream, which should drop smurf1
smurf.stream(state='on')
assert len(smurf.run.CLIENTS['smurf']) == 2

# Remaining streams should turn on
for client in smurf.run.CLIENTS['smurf']:
client.stream.start.assert_called_once()


def test_stream_agent_unavailable_on_stop():
# Replace 'smurf1' client with one that will error on stream.stop()
run.CLIENTS['smurf'][0] = ErrorClient('smurf1')
Expand Down
1 change: 1 addition & 0 deletions tests/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ def _mock_smurf_client(instance_id):

# smurf.stream
session = create_session('stream', status='running')
session.data = {'stream_on': True} # Always assume stream turns on
reply = OCSReply(ocs.OK, 'msg', session.encoded())
smurf.stream.start = MagicMock(return_value=reply)
smurf.stream.status = MagicMock(return_value=reply)
Expand Down