diff --git a/sky/spot/spot_utils.py b/sky/spot/spot_utils.py index d87d9492088..ac3dcc72b5d 100644 --- a/sky/spot/spot_utils.py +++ b/sky/spot/spot_utils.py @@ -9,6 +9,7 @@ import colorama import filelock +import rich from sky import backends from sky import exceptions @@ -37,6 +38,11 @@ _LOG_STREAM_CHECK_CONTROLLER_GAP_SECONDS = 5 +_JOB_WAITING_STATUS_MESSAGE = ('[bold cyan]Waiting for the job to start' + '{status_str}.[/] It may take a few minutes.') +_JOB_CANCELLED_MESSAGE = ('[bold cyan]Waiting for the job status to be updated.' + '[/] It may take a minute.') + class UserSignal(enum.Enum): """The signal to be sent to the user.""" @@ -197,89 +203,121 @@ def cancel_job_by_name(job_name: str) -> str: def stream_logs_by_id(job_id: int, follow: bool = True) -> str: """Stream logs by job id.""" controller_status = job_lib.get_status(job_id) - while (controller_status != job_lib.JobStatus.RUNNING and - (controller_status is None or not controller_status.is_terminal())): - status_str = 'None' - if controller_status is not None: - status_str = controller_status.value - logger.info( - 'Waiting for the spot controller process to be RUNNING (status: ' - f'{status_str}).') - time.sleep(_LOG_STREAM_CHECK_CONTROLLER_GAP_SECONDS) - controller_status = job_lib.get_status(job_id) - - job_status = spot_state.get_status(job_id) - while job_status is None: - logger.info('Waiting for the spot job to be started.') - time.sleep(1) + status_msg = ('[bold cyan]Waiting for controller process to be RUNNING ' + '{status_str}[/]. It may take a few minutes.') + status_display = rich.status.Status(status_msg.format(status_str='')) + with status_display: + prev_msg = None + while (controller_status != job_lib.JobStatus.RUNNING and + (controller_status is None or + not controller_status.is_terminal())): + status_str = 'None' + if controller_status is not None: + status_str = controller_status.value + msg = status_msg.format(status_str=f' (status: {status_str})') + if msg != prev_msg: + status_display.update(msg) + prev_msg = msg + time.sleep(_LOG_STREAM_CHECK_CONTROLLER_GAP_SECONDS) + controller_status = job_lib.get_status(job_id) + + msg = _JOB_WAITING_STATUS_MESSAGE.format(status_str='') + status_display.update(msg) + prev_msg = msg job_status = spot_state.get_status(job_id) + while job_status is None: + time.sleep(1) + job_status = spot_state.get_status(job_id) - if job_status.is_terminal(): - job_msg = '' - if job_status.is_failed(): - job_msg = ('\nFor detailed error message, please check: ' - f'{colorama.Style.BRIGHT}sky logs ' - f'{SPOT_CONTROLLER_NAME} {job_id}' - f'{colorama.Style.RESET_ALL}') - return ( - f'Job {job_id} is already in terminal state {job_status.value}. ' - f'Logs will not be shown.{job_msg}') - task_name = spot_state.get_task_name_by_job_id(job_id) - cluster_name = generate_spot_cluster_name(task_name, job_id) - backend = backends.CloudVmRayBackend() - spot_status = spot_state.get_status(job_id) - # spot_status can be None if the controller process just started and has - # not updated the spot status yet. - while spot_status is None or not spot_status.is_terminal(): - handle = global_user_state.get_handle_from_cluster_name(cluster_name) - # Check the handle: The cluster can be preempted and removed from the - # table before the spot state is updated by the controller. In this - # case, we should skip the logging, and wait for the next round of - # status check. - if handle is None or spot_status != spot_state.SpotStatus.RUNNING: - status_help_str = '' - if (spot_status is not None and - spot_status != spot_state.SpotStatus.RUNNING): - status_help_str = f', as the spot job is {spot_status.value}' - logger.info(f'INFO: The log is not ready yet{status_help_str}. ' - f'Waiting for {JOB_STATUS_CHECK_GAP_SECONDS} seconds.') - time.sleep(JOB_STATUS_CHECK_GAP_SECONDS) + if job_status.is_terminal(): + job_msg = '' + if job_status.is_failed(): + job_msg = ('\nFor detailed error message, please check: ' + f'{colorama.Style.BRIGHT}sky logs ' + f'{SPOT_CONTROLLER_NAME} {job_id}' + f'{colorama.Style.RESET_ALL}') + return (f'Job {job_id} is already in terminal state ' + f'{job_status.value}. Logs will not be shown.{job_msg}') + task_name = spot_state.get_task_name_by_job_id(job_id) + cluster_name = generate_spot_cluster_name(task_name, job_id) + backend = backends.CloudVmRayBackend() + spot_status = spot_state.get_status(job_id) + + # spot_status can be None if the controller process just started and has + # not updated the spot status yet. + while spot_status is None or not spot_status.is_terminal(): + handle = global_user_state.get_handle_from_cluster_name( + cluster_name) + # Check the handle: The cluster can be preempted and removed from + # the table before the spot state is updated by the controller. In + # this case, we should skip the logging, and wait for the next + # round of status check. + if handle is None or spot_status != spot_state.SpotStatus.RUNNING: + status_str = '' + if (spot_status is not None and + spot_status != spot_state.SpotStatus.RUNNING): + status_str = f' (status: {spot_status.value})' + logger.debug( + f'INFO: The log is not ready yet{status_str}. ' + f'Waiting for {JOB_STATUS_CHECK_GAP_SECONDS} seconds.') + msg = _JOB_WAITING_STATUS_MESSAGE.format(status_str=status_str) + if msg != prev_msg: + status_display.update(msg) + prev_msg = msg + time.sleep(JOB_STATUS_CHECK_GAP_SECONDS) + spot_status = spot_state.get_status(job_id) + continue + status_display.stop() + returncode = backend.tail_logs(handle, + job_id=None, + spot_job_id=job_id, + follow=follow) + if returncode == 0: + # If the log tailing exit successfully (the real job can be + # SUCCEEDED or FAILED), we can safely break the loop. We use the + # status in job queue to show the information, as the spot_state + # is not updated yet. + job_statuses = backend.get_job_status(handle, stream_logs=False) + job_status = list(job_statuses.values())[0] + assert job_status is not None, 'No job found.' + if job_status != job_lib.JobStatus.CANCELLED: + break + # The job can be cancelled by the user or the controller (when + # the cluster is partially preempted). + logger.debug( + 'INFO: Job is cancelled. Waiting for the status update in ' + f'{JOB_STATUS_CHECK_GAP_SECONDS} seconds.') + else: + logger.debug( + f'INFO: (Log streaming) Got return code {returncode}. ' + f'Retrying in {JOB_STATUS_CHECK_GAP_SECONDS} seconds.') + # Finish early if the spot status is already in terminal state. spot_status = spot_state.get_status(job_id) - continue - returncode = backend.tail_logs(handle, - job_id=None, - spot_job_id=job_id, - follow=follow) - if returncode == 0: - # If the log tailing exit successfully (the real job can be - # SUCCEEDED or FAILED), we can safely break the loop. We use the - # status in job queue to show the information, as the spot_state is - # not updated yet. - job_statuses = backend.get_job_status(handle, stream_logs=False) - job_status = list(job_statuses.values())[0] - assert job_status is not None, 'No job found.' - if job_status != job_lib.JobStatus.CANCELLED: - logger.info(f'Logs finished for job {job_id} ' - f'(status: {job_status.value}).') + if spot_status.is_terminal(): break - # The job can be cancelled by the user or the controller (when the - # the cluster is partially preempted). - logger.info('INFO: (Log streaming) Job is cancelled. Waiting ' - 'for the status update in ' - f'{JOB_STATUS_CHECK_GAP_SECONDS} seconds.') - else: - logger.info( - f'INFO: (Log streaming) Got return code {returncode}. Retrying ' - f'in {JOB_STATUS_CHECK_GAP_SECONDS} seconds.') - # If the tailing fails, it is likely that the cluster fails, so we wait - # a while to make sure the spot state is updated by the controller, and - # check the spot queue again. - time.sleep(JOB_STATUS_CHECK_GAP_SECONDS) + logger.info(f'{colorama.Fore.YELLOW}The job is preempted.' + f'{colorama.Style.RESET_ALL}') + msg = _JOB_CANCELLED_MESSAGE + status_display.update(msg) + prev_msg = msg + status_display.start() + # If the tailing fails, it is likely that the cluster fails, so we + # wait a while to make sure the spot state is updated by the + # controller, and check the spot queue again. + # Wait a bit longer than the controller, so as to make sure the + # spot state is updated. + time.sleep(3 * JOB_STATUS_CHECK_GAP_SECONDS) + spot_status = spot_state.get_status(job_id) + + # The spot_status may not be in terminal status yet, since the controllerhas + # not updated the spot state yet. We wait for a while, until the spot state + # is updated. + spot_status = spot_state.get_status(job_id) + while not spot_status.is_terminal(): + time.sleep(1) spot_status = spot_state.get_status(job_id) - else: - # The spot_status is in terminal state. - logger.info(f'Logs finished for job {job_id} ' - f'(status: {spot_state.get_status(job_id).value}).') + logger.info(f'Logs finished for job {job_id} ' + f'(status: {spot_status.value}).') return ''