-
Notifications
You must be signed in to change notification settings - Fork 22
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Non blocking reads from process output #105
Changes from all commits
4f2502b
35ef937
6e6dabc
53c6c88
f501d29
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,24 +1,70 @@ | ||
from dataclasses import dataclass, field | ||
from typing import Any, Dict | ||
from queue import Empty, Queue | ||
from threading import Thread | ||
from typing import Any, Dict, List | ||
|
||
from pyreisejl.utility.helpers import get_scenario_status | ||
|
||
|
||
class Listener: | ||
"""Runs in the background to read from stdout/stderr of a long lived | ||
process""" | ||
|
||
def __init__(self, stream): | ||
self.stream = stream | ||
self.queue = Queue() | ||
self._start() | ||
|
||
def _start(self): | ||
t = Thread(target=self._enqueue_output) | ||
t.daemon = True | ||
t.start() | ||
|
||
def _enqueue_output(self): | ||
for line in self.stream: | ||
s = line.decode().strip() | ||
if len(s) > 0: | ||
self.queue.put(s) | ||
self.stream.close() | ||
|
||
def poll(self): | ||
"""Get the latest output from the stream | ||
|
||
:return: (*list*) -- list of lines since previous poll | ||
""" | ||
result = [] | ||
try: | ||
while True: | ||
line = self.queue.get_nowait() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. From: https://docs.python.org/3/library/queue.html#queue.Queue.get_nowait """
""" There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just making sure, does the |
||
result.append(line) | ||
except Empty: # noqa | ||
pass | ||
return result | ||
|
||
|
||
@dataclass | ||
class SimulationState: | ||
"""Track the state of an ongoing simulation""" | ||
|
||
_EXCLUDE = ["proc", "out_listener", "err_listener"] | ||
|
||
scenario_id: int | ||
proc: Any = field(default=None, repr=False, compare=False, hash=False) | ||
output: str = field(default="", repr=False, compare=False, hash=False) | ||
errors: str = field(default="", repr=False, compare=False, hash=False) | ||
output: List = field(default_factory=list, repr=False, compare=False, hash=False) | ||
errors: List = field(default_factory=list, repr=False, compare=False, hash=False) | ||
status: str = None | ||
|
||
def __post_init__(self): | ||
self.out_listener = Listener(self.proc.stdout) | ||
self.err_listener = Listener(self.proc.stderr) | ||
|
||
def _refresh(self): | ||
"""Set the latest status and append the latest output from standard | ||
streams. | ||
""" | ||
self.status = get_scenario_status(self.scenario_id) | ||
self.output += self.proc.stdout.read().decode() | ||
self.errors += self.proc.stderr.read().decode() | ||
self.output += self.out_listener.poll() | ||
self.errors += self.err_listener.poll() | ||
|
||
def as_dict(self): | ||
"""Return custom dict which omits the process attribute which is not | ||
|
@@ -27,11 +73,13 @@ def as_dict(self): | |
:return: (*dict*) -- dict of the instance attributes | ||
""" | ||
self._refresh() | ||
return {k: v for k, v in self.__dict__.items() if k != "proc"} | ||
return {k: v for k, v in self.__dict__.items() if k not in self._EXCLUDE} | ||
|
||
|
||
@dataclass | ||
class ApplicationState: | ||
"""Tracks all simulations during the lifetime of the application""" | ||
|
||
ongoing: Dict[int, SimulationState] = field(default_factory=dict) | ||
|
||
def add(self, entry): | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,41 +1,36 @@ | ||
from pyreisejl.utility.state import ApplicationState, SimulationState | ||
|
||
from subprocess import PIPE, Popen | ||
|
||
class FakeIOStream: | ||
def __init__(self): | ||
self.counter = 0 | ||
import pytest | ||
|
||
def read(self): | ||
self.counter += 1 | ||
return bytes(str(self.counter).encode()) | ||
from pyreisejl.utility.state import ApplicationState, SimulationState | ||
|
||
|
||
class FakeProcess: | ||
def __init__(self): | ||
self.stdout = FakeIOStream() | ||
self.stderr = FakeIOStream() | ||
@pytest.fixture | ||
def test_proc(): | ||
cmd = ["echo", "foo"] | ||
proc = Popen(cmd, stdout=PIPE, stderr=PIPE, start_new_session=True) | ||
return proc | ||
|
||
|
||
def test_scenario_state_refresh(): | ||
entry = SimulationState(1234, FakeProcess()) | ||
entry.as_dict() | ||
assert entry.output == "1" | ||
assert entry.errors == "1" | ||
def test_scenario_state_refresh(test_proc): | ||
entry = SimulationState(123, test_proc) | ||
entry.as_dict() | ||
assert entry.output == "12" | ||
assert entry.errors == "12" | ||
assert entry.output == ["foo"] | ||
assert entry.errors == [] | ||
|
||
|
||
def test_scenario_state_serializable(): | ||
entry = SimulationState(1234, FakeProcess()) | ||
assert "proc" not in entry.as_dict().keys() | ||
def test_scenario_state_serializable(test_proc): | ||
entry = SimulationState(123, test_proc) | ||
keys = entry.as_dict().keys() | ||
assert "proc" not in keys | ||
assert all(["listener" not in k for k in keys]) | ||
|
||
|
||
def test_app_state_get(): | ||
def test_app_state_get(test_proc): | ||
state = ApplicationState() | ||
assert len(state.ongoing) == 0 | ||
|
||
entry = SimulationState(1234, FakeProcess()) | ||
entry = SimulationState(123, test_proc) | ||
state.add(entry) | ||
assert len(state.ongoing) == 1 | ||
assert state.get(1234) is not None | ||
assert state.get(123) is not None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unbounded Queue leaves you vulnerable to OOM issues. Would it be better to limit and throw an exception when out of bounds, or shed oldest entries?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, not sure how much of an issue it is but I will make a note to revisit this. For context, the size of the streams is much smaller than the output data, which is most likely to cause OOM issues.