Skip to content

Commit 76e0520

Browse files
committed
stdout logging to stdout.log
1 parent de18278 commit 76e0520

File tree

3 files changed

+147
-0
lines changed

3 files changed

+147
-0
lines changed
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
import time
2+
3+
import mlflow
4+
5+
mlflow.set_tracking_uri("http://127.0.0.1:5002")
6+
7+
if __name__ == "__main__":
8+
mlflow.set_experiment("stdout_test")
9+
10+
print("Testing stdout logging integration...")
11+
12+
with mlflow.start_run(log_stdout=True, log_stdout_interval=3) as run:
13+
print(f"MLflow Run ID: {run.info.run_id}")
14+
print("This should appear in both terminal and MLflow!")
15+
16+
N_LOGS = 30
17+
for i in range(N_LOGS):
18+
print(f"Message {i + 1}/{N_LOGS}")
19+
time.sleep(1)
20+
21+
print("Test completed!")
22+
23+
print("This message should only appear in terminal (run has ended)")

mlflow/tracking/fluent.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@
105105

106106

107107
run_id_to_system_metrics_monitor = {}
108+
run_id_to_stdout_logger = {}
108109

109110

110111
_active_run_stack = ThreadLocalVariable(default_factory=lambda: [])
@@ -267,6 +268,8 @@ def start_run(
267268
tags: Optional[dict[str, Any]] = None,
268269
description: Optional[str] = None,
269270
log_system_metrics: Optional[bool] = None,
271+
log_stdout: Optional[bool] = None,
272+
log_stdout_interval: int = 5,
270273
) -> ActiveRun:
271274
"""
272275
Start a new MLflow run, setting it as the active run under which metrics and parameters
@@ -309,6 +312,11 @@ def start_run(
309312
to MLflow, e.g., cpu/gpu utilization. If None, we will check environment variable
310313
`MLFLOW_ENABLE_SYSTEM_METRICS_LOGGING` to determine whether to log system metrics.
311314
System metrics logging is an experimental feature in MLflow 2.8 and subject to change.
315+
log_stdout: bool, defaults to None. If True, stdout will be captured and periodically
316+
logged to MLflow as an artifact named 'stdout.log'. If False, stdout logging is
317+
disabled. If None, stdout logging is disabled by default.
318+
log_stdout_interval: int, defaults to 5. The interval in seconds at which to log
319+
the captured stdout to MLflow. Only used when log_stdout is True.
312320
313321
Returns:
314322
:py:class:`mlflow.ActiveRun` object that acts as a context manager wrapping the
@@ -502,6 +510,19 @@ def start_run(
502510
_logger.error(f"Failed to start system metrics monitoring: {e}.")
503511

504512
active_run_stack.append(ActiveRun(active_run_obj))
513+
514+
if log_stdout:
515+
try:
516+
from mlflow.utils.stdout_logging import log_stdout_stream
517+
518+
# Create a context manager that will be entered when the ActiveRun is used
519+
stdout_logger = log_stdout_stream(interval_seconds=log_stdout_interval)
520+
run_id_to_stdout_logger[active_run_obj.info.run_id] = stdout_logger
521+
# Start the stdout logging
522+
stdout_logger.__enter__()
523+
except Exception as e:
524+
_logger.error(f"Failed to start stdout logging: {e}.")
525+
505526
return active_run_stack[-1]
506527

507528

@@ -548,6 +569,12 @@ def end_run(status: str = RunStatus.to_string(RunStatus.FINISHED)) -> None:
548569
if last_active_run_id in run_id_to_system_metrics_monitor:
549570
system_metrics_monitor = run_id_to_system_metrics_monitor.pop(last_active_run_id)
550571
system_metrics_monitor.finish()
572+
if last_active_run_id in run_id_to_stdout_logger:
573+
stdout_logger = run_id_to_stdout_logger.pop(last_active_run_id)
574+
try:
575+
stdout_logger.__exit__(None, None, None)
576+
except Exception as e:
577+
_logger.error(f"Failed to stop stdout logging: {e}.")
551578

552579

553580
def _safe_end_run():

mlflow/utils/stdout_logging.py

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
import sys
2+
import threading
3+
import time
4+
from contextlib import contextmanager
5+
from io import StringIO
6+
7+
import mlflow
8+
9+
10+
class TeeStringIO:
11+
"""A file-like object that writes to both original stdout and a StringIO buffer."""
12+
13+
def __init__(self, original_stdout, string_buffer):
14+
self.original_stdout = original_stdout
15+
self.string_buffer = string_buffer
16+
17+
def write(self, data):
18+
# Write to both original stdout and our buffer
19+
self.original_stdout.write(data)
20+
self.string_buffer.write(data)
21+
return len(data)
22+
23+
def flush(self):
24+
self.original_stdout.flush()
25+
self.string_buffer.flush()
26+
27+
def __getattr__(self, name):
28+
# Delegate other attributes to original stdout
29+
return getattr(self.original_stdout, name)
30+
31+
32+
@contextmanager
33+
def log_stdout_stream(interval_seconds=5):
34+
"""
35+
A context manager to stream stdout to an MLflow artifact.
36+
37+
This context manager redirects `sys.stdout` to an in-memory buffer.
38+
A background thread periodically flushes this buffer and logs its
39+
contents to an MLflow artifact file named 'stdout.log'.
40+
41+
Args:
42+
interval_seconds (int): The interval in seconds at which to log
43+
the stdout buffer to MLflow.
44+
45+
Example:
46+
import time
47+
import mlflow
48+
49+
with mlflow.start_run():
50+
with log_stdout_stream():
51+
print("This is the start of my script.")
52+
time.sleep(6)
53+
print("This message will appear in the first log upload.")
54+
time.sleep(6)
55+
print("And this will be in the second.")
56+
# The context manager will automatically handle final log upload
57+
# and cleanup.
58+
print("Stdout is now back to normal.")
59+
"""
60+
if not mlflow.active_run():
61+
raise RuntimeError("An active MLflow run is required to stream stdout.")
62+
63+
original_stdout = sys.stdout
64+
stdout_buffer = StringIO()
65+
tee_stdout = TeeStringIO(original_stdout, stdout_buffer)
66+
sys.stdout = tee_stdout
67+
68+
stop_event = threading.Event()
69+
log_thread = None
70+
71+
def _log_loop():
72+
while not stop_event.is_set():
73+
time.sleep(interval_seconds)
74+
_log_current_stdout()
75+
76+
def _log_current_stdout():
77+
content = stdout_buffer.getvalue()
78+
79+
if content:
80+
mlflow.log_text(content, "stdout.log")
81+
82+
try:
83+
log_thread = threading.Thread(target=_log_loop, name="mlflow-stdout-logging")
84+
log_thread.daemon = True
85+
log_thread.start()
86+
yield
87+
finally:
88+
if log_thread:
89+
stop_event.set()
90+
log_thread.join()
91+
92+
# Final flush and log to capture any remaining output
93+
_log_current_stdout()
94+
95+
# Restore stdout
96+
sys.stdout = original_stdout
97+
stdout_buffer.close()

0 commit comments

Comments
 (0)