Skip to content

Commit

Permalink
[Fluidstack] Improve fluidstack provisioner for corner cases (#3254)
Browse files Browse the repository at this point in the history
* Improve error for corner cases

* review

* Add retry for reboot in setup

* format

* rename

* Add comments

* comments

* Fix instance creation waiting

* Only get the instances just created

* logging update

* larger max retry
  • Loading branch information
Michaelvll authored Mar 1, 2024
1 parent 0a49a00 commit 095acab
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 20 deletions.
15 changes: 13 additions & 2 deletions sky/provision/fluidstack/fluidstack_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,9 @@ def create_instance(
auth=(self.api_key, self.api_token),
json=body)
raise_fluidstack_error(response)
return response.json().get('multiple')
instance_ids = response.json().get('multiple')
assert all(id is not None for id in instance_ids), instance_ids
return instance_ids

def list_ssh_keys(self):
response = requests.get(ENDPOINT + 'ssh',
Expand Down Expand Up @@ -231,11 +233,20 @@ def status(self, instance_id: str):
response = self.info(instance_id)
return response['status']

def add_tags(self, instance_id: str, tags: Dict[str, str]):
def add_tags(self, instance_id: str, tags: Dict[str, str]) -> str:
response = requests.patch(
ENDPOINT + f'server/{instance_id}/tag',
auth=(self.api_key, self.api_token),
json=dict(tags=json.dumps(tags)),
)
raise_fluidstack_error(response)
return response.json()

def rename(self, instance_id: str, hostname: str) -> str:
response = requests.patch(
ENDPOINT + f'server/{instance_id}/rename',
auth=(self.api_key, self.api_token),
json=dict(name=hostname),
)
raise_fluidstack_error(response)
return response.json()
103 changes: 87 additions & 16 deletions sky/provision/fluidstack/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,10 @@ def get_internal_ip(node_info: Dict[str, Any]) -> None:
node_info['internal_ip'] = result[1].strip()


def _filter_instances(cluster_name_on_cloud: str,
status_filters: Optional[List[str]]) -> Dict[str, Any]:
def _filter_instances(
cluster_name_on_cloud: str,
status_filters: Optional[List[str]],
include_instances: Optional[List[str]] = None) -> Dict[str, Any]:

instances = utils.FluidstackClient().list_instances()
possible_names = [
Expand All @@ -56,6 +58,9 @@ def _filter_instances(cluster_name_on_cloud: str,
if (status_filters is not None and
instance['status'] not in status_filters):
continue
if (include_instances is not None and
instance['id'] not in include_instances):
continue
if instance.get('hostname') in possible_names:
filtered_instances[instance['id']] = instance
return filtered_instances
Expand Down Expand Up @@ -86,21 +91,63 @@ def run_instances(region: str, cluster_name_on_cloud: str,
'reboot',
'rebooting',
]

while True:
instances = _filter_instances(cluster_name_on_cloud, pending_status)
if len(instances) > config.count:
raise RuntimeError(
f'Cluster {cluster_name_on_cloud} already has '
f'{len(instances)} nodes, but {config.count} are '
'required. Please try terminate the cluster and retry.')
if not instances:
break
logger.info(f'Waiting for {len(instances)} instances to be ready.')
instance_statuses = [
instance['status'] for instance in instances.values()
]
logger.info(f'Waiting for {len(instances)} instances to be ready: '
f'{instance_statuses}')
time.sleep(POLL_INTERVAL)
exist_instances = _filter_instances(cluster_name_on_cloud, ['running'])
head_instance_id = _get_head_instance_id(exist_instances)

def rename(instance_id: str, new_name: str) -> None:
try:
utils.FluidstackClient().rename(instance_id, new_name)
except Exception as e:
logger.warning(f'run_instances error: {e}')
raise

for instance_id, instance in exist_instances.items():
if head_instance_id is None:
# It is possible that head instance does not exist because the
# worker instance provisioning succeeded, but failed for the head
# instance in a previous launch.
head_instance_id = instance_id
instance_name = f'{cluster_name_on_cloud}-head'
logger.info(f'Renaming head node {head_instance_id} to '
f'{instance_name}')
rename(instance_id, instance_name)
if (instance_id != head_instance_id and
instance['hostname'].endswith('-head')):
# Multiple head instances exist.
# This is a rare case when the instance name was manually modified
# on the cloud or some unexpected behavior happened.
# TODO(zhwu): This may not be necessary. An althernative can be
# terminating those head instances.
instance_name = f'{cluster_name_on_cloud}-worker'
logger.info(f'Renaming worker node {instance_id} to '
f'{instance_name}.')
try:
utils.FluidstackClient().rename(instance_id, instance_name)
except Exception as e: # pylint: disable=broad-except
logger.warning(f'run_instances error: {e}')
raise

to_start_count = config.count - len(exist_instances)
if to_start_count < 0:
raise RuntimeError(
f'Cluster {cluster_name_on_cloud} already has '
f'{len(exist_instances)} nodes, but {config.count} are required.')
f'{len(exist_instances)} nodes, but {config.count} are '
'required. Please try terminate the cluster and retry.')
if to_start_count == 0:
if head_instance_id is None:
raise RuntimeError(
Expand Down Expand Up @@ -134,20 +181,43 @@ def run_instances(region: str, cluster_name_on_cloud: str,

# Wait for instances to be ready.
while True:
instances = _filter_instances(cluster_name_on_cloud, ['running'])
ready_instance_cnt = len(instances)
instances = _filter_instances(cluster_name_on_cloud,
pending_status + ['running'])
if len(instances) < config.count:
# Some of pending instances have been convert to a state that will
# not convert to `running` status. This can be due to resource
# availability issue.
all_instances = _filter_instances(
cluster_name_on_cloud,
status_filters=None,
include_instances=created_instance_ids)
all_statuses = [
instance['status'] for instance in all_instances.values()
]
failed_instance_cnt = config.count - len(instances)
logger.error(f'Failed to create {failed_instance_cnt} '
f'instances for cluster {cluster_name_on_cloud}, '
f'with statuses: {all_statuses}')
raise RuntimeError(
f'Failed to create {failed_instance_cnt} instances, '
f'with statuses: {all_statuses}')

ready_instances = []
pending_instances = []
for instance in instances.values():
if instance['status'] == 'running':
ready_instances.append(instance)
else:
pending_instances.append(instance)
ready_instance_cnt = len(ready_instances)
pending_statuses = [
instance['status'] for instance in pending_instances
]
logger.info('Waiting for instances to be ready: '
f'({ready_instance_cnt}/{config.count}).')
f'({ready_instance_cnt}/{config.count}).\n'
f' Pending instance statuses: {pending_statuses}')
if ready_instance_cnt == config.count:
break
failed_instances = _filter_instances(
cluster_name_on_cloud,
['timeout error', 'failed to create', 'out of stock'])
if failed_instances:
logger.error(f'Failed to create {len(failed_instances)}'
f'instances for cluster {cluster_name_on_cloud}')
raise RuntimeError(
f'Failed to create {len(failed_instances)} instances.')

time.sleep(POLL_INTERVAL)
assert head_instance_id is not None, 'head_instance_id should not be None'
Expand Down Expand Up @@ -254,6 +324,7 @@ def query_instances(
'failed to create': status_lib.ClusterStatus.INIT,
'timeout error': status_lib.ClusterStatus.INIT,
'out of stock': status_lib.ClusterStatus.INIT,
'terminating': None,
'terminated': None,
}
statuses: Dict[str, Optional[status_lib.ClusterStatus]] = {}
Expand Down
21 changes: 19 additions & 2 deletions sky/provision/instance_setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
_START_TITLE = '\n' + '-' * 20 + 'Start: {} ' + '-' * 20
_END_TITLE = '-' * 20 + 'End: {} ' + '-' * 20 + '\n'

_MAX_RETRY = 5
_MAX_RETRY = 6

# Increase the limit of the number of open files for the raylet process,
# as the `ulimit` may not take effect at this point, because it requires
Expand Down Expand Up @@ -200,6 +200,23 @@ def _setup_node(runner: command_runner.SSHCommandRunner,
stream_logs=False,
log_path=log_path,
require_outputs=True)
retry_cnt = 0
while returncode == 255 and retry_cnt < _MAX_RETRY:
# Got network connection issue occur during setup. This could
# happen when a setup step requires a reboot, e.g. nvidia-driver
# installation (happens for fluidstack). We should retry for it.
logger.info('Network connection issue during setup, this is '
'likely due to the reboot of the instance. '
'Retrying setup in 10 seconds.')
time.sleep(10)
retry_cnt += 1
returncode, stdout, stderr = runner.run(cmd,
stream_logs=False,
log_path=log_path,
require_outputs=True)
if not returncode:
break

if returncode:
raise RuntimeError(
'Failed to run setup commands on an instance. '
Expand Down Expand Up @@ -283,7 +300,7 @@ def start_ray_on_head_node(cluster_name: str, custom_resource: Optional[str],
require_outputs=True)
if returncode:
raise RuntimeError('Failed to start ray on the head node '
f'(exit code {returncode}). Error: '
f'(exit code {returncode}). Error: \n'
f'===== stdout ===== \n{stdout}\n'
f'===== stderr ====={stderr}')

Expand Down

0 comments on commit 095acab

Please sign in to comment.