Skip to content

Commit

Permalink
Fix a situation where server start success but client start failed (N…
Browse files Browse the repository at this point in the history
…VIDIA#630)

* Fix a situation where server and client deploy success, and server start success but client start failed

* Make sure reply is not None

* Add deleting of running jobs and fire abort event when exception happens
  • Loading branch information
YuanTingHsieh authored Jun 1, 2022
1 parent 16edf78 commit 15bebb3
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 13 deletions.
4 changes: 1 addition & 3 deletions nvflare/private/fed/server/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,7 @@ def check_client_replies(replies: List[ClientReply], client_sites: List[str], co

error_msg = ""
for r, client_name in zip(replies, client_sites):
if r.reply is None:
error_msg += f"\t{client_name}: reply is None\n"
elif ERROR_MSG_PREFIX in r.reply.body:
if r.reply and ERROR_MSG_PREFIX in r.reply.body:
error_msg += f"\t{client_name}: {r.reply.body}\n"
if error_msg != "":
raise RuntimeError(f"Failed to {command} to the following clients: \n{error_msg}")
Expand Down
18 changes: 9 additions & 9 deletions nvflare/private/fed/server/job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def _deploy_job(self, job: Job, sites: dict, fl_ctx: FLContext) -> str:
fl_ctx: FLContext
Returns:
run_number
"""
fl_ctx.remove_prop(FLContextKey.JOB_RUN_NUMBER)
engine = fl_ctx.get_engine()
Expand All @@ -98,7 +98,7 @@ def _deploy_job(self, job: Job, sites: dict, fl_ctx: FLContext) -> str:
if p == "server":
success = deploy_app(app_name=app_name, site_name="server", workspace=workspace, app_data=app_data)
self.log_info(
fl_ctx, f"Application {app_name} deployed to the server for run:{run_number}", fire_event=False
fl_ctx, f"Application {app_name} deployed to the server for run: {run_number}", fire_event=False
)
if not success:
raise RuntimeError(f"Failed to deploy the App: {app_name} to the server")
Expand All @@ -112,23 +112,20 @@ def _deploy_job(self, job: Job, sites: dict, fl_ctx: FLContext) -> str:
display_sites = ",".join(client_sites)
self.log_info(
fl_ctx,
f"Application {app_name} deployed to the clients: {display_sites} for run:{run_number}",
f"Application {app_name} deployed to the clients: {display_sites} for run: {run_number}",
fire_event=False,
)

self.fire_event(EventType.JOB_DEPLOYED, fl_ctx)
return run_number

def _start_run(self, run_number, job: Job, client_sites: dict, fl_ctx: FLContext):
def _start_run(self, run_number: str, job: Job, client_sites: dict, fl_ctx: FLContext):
"""Start the application
Args:
run_number: run_number
client_sites: participating sites
fl_ctx: FLContext
Returns:
"""
engine = fl_ctx.get_engine()
job_clients = engine.get_job_clients(client_sites)
Expand Down Expand Up @@ -245,6 +242,7 @@ def run(self, fl_ctx: FLContext):
if ready_job:
with self.lock:
client_sites = {k: v for k, v in sites.items() if k != "server"}
run_number = None
try:
self.log_info(fl_ctx, f"Got the job:{ready_job.job_id} from the scheduler to run")
fl_ctx.set_prop(FLContextKey.CURRENT_JOB_ID, ready_job.job_id)
Expand All @@ -259,10 +257,12 @@ def run(self, fl_ctx: FLContext):
self.running_jobs[run_number] = ready_job
job_manager.set_status(ready_job.job_id, RunStatus.RUNNING, fl_ctx)
except Exception as e:
run_number = fl_ctx.get_prop(FLContextKey.JOB_RUN_NUMBER)
if run_number:
self._delete_run(run_number, list(client_sites.keys()), fl_ctx)
if run_number in self.running_jobs:
del self.running_jobs[run_number]
self._stop_run(run_number, fl_ctx)
job_manager.set_status(ready_job.job_id, RunStatus.FAILED_TO_RUN, fl_ctx)
self.fire_event(EventType.JOB_ABORTED, fl_ctx)
self.log_error(fl_ctx, f"Failed to run the Job ({ready_job.job_id}): {e}")

time.sleep(1.0)
Expand Down
2 changes: 1 addition & 1 deletion nvflare/private/fed/server/server_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -757,7 +757,7 @@ def start_client_job(self, run_number, client_sites):
requests.update({client.token: request})
replies = []
if requests:
replies = self._send_admin_requests(requests)
replies = self._send_admin_requests(requests, timeout_secs=20)
return replies

def stop_all_jobs(self):
Expand Down

0 comments on commit 15bebb3

Please sign in to comment.