Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Non blocking reads from process output #105

Merged
merged 5 commits into from
Feb 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file modified pyreisejl/utility/__init__.py
100755 → 100644
Empty file.
60 changes: 54 additions & 6 deletions pyreisejl/utility/state.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,70 @@
from dataclasses import dataclass, field
from typing import Any, Dict
from queue import Empty, Queue
from threading import Thread
from typing import Any, Dict, List

from pyreisejl.utility.helpers import get_scenario_status


class Listener:
"""Runs in the background to read from stdout/stderr of a long lived
process"""

def __init__(self, stream):
self.stream = stream
self.queue = Queue()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unbounded Queue leaves you vulnerable to OOM issues. Would it be better to limit and throw an exception when out of bounds, or shed oldest entries?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, not sure how much of an issue it is but I will make a note to revisit this. For context, the size of the streams is much smaller than the output data, which is most likely to cause OOM issues.

self._start()

def _start(self):
t = Thread(target=self._enqueue_output)
t.daemon = True
t.start()

def _enqueue_output(self):
for line in self.stream:
s = line.decode().strip()
if len(s) > 0:
self.queue.put(s)
self.stream.close()

def poll(self):
"""Get the latest output from the stream

:return: (*list*) -- list of lines since previous poll
"""
result = []
try:
while True:
line = self.queue.get_nowait()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From: https://docs.python.org/3/library/queue.html#queue.Queue.get_nowait

"""
exception queue.Empty

Exception raised when non-blocking get() (or get_nowait()) is called on a Queue object which is empty.

"""
Seems to indicate that this call could raise an exception... do you want to catch and ignore here or annotate your docstring to indicate this could be raised?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just making sure, does the except block below already do this?

result.append(line)
except Empty: # noqa
pass
return result


@dataclass
class SimulationState:
"""Track the state of an ongoing simulation"""

_EXCLUDE = ["proc", "out_listener", "err_listener"]

scenario_id: int
proc: Any = field(default=None, repr=False, compare=False, hash=False)
output: str = field(default="", repr=False, compare=False, hash=False)
errors: str = field(default="", repr=False, compare=False, hash=False)
output: List = field(default_factory=list, repr=False, compare=False, hash=False)
errors: List = field(default_factory=list, repr=False, compare=False, hash=False)
status: str = None

def __post_init__(self):
self.out_listener = Listener(self.proc.stdout)
self.err_listener = Listener(self.proc.stderr)

def _refresh(self):
"""Set the latest status and append the latest output from standard
streams.
"""
self.status = get_scenario_status(self.scenario_id)
self.output += self.proc.stdout.read().decode()
self.errors += self.proc.stderr.read().decode()
self.output += self.out_listener.poll()
self.errors += self.err_listener.poll()

def as_dict(self):
"""Return custom dict which omits the process attribute which is not
Expand All @@ -27,11 +73,13 @@ def as_dict(self):
:return: (*dict*) -- dict of the instance attributes
"""
self._refresh()
return {k: v for k, v in self.__dict__.items() if k != "proc"}
return {k: v for k, v in self.__dict__.items() if k not in self._EXCLUDE}


@dataclass
class ApplicationState:
"""Tracks all simulations during the lifetime of the application"""

ongoing: Dict[int, SimulationState] = field(default_factory=dict)

def add(self, entry):
Expand Down
45 changes: 20 additions & 25 deletions pyreisejl/utility/tests/test_state.py
Original file line number Diff line number Diff line change
@@ -1,41 +1,36 @@
from pyreisejl.utility.state import ApplicationState, SimulationState

from subprocess import PIPE, Popen

class FakeIOStream:
def __init__(self):
self.counter = 0
import pytest

def read(self):
self.counter += 1
return bytes(str(self.counter).encode())
from pyreisejl.utility.state import ApplicationState, SimulationState


class FakeProcess:
def __init__(self):
self.stdout = FakeIOStream()
self.stderr = FakeIOStream()
@pytest.fixture
def test_proc():
cmd = ["echo", "foo"]
proc = Popen(cmd, stdout=PIPE, stderr=PIPE, start_new_session=True)
return proc


def test_scenario_state_refresh():
entry = SimulationState(1234, FakeProcess())
entry.as_dict()
assert entry.output == "1"
assert entry.errors == "1"
def test_scenario_state_refresh(test_proc):
entry = SimulationState(123, test_proc)
entry.as_dict()
assert entry.output == "12"
assert entry.errors == "12"
assert entry.output == ["foo"]
assert entry.errors == []


def test_scenario_state_serializable():
entry = SimulationState(1234, FakeProcess())
assert "proc" not in entry.as_dict().keys()
def test_scenario_state_serializable(test_proc):
entry = SimulationState(123, test_proc)
keys = entry.as_dict().keys()
assert "proc" not in keys
assert all(["listener" not in k for k in keys])


def test_app_state_get():
def test_app_state_get(test_proc):
state = ApplicationState()
assert len(state.ongoing) == 0

entry = SimulationState(1234, FakeProcess())
entry = SimulationState(123, test_proc)
state.add(entry)
assert len(state.ongoing) == 1
assert state.get(1234) is not None
assert state.get(123) is not None