Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions framework/docs/source/ref-exit-codes/201.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
[201] SERVERAPP_EXCEPTION
=========================

Description
-----------

An unhandled exception was raised during the execution of your ServerApp code.

How to Resolve
--------------

This error indicates that your ServerApp code failed with an unhandled exception. Check
the logged exception details in the logs, review your code carefully, and fix the
underlying issue causing the error.
31 changes: 31 additions & 0 deletions framework/py/flwr/common/exception.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Copyright 2025 Flower Labs GmbH. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Flower application exceptions."""


class AppExitException(Exception):
"""Base exception for all application-level errors in ServerApp and ClientApp.

When raised, the process will exit and report a telemetry event with the associated
exit code.
"""

# Default exit code — subclasses must override
exit_code = -1

def __init_subclass__(cls) -> None:
"""Ensure subclasses override the exit_code attribute."""
if cls.exit_code == -1:
raise ValueError("Subclasses must override the exit_code attribute.")
2 changes: 2 additions & 0 deletions framework/py/flwr/common/exit/exit_code.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class ExitCode:

# ServerApp-specific exit codes (200-299)
SERVERAPP_STRATEGY_PRECONDITION_UNMET = 200
SERVERAPP_EXCEPTION = 201
SERVERAPP_STRATEGY_AGGREGATION_ERROR = 202

# SuperNode-specific exit codes (300-399)
Expand Down Expand Up @@ -90,6 +91,7 @@ def __new__(cls) -> ExitCode:
"perform weighted average (e.g. in FedAvg) please ensure the returned "
"MetricRecord from ClientApps do include this key."
),
ExitCode.SERVERAPP_EXCEPTION: "An unhandled exception occurred in the ServerApp.",
ExitCode.SERVERAPP_STRATEGY_AGGREGATION_ERROR: (
"The strategy encountered an error during aggregation. Please check the logs "
"for more details."
Expand Down
69 changes: 41 additions & 28 deletions framework/py/flwr/server/serverapp/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@
Status,
SubStatus,
)
from flwr.common.exit import ExitCode, flwr_exit
from flwr.common.exception import AppExitException
from flwr.common.exit import ExitCode, add_exit_handler, flwr_exit
from flwr.common.heartbeat import HeartbeatSender, get_grpc_app_heartbeat_fn
from flwr.common.logger import (
log,
Expand Down Expand Up @@ -133,12 +134,34 @@ def run_serverapp( # pylint: disable=R0913, R0914, R0915, R0917, W0212
# Resolve directory where FABs are installed
flwr_dir_ = get_flwr_dir(flwr_dir)
log_uploader = None
success = True
hash_run_id = None
run_status = None
heartbeat_sender = None
grid = None
context = None
exit_code = ExitCode.SUCCESS

def on_exit() -> None:
# Stop heartbeat sender
if heartbeat_sender:
heartbeat_sender.stop()

# Stop log uploader for this run and upload final logs
if log_uploader:
stop_log_uploader(log_queue, log_uploader)

# Update run status
if run_status and grid:
run_status_proto = run_status_to_proto(run_status)
grid._stub.UpdateRunStatus(
UpdateRunStatusRequest(run_id=run.run_id, run_status=run_status_proto)
)

# Close the Grpc connection
if grid:
grid.close()

add_exit_handler(on_exit)

try:
# Initialize the GrpcGrid
Expand Down Expand Up @@ -229,43 +252,33 @@ def run_serverapp( # pylint: disable=R0913, R0914, R0915, R0917, W0212
_ = grid._stub.PushAppOutputs(out_req)

run_status = RunStatus(Status.FINISHED, SubStatus.COMPLETED, "")

# Raised when the run is already stopped by the user
except RunNotRunningException:
log(INFO, "")
log(INFO, "Run ID %s stopped.", run.run_id)
log(INFO, "")
run_status = None
success = False
# No need to update the exit code since this is expected behavior

except Exception as ex: # pylint: disable=broad-exception-caught
exc_entity = "ServerApp"
log(ERROR, "%s raised an exception", exc_entity, exc_info=ex)
run_status = RunStatus(Status.FINISHED, SubStatus.FAILED, str(ex))
success = False

finally:
# Stop heartbeat sender
if heartbeat_sender:
heartbeat_sender.stop()

# Stop log uploader for this run and upload final logs
if log_uploader:
stop_log_uploader(log_queue, log_uploader)

# Update run status
if run_status and grid:
run_status_proto = run_status_to_proto(run_status)
grid._stub.UpdateRunStatus(
UpdateRunStatusRequest(run_id=run.run_id, run_status=run_status_proto)
)

# Close the Grpc connection
if grid:
grid.close()

event(
EventType.FLWR_SERVERAPP_RUN_LEAVE,
event_details={"run-id-hash": hash_run_id, "success": success},
)
# Set exit code
exit_code = ExitCode.SERVERAPP_EXCEPTION # General exit code
if isinstance(ex, AppExitException):
exit_code = ex.exit_code

flwr_exit(
code=exit_code,
event_type=EventType.FLWR_SERVERAPP_RUN_LEAVE,
event_details={
"run-id-hash": hash_run_id,
"success": exit_code == ExitCode.SUCCESS,
},
)


def _parse_args_run_flwr_serverapp() -> argparse.ArgumentParser:
Expand Down
29 changes: 9 additions & 20 deletions framework/py/flwr/serverapp/strategy/strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,10 @@
from typing import Callable, Optional

from flwr.common import ArrayRecord, ConfigRecord, Message, MetricRecord, log
from flwr.common.exit import ExitCode, flwr_exit
from flwr.server import Grid

from .result import Result
from .strategy_utils import InconsistentMessageReplies, log_strategy_start_info
from .strategy_utils import log_strategy_start_info


class Strategy(ABC):
Expand Down Expand Up @@ -218,15 +217,10 @@ def start(
)

# Aggregate train
try:
agg_arrays, agg_train_metrics = self.aggregate_train(
current_round,
train_replies,
)
except InconsistentMessageReplies as e:
flwr_exit(
ExitCode.SERVERAPP_STRATEGY_PRECONDITION_UNMET, message=str(e)
)
agg_arrays, agg_train_metrics = self.aggregate_train(
current_round,
train_replies,
)

# Log training metrics and append to history
if agg_arrays is not None:
Expand All @@ -253,15 +247,10 @@ def start(
)

# Aggregate evaluate
try:
agg_evaluate_metrics = self.aggregate_evaluate(
current_round,
evaluate_replies,
)
except InconsistentMessageReplies as e:
flwr_exit(
ExitCode.SERVERAPP_STRATEGY_PRECONDITION_UNMET, message=str(e)
)
agg_evaluate_metrics = self.aggregate_evaluate(
current_round,
evaluate_replies,
)

# Log training metrics and append to history
if agg_evaluate_metrics is not None:
Expand Down
6 changes: 5 additions & 1 deletion framework/py/flwr/serverapp/strategy/strategy_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,17 @@
RecordDict,
log,
)
from flwr.common.exception import AppExitException
from flwr.common.exit import ExitCode
from flwr.server import Grid


class InconsistentMessageReplies(Exception):
class InconsistentMessageReplies(AppExitException):
"""Exception triggered when replies are inconsistent and therefore aggregation must
be skipped."""

exit_code = ExitCode.SERVERAPP_STRATEGY_PRECONDITION_UNMET

def __init__(self, reason: str):
super().__init__(reason)

Expand Down
59 changes: 37 additions & 22 deletions framework/py/flwr/simulation/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@
Status,
SubStatus,
)
from flwr.common.exit import ExitCode, flwr_exit
from flwr.common.exception import AppExitException
from flwr.common.exit import ExitCode, add_exit_handler, flwr_exit
from flwr.common.heartbeat import HeartbeatSender, get_grpc_app_heartbeat_fn
from flwr.common.logger import (
log,
Expand Down Expand Up @@ -143,9 +144,29 @@ def run_simulation_process( # pylint: disable=R0913, R0914, R0915, R0917, W0212

# Resolve directory where FABs are installed
flwr_dir = get_flwr_dir(flwr_dir_)
hash_run_id = None
log_uploader = None
heartbeat_sender = None
run_status = None
exit_code = ExitCode.SUCCESS

def on_exit() -> None:
# Stop heartbeat sender
if heartbeat_sender:
heartbeat_sender.stop()

# Stop log uploader for this run and upload final logs
if log_uploader:
stop_log_uploader(log_queue, log_uploader)

# Update run status
if run_status:
run_status_proto = run_status_to_proto(run_status)
conn._stub.UpdateRunStatus(
UpdateRunStatusRequest(run_id=run.run_id, run_status=run_status_proto)
)

add_exit_handler(on_exit)

try:
# Pull SimulationInputs from LinkState
Expand All @@ -155,6 +176,8 @@ def run_simulation_process( # pylint: disable=R0913, R0914, R0915, R0917, W0212
run = run_from_proto(res.run)
fab = fab_from_proto(res.fab)

hash_run_id = get_sha256_hash(run.run_id)

# Start log uploader for this run
log_uploader = start_log_uploader(
log_queue=log_queue,
Expand Down Expand Up @@ -264,27 +287,19 @@ def run_simulation_process( # pylint: disable=R0913, R0914, R0915, R0917, W0212
log(ERROR, "%s raised an exception", exc_entity, exc_info=ex)
run_status = RunStatus(Status.FINISHED, SubStatus.FAILED, str(ex))

finally:
# Stop heartbeat sender
if heartbeat_sender:
heartbeat_sender.stop()

# Stop log uploader for this run and upload final logs
if log_uploader:
stop_log_uploader(log_queue, log_uploader)

# Update run status
if run_status:
run_status_proto = run_status_to_proto(run_status)
conn._stub.UpdateRunStatus(
UpdateRunStatusRequest(run_id=run.run_id, run_status=run_status_proto)
)

# Clean up the Context if it exists
try:
del updated_context
except NameError:
pass
# Set exit code
exit_code = ExitCode.SERVERAPP_EXCEPTION # General exit code
if isinstance(ex, AppExitException):
exit_code = ex.exit_code

flwr_exit(
code=exit_code,
event_type=EventType.FLWR_SIMULATION_RUN_LEAVE,
event_details={
"run-id-hash": hash_run_id,
"success": exit_code == ExitCode.SUCCESS,
},
)


def _parse_args_run_flwr_simulation() -> argparse.ArgumentParser:
Expand Down
Loading