Skip to content

Commit 61507e8

Browse files
committed
[core][event] rename task_execution_event to task_lifecycle_event
Signed-off-by: Cuong Nguyen <can@anyscale.com>
1 parent ea54444 commit 61507e8

File tree

12 files changed

+104
-90
lines changed

12 files changed

+104
-90
lines changed

doc/source/ray-observability/user-guides/ray-event-export.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ For each task, Ray exports two types of events: Task Definition Event and Task E
4545
and actor tasks respectively.
4646
* Task Execution Events contain task state transition information and metadata
4747
generated during task execution.
48-
See `src/ray/protobuf/public/events_task_execution_event.proto <https://github.com/ray-project/ray/blob/master/src/ray/protobuf/public/events_task_execution_event.proto>`_ for the event format.
48+
See `src/ray/protobuf/public/events_task_lifecycle_event.proto <https://github.com/ray-project/ray/blob/master/src/ray/protobuf/public/events_task_lifecycle_event.proto>`_ for the event format.
4949

5050
An example of a Task Definition Event and a Task Execution Event:
5151

python/ray/dashboard/modules/aggregator/aggregator_agent.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,7 @@
7979
# The list of all supported event types can be found in src/ray/protobuf/public/events_base_event.proto (EventType enum)
8080
# By default TASK_PROFILE_EVENT is not exposed to external services
8181
DEFAULT_EXPOSABLE_EVENT_TYPES = (
82-
"TASK_DEFINITION_EVENT,TASK_EXECUTION_EVENT,"
83-
"ACTOR_TASK_DEFINITION_EVENT,ACTOR_TASK_EXECUTION_EVENT,"
82+
"TASK_DEFINITION_EVENT,TASK_LIFECYCLE_EVENT,ACTOR_TASK_DEFINITION_EVENT,"
8483
"DRIVER_JOB_DEFINITION_EVENT,DRIVER_JOB_EXECUTION_EVENT,"
8584
"ACTOR_DEFINITION_EVENT,ACTOR_LIFECYCLE_EVENT,"
8685
"NODE_DEFINITION_EVENT,NODE_LIFECYCLE_EVENT,"

python/ray/dashboard/modules/aggregator/tests/test_aggregator_agent.py

Lines changed: 29 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,8 @@
4141
from ray.core.generated.events_task_definition_event_pb2 import (
4242
TaskDefinitionEvent,
4343
)
44-
from ray.core.generated.events_task_execution_event_pb2 import (
45-
TaskExecutionEvent,
44+
from ray.core.generated.events_task_lifecycle_event_pb2 import (
45+
TaskLifecycleEvent,
4646
)
4747
from ray.core.generated.events_task_profile_events_pb2 import TaskProfileEvents
4848
from ray.core.generated.profile_events_pb2 import ProfileEventEntry, ProfileEvents
@@ -547,20 +547,23 @@ def _verify_task_definition_event_json(req_json, expected_timestamp):
547547
}
548548

549549

550-
def _create_task_execution_event_proto(timestamp):
550+
def _create_task_lifecycle_event_proto(timestamp):
551551
return RayEvent(
552552
event_id=b"1",
553553
source_type=RayEvent.SourceType.CORE_WORKER,
554-
event_type=RayEvent.EventType.TASK_EXECUTION_EVENT,
554+
event_type=RayEvent.EventType.TASK_LIFECYCLE_EVENT,
555555
timestamp=timestamp,
556556
severity=RayEvent.Severity.INFO,
557557
session_name="test_session",
558-
task_execution_event=TaskExecutionEvent(
558+
task_lifecycle_event=TaskLifecycleEvent(
559559
task_id=b"1",
560560
task_attempt=1,
561-
task_state={
562-
TaskStatus.RUNNING: timestamp,
563-
},
561+
state_transitions=[
562+
TaskLifecycleEvent.StateTransition(
563+
state=TaskStatus.RUNNING,
564+
timestamp=timestamp,
565+
),
566+
],
564567
ray_error_info=RayErrorInfo(
565568
error_type=ErrorType.TASK_EXECUTION_EXCEPTION,
566569
),
@@ -571,13 +574,13 @@ def _create_task_execution_event_proto(timestamp):
571574
)
572575

573576

574-
def _verify_task_execution_event_json(req_json, expected_timestamp):
577+
def _verify_task_lifecycle_event_json(req_json, expected_timestamp):
575578
assert len(req_json) == 1
576579

577580
# Verify the base event fields
578581
assert req_json[0]["eventId"] == base64.b64encode(b"1").decode()
579582
assert req_json[0]["sourceType"] == "CORE_WORKER"
580-
assert req_json[0]["eventType"] == "TASK_EXECUTION_EVENT"
583+
assert req_json[0]["eventType"] == "TASK_LIFECYCLE_EVENT"
581584
assert req_json[0]["timestamp"] == expected_timestamp
582585
assert req_json[0]["severity"] == "INFO"
583586
assert (
@@ -587,23 +590,26 @@ def _verify_task_execution_event_json(req_json, expected_timestamp):
587590

588591
# Verify the task execution event specific fields
589592
assert (
590-
req_json[0]["taskExecutionEvent"]["taskId"] == base64.b64encode(b"1").decode()
593+
req_json[0]["taskLifecycleEvent"]["taskId"] == base64.b64encode(b"1").decode()
591594
)
592-
assert req_json[0]["taskExecutionEvent"]["taskAttempt"] == 1
593-
assert req_json[0]["taskExecutionEvent"]["taskState"] == {
594-
"8": expected_timestamp,
595-
}
595+
assert req_json[0]["taskLifecycleEvent"]["taskAttempt"] == 1
596+
assert req_json[0]["taskLifecycleEvent"]["stateTransitions"] == [
597+
{
598+
"state": "RUNNING",
599+
"timestamp": expected_timestamp,
600+
}
601+
]
596602
assert (
597-
req_json[0]["taskExecutionEvent"]["rayErrorInfo"]["errorType"]
603+
req_json[0]["taskLifecycleEvent"]["rayErrorInfo"]["errorType"]
598604
== "TASK_EXECUTION_EXCEPTION"
599605
)
600606
assert (
601-
req_json[0]["taskExecutionEvent"]["nodeId"] == base64.b64encode(b"1").decode()
607+
req_json[0]["taskLifecycleEvent"]["nodeId"] == base64.b64encode(b"1").decode()
602608
)
603609
assert (
604-
req_json[0]["taskExecutionEvent"]["workerId"] == base64.b64encode(b"1").decode()
610+
req_json[0]["taskLifecycleEvent"]["workerId"] == base64.b64encode(b"1").decode()
605611
)
606-
assert req_json[0]["taskExecutionEvent"]["workerPid"] == 1
612+
assert req_json[0]["taskLifecycleEvent"]["workerPid"] == 1
607613

608614

609615
def _create_profile_event_request(timestamp):
@@ -676,9 +682,9 @@ def _verify_profile_event_json(req_json, expected_timestamp):
676682
id="task_definition_event",
677683
),
678684
pytest.param(
679-
_create_task_execution_event_proto,
680-
_verify_task_execution_event_json,
681-
id="task_execution_event",
685+
_create_task_lifecycle_event_proto,
686+
_verify_task_lifecycle_event_json,
687+
id="task_lifecycle_event",
682688
),
683689
pytest.param(
684690
_create_profile_event_request, _verify_profile_event_json, id="profile_event"
@@ -693,7 +699,7 @@ def _verify_profile_event_json(req_json, expected_timestamp):
693699
{
694700
"env_vars": {
695701
"RAY_DASHBOARD_AGGREGATOR_AGENT_EVENTS_EXPORT_ADDR": _EVENT_AGGREGATOR_AGENT_TARGET_ADDR,
696-
"RAY_DASHBOARD_AGGREGATOR_AGENT_EXPOSABLE_EVENT_TYPES": "TASK_DEFINITION_EVENT,TASK_EXECUTION_EVENT,ACTOR_TASK_DEFINITION_EVENT,ACTOR_TASK_EXECUTION_EVENT,TASK_PROFILE_EVENT",
702+
"RAY_DASHBOARD_AGGREGATOR_AGENT_EXPOSABLE_EVENT_TYPES": "TASK_DEFINITION_EVENT,TASK_LIFECYCLE_EVENT,ACTOR_TASK_DEFINITION_EVENT,TASK_PROFILE_EVENT",
697703
},
698704
},
699705
],

src/ray/core_worker/task_event_buffer.cc

Lines changed: 24 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -216,17 +216,20 @@ void TaskStatusEvent::PopulateRpcRayTaskDefinitionEvent(T &definition_event_data
216216
}
217217
}
218218

219-
void TaskStatusEvent::PopulateRpcRayTaskExecutionEvent(
220-
rpc::events::TaskExecutionEvent &execution_event_data,
219+
void TaskStatusEvent::PopulateRpcRayTaskLifecycleEvent(
220+
rpc::events::TaskLifecycleEvent &lifecycle_event_data,
221221
google::protobuf::Timestamp timestamp) {
222222
// Task identifier
223-
execution_event_data.set_task_id(task_id_.Binary());
224-
execution_event_data.set_task_attempt(attempt_number_);
223+
lifecycle_event_data.set_task_id(task_id_.Binary());
224+
lifecycle_event_data.set_task_attempt(attempt_number_);
225225

226226
// Task state
227-
auto &task_state = *execution_event_data.mutable_task_state();
228227
if (task_status_ != rpc::TaskStatus::NIL) {
229-
task_state[task_status_] = timestamp;
228+
rpc::events::TaskLifecycleEvent::StateTransition state_transition;
229+
state_transition.set_state(task_status_);
230+
state_transition.mutable_timestamp()->CopyFrom(timestamp);
231+
*lifecycle_event_data.mutable_state_transitions()->Add() =
232+
std::move(state_transition);
230233
}
231234

232235
// Task property updates
@@ -235,30 +238,30 @@ void TaskStatusEvent::PopulateRpcRayTaskExecutionEvent(
235238
}
236239

237240
if (state_update_->error_info_.has_value()) {
238-
execution_event_data.mutable_ray_error_info()->CopyFrom(*state_update_->error_info_);
241+
lifecycle_event_data.mutable_ray_error_info()->CopyFrom(*state_update_->error_info_);
239242
}
240243

241244
if (state_update_->node_id_.has_value()) {
242245
RAY_CHECK(task_status_ == rpc::TaskStatus::SUBMITTED_TO_WORKER)
243246
.WithField("TaskStatus", task_status_)
244247
<< "Node ID should be included when task status changes to "
245248
"SUBMITTED_TO_WORKER.";
246-
execution_event_data.set_node_id(state_update_->node_id_->Binary());
249+
lifecycle_event_data.set_node_id(state_update_->node_id_->Binary());
247250
}
248251

249252
if (state_update_->worker_id_.has_value()) {
250253
RAY_CHECK(task_status_ == rpc::TaskStatus::SUBMITTED_TO_WORKER)
251254
.WithField("TaskStatus", task_status_)
252255
<< "Worker ID should be included when task status changes to "
253256
"SUBMITTED_TO_WORKER.";
254-
execution_event_data.set_worker_id(state_update_->worker_id_->Binary());
257+
lifecycle_event_data.set_worker_id(state_update_->worker_id_->Binary());
255258
}
256259

257260
if (state_update_->pid_.has_value()) {
258-
execution_event_data.set_worker_pid(state_update_->pid_.value());
261+
lifecycle_event_data.set_worker_pid(state_update_->pid_.value());
259262
}
260263

261-
execution_event_data.set_job_id(job_id_.Binary());
264+
lifecycle_event_data.set_job_id(job_id_.Binary());
262265
}
263266

264267
void TaskStatusEvent::PopulateRpcRayEventBaseFields(
@@ -278,7 +281,7 @@ void TaskStatusEvent::PopulateRpcRayEventBaseFields(
278281
ray_event.set_event_type(rpc::events::RayEvent::TASK_DEFINITION_EVENT);
279282
}
280283
} else {
281-
ray_event.set_event_type(rpc::events::RayEvent::TASK_EXECUTION_EVENT);
284+
ray_event.set_event_type(rpc::events::RayEvent::TASK_LIFECYCLE_EVENT);
282285
}
283286
}
284287

@@ -301,14 +304,14 @@ void TaskStatusEvent::ToRpcRayEvents(RayEventsTuple &ray_events_tuple) {
301304
}
302305

303306
// Populate the task execution event
304-
PopulateRpcRayEventBaseFields(ray_events_tuple.task_execution_event.has_value()
305-
? ray_events_tuple.task_execution_event.value()
306-
: ray_events_tuple.task_execution_event.emplace(),
307+
PopulateRpcRayEventBaseFields(ray_events_tuple.task_lifecycle_event.has_value()
308+
? ray_events_tuple.task_lifecycle_event.value()
309+
: ray_events_tuple.task_lifecycle_event.emplace(),
307310
false,
308311
timestamp);
309-
auto task_execution_event =
310-
ray_events_tuple.task_execution_event.value().mutable_task_execution_event();
311-
PopulateRpcRayTaskExecutionEvent(*task_execution_event, timestamp);
312+
auto task_lifecycle_event =
313+
ray_events_tuple.task_lifecycle_event.value().mutable_task_lifecycle_event();
314+
PopulateRpcRayTaskLifecycleEvent(*task_lifecycle_event, timestamp);
312315
}
313316

314317
void TaskProfileEvent::ToRpcTaskEvents(rpc::TaskEvents *rpc_task_events) {
@@ -629,9 +632,9 @@ TaskEventBufferImpl::CreateRayEventsDataToSend(
629632
auto events = data->add_events();
630633
*events = std::move(ray_events_tuple.task_definition_event.value());
631634
}
632-
if (ray_events_tuple.task_execution_event) {
635+
if (ray_events_tuple.task_lifecycle_event) {
633636
auto events = data->add_events();
634-
*events = std::move(ray_events_tuple.task_execution_event.value());
637+
*events = std::move(ray_events_tuple.task_lifecycle_event.value());
635638
}
636639
if (ray_events_tuple.task_profile_event) {
637640
auto events = data->add_events();
@@ -656,7 +659,7 @@ TaskEventBuffer::TaskEventDataToSend TaskEventBufferImpl::CreateDataToSend(
656659
const absl::flat_hash_set<TaskAttempt> &dropped_task_attempts_to_send) {
657660
// Aggregate the task events by TaskAttempt.
658661
absl::flat_hash_map<TaskAttempt, rpc::TaskEvents> agg_task_events;
659-
// (task_attempt, (task_definition_event, task_execution_event, task_profile_event))
662+
// (task_attempt, (task_definition_event, task_lifecycle_event, task_profile_event))
660663
absl::flat_hash_map<TaskAttempt, RayEventsTuple> agg_ray_events;
661664

662665
auto to_rpc_event_fn =

src/ray/core_worker/task_event_buffer.h

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -44,15 +44,15 @@ namespace worker {
4444
using TaskAttempt = std::pair<TaskID, int32_t>;
4545

4646
/// A struct containing a tuple of rpc::events::RayEvent.
47-
/// When converting the TaskStatusEvent, task_definition_event and task_execution_event
47+
/// When converting the TaskStatusEvent, task_definition_event and task_lifecycle_event
4848
/// will be populated with rpc::events::TaskDefinitionEvent and
49-
/// rpc::events::TaskExecutionEvent respectively. When converting the TaskProfileEvent,
49+
/// rpc::events::TaskLifecycleEvent respectively. When converting the TaskProfileEvent,
5050
/// task_profile_event will be populated with rpc::events::TaskProfileEvent. A struct is
51-
/// needed because the TaskProfileEvent, TaskDefinitionEvent and TaskExecutionEvent all
51+
/// needed because the TaskProfileEvent, TaskDefinitionEvent and TaskLifecycleEvent all
5252
/// can share the same task_id and attempt_number.
5353
struct RayEventsTuple {
5454
std::optional<rpc::events::RayEvent> task_definition_event;
55-
std::optional<rpc::events::RayEvent> task_execution_event;
55+
std::optional<rpc::events::RayEvent> task_lifecycle_event;
5656
std::optional<rpc::events::RayEvent> task_profile_event;
5757
};
5858

@@ -171,7 +171,7 @@ class TaskStatusEvent : public TaskEvent {
171171

172172
/// The function to convert the TaskStatusEvent class to a pair of
173173
/// rpc::events::RayEvent with rpc::events::TaskDefinitionEvent and
174-
/// rpc::events::TaskExecutionEvent respectively. The TaskExecutionEvent will always
174+
/// rpc::events::TaskLifecycleEvent respectively. The TaskLifecycleEvent will always
175175
/// be populated. The TaskDefinitionEvent will be populated only when the task_spec_
176176
/// is not null.
177177
/// NOTE: this method will modify internal states by moving fields of task_spec_ to
@@ -189,9 +189,9 @@ class TaskStatusEvent : public TaskEvent {
189189
template <typename T>
190190
void PopulateRpcRayTaskDefinitionEvent(T &definition_event_data);
191191

192-
// Helper functions to populate the task execution event of rpc::events::RayEvent
193-
void PopulateRpcRayTaskExecutionEvent(
194-
rpc::events::TaskExecutionEvent &execution_event_data,
192+
// Helper functions to populate the task lifecycle event of rpc::events::RayEvent
193+
void PopulateRpcRayTaskLifecycleEvent(
194+
rpc::events::TaskLifecycleEvent &lifecycle_event_data,
195195
google::protobuf::Timestamp timestamp);
196196

197197
// Helper functions to populate the base fields of rpc::events::RayEvent

src/ray/core_worker/tests/task_event_buffer_test.cc

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -439,9 +439,9 @@ TEST_P(TaskEventBufferTestDifferentDestination, TestFlushEvents) {
439439
auto new_event = expected_ray_events_data.add_events();
440440
*new_event = std::move(ray_events_tuple.task_definition_event.value());
441441
}
442-
if (ray_events_tuple.task_execution_event) {
442+
if (ray_events_tuple.task_lifecycle_event) {
443443
auto new_event = expected_ray_events_data.add_events();
444-
*new_event = std::move(ray_events_tuple.task_execution_event.value());
444+
*new_event = std::move(ray_events_tuple.task_lifecycle_event.value());
445445
}
446446
if (ray_events_tuple.task_profile_event) {
447447
auto new_event = expected_ray_events_data.add_events();
@@ -757,9 +757,9 @@ TEST_P(TaskEventBufferTestLimitBufferDifferentDestination,
757757
auto new_event = expected_ray_events_data.add_events();
758758
*new_event = std::move(ray_events_tuple.task_definition_event.value());
759759
}
760-
if (ray_events_tuple.task_execution_event) {
760+
if (ray_events_tuple.task_lifecycle_event) {
761761
auto new_event = expected_ray_events_data.add_events();
762-
*new_event = std::move(ray_events_tuple.task_execution_event.value());
762+
*new_event = std::move(ray_events_tuple.task_lifecycle_event.value());
763763
}
764764
if (ray_events_tuple.task_profile_event) {
765765
auto new_event = expected_ray_events_data.add_events();
@@ -960,7 +960,7 @@ TEST_F(TaskEventBufferTest, TestTaskProfileEventToRpcRayEvents) {
960960
// Verify that the second event is nullopt (empty)
961961
EXPECT_FALSE(ray_events_tuple.task_definition_event.has_value())
962962
<< "TaskProfileEvent should be populated in RayEventsTuple";
963-
EXPECT_FALSE(ray_events_tuple.task_execution_event.has_value())
963+
EXPECT_FALSE(ray_events_tuple.task_lifecycle_event.has_value())
964964
<< "TaskProfileEvent should be populated in RayEventsTuple";
965965

966966
// Verify that the first event contains the profile event
@@ -1086,9 +1086,9 @@ TEST_P(TaskEventBufferTestDifferentDestination,
10861086
auto new_event = expected_ray_events_data.add_events();
10871087
*new_event = std::move(ray_events_tuple.task_definition_event.value());
10881088
}
1089-
if (ray_events_tuple.task_execution_event) {
1089+
if (ray_events_tuple.task_lifecycle_event) {
10901090
auto new_event = expected_ray_events_data.add_events();
1091-
*new_event = std::move(ray_events_tuple.task_execution_event.value());
1091+
*new_event = std::move(ray_events_tuple.task_lifecycle_event.value());
10921092
}
10931093
if (ray_events_tuple.task_profile_event) {
10941094
auto new_event = expected_ray_events_data.add_events();

src/ray/gcs/gcs_ray_event_converter.cc

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@ GcsRayEventConverter::ConvertToTaskEventDataRequests(
3737
task_event = ConvertToTaskEvents(std::move(*event.mutable_task_definition_event()));
3838
break;
3939
}
40-
case rpc::events::RayEvent::TASK_EXECUTION_EVENT: {
41-
task_event = ConvertToTaskEvents(std::move(*event.mutable_task_execution_event()));
40+
case rpc::events::RayEvent::TASK_LIFECYCLE_EVENT: {
41+
task_event = ConvertToTaskEvents(std::move(*event.mutable_task_lifecycle_event()));
4242
break;
4343
}
4444
case rpc::events::RayEvent::TASK_PROFILE_EVENT: {
@@ -144,7 +144,7 @@ rpc::TaskEvents GcsRayEventConverter::ConvertToTaskEvents(
144144
}
145145

146146
rpc::TaskEvents GcsRayEventConverter::ConvertToTaskEvents(
147-
rpc::events::TaskExecutionEvent &&event) {
147+
rpc::events::TaskLifecycleEvent &&event) {
148148
rpc::TaskEvents task_event;
149149
task_event.set_task_id(event.task_id());
150150
task_event.set_attempt_number(event.task_attempt());
@@ -156,9 +156,9 @@ rpc::TaskEvents GcsRayEventConverter::ConvertToTaskEvents(
156156
task_state_update->set_worker_pid(event.worker_pid());
157157
task_state_update->mutable_error_info()->Swap(event.mutable_ray_error_info());
158158

159-
for (const auto &[state, timestamp] : event.task_state()) {
160-
int64_t ns = ProtoTimestampToAbslTimeNanos(timestamp);
161-
(*task_state_update->mutable_state_ts_ns())[state] = ns;
159+
for (const auto &state_transition : event.state_transitions()) {
160+
int64_t ns = ProtoTimestampToAbslTimeNanos(state_transition.timestamp());
161+
(*task_state_update->mutable_state_ts_ns())[state_transition.state()] = ns;
162162
}
163163
return task_event;
164164
}

src/ray/gcs/gcs_ray_event_converter.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,11 +51,11 @@ class GcsRayEventConverter {
5151
/// \return The output TaskEvents to populate.
5252
rpc::TaskEvents ConvertToTaskEvents(rpc::events::TaskProfileEvents &&event);
5353

54-
/// Convert a TaskExecutionEvent to a TaskEvents.
54+
/// Convert a TaskLifecycleEvent to a TaskEvents.
5555
///
56-
/// \param event The TaskExecutionEvent to convert.
56+
/// \param event The TaskLifecycleEvent to convert.
5757
/// \return The output TaskEvents to populate.
58-
rpc::TaskEvents ConvertToTaskEvents(rpc::events::TaskExecutionEvent &&event);
58+
rpc::TaskEvents ConvertToTaskEvents(rpc::events::TaskLifecycleEvent &&event);
5959

6060
/// Convert an ActorTaskDefinitionEvent to a TaskEvents.
6161
///
@@ -99,7 +99,7 @@ class GcsRayEventConverter {
9999
std::vector<rpc::AddTaskEventDataRequest> &requests_per_job_id,
100100
absl::flat_hash_map<std::string, size_t> &job_id_to_index);
101101

102-
FRIEND_TEST(GcsRayEventConverterTest, TestConvertTaskExecutionEvent);
102+
FRIEND_TEST(GcsRayEventConverterTest, TestConvertTaskLifecycleEvent);
103103
FRIEND_TEST(GcsRayEventConverterTest, TestConvertActorTaskDefinitionEvent);
104104
};
105105

0 commit comments

Comments
 (0)