Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fluidstack] Improve fluidstack provisioner for corner cases #3254

Merged
merged 14 commits into from
Mar 1, 2024
15 changes: 13 additions & 2 deletions sky/provision/fluidstack/fluidstack_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,9 @@ def create_instance(
auth=(self.api_key, self.api_token),
json=body)
raise_fluidstack_error(response)
return response.json().get('multiple')
instance_ids = response.json().get('multiple')
assert all(id is not None for id in instance_ids), instance_ids
return instance_ids

def list_ssh_keys(self):
response = requests.get(ENDPOINT + 'ssh',
Expand Down Expand Up @@ -231,11 +233,20 @@ def status(self, instance_id: str):
response = self.info(instance_id)
return response['status']

def add_tags(self, instance_id: str, tags: Dict[str, str]):
def add_tags(self, instance_id: str, tags: Dict[str, str]) -> str:
response = requests.patch(
ENDPOINT + f'server/{instance_id}/tag',
auth=(self.api_key, self.api_token),
json=dict(tags=json.dumps(tags)),
)
raise_fluidstack_error(response)
return response.json()

def rename(self, instance_id: str, hostname: str) -> str:
response = requests.patch(
ENDPOINT + f'server/{instance_id}/rename',
auth=(self.api_key, self.api_token),
json=dict(name=hostname),
)
raise_fluidstack_error(response)
return response.json()
103 changes: 87 additions & 16 deletions sky/provision/fluidstack/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,10 @@ def get_internal_ip(node_info: Dict[str, Any]) -> None:
node_info['internal_ip'] = result[1].strip()


def _filter_instances(cluster_name_on_cloud: str,
status_filters: Optional[List[str]]) -> Dict[str, Any]:
def _filter_instances(
cluster_name_on_cloud: str,
status_filters: Optional[List[str]],
include_instances: Optional[List[str]] = None) -> Dict[str, Any]:

instances = utils.FluidstackClient().list_instances()
possible_names = [
Expand All @@ -56,6 +58,9 @@ def _filter_instances(cluster_name_on_cloud: str,
if (status_filters is not None and
instance['status'] not in status_filters):
continue
if (include_instances is not None and
instance['id'] not in include_instances):
continue
if instance.get('hostname') in possible_names:
filtered_instances[instance['id']] = instance
return filtered_instances
Expand Down Expand Up @@ -86,21 +91,63 @@ def run_instances(region: str, cluster_name_on_cloud: str,
'reboot',
'rebooting',
]

while True:
instances = _filter_instances(cluster_name_on_cloud, pending_status)
if len(instances) > config.count:
raise RuntimeError(
f'Cluster {cluster_name_on_cloud} already has '
f'{len(instances)} nodes, but {config.count} are '
'required. Please try terminate the cluster and retry.')
if not instances:
break
logger.info(f'Waiting for {len(instances)} instances to be ready.')
instance_statuses = [
instance['status'] for instance in instances.values()
]
logger.info(f'Waiting for {len(instances)} instances to be ready: '
f'{instance_statuses}')
time.sleep(POLL_INTERVAL)
exist_instances = _filter_instances(cluster_name_on_cloud, ['running'])
head_instance_id = _get_head_instance_id(exist_instances)

def rename(instance_id: str, new_name: str) -> None:
Michaelvll marked this conversation as resolved.
Show resolved Hide resolved
try:
utils.FluidstackClient().rename(instance_id, new_name)
except Exception as e:
logger.warning(f'run_instances error: {e}')
raise

for instance_id, instance in exist_instances.items():
if head_instance_id is None:
Michaelvll marked this conversation as resolved.
Show resolved Hide resolved
# It is possible that head instance does not exist because the
# worker instance provisioning succeeded, but failed for the head
# instance in a previous launch.
head_instance_id = instance_id
instance_name = f'{cluster_name_on_cloud}-head'
logger.info(f'Renaming head node {head_instance_id} to '
f'{instance_name}')
rename(instance_id, instance_name)
if (instance_id != head_instance_id and
instance['hostname'].endswith('-head')):
# Multiple head instances exist.
# This is a rare case when the instance name was manually modified
# on the cloud or some unexpected behavior happened.
# TODO(zhwu): This may not be necessary. An althernative can be
# terminating those head instances.
instance_name = f'{cluster_name_on_cloud}-worker'
logger.info(f'Renaming worker node {instance_id} to '
f'{instance_name}.')
try:
utils.FluidstackClient().rename(instance_id, instance_name)
except Exception as e: # pylint: disable=broad-except
logger.warning(f'run_instances error: {e}')
raise

to_start_count = config.count - len(exist_instances)
if to_start_count < 0:
raise RuntimeError(
f'Cluster {cluster_name_on_cloud} already has '
f'{len(exist_instances)} nodes, but {config.count} are required.')
f'{len(exist_instances)} nodes, but {config.count} are '
'required. Please try terminate the cluster and retry.')
if to_start_count == 0:
if head_instance_id is None:
raise RuntimeError(
Expand Down Expand Up @@ -134,20 +181,43 @@ def run_instances(region: str, cluster_name_on_cloud: str,

# Wait for instances to be ready.
while True:
instances = _filter_instances(cluster_name_on_cloud, ['running'])
ready_instance_cnt = len(instances)
instances = _filter_instances(cluster_name_on_cloud,
pending_status + ['running'])
if len(instances) < config.count:
# Some of pending instances have been convert to a state that will
# not convert to `running` status. This can be due to resource
# availability issue.
all_instances = _filter_instances(
cluster_name_on_cloud,
status_filters=None,
include_instances=created_instance_ids)
all_statuses = [
instance['status'] for instance in all_instances.values()
]
failed_instance_cnt = config.count - len(instances)
logger.error(f'Failed to create {failed_instance_cnt} '
f'instances for cluster {cluster_name_on_cloud}, '
f'with statuses: {all_statuses}')
raise RuntimeError(
f'Failed to create {failed_instance_cnt} instances, '
f'with statuses: {all_statuses}')

ready_instances = []
pending_instances = []
for instance in instances.values():
if instance['status'] == 'running':
ready_instances.append(instance)
else:
pending_instances.append(instance)
ready_instance_cnt = len(ready_instances)
pending_statuses = [
instance['status'] for instance in pending_instances
]
logger.info('Waiting for instances to be ready: '
f'({ready_instance_cnt}/{config.count}).')
f'({ready_instance_cnt}/{config.count}).\n'
f' Pending instance statuses: {pending_statuses}')
if ready_instance_cnt == config.count:
break
failed_instances = _filter_instances(
cluster_name_on_cloud,
['timeout error', 'failed to create', 'out of stock'])
if failed_instances:
logger.error(f'Failed to create {len(failed_instances)}'
f'instances for cluster {cluster_name_on_cloud}')
raise RuntimeError(
f'Failed to create {len(failed_instances)} instances.')

time.sleep(POLL_INTERVAL)
assert head_instance_id is not None, 'head_instance_id should not be None'
Expand Down Expand Up @@ -254,6 +324,7 @@ def query_instances(
'failed to create': status_lib.ClusterStatus.INIT,
'timeout error': status_lib.ClusterStatus.INIT,
'out of stock': status_lib.ClusterStatus.INIT,
'terminating': None,
'terminated': None,
}
statuses: Dict[str, Optional[status_lib.ClusterStatus]] = {}
Expand Down
21 changes: 19 additions & 2 deletions sky/provision/instance_setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
_START_TITLE = '\n' + '-' * 20 + 'Start: {} ' + '-' * 20
_END_TITLE = '-' * 20 + 'End: {} ' + '-' * 20 + '\n'

_MAX_RETRY = 5
_MAX_RETRY = 6

# Increase the limit of the number of open files for the raylet process,
# as the `ulimit` may not take effect at this point, because it requires
Expand Down Expand Up @@ -200,6 +200,23 @@ def _setup_node(runner: command_runner.SSHCommandRunner,
stream_logs=False,
log_path=log_path,
require_outputs=True)
retry_cnt = 0
while returncode == 255 and retry_cnt < _MAX_RETRY:
# Got network connection issue occur during setup. This could
# happen when a setup step requires a reboot, e.g. nvidia-driver
# installation (happens for fluidstack). We should retry for it.
logger.info('Network connection issue during setup, this is '
'likely due to the reboot of the instance. '
'Retrying setup in 10 seconds.')
time.sleep(10)
retry_cnt += 1
returncode, stdout, stderr = runner.run(cmd,
stream_logs=False,
log_path=log_path,
require_outputs=True)
if not returncode:
break

if returncode:
raise RuntimeError(
'Failed to run setup commands on an instance. '
Expand Down Expand Up @@ -283,7 +300,7 @@ def start_ray_on_head_node(cluster_name: str, custom_resource: Optional[str],
require_outputs=True)
if returncode:
raise RuntimeError('Failed to start ray on the head node '
f'(exit code {returncode}). Error: '
f'(exit code {returncode}). Error: \n'
f'===== stdout ===== \n{stdout}\n'
f'===== stderr ====={stderr}')

Expand Down
Loading