Skip to content

Commit f1da4f2

Browse files
authored
feat(BA-1428): Refactor event dispatcher and handlers directory structure (#4497)
1 parent aa43c2b commit f1da4f2

24 files changed

+230
-131
lines changed

changes/4497.feature.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Refactor event dispatcher and handlers directory structure

src/ai/backend/manager/api/context.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
)
1616
from ai.backend.manager.config.provider import ManagerConfigProvider
1717
from ai.backend.manager.plugin.network import NetworkPluginContext
18+
from ai.backend.manager.scheduler.dispatcher import SchedulerDispatcher
1819
from ai.backend.manager.service.base import ServicesContext
1920
from ai.backend.manager.services.processors import Processors
2021

@@ -65,6 +66,7 @@ class RootContext(BaseContext):
6566

6667
registry: AgentRegistry
6768
agent_cache: AgentRPCCache
69+
scheduler_dispatcher: SchedulerDispatcher
6870

6971
error_monitor: ErrorPluginContext
7072
stats_monitor: StatsPluginContext

src/ai/backend/manager/config/unified.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1604,7 +1604,7 @@ class IdleCheckerConfig(BaseModel):
16041604
Enabled idle checkers.
16051605
Comma-separated list of checker names.
16061606
""",
1607-
examples=["timeout", "utilization"],
1607+
examples=["network_timeout", "utilization"],
16081608
)
16091609
app_streaming_packet_timeout: TimeDuration = Field(
16101610
default=_TimeDurationPydanticAnnotation.time_duration_validator("5m"),

src/ai/backend/manager/event_dispatcher/bgtask.py

Lines changed: 0 additions & 30 deletions
This file was deleted.

src/ai/backend/manager/event_dispatcher/dispatch.py

Lines changed: 95 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,15 @@
88
AgentTerminatedEvent,
99
DoAgentResourceCheckEvent,
1010
)
11+
from ai.backend.common.events.bgtask import (
12+
BgtaskCancelledEvent,
13+
BgtaskDoneEvent,
14+
BgtaskFailedEvent,
15+
BgtaskPartialSuccessEvent,
16+
BgtaskUpdatedEvent,
17+
)
1118
from ai.backend.common.events.dispatcher import (
19+
CoalescingOptions,
1220
EventDispatcher,
1321
)
1422
from ai.backend.common.events.hub.hub import EventHub
@@ -32,8 +40,15 @@
3240
ModelServiceStatusEvent,
3341
RouteCreatedEvent,
3442
)
43+
from ai.backend.common.events.schedule import (
44+
DoCheckPrecondEvent,
45+
DoScaleEvent,
46+
DoScheduleEvent,
47+
DoStartSessionEvent,
48+
)
3549
from ai.backend.common.events.session import (
3650
DoTerminateSessionEvent,
51+
DoUpdateSessionStatusEvent,
3752
SessionCancelledEvent,
3853
SessionEnqueuedEvent,
3954
SessionFailureEvent,
@@ -49,22 +64,24 @@
4964
VFolderDeletionSuccessEvent,
5065
)
5166
from ai.backend.common.plugin.event import EventDispatcherPluginContext
52-
from ai.backend.manager.event_dispatcher.propagator import PropagatorEventDispatcher
67+
from ai.backend.manager.event_dispatcher.handlers.propagator import PropagatorEventHandler
68+
from ai.backend.manager.event_dispatcher.handlers.schedule import ScheduleEventHandler
5369
from ai.backend.manager.registry import AgentRegistry
70+
from ai.backend.manager.scheduler.dispatcher import SchedulerDispatcher
5471

5572
from ..models.utils import ExtendedAsyncSAEngine
56-
from .agent import AgentEventHandler
57-
from .bgtask import dispatch_bgtask_events
58-
from .image import ImageEventHandler
59-
from .kernel import KernelEventHandler
60-
from .model_serving import ModelServingEventHandler
73+
from .handlers.agent import AgentEventHandler
74+
from .handlers.image import ImageEventHandler
75+
from .handlers.kernel import KernelEventHandler
76+
from .handlers.model_serving import ModelServingEventHandler
77+
from .handlers.session import SessionEventHandler
78+
from .handlers.vfolder import VFolderEventHandler
6179
from .reporters import EventLogger
62-
from .session import SessionEventHandler
63-
from .vfolder import VFolderEventHandler
6480

6581

6682
@dataclass
6783
class DispatcherArgs:
84+
scheduler_dispatcher: SchedulerDispatcher
6885
event_hub: EventHub
6986
agent_registry: AgentRegistry
7087
db: ExtendedAsyncSAEngine
@@ -73,10 +90,11 @@ class DispatcherArgs:
7390

7491
class Dispatchers:
7592
_db: ExtendedAsyncSAEngine
76-
_propagator_dispatcher: PropagatorEventDispatcher
93+
_propagator_handler: PropagatorEventHandler
7794
_agent_event_handler: AgentEventHandler
7895
_image_event_handler: ImageEventHandler
7996
_kernel_event_handler: KernelEventHandler
97+
_schedule_event_handler: ScheduleEventHandler
8098
_model_serving_event_handler: ModelServingEventHandler
8199
_session_event_handler: SessionEventHandler
82100
_vfolder_event_handler: VFolderEventHandler
@@ -87,10 +105,11 @@ def __init__(self, args: DispatcherArgs) -> None:
87105
"""
88106
self._db = args.db
89107
self._event_dispatcher_plugin_ctx = args.event_dispatcher_plugin_ctx
90-
self._propagator_dispatcher = PropagatorEventDispatcher(args.event_hub)
108+
self._propagator_handler = PropagatorEventHandler(args.event_hub)
91109
self._agent_event_handler = AgentEventHandler(args.agent_registry, args.db)
92110
self._image_event_handler = ImageEventHandler(args.agent_registry, args.db)
93111
self._kernel_event_handler = KernelEventHandler(args.agent_registry, args.db)
112+
self._schedule_event_handler = ScheduleEventHandler(args.scheduler_dispatcher)
94113
self._model_serving_event_handler = ModelServingEventHandler(args.agent_registry, args.db)
95114
self._session_event_handler = SessionEventHandler(args.agent_registry, args.db)
96115
self._vfolder_event_handler = VFolderEventHandler(args.db)
@@ -99,15 +118,39 @@ def dispatch(self, event_dispatcher: EventDispatcher) -> None:
99118
"""
100119
Dispatch events to the appropriate dispatcher.
101120
"""
102-
dispatch_bgtask_events(event_dispatcher, self._propagator_dispatcher)
121+
self._dispatch_bgtask_events(event_dispatcher)
103122
self._dispatch_agent_events(event_dispatcher)
104123
self._dispatch_error_monitor_events(event_dispatcher)
105124
self._dispatch_image_events(event_dispatcher)
106125
self._dispatch_kernel_events(event_dispatcher)
126+
self._dispatch_schedule_events(event_dispatcher)
107127
self._dispatch_model_serving_events(event_dispatcher)
108128
self._dispatch_session_events(event_dispatcher)
109129
self._dispatch_vfolder_events(event_dispatcher)
110130

131+
def _dispatch_bgtask_events(
132+
self,
133+
event_dispatcher: EventDispatcher,
134+
) -> None:
135+
"""
136+
Register event dispatchers for background task events.
137+
"""
138+
event_dispatcher.subscribe(
139+
BgtaskUpdatedEvent, None, self._propagator_handler.propagate_event
140+
)
141+
event_dispatcher.subscribe(BgtaskDoneEvent, None, self._propagator_handler.propagate_event)
142+
event_dispatcher.subscribe(
143+
BgtaskPartialSuccessEvent,
144+
None,
145+
self._propagator_handler.propagate_event,
146+
)
147+
event_dispatcher.subscribe(
148+
BgtaskCancelledEvent, None, self._propagator_handler.propagate_event
149+
)
150+
event_dispatcher.subscribe(
151+
BgtaskFailedEvent, None, self._propagator_handler.propagate_event
152+
)
153+
111154
def _dispatch_agent_events(
112155
self,
113156
event_dispatcher: EventDispatcher,
@@ -229,6 +272,47 @@ def _dispatch_model_serving_events(self, event_dispatcher: EventDispatcher) -> N
229272
RouteCreatedEvent, None, self._model_serving_event_handler.handle_route_creation
230273
)
231274

275+
def _dispatch_schedule_events(self, event_dispatcher: EventDispatcher) -> None:
276+
coalescing_opts: CoalescingOptions = {
277+
"max_wait": 0.5,
278+
"max_batch_size": 32,
279+
}
280+
event_dispatcher.consume(
281+
SessionEnqueuedEvent,
282+
None,
283+
self._schedule_event_handler.handle_session_enqueued,
284+
coalescing_opts,
285+
name="dispatcher.schedule/enqueue",
286+
)
287+
event_dispatcher.consume(
288+
SessionTerminatedEvent,
289+
None,
290+
self._schedule_event_handler.handle_session_terminated,
291+
coalescing_opts,
292+
name="dispatcher.term",
293+
)
294+
event_dispatcher.consume(
295+
AgentStartedEvent,
296+
None,
297+
self._schedule_event_handler.handle_agent_started,
298+
name="dispatcher.schedule",
299+
)
300+
event_dispatcher.consume(
301+
DoScheduleEvent, None, self._schedule_event_handler.handle_do_schedule, coalescing_opts
302+
)
303+
event_dispatcher.consume(
304+
DoStartSessionEvent, None, self._schedule_event_handler.handle_do_start_session
305+
)
306+
event_dispatcher.consume(
307+
DoCheckPrecondEvent, None, self._schedule_event_handler.handle_do_check_precond
308+
)
309+
event_dispatcher.consume(DoScaleEvent, None, self._schedule_event_handler.handle_do_scale)
310+
event_dispatcher.consume(
311+
DoUpdateSessionStatusEvent,
312+
None,
313+
self._schedule_event_handler.handle_do_update_session_status,
314+
)
315+
232316
def _dispatch_session_events(self, event_dispatcher: EventDispatcher) -> None:
233317
# action-trigerring events
234318
event_dispatcher.consume(
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
python_sources(name="src")

src/ai/backend/manager/event_dispatcher/handlers/__init__.py

Whitespace-only changes.

src/ai/backend/manager/event_dispatcher/agent.py renamed to src/ai/backend/manager/event_dispatcher/handlers/agent.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@
1616
from ai.backend.manager.errors.exceptions import InstanceNotFound
1717
from ai.backend.manager.registry import AgentRegistry
1818

19-
from ..models.agent import AgentStatus, agents
20-
from ..models.utils import (
19+
from ...models.agent import AgentStatus, agents
20+
from ...models.utils import (
2121
ExtendedAsyncSAEngine,
2222
)
2323

src/ai/backend/manager/event_dispatcher/image.py renamed to src/ai/backend/manager/event_dispatcher/handlers/image.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
from ai.backend.logging import BraceStyleAdapter
1313
from ai.backend.manager.registry import AgentRegistry
1414

15-
from ..models.utils import (
15+
from ...models.utils import (
1616
ExtendedAsyncSAEngine,
1717
)
1818

src/ai/backend/manager/event_dispatcher/kernel.py renamed to src/ai/backend/manager/event_dispatcher/handlers/kernel.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@
2121
from ai.backend.logging import BraceStyleAdapter
2222
from ai.backend.manager.registry import AgentRegistry
2323

24-
from ..models.kernel import kernels
25-
from ..models.utils import (
24+
from ...models.kernel import kernels
25+
from ...models.utils import (
2626
ExtendedAsyncSAEngine,
2727
execute_with_retry,
2828
)

src/ai/backend/manager/event_dispatcher/model_serving.py renamed to src/ai/backend/manager/event_dispatcher/handlers/model_serving.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,19 +19,19 @@
1919
from ai.backend.manager.errors.exceptions import SessionNotFound
2020
from ai.backend.manager.registry import AgentRegistry
2121

22-
from ..models.endpoint import EndpointRow
23-
from ..models.image import ImageIdentifier, ImageRow
24-
from ..models.keypair import KeyPairRow
25-
from ..models.routing import RouteStatus, RoutingRow
26-
from ..models.session import KernelLoadingStrategy, SessionRow
27-
from ..models.user import UserRow
28-
from ..models.utils import (
22+
from ...models.endpoint import EndpointRow
23+
from ...models.image import ImageIdentifier, ImageRow
24+
from ...models.keypair import KeyPairRow
25+
from ...models.routing import RouteStatus, RoutingRow
26+
from ...models.session import KernelLoadingStrategy, SessionRow
27+
from ...models.user import UserRow
28+
from ...models.utils import (
2929
ExtendedAsyncSAEngine,
3030
execute_with_retry,
3131
is_db_retry_error,
3232
)
33-
from ..types import UserScope
34-
from ..utils import query_userinfo
33+
from ...types import UserScope
34+
from ...utils import query_userinfo
3535

3636
log = BraceStyleAdapter(logging.getLogger(__spec__.name))
3737

src/ai/backend/manager/event_dispatcher/propagator.py renamed to src/ai/backend/manager/event_dispatcher/handlers/propagator.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from ai.backend.manager.errors.exceptions import InternalServerError
55

66

7-
class PropagatorEventDispatcher:
7+
class PropagatorEventHandler:
88
_event_hub: EventHub
99

1010
def __init__(self, event_hub: EventHub) -> None:
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
import logging
2+
3+
from ai.backend.common.events.agent import AgentStartedEvent
4+
from ai.backend.common.events.schedule import (
5+
DoCheckPrecondEvent,
6+
DoScaleEvent,
7+
DoScheduleEvent,
8+
DoStartSessionEvent,
9+
)
10+
from ai.backend.common.events.session import (
11+
DoUpdateSessionStatusEvent,
12+
SessionEnqueuedEvent,
13+
SessionTerminatedEvent,
14+
)
15+
from ai.backend.logging.utils import BraceStyleAdapter
16+
from ai.backend.manager.scheduler.dispatcher import SchedulerDispatcher
17+
18+
log = BraceStyleAdapter(logging.getLogger(__spec__.name))
19+
20+
21+
class ScheduleEventHandler:
22+
_scheduler_dispatcher: SchedulerDispatcher
23+
24+
def __init__(self, scheduler_dispatcher: SchedulerDispatcher) -> None:
25+
self._scheduler_dispatcher = scheduler_dispatcher
26+
27+
async def handle_session_enqueued(
28+
self, context: None, agent_id: str, ev: SessionEnqueuedEvent
29+
) -> None:
30+
await self._scheduler_dispatcher.schedule(ev.event_name())
31+
32+
async def handle_session_terminated(
33+
self, context: None, agent_id: str, ev: SessionTerminatedEvent
34+
) -> None:
35+
await self._scheduler_dispatcher.schedule(ev.event_name())
36+
37+
async def handle_agent_started(
38+
self, context: None, agent_id: str, ev: AgentStartedEvent
39+
) -> None:
40+
await self._scheduler_dispatcher.schedule(ev.event_name())
41+
42+
async def handle_do_schedule(self, context: None, agent_id: str, ev: DoScheduleEvent) -> None:
43+
await self._scheduler_dispatcher.schedule(ev.event_name())
44+
45+
async def handle_do_start_session(
46+
self, context: None, agent_id: str, ev: DoStartSessionEvent
47+
) -> None:
48+
await self._scheduler_dispatcher.start(ev.event_name())
49+
50+
async def handle_do_check_precond(
51+
self, context: None, agent_id: str, ev: DoCheckPrecondEvent
52+
) -> None:
53+
await self._scheduler_dispatcher.check_precond(ev.event_name())
54+
55+
async def handle_do_scale(self, context: None, agent_id: str, ev: DoScaleEvent) -> None:
56+
await self._scheduler_dispatcher.scale_services(ev.event_name())
57+
58+
async def handle_do_update_session_status(
59+
self, context: None, agent_id: str, ev: DoUpdateSessionStatusEvent
60+
) -> None:
61+
await self._scheduler_dispatcher.update_session_status()

src/ai/backend/manager/event_dispatcher/session.py renamed to src/ai/backend/manager/event_dispatcher/handlers/session.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,10 @@
3232
from ai.backend.manager.errors.exceptions import SessionNotFound
3333
from ai.backend.manager.registry import AgentRegistry
3434

35-
from ..models.endpoint import EndpointLifecycle, EndpointRow
36-
from ..models.routing import RouteStatus, RoutingRow
37-
from ..models.session import KernelLoadingStrategy, SessionRow
38-
from ..models.utils import (
35+
from ...models.endpoint import EndpointLifecycle, EndpointRow
36+
from ...models.routing import RouteStatus, RoutingRow
37+
from ...models.session import KernelLoadingStrategy, SessionRow
38+
from ...models.utils import (
3939
ExtendedAsyncSAEngine,
4040
execute_with_retry,
4141
is_db_retry_error,

src/ai/backend/manager/event_dispatcher/vfolder.py renamed to src/ai/backend/manager/event_dispatcher/handlers/vfolder.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,10 @@
99
)
1010
from ai.backend.logging import BraceStyleAdapter
1111

12-
from ..models.utils import (
12+
from ...models.utils import (
1313
ExtendedAsyncSAEngine,
1414
)
15-
from ..models.vfolder import VFolderOperationStatus, update_vfolder_status
15+
from ...models.vfolder import VFolderOperationStatus, update_vfolder_status
1616

1717
log = BraceStyleAdapter(logging.getLogger(__spec__.name))
1818

0 commit comments

Comments
 (0)