Skip to content

Commit dd25776

Browse files
Implement Side effect retries
* Introduce Protocol Version V2 * Also adds a workaround to make sure typescript SDK <= 1.2.1 works with Restate 1.1
1 parent bbf4f29 commit dd25776

File tree

13 files changed

+200
-48
lines changed

13 files changed

+200
-48
lines changed

crates/invoker-api/src/journal_reader.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use restate_types::identifiers::InvocationId;
1414
use restate_types::invocation::ServiceInvocationSpanContext;
1515
use restate_types::journal::raw::PlainRawEntry;
1616
use restate_types::journal::EntryIndex;
17+
use restate_types::time::MillisSinceEpoch;
1718
use std::future::Future;
1819

1920
/// Metadata associated with a journal
@@ -22,18 +23,25 @@ pub struct JournalMetadata {
2223
pub length: EntryIndex,
2324
pub span_context: ServiceInvocationSpanContext,
2425
pub pinned_deployment: Option<PinnedDeployment>,
26+
/// This value is not agreed among Partition processor replicas right now.
27+
///
28+
/// The upper bound for the total clock skew is the clock skew of the different machines
29+
/// and the max time difference between two replicas applying the journal append command.
30+
pub last_modification_date: MillisSinceEpoch,
2531
}
2632

2733
impl JournalMetadata {
2834
pub fn new(
2935
length: EntryIndex,
3036
span_context: ServiceInvocationSpanContext,
3137
pinned_deployment: Option<PinnedDeployment>,
38+
last_modification_date: MillisSinceEpoch,
3239
) -> Self {
3340
Self {
3441
pinned_deployment,
3542
span_context,
3643
length,
44+
last_modification_date,
3745
}
3846
}
3947
}

crates/invoker-api/src/lib.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ pub mod test_util {
3333
use restate_types::invocation::{InvocationTarget, ServiceInvocationSpanContext};
3434
use restate_types::journal::raw::PlainRawEntry;
3535
use restate_types::journal::Completion;
36+
use restate_types::time::MillisSinceEpoch;
3637
use std::convert::Infallible;
3738
use std::iter::empty;
3839
use std::marker::PhantomData;
@@ -51,7 +52,12 @@ pub mod test_util {
5152
_sid: &'a InvocationId,
5253
) -> Result<(JournalMetadata, Self::JournalStream), Self::Error> {
5354
Ok((
54-
JournalMetadata::new(0, ServiceInvocationSpanContext::empty(), None),
55+
JournalMetadata::new(
56+
0,
57+
ServiceInvocationSpanContext::empty(),
58+
None,
59+
MillisSinceEpoch::UNIX_EPOCH,
60+
),
5561
futures::stream::empty(),
5662
))
5763
}

crates/invoker-impl/src/invocation_state_machine.rs

Lines changed: 64 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ pub(super) struct InvocationStateMachine {
2323
pub(super) invocation_target: InvocationTarget,
2424
invocation_state: InvocationState,
2525
retry_iter: retries::RetryIter<'static>,
26+
pub(super) retry_count_since_last_stored_entry: u32,
2627
}
2728

2829
/// This struct tracks which entries the invocation task generates,
@@ -57,7 +58,7 @@ impl JournalTracker {
5758
self.last_entry_sent_to_partition_processor,
5859
) {
5960
(_, None) => {
60-
// The invocation task didn't generated new entries.
61+
// The invocation task didn't generate new entries.
6162
// We're always good to retry in this case.
6263
true
6364
}
@@ -135,6 +136,7 @@ impl InvocationStateMachine {
135136
invocation_target,
136137
invocation_state: InvocationState::New,
137138
retry_iter: retry_policy.into_iter(),
139+
retry_count_since_last_stored_entry: 0,
138140
}
139141
}
140142

@@ -202,6 +204,8 @@ impl InvocationStateMachine {
202204
InvocationState::InFlight { .. }
203205
));
204206

207+
self.retry_count_since_last_stored_entry = 0;
208+
205209
if let InvocationState::InFlight {
206210
journal_tracker,
207211
entries_to_ack,
@@ -271,7 +275,10 @@ impl InvocationStateMachine {
271275
}
272276

273277
/// Returns Some() with the timer for the next retry, otherwise None if retry limit exhausted
274-
pub(super) fn handle_task_error(&mut self) -> Option<Duration> {
278+
pub(super) fn handle_task_error(
279+
&mut self,
280+
next_retry_interval_override: Option<Duration>,
281+
) -> Option<Duration> {
275282
let journal_tracker = match &self.invocation_state {
276283
InvocationState::InFlight {
277284
journal_tracker, ..
@@ -289,9 +296,10 @@ impl InvocationStateMachine {
289296
*journal_tracker
290297
}
291298
};
292-
let next_timer = self.retry_iter.next();
293299

300+
let next_timer = next_retry_interval_override.or_else(|| self.retry_iter.next());
294301
if next_timer.is_some() {
302+
self.retry_count_since_last_stored_entry += 1;
295303
self.invocation_state = InvocationState::WaitingRetry {
296304
timer_fired: false,
297305
journal_tracker,
@@ -339,16 +347,67 @@ mod tests {
339347
RetryPolicy::fixed_delay(Duration::from_secs(1), Some(10)),
340348
);
341349

342-
assert!(invocation_state_machine.handle_task_error().is_some());
350+
assert!(invocation_state_machine.handle_task_error(None).is_some());
343351
check!(let InvocationState::WaitingRetry { .. } = invocation_state_machine.invocation_state);
344352

345353
invocation_state_machine.notify_retry_timer_fired();
346354

347355
// We stay in `WaitingForRetry`
348-
assert!(invocation_state_machine.handle_task_error().is_some());
356+
assert!(invocation_state_machine.handle_task_error(None).is_some());
349357
check!(let InvocationState::WaitingRetry { .. } = invocation_state_machine.invocation_state);
350358
}
351359

360+
#[test(tokio::test)]
361+
async fn handle_error_counts_attempts_on_same_entry() {
362+
let mut invocation_state_machine = InvocationStateMachine::create(
363+
InvocationTarget::mock_virtual_object(),
364+
RetryPolicy::fixed_delay(Duration::from_secs(1), Some(10)),
365+
);
366+
367+
// Start invocation
368+
invocation_state_machine.start(
369+
tokio::spawn(async {}).abort_handle(),
370+
mpsc::unbounded_channel().0,
371+
);
372+
373+
// Notify error
374+
assert!(invocation_state_machine.handle_task_error(None).is_some());
375+
assert_eq!(
376+
invocation_state_machine.retry_count_since_last_stored_entry,
377+
1
378+
);
379+
380+
// Try to start again
381+
invocation_state_machine.start(
382+
tokio::spawn(async {}).abort_handle(),
383+
mpsc::unbounded_channel().0,
384+
);
385+
386+
// Get error again
387+
assert!(invocation_state_machine.handle_task_error(None).is_some());
388+
assert_eq!(
389+
invocation_state_machine.retry_count_since_last_stored_entry,
390+
2
391+
);
392+
393+
// Try to start again
394+
invocation_state_machine.start(
395+
tokio::spawn(async {}).abort_handle(),
396+
mpsc::unbounded_channel().0,
397+
);
398+
assert_eq!(
399+
invocation_state_machine.retry_count_since_last_stored_entry,
400+
2
401+
);
402+
403+
// Now complete the entry
404+
invocation_state_machine.notify_new_entry(1, false);
405+
assert_eq!(
406+
invocation_state_machine.retry_count_since_last_stored_entry,
407+
0
408+
);
409+
}
410+
352411
#[test(tokio::test)]
353412
async fn handle_requires_ack() {
354413
let mut invocation_state_machine = InvocationStateMachine::create(

crates/invoker-impl/src/invocation_task/mod.rs

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,10 @@ use tracing::instrument;
5656
const SERVICE_PROTOCOL_VERSION_V1: HeaderValue =
5757
HeaderValue::from_static("application/vnd.restate.invocation.v1");
5858

59+
#[allow(clippy::declare_interior_mutable_const)]
60+
const SERVICE_PROTOCOL_VERSION_V2: HeaderValue =
61+
HeaderValue::from_static("application/vnd.restate.invocation.v2");
62+
5963
#[allow(clippy::declare_interior_mutable_const)]
6064
const X_RESTATE_SERVER: HeaderName = HeaderName::from_static("x-restate-server");
6165

@@ -126,12 +130,14 @@ pub(crate) enum InvocationTaskError {
126130
#[code(unknown)]
127131
EntryEnrichment(EntryIndex, EntryType, #[source] InvocationError),
128132

129-
#[error("Error message received from the SDK with related entry {0:?}: {1}")]
133+
#[error("Error message received from the SDK with related entry {related_entry:?}: {error}")]
130134
#[code(restate_errors::RT0007)]
131-
ErrorMessageReceived(
132-
Option<InvocationErrorRelatedEntry>,
133-
#[source] InvocationError,
134-
),
135+
ErrorMessageReceived {
136+
related_entry: Option<InvocationErrorRelatedEntry>,
137+
next_retry_interval_override: Option<Duration>,
138+
#[source]
139+
error: InvocationError,
140+
},
135141
#[error("cannot talk to service endpoint '{0}' because its service protocol versions [{}, {}] are incompatible with the server's service protocol versions [{}, {}].", .1.start(), .1.end(), i32::from(MIN_SERVICE_PROTOCOL_VERSION), i32::from(MAX_SERVICE_PROTOCOL_VERSION))]
136142
#[code(restate_errors::RT0013)]
137143
IncompatibleServiceEndpoint(DeploymentId, RangeInclusive<i32>),
@@ -156,9 +162,19 @@ impl InvocationTaskError {
156162
true
157163
}
158164

165+
pub(crate) fn next_retry_interval_override(&self) -> Option<Duration> {
166+
match self {
167+
InvocationTaskError::ErrorMessageReceived {
168+
next_retry_interval_override,
169+
..
170+
} => *next_retry_interval_override,
171+
_ => None,
172+
}
173+
}
174+
159175
pub(crate) fn into_invocation_error(self) -> InvocationError {
160176
match self {
161-
InvocationTaskError::ErrorMessageReceived(_, e) => e,
177+
InvocationTaskError::ErrorMessageReceived { error, .. } => error,
162178
InvocationTaskError::EntryEnrichment(entry_index, entry_type, e) => {
163179
let msg = format!(
164180
"Error when processing entry {} of type {}: {}",
@@ -179,9 +195,10 @@ impl InvocationTaskError {
179195
pub(crate) fn into_invocation_error_report(mut self) -> InvocationErrorReport {
180196
let doc_error_code = codederror::CodedError::code(&self);
181197
let maybe_related_entry = match self {
182-
InvocationTaskError::ErrorMessageReceived(ref mut related_entry, _) => {
183-
related_entry.take()
184-
}
198+
InvocationTaskError::ErrorMessageReceived {
199+
ref mut related_entry,
200+
..
201+
} => related_entry.take(),
185202
_ => None,
186203
}
187204
.unwrap_or_default();
@@ -247,6 +264,7 @@ pub(super) struct InvocationTask<SR, JR, EE, DMR> {
247264
disable_eager_state: bool,
248265
message_size_warning: usize,
249266
message_size_limit: Option<usize>,
267+
retry_count_since_last_stored_entry: u32,
250268

251269
// Invoker tx/rx
252270
state_reader: SR,
@@ -307,6 +325,7 @@ where
307325
disable_eager_state: bool,
308326
message_size_warning: usize,
309327
message_size_limit: Option<usize>,
328+
retry_count_since_last_stored_entry: u32,
310329
state_reader: SR,
311330
journal_reader: JR,
312331
entry_enricher: EE,
@@ -330,6 +349,7 @@ where
330349
invoker_rx,
331350
message_size_limit,
332351
message_size_warning,
352+
retry_count_since_last_stored_entry,
333353
}
334354
}
335355

@@ -479,6 +499,7 @@ fn service_protocol_version_to_header_value(
479499
unreachable!("unknown protocol version should never be chosen")
480500
}
481501
ServiceProtocolVersion::V1 => SERVICE_PROTOCOL_VERSION_V1,
502+
ServiceProtocolVersion::V2 => SERVICE_PROTOCOL_VERSION_V2,
482503
}
483504
}
484505

crates/invoker-impl/src/invocation_task/service_protocol_runner.rs

Lines changed: 26 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,10 @@ use restate_types::schema::deployment::{
3737
Deployment, DeploymentMetadata, DeploymentType, ProtocolType,
3838
};
3939
use restate_types::service_protocol::ServiceProtocolVersion;
40+
use restate_types::time::MillisSinceEpoch;
4041
use std::collections::HashSet;
4142
use std::future::poll_fn;
43+
use std::time::Duration;
4244
use tokio::sync::mpsc;
4345
use tokio_stream::wrappers::ReceiverStream;
4446
use tracing::{debug, info, trace, warn, Span};
@@ -152,8 +154,14 @@ where
152154
);
153155

154156
crate::shortcircuit!(
155-
self.write_start(&mut http_stream_tx, journal_size, state_iter)
156-
.await
157+
self.write_start(
158+
&mut http_stream_tx,
159+
journal_size,
160+
state_iter,
161+
self.invocation_task.retry_count_since_last_stored_entry,
162+
journal_metadata.last_modification_date
163+
)
164+
.await
157165
);
158166

159167
// Initialize the response stream state
@@ -346,10 +354,8 @@ where
346354
ResponseChunk::Data(buf) => crate::shortcircuit!(self.handle_read(parent_span_context, buf)),
347355
ResponseChunk::End => {
348356
// Response stream was closed without SuspensionMessage, EndMessage or ErrorMessage
349-
return TerminalLoopState::Failed(InvocationTaskError::ErrorMessageReceived(
350-
None,
351-
InvocationError::default()
352-
))
357+
return TerminalLoopState::Failed(InvocationTaskError::ErrorMessageReceived{
358+
related_entry: None,next_retry_interval_override: None,error: InvocationError::default(),})
353359
}
354360
}
355361
},
@@ -376,10 +382,11 @@ where
376382
ResponseChunk::Data(buf) => crate::shortcircuit!(self.handle_read(parent_span_context, buf)),
377383
ResponseChunk::End => {
378384
// Response stream was closed without SuspensionMessage, EndMessage or ErrorMessage
379-
return TerminalLoopState::Failed(InvocationTaskError::ErrorMessageReceived(
380-
None,
381-
InvocationError::default()
382-
))
385+
return TerminalLoopState::Failed(InvocationTaskError::ErrorMessageReceived {
386+
related_entry: None,
387+
next_retry_interval_override: None,
388+
error: InvocationError::default(),
389+
})
383390
}
384391
}
385392
},
@@ -398,6 +405,8 @@ where
398405
http_stream_tx: &mut InvokerRequestStreamSender,
399406
journal_size: u32,
400407
state_entries: EagerState<I>,
408+
retry_count_since_last_stored_entry: u32,
409+
duration_since_last_stored_entry: MillisSinceEpoch,
401410
) -> Result<(), InvocationTaskError> {
402411
let is_partial = state_entries.is_partial();
403412

@@ -414,6 +423,8 @@ where
414423
journal_size,
415424
is_partial,
416425
state_entries,
426+
retry_count_since_last_stored_entry,
427+
duration_since_last_stored_entry,
417428
),
418429
)
419430
.await
@@ -530,8 +541,8 @@ where
530541
TerminalLoopState::Suspended(suspension_indexes)
531542
}
532543
ProtocolMessage::Error(e) => {
533-
TerminalLoopState::Failed(InvocationTaskError::ErrorMessageReceived(
534-
Some(InvocationErrorRelatedEntry {
544+
TerminalLoopState::Failed(InvocationTaskError::ErrorMessageReceived {
545+
related_entry: Some(InvocationErrorRelatedEntry {
535546
related_entry_index: e.related_entry_index,
536547
related_entry_name: e.related_entry_name.clone(),
537548
related_entry_type: e
@@ -540,8 +551,9 @@ where
540551
.and_then(|idx| MessageType::try_from(idx).ok())
541552
.and_then(|mt| EntryType::try_from(mt).ok()),
542553
}),
543-
InvocationError::from(e),
544-
))
554+
next_retry_interval_override: e.next_retry_delay.map(Duration::from_millis),
555+
error: InvocationError::from(e),
556+
})
545557
}
546558
ProtocolMessage::End(_) => TerminalLoopState::Closed,
547559
ProtocolMessage::UnparsedEntry(entry) => {

0 commit comments

Comments
 (0)