Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fluidstack] Improve fluidstack provisioner for corner cases #3254

Merged
merged 14 commits into from
Mar 1, 2024
15 changes: 13 additions & 2 deletions sky/provision/fluidstack/fluidstack_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,9 @@ def create_instance(
auth=(self.api_key, self.api_token),
json=body)
raise_fluidstack_error(response)
return response.json().get('multiple')
instance_ids = response.json().get('multiple')
assert all(id is not None for id in instance_ids), instance_ids
return instance_ids

def list_ssh_keys(self):
response = requests.get(ENDPOINT + 'ssh',
Expand Down Expand Up @@ -231,11 +233,20 @@ def status(self, instance_id: str):
response = self.info(instance_id)
return response['status']

def add_tags(self, instance_id: str, tags: Dict[str, str]):
def add_tags(self, instance_id: str, tags: Dict[str, str]) -> str:
response = requests.patch(
ENDPOINT + f'server/{instance_id}/tag',
auth=(self.api_key, self.api_token),
json=dict(tags=json.dumps(tags)),
)
raise_fluidstack_error(response)
return response.json()

def rename(self, instance_id: str, hostname: str) -> str:
response = requests.patch(
ENDPOINT + f'server/{instance_id}/rename',
auth=(self.api_key, self.api_token),
json=dict(name=hostname),
)
raise_fluidstack_error(response)
return response.json()
40 changes: 39 additions & 1 deletion sky/provision/fluidstack/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,18 +89,55 @@ def run_instances(region: str, cluster_name_on_cloud: str,

while True:
instances = _filter_instances(cluster_name_on_cloud, pending_status)
if len(instances) > config.count:
raise RuntimeError(
f'Cluster {cluster_name_on_cloud} already has '
f'{len(instances)} nodes, but {config.count} are '
'required. Please try terminate the cluster and retry.')
if not instances:
break
logger.info(f'Waiting for {len(instances)} instances to be ready.')
time.sleep(POLL_INTERVAL)
exist_instances = _filter_instances(cluster_name_on_cloud, ['running'])
head_instance_id = _get_head_instance_id(exist_instances)

def rename(instance_id: str, new_name: str) -> None:
Michaelvll marked this conversation as resolved.
Show resolved Hide resolved
try:
utils.FluidstackClient().rename(instance_id, new_name)
except Exception as e:
logger.warning(f'run_instances error: {e}')
raise

for instance_id, instance in exist_instances.items():
if head_instance_id is None:
Michaelvll marked this conversation as resolved.
Show resolved Hide resolved
# It is possible that head instance does not exist because the
# worker instance provisioning succeeded, but failed for the head
# instance in a previous launch.
head_instance_id = instance_id
instance_name = f'{cluster_name_on_cloud}-head'
logger.info(f'Renaming head node {head_instance_id} to '
f'{instance_name}')
rename(instance_id, instance_name)
if (instance_id != head_instance_id and
instance['hostname'].endswith('-head')):
# Multiple head instances exist.
# This is a rare case when the instance name was manually modified
# on the cloud or some unexpected behavior happened.
instance_name = f'{cluster_name_on_cloud}-worker'
logger.info(f'Renaming worker node {instance_id} to '
f'{instance_name}.')
try:
utils.FluidstackClient().rename(instance_id, instance_name)
except Exception as e: # pylint: disable=broad-except
logger.warning(f'run_instances error: {e}')
raise

to_start_count = config.count - len(exist_instances)
if to_start_count < 0:
raise RuntimeError(
f'Cluster {cluster_name_on_cloud} already has '
f'{len(exist_instances)} nodes, but {config.count} are required.')
f'{len(exist_instances)} nodes, but {config.count} are '
'required. Please try terminate the cluster and retry.')
if to_start_count == 0:
if head_instance_id is None:
raise RuntimeError(
Expand Down Expand Up @@ -254,6 +291,7 @@ def query_instances(
'failed to create': status_lib.ClusterStatus.INIT,
'timeout error': status_lib.ClusterStatus.INIT,
'out of stock': status_lib.ClusterStatus.INIT,
'terminating': None,
'terminated': None,
}
statuses: Dict[str, Optional[status_lib.ClusterStatus]] = {}
Expand Down
17 changes: 17 additions & 0 deletions sky/provision/instance_setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,23 @@ def _setup_node(runner: command_runner.SSHCommandRunner,
stream_logs=False,
log_path=log_path,
require_outputs=True)
retry_cnt = 0
while returncode == 255 and retry_cnt < _MAX_RETRY:
# Got network connection issue occur during setup. This could
# happen when a setup step requires a reboot, e.g. nvidia-driver
# installation (happens for fluidstack). We should retry for it.
logger.info('Network connection issue during setup, this is '
'likely due to the reboot of the instance. '
'Retrying setup in 10 seconds.')
time.sleep(10)
retry_cnt += 1
returncode, stdout, stderr = runner.run(cmd,
stream_logs=False,
log_path=log_path,
require_outputs=True)
if not returncode:
break

if returncode:
raise RuntimeError(
'Failed to run setup commands on an instance. '
Expand Down
Loading