Skip to content

Commit d5b09a3

Browse files
committed
add parallel stdout/stderr streaming option
1 parent 7e0bb6b commit d5b09a3

File tree

2 files changed

+106
-10
lines changed

2 files changed

+106
-10
lines changed

pkg/pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ namespaces = true
1515

1616
# ----------------------------------------- Project Metadata -------------------------------------
1717
[project]
18-
version = "0.1.0"
18+
version = "0.2.0"
1919
name = "PyShellMan"
2020
requires-python = ">=3.10"
2121
dependencies = [

pkg/src/pyshellman/shell.py

Lines changed: 105 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
from types import SimpleNamespace as _SimpleNamespace
55
import subprocess as _subprocess
66
from pathlib import Path as _Path
7+
import threading as _threading
8+
import sys as _sys
79

810
from pyshellman import exception as _exception
911
from pyshellman.output import ShellOutput as _ShellOutput
@@ -15,8 +17,11 @@ class Runner:
1517

1618
def __init__(
1719
self,
20+
*,
1821
pre_command: list[str] | None = None,
1922
cwd: str | _Path | None = None,
23+
stream_stdout: bool = False,
24+
stream_stderr: bool = False,
2025
raise_execution: bool = True,
2126
raise_exit_code: bool = True,
2227
raise_stderr: bool = False,
@@ -31,6 +36,8 @@ def __init__(
3136
):
3237
self.pre_command = pre_command or []
3338
self.cwd = _Path(cwd).resolve() if cwd else None
39+
self.stream_stdout = stream_stdout
40+
self.stream_stderr = stream_stderr
3441
self.raise_execution = raise_execution
3542
self.raise_exit_code = raise_exit_code
3643
self.raise_stderr = raise_stderr
@@ -47,7 +54,10 @@ def __init__(
4754
def run(
4855
self,
4956
command: list[str],
57+
*,
5058
cwd: str | _Path | None = None,
59+
stream_stdout: bool | None = None,
60+
stream_stderr: bool | None = None,
5161
raise_execution: bool | None = None,
5262
raise_exit_code: bool | None = None,
5363
raise_stderr: bool | None = None,
@@ -62,16 +72,17 @@ def run(
6272
args = self._get_run_args(locals())
6373
command = self.pre_command + command
6474
cwd = _Path(args.cwd).resolve() if args.cwd else None
65-
try:
66-
process = _subprocess.run(command, text=args.text_output, cwd=args.cwd, capture_output=True)
67-
except FileNotFoundError:
68-
stdout = None
69-
stderr = None
70-
code = None
75+
76+
if args.stream_stdout or args.stream_stderr:
77+
func = self._run_stream
78+
kwargs = {
79+
"stream_stdout": args.stream_stdout,
80+
"stream_stderr": args.stream_stderr,
81+
}
7182
else:
72-
stdout = (process.stdout.strip() if args.text_output else process.stdout) or None
73-
stderr = (process.stderr.strip() if args.text_output else process.stderr) or None
74-
code = process.returncode
83+
func = self._run_nostream
84+
kwargs = {}
85+
stdout, stderr, code = func(command, args, **kwargs)
7586
output = _ShellOutput(
7687
title=args.log_title,
7788
command=command,
@@ -110,10 +121,93 @@ def _get_run_args(self, args: dict) -> _SimpleNamespace:
110121
}
111122
return _SimpleNamespace(**final_args)
112123

124+
@staticmethod
125+
def _run_nostream(
126+
command: list[str],
127+
args: _SimpleNamespace
128+
) -> tuple[str | bytes | None, str | bytes | None, int | None]:
129+
try:
130+
process = _subprocess.run(command, text=args.text_output, cwd=args.cwd, capture_output=True)
131+
except FileNotFoundError:
132+
return None, None, None
133+
stdout = (process.stdout.strip() if args.text_output else process.stdout) or None
134+
stderr = (process.stderr.strip() if args.text_output else process.stderr) or None
135+
code = process.returncode
136+
return stdout, stderr, code
137+
138+
@staticmethod
139+
def _run_stream(
140+
command: list[str],
141+
args: _SimpleNamespace,
142+
stream_stdout: bool,
143+
stream_stderr: bool,
144+
) -> tuple[str | bytes | None, str | bytes | None, int | None]:
145+
"""Run a subprocess while printing and capturing stdout/stderr."""
146+
try:
147+
process = _subprocess.Popen(
148+
command,
149+
text=args.text_output,
150+
cwd=args.cwd,
151+
stdout=_subprocess.PIPE,
152+
stderr=_subprocess.PIPE,
153+
bufsize=1 if args.text_output else 0,
154+
)
155+
except FileNotFoundError:
156+
return None, None, None
157+
158+
stdout_chunks: list[str] | list[bytes] = []
159+
stderr_chunks: list[str] | list[bytes] = []
160+
161+
def read_stream(stream, chunks, live: bool):
162+
if args.text_output:
163+
for line in iter(stream.readline, ''):
164+
chunks.append(line)
165+
if live:
166+
print(line, end='', flush=True)
167+
else:
168+
# Read in binary chunks; choose a reasonable block size (e.g., 4096)
169+
while True:
170+
chunk = stream.read(4096)
171+
if not chunk:
172+
break
173+
if live:
174+
# Write bytes directly to stdout.buffer (bypass text encoding)
175+
_sys.stdout.buffer.write(chunk)
176+
_sys.stdout.buffer.flush()
177+
chunks.append(chunk)
178+
stream.close()
179+
180+
threads = [
181+
_threading.Thread(
182+
target=read_stream,
183+
args=(process.stdout, stdout_chunks, stream_stdout),
184+
),
185+
_threading.Thread(
186+
target=read_stream,
187+
args=(process.stderr, stderr_chunks, stream_stderr),
188+
),
189+
]
190+
191+
for t in threads:
192+
t.start()
193+
process.wait()
194+
for t in threads:
195+
t.join()
196+
if args.text_output:
197+
stdout = ''.join(stdout_chunks)
198+
stderr = ''.join(stderr_chunks)
199+
else:
200+
stdout = b''.join(stdout_chunks)
201+
stderr = b''.join(stderr_chunks)
202+
return stdout, stderr, process.returncode
203+
113204

114205
def run(
115206
command: list[str],
207+
*,
116208
cwd: str | _Path | None = None,
209+
stream_stdout: bool = False,
210+
stream_stderr: bool = False,
117211
raise_execution: bool = True,
118212
raise_exit_code: bool = True,
119213
raise_stderr: bool = False,
@@ -128,6 +222,8 @@ def run(
128222
) -> _ShellOutput:
129223
return Runner(
130224
cwd=cwd,
225+
stream_stdout=stream_stdout,
226+
stream_stderr=stream_stderr,
131227
raise_execution=raise_execution,
132228
raise_exit_code=raise_exit_code,
133229
raise_stderr=raise_stderr,

0 commit comments

Comments
 (0)