Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Serve] Update log pattern in _follow_replica_logs for new UX 3.0 #4333

Merged
merged 9 commits into from
Nov 14, 2024
5 changes: 1 addition & 4 deletions sky/serve/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -687,10 +687,7 @@ def tail_logs(
"""
if isinstance(target, str):
target = serve_utils.ServiceComponent(target)
if not isinstance(target, serve_utils.ServiceComponent):
with ux_utils.print_exception_no_traceback():
raise ValueError(f'`target` must be a string or '
f'sky.serve.ServiceComponent, got {type(target)}.')

if target == serve_utils.ServiceComponent.REPLICA:
if replica_id is None:
with ux_utils.print_exception_no_traceback():
Expand Down
169 changes: 88 additions & 81 deletions sky/serve/serve_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import time
import typing
from typing import (Any, Callable, DefaultDict, Dict, Generic, Iterator, List,
Optional, TextIO, Type, TypeVar)
Optional, Set, TextIO, Type, TypeVar)
import uuid

import colorama
Expand Down Expand Up @@ -46,8 +46,9 @@
constants.CONTROLLER_MEMORY_USAGE_GB)
_CONTROLLER_URL = 'http://localhost:{CONTROLLER_PORT}'

_SKYPILOT_PROVISION_LOG_PATTERN = r'.*tail -n100 -f (.*provision\.log).*'
_SKYPILOT_LOG_PATTERN = r'.*tail -n100 -f (.*\.log).*'
# Log paths should always appear after a space and start with ~
_SKYPILOT_PROVISION_LOG_PATTERN = r'.* (~/.*provision\.log)'
_SKYPILOT_LOG_PATTERN = r'.* (~/.*\.log)'
andylizf marked this conversation as resolved.
Show resolved Hide resolved
# TODO(tian): Find all existing replica id and print here.
_FAILED_TO_FIND_REPLICA_MSG = (
f'{colorama.Fore.RED}Failed to find replica '
Expand Down Expand Up @@ -592,67 +593,84 @@ def get_latest_version_with_min_replicas(


def _follow_replica_logs(
file: TextIO,
cluster_name: str,
*,
finish_stream: Callable[[], bool],
exit_if_stream_end: bool = False,
no_new_content_timeout: Optional[int] = None) -> Iterator[str]:
line = ''
log_file = None
no_new_content_cnt = 0
file: TextIO,
cluster_name: str,
*,
should_stop: Callable[[], bool],
stop_on_eof: bool = False,
idle_timeout_seconds: Optional[int] = None,
_visited_logs: Optional[Set[str]] = None,
) -> Iterator[str]:
"""Follows logs for a replica, handling nested log files.

Args:
file: Log file to read from.
cluster_name: Name of the cluster being launched.
should_stop: Callback that returns True when streaming should stop.
stop_on_eof: If True, stop when reaching end of file.
idle_timeout_seconds: If set, stop after these many seconds without
new content.
_visited_logs: Internal parameter to prevent infinite recursion.

Yields:
Log lines from the main file and any nested log files.
"""
visited_logs: Set[
str] = _visited_logs if _visited_logs is not None else set()

def cluster_is_up() -> bool:
cluster_record = global_user_state.get_cluster_from_name(cluster_name)
if cluster_record is None:
return False
return cluster_record['status'] == status_lib.ClusterStatus.UP

while True:
tmp = file.readline()
if tmp is not None and tmp != '':
no_new_content_cnt = 0
line += tmp
if '\n' in line or '\r' in line:
# Tailing detailed progress for user. All logs in skypilot is
# of format `To view detailed progress: tail -n100 -f *.log`.
x = re.match(_SKYPILOT_PROVISION_LOG_PATTERN, line)
if x is not None:
log_file = os.path.expanduser(x.group(1))
elif re.match(_SKYPILOT_LOG_PATTERN, line) is None:
# Not print other logs (file sync logs) since we lack
# utility to determine when these log files are finished
# writing.
# TODO(tian): Not skip these logs since there are small
# chance that error will happen in file sync. Need to find
# a better way to do this.
yield line
# Output next line first since it indicates the process is
# starting. For our launching logs, it's always:
# Launching on <cloud> <region> (<zone>)
if log_file is not None:
with open(log_file, 'r', newline='',
encoding='utf-8') as f:
# We still exit if more than 10 seconds without new
# content to avoid any internal bug that causes
# the launch failed and cluster status remains INIT.
for l in _follow_replica_logs(
f,
cluster_name,
finish_stream=cluster_is_up,
exit_if_stream_end=exit_if_stream_end,
no_new_content_timeout=10):
yield l
log_file = None
line = ''
else:
if exit_if_stream_end or finish_stream():
break
if no_new_content_timeout is not None:
if no_new_content_cnt >= no_new_content_timeout:
break
no_new_content_cnt += 1
time.sleep(1)
def process_line(line: str) -> Iterator[str]:
# The line might be directing users to view logs, like
# `✓ Cluster launched: new-http. View logs at: *.log`
# We should tail the detailed logs for user.
provision_log_prompt = re.match(_SKYPILOT_PROVISION_LOG_PATTERN, line)
log_prompt = re.match(_SKYPILOT_LOG_PATTERN, line)

if provision_log_prompt is not None:
nested_log_path = os.path.expanduser(provision_log_prompt.group(1))

if nested_log_path in visited_logs:
return
visited_logs.add(nested_log_path)
andylizf marked this conversation as resolved.
Show resolved Hide resolved

try:
with open(nested_log_path, 'r', newline='',
encoding='utf-8') as f:
# We still exit if more than 10 seconds without new content
# to avoid any internal bug that causes the launch to fail
# while cluster status remains INIT.
yield from _follow_replica_logs(f,
cluster_name,
should_stop=cluster_is_up,
stop_on_eof=stop_on_eof,
idle_timeout_seconds=10,
_visited_logs=visited_logs)
except FileNotFoundError:
# Ignore missing log files
pass
andylizf marked this conversation as resolved.
Show resolved Hide resolved
return

if log_prompt is not None:
# Now we skip other logs (file sync logs) since we lack
# utility to determine when these log files are finished
# writing.
# TODO(tian): We should not skip these logs since there are
# small chance that error will happen in file sync. Need to
# find a better way to do this.
return

yield line

return log_utils.follow_logs(file,
should_stop=should_stop,
stop_on_eof=stop_on_eof,
process_line=process_line,
idle_timeout_seconds=idle_timeout_seconds)


def stream_replica_logs(service_name: str, replica_id: int,
Expand Down Expand Up @@ -687,14 +705,17 @@ def _get_replica_status() -> serve_state.ReplicaStatus:
raise ValueError(
_FAILED_TO_FIND_REPLICA_MSG.format(replica_id=replica_id))

finish_stream = (
replica_provisioned = (
lambda: _get_replica_status() != serve_state.ReplicaStatus.PROVISIONING)
with open(launch_log_file_name, 'r', newline='', encoding='utf-8') as f:
for line in _follow_replica_logs(f,
replica_cluster_name,
finish_stream=finish_stream,
exit_if_stream_end=not follow):
for line in _follow_replica_logs(
f,
replica_cluster_name,
should_stop=replica_provisioned,
stop_on_eof=not follow,
):
print(line, end='', flush=True)

if (not follow and
_get_replica_status() == serve_state.ReplicaStatus.PROVISIONING):
# Early exit if not following the logs.
Expand All @@ -719,22 +740,6 @@ def _get_replica_status() -> serve_state.ReplicaStatus:
return ''


def _follow_logs(file: TextIO, *, finish_stream: Callable[[], bool],
exit_if_stream_end: bool) -> Iterator[str]:
line = ''
while True:
tmp = file.readline()
if tmp is not None and tmp != '':
line += tmp
if '\n' in line or '\r' in line:
yield line
line = ''
else:
if exit_if_stream_end or finish_stream():
break
time.sleep(1)


def stream_serve_process_logs(service_name: str, stream_controller: bool,
follow: bool) -> str:
msg = check_service_status_healthy(service_name)
Expand All @@ -753,9 +758,11 @@ def _service_is_terminal() -> bool:

with open(os.path.expanduser(log_file), 'r', newline='',
encoding='utf-8') as f:
for line in _follow_logs(f,
finish_stream=_service_is_terminal,
exit_if_stream_end=not follow):
for line in log_utils.follow_logs(
f,
should_stop=_service_is_terminal,
stop_on_eof=not follow,
):
print(line, end='', flush=True)
return ''

Expand Down
5 changes: 1 addition & 4 deletions sky/skylet/log_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,11 +315,8 @@ def run_bash_command_with_log(bash_command: str,
# Need this `-i` option to make sure `source ~/.bashrc` work.
inner_command = f'/bin/bash -i {script_path}'

subprocess_cmd: Union[str, List[str]]
subprocess_cmd = inner_command

return run_with_log(
subprocess_cmd,
inner_command,
log_path,
stream_logs=stream_logs,
with_ray=with_ray,
Expand Down
53 changes: 52 additions & 1 deletion sky/utils/log_utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
"""Logging utils."""
import enum
import time
import types
from typing import List, Optional, Type
from typing import Callable, Iterator, List, Optional, TextIO, Type

import colorama
import pendulum
Expand Down Expand Up @@ -284,3 +285,53 @@ def readable_time_duration(start: Optional[float],
diff = diff.replace('hour', 'hr')

return diff


def follow_logs(
file: TextIO,
*,
should_stop: Callable[[], bool],
stop_on_eof: bool = False,
process_line: Optional[Callable[[str], Iterator[str]]] = None,
idle_timeout_seconds: Optional[int] = None,
) -> Iterator[str]:
"""Streams and processes logs line by line from a file.

Args:
file: File object to read logs from.
should_stop: Callback that returns True when streaming should stop.
stop_on_eof: If True, stop when reaching end of file.
process_line: Optional callback to transform/filter each line.
idle_timeout_seconds: If set, stop after these many seconds without
new content.

Yields:
Log lines, possibly transformed by process_line if provided.
"""
current_line: str = ''
seconds_without_content: int = 0

while True:
content = file.readline()

if not content:
if stop_on_eof or should_stop():
break

if idle_timeout_seconds is not None:
if seconds_without_content >= idle_timeout_seconds:
break
seconds_without_content += 1

time.sleep(1)
continue

seconds_without_content = 0
current_line += content

if '\n' in current_line or '\r' in current_line:
if process_line is not None:
yield from process_line(current_line)
else:
yield current_line
current_line = ''
Loading