Skip to content

Commit

Permalink
Add invocation target to invoker events. (#1472)
Browse files Browse the repository at this point in the history
  • Loading branch information
slinkydeveloper authored Apr 26, 2024
1 parent aba042c commit 3a9632c
Showing 1 changed file with 23 additions and 6 deletions.
29 changes: 23 additions & 6 deletions crates/invoker-impl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,7 @@ where
.resolve_invocation(partition, &invocation_id)
{
trace!(
restate.invocation.target = %ism.invocation_target,
"Chosen deployment {}. Invocation state: {:?}",
deployment_id,
ism.invocation_state_debug()
Expand Down Expand Up @@ -597,6 +598,7 @@ where
.resolve_invocation(partition, &invocation_id)
{
trace!(
restate.invocation.target = %ism.invocation_target,
"x-restate-server header {}. Invocation state: {:?}",
x_restate_server_header,
ism.invocation_state_debug()
Expand Down Expand Up @@ -637,6 +639,7 @@ where
{
ism.notify_new_entry(entry_index, requires_ack);
trace!(
restate.invocation.target = %ism.invocation_target,
"Received a new entry. Invocation state: {:?}",
ism.invocation_state_debug()
);
Expand Down Expand Up @@ -679,6 +682,7 @@ where
.resolve_invocation(partition, &invocation_id)
{
trace!(
restate.invocation.target = %ism.invocation_target,
restate.journal.index = completion.entry_index,
"Notifying completion"
);
Expand All @@ -702,12 +706,14 @@ where
partition: PartitionLeaderEpoch,
invocation_id: InvocationId,
) {
if let Some((sender, _)) = self
if let Some((sender, ism)) = self
.invocation_state_machine_manager
.remove_invocation(partition, &invocation_id)
{
counter!(INVOKER_INVOCATION_TASK, "status" => TASK_OP_COMPLETED).increment(1);
trace!("Invocation task closed correctly");
trace!(
restate.invocation.target = %ism.invocation_target,
"Invocation task closed correctly");
self.quota.unreserve_slot();
self.status_store.on_end(&partition, &invocation_id);
let _ = sender
Expand Down Expand Up @@ -736,12 +742,14 @@ where
invocation_id: InvocationId,
entry_indexes: HashSet<EntryIndex>,
) {
if let Some((sender, _)) = self
if let Some((sender, ism)) = self
.invocation_state_machine_manager
.remove_invocation(partition, &invocation_id)
{
counter!(INVOKER_INVOCATION_TASK, "status" => TASK_OP_SUSPENDED).increment(1);
trace!("Suspending invocation");
trace!(
restate.invocation.target = %ism.invocation_target,
"Suspending invocation");
self.quota.unreserve_slot();
self.status_store.on_end(&partition, &invocation_id);
let _ = sender
Expand Down Expand Up @@ -801,7 +809,9 @@ where
.invocation_state_machine_manager
.remove_invocation(partition, &invocation_id)
{
trace!("Aborting invocation");
trace!(
restate.invocation.target = %ism.invocation_target,
"Aborting invocation");
ism.abort();
self.quota.unreserve_slot();
self.status_store.on_end(&partition, &invocation_id);
Expand All @@ -825,6 +835,7 @@ where
for (fid, mut ism) in invocation_state_machines.into_iter() {
trace!(
restate.invocation.id = %fid,
restate.invocation.target = %ism.invocation_target,
"Aborting invocation"
);
ism.abort();
Expand Down Expand Up @@ -865,6 +876,7 @@ where
warn_it!(
error,
restate.invocation.id = %invocation_id,
restate.invocation.target = %ism.invocation_target,
"Error when executing the invocation, retrying in {}.",
humantime::format_duration(next_retry_timer_duration));
trace!("Invocation state: {:?}.", ism.invocation_state_debug());
Expand Down Expand Up @@ -893,6 +905,7 @@ where
warn_it!(
error,
restate.invocation.id = %invocation_id,
restate.invocation.target = %ism.invocation_target,
"Error when executing the invocation, not going to retry.");
self.quota.unreserve_slot();
self.status_store.on_end(&partition, &invocation_id);
Expand Down Expand Up @@ -935,6 +948,7 @@ where
self.status_store.on_start(partition, invocation_id);
ism.start(abort_handle, completions_tx);
trace!(
restate.invocation.target = %ism.invocation_target,
"Invocation task started state. Invocation state: {:?}",
ism.invocation_state_debug()
);
Expand All @@ -958,7 +972,9 @@ where
{
f(&mut ism);
if ism.is_ready_to_retry() {
trace!("Going to retry now");
trace!(
restate.invocation.target = %ism.invocation_target,
"Going to retry now");
self.start_invocation_task(
options,
partition,
Expand All @@ -969,6 +985,7 @@ where
.await;
} else {
trace!(
restate.invocation.target = %ism.invocation_target,
"Not going to retry. Invocation state: {:?}",
ism.invocation_state_debug()
);
Expand Down

0 comments on commit 3a9632c

Please sign in to comment.