Skip to content

Commit

Permalink
[Spot] Better spot logs (skypilot-org#1412)
Browse files Browse the repository at this point in the history
* Add cluster status check even job is RUNNING for multi-node

* Disable autoscaler logs and fix finished when partially preempted

* format

* Add test

* Better spot logging

* Add logs

* format

* address comments

* address comments part 2

* Finish the logging early

* format

* better logging

* Address comments

* Fix message

* Address comments
  • Loading branch information
Michaelvll authored and Sumanth committed Jan 15, 2023
1 parent 0563248 commit 78ed58c
Showing 1 changed file with 116 additions and 78 deletions.
194 changes: 116 additions & 78 deletions sky/spot/spot_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import colorama
import filelock
import rich

from sky import backends
from sky import exceptions
Expand Down Expand Up @@ -37,6 +38,11 @@

_LOG_STREAM_CHECK_CONTROLLER_GAP_SECONDS = 5

_JOB_WAITING_STATUS_MESSAGE = ('[bold cyan]Waiting for the job to start'
'{status_str}.[/] It may take a few minutes.')
_JOB_CANCELLED_MESSAGE = ('[bold cyan]Waiting for the job status to be updated.'
'[/] It may take a minute.')


class UserSignal(enum.Enum):
"""The signal to be sent to the user."""
Expand Down Expand Up @@ -197,89 +203,121 @@ def cancel_job_by_name(job_name: str) -> str:
def stream_logs_by_id(job_id: int, follow: bool = True) -> str:
"""Stream logs by job id."""
controller_status = job_lib.get_status(job_id)
while (controller_status != job_lib.JobStatus.RUNNING and
(controller_status is None or not controller_status.is_terminal())):
status_str = 'None'
if controller_status is not None:
status_str = controller_status.value
logger.info(
'Waiting for the spot controller process to be RUNNING (status: '
f'{status_str}).')
time.sleep(_LOG_STREAM_CHECK_CONTROLLER_GAP_SECONDS)
controller_status = job_lib.get_status(job_id)

job_status = spot_state.get_status(job_id)
while job_status is None:
logger.info('Waiting for the spot job to be started.')
time.sleep(1)
status_msg = ('[bold cyan]Waiting for controller process to be RUNNING '
'{status_str}[/]. It may take a few minutes.')
status_display = rich.status.Status(status_msg.format(status_str=''))
with status_display:
prev_msg = None
while (controller_status != job_lib.JobStatus.RUNNING and
(controller_status is None or
not controller_status.is_terminal())):
status_str = 'None'
if controller_status is not None:
status_str = controller_status.value
msg = status_msg.format(status_str=f' (status: {status_str})')
if msg != prev_msg:
status_display.update(msg)
prev_msg = msg
time.sleep(_LOG_STREAM_CHECK_CONTROLLER_GAP_SECONDS)
controller_status = job_lib.get_status(job_id)

msg = _JOB_WAITING_STATUS_MESSAGE.format(status_str='')
status_display.update(msg)
prev_msg = msg
job_status = spot_state.get_status(job_id)
while job_status is None:
time.sleep(1)
job_status = spot_state.get_status(job_id)

if job_status.is_terminal():
job_msg = ''
if job_status.is_failed():
job_msg = ('\nFor detailed error message, please check: '
f'{colorama.Style.BRIGHT}sky logs '
f'{SPOT_CONTROLLER_NAME} {job_id}'
f'{colorama.Style.RESET_ALL}')
return (
f'Job {job_id} is already in terminal state {job_status.value}. '
f'Logs will not be shown.{job_msg}')
task_name = spot_state.get_task_name_by_job_id(job_id)
cluster_name = generate_spot_cluster_name(task_name, job_id)
backend = backends.CloudVmRayBackend()
spot_status = spot_state.get_status(job_id)
# spot_status can be None if the controller process just started and has
# not updated the spot status yet.
while spot_status is None or not spot_status.is_terminal():
handle = global_user_state.get_handle_from_cluster_name(cluster_name)
# Check the handle: The cluster can be preempted and removed from the
# table before the spot state is updated by the controller. In this
# case, we should skip the logging, and wait for the next round of
# status check.
if handle is None or spot_status != spot_state.SpotStatus.RUNNING:
status_help_str = ''
if (spot_status is not None and
spot_status != spot_state.SpotStatus.RUNNING):
status_help_str = f', as the spot job is {spot_status.value}'
logger.info(f'INFO: The log is not ready yet{status_help_str}. '
f'Waiting for {JOB_STATUS_CHECK_GAP_SECONDS} seconds.')
time.sleep(JOB_STATUS_CHECK_GAP_SECONDS)
if job_status.is_terminal():
job_msg = ''
if job_status.is_failed():
job_msg = ('\nFor detailed error message, please check: '
f'{colorama.Style.BRIGHT}sky logs '
f'{SPOT_CONTROLLER_NAME} {job_id}'
f'{colorama.Style.RESET_ALL}')
return (f'Job {job_id} is already in terminal state '
f'{job_status.value}. Logs will not be shown.{job_msg}')
task_name = spot_state.get_task_name_by_job_id(job_id)
cluster_name = generate_spot_cluster_name(task_name, job_id)
backend = backends.CloudVmRayBackend()
spot_status = spot_state.get_status(job_id)

# spot_status can be None if the controller process just started and has
# not updated the spot status yet.
while spot_status is None or not spot_status.is_terminal():
handle = global_user_state.get_handle_from_cluster_name(
cluster_name)
# Check the handle: The cluster can be preempted and removed from
# the table before the spot state is updated by the controller. In
# this case, we should skip the logging, and wait for the next
# round of status check.
if handle is None or spot_status != spot_state.SpotStatus.RUNNING:
status_str = ''
if (spot_status is not None and
spot_status != spot_state.SpotStatus.RUNNING):
status_str = f' (status: {spot_status.value})'
logger.debug(
f'INFO: The log is not ready yet{status_str}. '
f'Waiting for {JOB_STATUS_CHECK_GAP_SECONDS} seconds.')
msg = _JOB_WAITING_STATUS_MESSAGE.format(status_str=status_str)
if msg != prev_msg:
status_display.update(msg)
prev_msg = msg
time.sleep(JOB_STATUS_CHECK_GAP_SECONDS)
spot_status = spot_state.get_status(job_id)
continue
status_display.stop()
returncode = backend.tail_logs(handle,
job_id=None,
spot_job_id=job_id,
follow=follow)
if returncode == 0:
# If the log tailing exit successfully (the real job can be
# SUCCEEDED or FAILED), we can safely break the loop. We use the
# status in job queue to show the information, as the spot_state
# is not updated yet.
job_statuses = backend.get_job_status(handle, stream_logs=False)
job_status = list(job_statuses.values())[0]
assert job_status is not None, 'No job found.'
if job_status != job_lib.JobStatus.CANCELLED:
break
# The job can be cancelled by the user or the controller (when
# the cluster is partially preempted).
logger.debug(
'INFO: Job is cancelled. Waiting for the status update in '
f'{JOB_STATUS_CHECK_GAP_SECONDS} seconds.')
else:
logger.debug(
f'INFO: (Log streaming) Got return code {returncode}. '
f'Retrying in {JOB_STATUS_CHECK_GAP_SECONDS} seconds.')
# Finish early if the spot status is already in terminal state.
spot_status = spot_state.get_status(job_id)
continue
returncode = backend.tail_logs(handle,
job_id=None,
spot_job_id=job_id,
follow=follow)
if returncode == 0:
# If the log tailing exit successfully (the real job can be
# SUCCEEDED or FAILED), we can safely break the loop. We use the
# status in job queue to show the information, as the spot_state is
# not updated yet.
job_statuses = backend.get_job_status(handle, stream_logs=False)
job_status = list(job_statuses.values())[0]
assert job_status is not None, 'No job found.'
if job_status != job_lib.JobStatus.CANCELLED:
logger.info(f'Logs finished for job {job_id} '
f'(status: {job_status.value}).')
if spot_status.is_terminal():
break
# The job can be cancelled by the user or the controller (when the
# the cluster is partially preempted).
logger.info('INFO: (Log streaming) Job is cancelled. Waiting '
'for the status update in '
f'{JOB_STATUS_CHECK_GAP_SECONDS} seconds.')
else:
logger.info(
f'INFO: (Log streaming) Got return code {returncode}. Retrying '
f'in {JOB_STATUS_CHECK_GAP_SECONDS} seconds.')
# If the tailing fails, it is likely that the cluster fails, so we wait
# a while to make sure the spot state is updated by the controller, and
# check the spot queue again.
time.sleep(JOB_STATUS_CHECK_GAP_SECONDS)
logger.info(f'{colorama.Fore.YELLOW}The job is preempted.'
f'{colorama.Style.RESET_ALL}')
msg = _JOB_CANCELLED_MESSAGE
status_display.update(msg)
prev_msg = msg
status_display.start()
# If the tailing fails, it is likely that the cluster fails, so we
# wait a while to make sure the spot state is updated by the
# controller, and check the spot queue again.
# Wait a bit longer than the controller, so as to make sure the
# spot state is updated.
time.sleep(3 * JOB_STATUS_CHECK_GAP_SECONDS)
spot_status = spot_state.get_status(job_id)

# The spot_status may not be in terminal status yet, since the controllerhas
# not updated the spot state yet. We wait for a while, until the spot state
# is updated.
spot_status = spot_state.get_status(job_id)
while not spot_status.is_terminal():
time.sleep(1)
spot_status = spot_state.get_status(job_id)
else:
# The spot_status is in terminal state.
logger.info(f'Logs finished for job {job_id} '
f'(status: {spot_state.get_status(job_id).value}).')
logger.info(f'Logs finished for job {job_id} '
f'(status: {spot_status.value}).')
return ''


Expand Down

0 comments on commit 78ed58c

Please sign in to comment.