Skip to content

Commit f635de7

Browse files
authored
[core][event] rename TaskExecutionEvent to TaskLifecycleEvent (#56853)
This PR refactors the `TaskExecutionEvent` proto in two ways: - Rename the file to `events_task_lifecycle_event.proto` - Refactor the task_state from a map to an array of TaskState and timestamp. Also rename the field to `state_transitions` for consistency. This PR depends on the upstream to update their logic to consume this new schema. Test: - CI <!-- CURSOR_SUMMARY --> --- > [!NOTE] > Renames task execution event to task lifecycle event and changes its schema from a state map to an ordered state_transitions list, updating core, GCS, dashboard, builds, tests, and docs. > > - **Proto/API changes (breaking)** > - Rename `TaskExecutionEvent` → `TaskLifecycleEvent` and update `RayEvent.EventType` (`TASK_EXECUTION_EVENT` → `TASK_LIFECYCLE_EVENT`). > - Replace `task_state` map with `state_transitions` (list of `{state, timestamp}`) in `events_task_lifecycle_event.proto`. > - Update `events_base_event.proto` field from `task_execution_event` → `task_lifecycle_event` and imports/BUILD deps accordingly. > - **Core worker** > - Update buffer/conversion logic in `src/ray/core_worker/task_event_buffer.{h,cc}` to populate/emit `TaskLifecycleEvent` with `state_transitions`. > - **GCS** > - Update `GcsRayEventConverter` to consume `TASK_LIFECYCLE_EVENT` and convert `state_transitions` to `state_ts_ns`. > - **Dashboard/Aggregator** > - Switch exposable type defaults/env to `TASK_LIFECYCLE_EVENT` in `python/.../aggregator_agent.py`. > - **Tests** > - Adjust unit tests to new event/type and schema across core worker, GCS, and dashboard. > - **Docs** > - Update event export guide references to new lifecycle event proto. > > <sup>Written by [Cursor Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit 61507e8. This will update automatically on new commits. Configure [here](https://cursor.com/dashboard?tab=bugbot).</sup> <!-- /CURSOR_SUMMARY --> Signed-off-by: Cuong Nguyen <can@anyscale.com>
1 parent 6f4b784 commit f635de7

File tree

12 files changed

+119
-109
lines changed

12 files changed

+119
-109
lines changed

doc/source/ray-observability/user-guides/ray-event-export.rst

Lines changed: 19 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ For each task, Ray exports two types of events: Task Definition Event and Task E
4545
and actor tasks respectively.
4646
* Task Execution Events contain task state transition information and metadata
4747
generated during task execution.
48-
See `src/ray/protobuf/public/events_task_execution_event.proto <https://github.com/ray-project/ray/blob/master/src/ray/protobuf/public/events_task_execution_event.proto>`_ for the event format.
48+
See `src/ray/protobuf/public/events_task_lifecycle_event.proto <https://github.com/ray-project/ray/blob/master/src/ray/protobuf/public/events_task_lifecycle_event.proto>`_ for the event format.
4949

5050
An example of a Task Definition Event and a Task Execution Event:
5151

@@ -73,16 +73,7 @@ An example of a Task Definition Event and a Task Execution Event:
7373
"requiredResources":{
7474
"CPU":1.0
7575
},
76-
"runtimeEnvInfo":{
77-
"serializedRuntimeEnv":"{}",
78-
"runtimeEnvConfig":{
79-
"setupTimeoutSeconds":600,
80-
"eagerInstall":true,
81-
"logFiles":[
82-
83-
]
84-
}
85-
},
76+
"serialized_runtime_env": "{}",
8677
"jobId":"AQAAAA==",
8778
"parentTaskId":"//////////////////////////8BAAAA",
8879
"placementGroupId":"////////////////////////",
@@ -96,23 +87,30 @@ An example of a Task Definition Event and a Task Execution Event:
9687
"message":""
9788
}
9889
99-
// task execution event
90+
// task lifecycle event
10091
{
10192
"eventId":"vkIaAHlQC5KoppGosqs2kBq5k2WzsAAbawDDbQ==",
10293
"sourceType":"CORE_WORKER",
103-
"eventType":"TASK_EXECUTION_EVENT",
94+
"eventType":"TASK_LIFECYCLE_EVENT",
10495
"timestamp":"2025-09-03T18:52:14.469074Z",
10596
"severity":"INFO",
10697
"sessionName":"session_2025-09-03_11-52-12_635210_85618",
107-
"taskExecutionEvent":{
98+
"taskLifecycleEvent":{
10899
"taskId":"yO9FzNARJXH///////////////8BAAAA",
109-
"taskState":{
110-
// key is the integer value of TaskStatus enum in common.proto at
111-
// https://github.com/ray-project/ray/blob/master/src/ray/protobuf/common.proto
112-
"2":"2025-09-03T18:52:14.467402Z", // PENDING_NODE_ASSIGNMENT
113-
"1":"2025-09-03T18:52:14.467290Z", // PENDING_ARGS_AVAIL
114-
"5":"2025-09-03T18:52:14.469074Z" // SUBMITTED_TO_WORKER
115-
},
100+
"stateTransitions": [
101+
{
102+
"state":"PENDING_NODE_ASSIGNMENT",
103+
"timestamp":"2025-09-03T18:52:14.467402Z"
104+
},
105+
{
106+
"state":"PENDING_ARGS_AVAIL",
107+
"timestamp":"2025-09-03T18:52:14.467290Z"
108+
},
109+
{
110+
"state":"SUBMITTED_TO_WORKER",
111+
"timestamp":"2025-09-03T18:52:14.469074Z"
112+
}
113+
],
116114
"nodeId":"ZvxTI6x9dlMFqMlIHErJpg5UEGK1INsKhW2zyg==",
117115
"workerId":"hMybCNYIFi+/yInYYhdc+qH8yMF65j/8+uCTmw==",
118116
"jobId":"AQAAAA==",

python/ray/dashboard/modules/aggregator/aggregator_agent.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,7 @@
5454
# The list of all supported event types can be found in src/ray/protobuf/public/events_base_event.proto (EventType enum)
5555
# By default TASK_PROFILE_EVENT is not exposed to external services
5656
DEFAULT_EXPOSABLE_EVENT_TYPES = (
57-
"TASK_DEFINITION_EVENT,TASK_EXECUTION_EVENT,"
58-
"ACTOR_TASK_DEFINITION_EVENT,ACTOR_TASK_EXECUTION_EVENT,"
57+
"TASK_DEFINITION_EVENT,TASK_LIFECYCLE_EVENT,ACTOR_TASK_DEFINITION_EVENT,"
5958
"DRIVER_JOB_DEFINITION_EVENT,DRIVER_JOB_LIFECYCLE_EVENT,"
6059
"ACTOR_DEFINITION_EVENT,ACTOR_LIFECYCLE_EVENT,"
6160
"NODE_DEFINITION_EVENT,NODE_LIFECYCLE_EVENT,"

python/ray/dashboard/modules/aggregator/tests/test_aggregator_agent.py

Lines changed: 29 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,8 @@
4141
from ray.core.generated.events_task_definition_event_pb2 import (
4242
TaskDefinitionEvent,
4343
)
44-
from ray.core.generated.events_task_execution_event_pb2 import (
45-
TaskExecutionEvent,
44+
from ray.core.generated.events_task_lifecycle_event_pb2 import (
45+
TaskLifecycleEvent,
4646
)
4747
from ray.core.generated.events_task_profile_events_pb2 import TaskProfileEvents
4848
from ray.core.generated.profile_events_pb2 import ProfileEventEntry, ProfileEvents
@@ -551,20 +551,23 @@ def _verify_task_definition_event_json(req_json, expected_timestamp):
551551
}
552552

553553

554-
def _create_task_execution_event_proto(timestamp):
554+
def _create_task_lifecycle_event_proto(timestamp):
555555
return RayEvent(
556556
event_id=b"1",
557557
source_type=RayEvent.SourceType.CORE_WORKER,
558-
event_type=RayEvent.EventType.TASK_EXECUTION_EVENT,
558+
event_type=RayEvent.EventType.TASK_LIFECYCLE_EVENT,
559559
timestamp=timestamp,
560560
severity=RayEvent.Severity.INFO,
561561
session_name="test_session",
562-
task_execution_event=TaskExecutionEvent(
562+
task_lifecycle_event=TaskLifecycleEvent(
563563
task_id=b"1",
564564
task_attempt=1,
565-
task_state={
566-
TaskStatus.RUNNING: timestamp,
567-
},
565+
state_transitions=[
566+
TaskLifecycleEvent.StateTransition(
567+
state=TaskStatus.RUNNING,
568+
timestamp=timestamp,
569+
),
570+
],
568571
ray_error_info=RayErrorInfo(
569572
error_type=ErrorType.TASK_EXECUTION_EXCEPTION,
570573
),
@@ -575,13 +578,13 @@ def _create_task_execution_event_proto(timestamp):
575578
)
576579

577580

578-
def _verify_task_execution_event_json(req_json, expected_timestamp):
581+
def _verify_task_lifecycle_event_json(req_json, expected_timestamp):
579582
assert len(req_json) == 1
580583

581584
# Verify the base event fields
582585
assert req_json[0]["eventId"] == base64.b64encode(b"1").decode()
583586
assert req_json[0]["sourceType"] == "CORE_WORKER"
584-
assert req_json[0]["eventType"] == "TASK_EXECUTION_EVENT"
587+
assert req_json[0]["eventType"] == "TASK_LIFECYCLE_EVENT"
585588
assert req_json[0]["timestamp"] == expected_timestamp
586589
assert req_json[0]["severity"] == "INFO"
587590
assert (
@@ -591,23 +594,26 @@ def _verify_task_execution_event_json(req_json, expected_timestamp):
591594

592595
# Verify the task execution event specific fields
593596
assert (
594-
req_json[0]["taskExecutionEvent"]["taskId"] == base64.b64encode(b"1").decode()
597+
req_json[0]["taskLifecycleEvent"]["taskId"] == base64.b64encode(b"1").decode()
595598
)
596-
assert req_json[0]["taskExecutionEvent"]["taskAttempt"] == 1
597-
assert req_json[0]["taskExecutionEvent"]["taskState"] == {
598-
"8": expected_timestamp,
599-
}
599+
assert req_json[0]["taskLifecycleEvent"]["taskAttempt"] == 1
600+
assert req_json[0]["taskLifecycleEvent"]["stateTransitions"] == [
601+
{
602+
"state": "RUNNING",
603+
"timestamp": expected_timestamp,
604+
}
605+
]
600606
assert (
601-
req_json[0]["taskExecutionEvent"]["rayErrorInfo"]["errorType"]
607+
req_json[0]["taskLifecycleEvent"]["rayErrorInfo"]["errorType"]
602608
== "TASK_EXECUTION_EXCEPTION"
603609
)
604610
assert (
605-
req_json[0]["taskExecutionEvent"]["nodeId"] == base64.b64encode(b"1").decode()
611+
req_json[0]["taskLifecycleEvent"]["nodeId"] == base64.b64encode(b"1").decode()
606612
)
607613
assert (
608-
req_json[0]["taskExecutionEvent"]["workerId"] == base64.b64encode(b"1").decode()
614+
req_json[0]["taskLifecycleEvent"]["workerId"] == base64.b64encode(b"1").decode()
609615
)
610-
assert req_json[0]["taskExecutionEvent"]["workerPid"] == 1
616+
assert req_json[0]["taskLifecycleEvent"]["workerPid"] == 1
611617

612618

613619
def _create_profile_event_request(timestamp):
@@ -680,9 +686,9 @@ def _verify_profile_event_json(req_json, expected_timestamp):
680686
id="task_definition_event",
681687
),
682688
pytest.param(
683-
_create_task_execution_event_proto,
684-
_verify_task_execution_event_json,
685-
id="task_execution_event",
689+
_create_task_lifecycle_event_proto,
690+
_verify_task_lifecycle_event_json,
691+
id="task_lifecycle_event",
686692
),
687693
pytest.param(
688694
_create_profile_event_request, _verify_profile_event_json, id="profile_event"
@@ -697,7 +703,7 @@ def _verify_profile_event_json(req_json, expected_timestamp):
697703
{
698704
"env_vars": {
699705
"RAY_DASHBOARD_AGGREGATOR_AGENT_EVENTS_EXPORT_ADDR": _EVENT_AGGREGATOR_AGENT_TARGET_ADDR,
700-
"RAY_DASHBOARD_AGGREGATOR_AGENT_EXPOSABLE_EVENT_TYPES": "TASK_DEFINITION_EVENT,TASK_EXECUTION_EVENT,ACTOR_TASK_DEFINITION_EVENT,ACTOR_TASK_EXECUTION_EVENT,TASK_PROFILE_EVENT",
706+
"RAY_DASHBOARD_AGGREGATOR_AGENT_EXPOSABLE_EVENT_TYPES": "TASK_DEFINITION_EVENT,TASK_LIFECYCLE_EVENT,ACTOR_TASK_DEFINITION_EVENT,TASK_PROFILE_EVENT",
701707
},
702708
},
703709
],

python/ray/dashboard/modules/aggregator/tests/test_multi_consumer_event_buffer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ async def test_add_event_buffer_overflow(self):
6262
events = []
6363
event_types = [
6464
RayEvent.EventType.TASK_DEFINITION_EVENT,
65-
RayEvent.EventType.TASK_EXECUTION_EVENT,
65+
RayEvent.EventType.TASK_LIFECYCLE_EVENT,
6666
RayEvent.EventType.ACTOR_TASK_DEFINITION_EVENT,
6767
]
6868
for i in range(3):

src/ray/core_worker/task_event_buffer.cc

Lines changed: 24 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -216,17 +216,20 @@ void TaskStatusEvent::PopulateRpcRayTaskDefinitionEvent(T &definition_event_data
216216
}
217217
}
218218

219-
void TaskStatusEvent::PopulateRpcRayTaskExecutionEvent(
220-
rpc::events::TaskExecutionEvent &execution_event_data,
219+
void TaskStatusEvent::PopulateRpcRayTaskLifecycleEvent(
220+
rpc::events::TaskLifecycleEvent &lifecycle_event_data,
221221
google::protobuf::Timestamp timestamp) {
222222
// Task identifier
223-
execution_event_data.set_task_id(task_id_.Binary());
224-
execution_event_data.set_task_attempt(attempt_number_);
223+
lifecycle_event_data.set_task_id(task_id_.Binary());
224+
lifecycle_event_data.set_task_attempt(attempt_number_);
225225

226226
// Task state
227-
auto &task_state = *execution_event_data.mutable_task_state();
228227
if (task_status_ != rpc::TaskStatus::NIL) {
229-
task_state[task_status_] = timestamp;
228+
rpc::events::TaskLifecycleEvent::StateTransition state_transition;
229+
state_transition.set_state(task_status_);
230+
state_transition.mutable_timestamp()->CopyFrom(timestamp);
231+
*lifecycle_event_data.mutable_state_transitions()->Add() =
232+
std::move(state_transition);
230233
}
231234

232235
// Task property updates
@@ -235,30 +238,30 @@ void TaskStatusEvent::PopulateRpcRayTaskExecutionEvent(
235238
}
236239

237240
if (state_update_->error_info_.has_value()) {
238-
execution_event_data.mutable_ray_error_info()->CopyFrom(*state_update_->error_info_);
241+
lifecycle_event_data.mutable_ray_error_info()->CopyFrom(*state_update_->error_info_);
239242
}
240243

241244
if (state_update_->node_id_.has_value()) {
242245
RAY_CHECK(task_status_ == rpc::TaskStatus::SUBMITTED_TO_WORKER)
243246
.WithField("TaskStatus", task_status_)
244247
<< "Node ID should be included when task status changes to "
245248
"SUBMITTED_TO_WORKER.";
246-
execution_event_data.set_node_id(state_update_->node_id_->Binary());
249+
lifecycle_event_data.set_node_id(state_update_->node_id_->Binary());
247250
}
248251

249252
if (state_update_->worker_id_.has_value()) {
250253
RAY_CHECK(task_status_ == rpc::TaskStatus::SUBMITTED_TO_WORKER)
251254
.WithField("TaskStatus", task_status_)
252255
<< "Worker ID should be included when task status changes to "
253256
"SUBMITTED_TO_WORKER.";
254-
execution_event_data.set_worker_id(state_update_->worker_id_->Binary());
257+
lifecycle_event_data.set_worker_id(state_update_->worker_id_->Binary());
255258
}
256259

257260
if (state_update_->pid_.has_value()) {
258-
execution_event_data.set_worker_pid(state_update_->pid_.value());
261+
lifecycle_event_data.set_worker_pid(state_update_->pid_.value());
259262
}
260263

261-
execution_event_data.set_job_id(job_id_.Binary());
264+
lifecycle_event_data.set_job_id(job_id_.Binary());
262265
}
263266

264267
void TaskStatusEvent::PopulateRpcRayEventBaseFields(
@@ -278,7 +281,7 @@ void TaskStatusEvent::PopulateRpcRayEventBaseFields(
278281
ray_event.set_event_type(rpc::events::RayEvent::TASK_DEFINITION_EVENT);
279282
}
280283
} else {
281-
ray_event.set_event_type(rpc::events::RayEvent::TASK_EXECUTION_EVENT);
284+
ray_event.set_event_type(rpc::events::RayEvent::TASK_LIFECYCLE_EVENT);
282285
}
283286
}
284287

@@ -301,14 +304,14 @@ void TaskStatusEvent::ToRpcRayEvents(RayEventsTuple &ray_events_tuple) {
301304
}
302305

303306
// Populate the task execution event
304-
PopulateRpcRayEventBaseFields(ray_events_tuple.task_execution_event.has_value()
305-
? ray_events_tuple.task_execution_event.value()
306-
: ray_events_tuple.task_execution_event.emplace(),
307+
PopulateRpcRayEventBaseFields(ray_events_tuple.task_lifecycle_event.has_value()
308+
? ray_events_tuple.task_lifecycle_event.value()
309+
: ray_events_tuple.task_lifecycle_event.emplace(),
307310
false,
308311
timestamp);
309-
auto task_execution_event =
310-
ray_events_tuple.task_execution_event.value().mutable_task_execution_event();
311-
PopulateRpcRayTaskExecutionEvent(*task_execution_event, timestamp);
312+
auto task_lifecycle_event =
313+
ray_events_tuple.task_lifecycle_event.value().mutable_task_lifecycle_event();
314+
PopulateRpcRayTaskLifecycleEvent(*task_lifecycle_event, timestamp);
312315
}
313316

314317
void TaskProfileEvent::ToRpcTaskEvents(rpc::TaskEvents *rpc_task_events) {
@@ -629,9 +632,9 @@ TaskEventBufferImpl::CreateRayEventsDataToSend(
629632
auto events = data->add_events();
630633
*events = std::move(ray_events_tuple.task_definition_event.value());
631634
}
632-
if (ray_events_tuple.task_execution_event) {
635+
if (ray_events_tuple.task_lifecycle_event) {
633636
auto events = data->add_events();
634-
*events = std::move(ray_events_tuple.task_execution_event.value());
637+
*events = std::move(ray_events_tuple.task_lifecycle_event.value());
635638
}
636639
if (ray_events_tuple.task_profile_event) {
637640
auto events = data->add_events();
@@ -656,7 +659,7 @@ TaskEventBuffer::TaskEventDataToSend TaskEventBufferImpl::CreateDataToSend(
656659
const absl::flat_hash_set<TaskAttempt> &dropped_task_attempts_to_send) {
657660
// Aggregate the task events by TaskAttempt.
658661
absl::flat_hash_map<TaskAttempt, rpc::TaskEvents> agg_task_events;
659-
// (task_attempt, (task_definition_event, task_execution_event, task_profile_event))
662+
// (task_attempt, (task_definition_event, task_lifecycle_event, task_profile_event))
660663
absl::flat_hash_map<TaskAttempt, RayEventsTuple> agg_ray_events;
661664

662665
auto to_rpc_event_fn =

src/ray/core_worker/task_event_buffer.h

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -44,15 +44,15 @@ namespace worker {
4444
using TaskAttempt = std::pair<TaskID, int32_t>;
4545

4646
/// A struct containing a tuple of rpc::events::RayEvent.
47-
/// When converting the TaskStatusEvent, task_definition_event and task_execution_event
47+
/// When converting the TaskStatusEvent, task_definition_event and task_lifecycle_event
4848
/// will be populated with rpc::events::TaskDefinitionEvent and
49-
/// rpc::events::TaskExecutionEvent respectively. When converting the TaskProfileEvent,
49+
/// rpc::events::TaskLifecycleEvent respectively. When converting the TaskProfileEvent,
5050
/// task_profile_event will be populated with rpc::events::TaskProfileEvent. A struct is
51-
/// needed because the TaskProfileEvent, TaskDefinitionEvent and TaskExecutionEvent all
51+
/// needed because the TaskProfileEvent, TaskDefinitionEvent and TaskLifecycleEvent all
5252
/// can share the same task_id and attempt_number.
5353
struct RayEventsTuple {
5454
std::optional<rpc::events::RayEvent> task_definition_event;
55-
std::optional<rpc::events::RayEvent> task_execution_event;
55+
std::optional<rpc::events::RayEvent> task_lifecycle_event;
5656
std::optional<rpc::events::RayEvent> task_profile_event;
5757
};
5858

@@ -171,7 +171,7 @@ class TaskStatusEvent : public TaskEvent {
171171

172172
/// The function to convert the TaskStatusEvent class to a pair of
173173
/// rpc::events::RayEvent with rpc::events::TaskDefinitionEvent and
174-
/// rpc::events::TaskExecutionEvent respectively. The TaskExecutionEvent will always
174+
/// rpc::events::TaskLifecycleEvent respectively. The TaskLifecycleEvent will always
175175
/// be populated. The TaskDefinitionEvent will be populated only when the task_spec_
176176
/// is not null.
177177
/// NOTE: this method will modify internal states by moving fields of task_spec_ to
@@ -189,9 +189,9 @@ class TaskStatusEvent : public TaskEvent {
189189
template <typename T>
190190
void PopulateRpcRayTaskDefinitionEvent(T &definition_event_data);
191191

192-
// Helper functions to populate the task execution event of rpc::events::RayEvent
193-
void PopulateRpcRayTaskExecutionEvent(
194-
rpc::events::TaskExecutionEvent &execution_event_data,
192+
// Helper functions to populate the task lifecycle event of rpc::events::RayEvent
193+
void PopulateRpcRayTaskLifecycleEvent(
194+
rpc::events::TaskLifecycleEvent &lifecycle_event_data,
195195
google::protobuf::Timestamp timestamp);
196196

197197
// Helper functions to populate the base fields of rpc::events::RayEvent

0 commit comments

Comments
 (0)