Skip to content

Commit 75d1ebd

Browse files
Expose all run retry options (#137)
* Expose all run retry options * Remove post_init * Make retry happen instantaneously on errors. * Fix incorrect try/catch * Move take_and_send_output inside the do progress loop, and enqueue the restate event always. * Move take_and_send_output inside the do progress loop, and enqueue the restate event always. * Format
1 parent 7c52251 commit 75d1ebd

File tree

5 files changed

+110
-51
lines changed

5 files changed

+110
-51
lines changed

python/restate/context.py

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
HandlerType = Union[Callable[[Any, I], Awaitable[O]], Callable[[Any], Awaitable[O]]]
3232
RunAction = Union[Callable[..., Coroutine[Any, Any, T]], Callable[..., T]]
3333

34+
# pylint: disable=R0902
3435
@dataclass
3536
class RunOptions(typing.Generic[T]):
3637
"""
@@ -40,15 +41,32 @@ class RunOptions(typing.Generic[T]):
4041
serde: Serde[T] = DefaultSerde()
4142
"""The serialization/deserialization mechanism. - if the default serde is used, a default serializer will be used based on the type.
4243
See also 'type_hint'."""
43-
max_attempts: Optional[int] = None
44-
"""The maximum number of retry attempts, including the initial attempt, to complete the action.
45-
If None, the action will be retried indefinitely, until it succeeds.
46-
Otherwise, the action will be retried until the maximum number of attempts is reached and then it will raise a TerminalError."""
47-
max_retry_duration: Optional[timedelta] = None
48-
"""The maximum duration for retrying. If None, the action will be retried indefinitely, until it succeeds.
49-
Otherwise, the action will be retried until the maximum duration is reached and then it will raise a TerminalError."""
5044
type_hint: Optional[typing.Type[T]] = None
5145
"""The type hint of the return value of the action. This is used to pick the serializer. If None, the type hint will be inferred from the action's return type, or the provided serializer."""
46+
max_attempts: Optional[int] = None
47+
"""Max number of attempts (including the initial), before giving up.
48+
49+
When giving up, `ctx.run` will throw a `TerminalError` wrapping the original error message."""
50+
max_duration: Optional[timedelta] = None
51+
"""Max duration of retries, before giving up.
52+
53+
When giving up, `ctx.run` will throw a `TerminalError` wrapping the original error message."""
54+
initial_retry_interval: Optional[timedelta] = None
55+
"""Initial interval for the first retry attempt.
56+
Retry interval will grow by a factor specified in `retry_interval_factor`.
57+
58+
If any of the other retry related fields is specified, the default for this field is 50 milliseconds, otherwise restate will fallback to the overall invocation retry policy."""
59+
max_retry_interval: Optional[timedelta] = None
60+
"""Max interval between retries.
61+
Retry interval will grow by a factor specified in `retry_interval_factor`.
62+
63+
The default is 10 seconds."""
64+
retry_interval_factor: Optional[float] = None
65+
"""Exponentiation factor to use when computing the next retry delay.
66+
67+
If any of the other retry related fields is specified, the default for this field is `2`, meaning retry interval will double at each attempt, otherwise restate will fallback to the overall invocation retry policy."""
68+
max_retry_duration: Optional[timedelta] = None
69+
"""Deprecated: Use max_duration instead."""
5270

5371
# pylint: disable=R0903
5472
class RestateDurableFuture(typing.Generic[T], Awaitable[T]):

python/restate/server_context.py

Lines changed: 36 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -408,18 +408,14 @@ async def must_take_notification(self, handle):
408408

409409
async def create_poll_or_cancel_coroutine(self, handles: typing.List[int]) -> None:
410410
"""Create a coroutine to poll the handle."""
411-
await self.take_and_send_output()
412411
while True:
412+
await self.take_and_send_output()
413413
do_progress_response = self.vm.do_progress(handles)
414-
if isinstance(do_progress_response, Exception):
415-
# We might need to write out something at this point.
416-
await self.take_and_send_output()
414+
if isinstance(do_progress_response, BaseException):
417415
# Print this exception, might be relevant for the user
418416
traceback.print_exception(do_progress_response)
419417
await cancel_current_task()
420418
if isinstance(do_progress_response, Suspended):
421-
# We might need to write out something at this point.
422-
await self.take_and_send_output()
423419
await cancel_current_task()
424420
if isinstance(do_progress_response, DoProgressAnyCompleted):
425421
# One of the handles completed
@@ -432,9 +428,10 @@ async def create_poll_or_cancel_coroutine(self, handles: typing.List[int]) -> No
432428
assert fn is not None
433429

434430
async def wrapper(f):
435-
await f()
436-
await self.take_and_send_output()
437-
await self.receive.enqueue_restate_event({ 'type' : 'restate.run_completed', 'data': None})
431+
try:
432+
await f()
433+
finally:
434+
await self.receive.enqueue_restate_event({ 'type' : 'restate.run_completed', 'data': None})
438435

439436
task = asyncio.create_task(wrapper(fn))
440437
self.tasks.add(task)
@@ -543,9 +540,13 @@ async def create_run_coroutine(self,
543540
action: RunAction[T],
544541
serde: Serde[T],
545542
max_attempts: Optional[int] = None,
546-
max_retry_duration: Optional[timedelta] = None,
543+
max_duration: Optional[timedelta] = None,
544+
initial_retry_interval: Optional[timedelta] = None,
545+
max_retry_interval: Optional[timedelta] = None,
546+
retry_interval_factor: Optional[float] = None,
547547
):
548548
"""Create a coroutine to poll the handle."""
549+
start = time.time()
549550
try:
550551
if inspect.iscoroutinefunction(action):
551552
action_result: T = await action() # type: ignore
@@ -566,15 +567,20 @@ async def create_run_coroutine(self,
566567
raise e from None
567568
# pylint: disable=W0718
568569
except Exception as e:
569-
if max_attempts is None and max_retry_duration is None:
570-
# no retry policy
571-
# todo: log the error
572-
self.vm.notify_error(repr(e), traceback.format_exc())
573-
else:
574-
failure = Failure(code=500, message=str(e))
575-
max_duration_ms = None if max_retry_duration is None else int(max_retry_duration.total_seconds() * 1000)
576-
config = RunRetryConfig(max_attempts=max_attempts, max_duration=max_duration_ms)
577-
self.vm.propose_run_completion_transient(handle, failure=failure, attempt_duration_ms=1, config=config)
570+
end = time.time()
571+
attempt_duration = int((end - start) * 1000)
572+
failure = Failure(code=500, message=str(e))
573+
max_duration_ms = None if max_duration is None else int(max_duration.total_seconds() * 1000)
574+
initial_retry_interval_ms = None if initial_retry_interval is None else int(initial_retry_interval.total_seconds() * 1000)
575+
max_retry_interval_ms = None if max_retry_interval is None else int(max_retry_interval.total_seconds() * 1000)
576+
config = RunRetryConfig(
577+
max_attempts=max_attempts,
578+
max_duration=max_duration_ms,
579+
initial_interval=initial_retry_interval_ms,
580+
max_interval=max_retry_interval_ms,
581+
interval_factor=retry_interval_factor
582+
)
583+
self.vm.propose_run_completion_transient(handle, failure=failure, attempt_duration_ms=attempt_duration, config=config)
578584
# pylint: disable=W0236
579585
# pylint: disable=R0914
580586
def run(self,
@@ -601,7 +607,7 @@ def run(self,
601607
else:
602608
# todo: we can also verify by looking at the signature that there are no missing parameters
603609
noargs_action = action # type: ignore
604-
self.run_coros_to_execute[handle] = lambda : self.create_run_coroutine(handle, noargs_action, serde, max_attempts, max_retry_duration)
610+
self.run_coros_to_execute[handle] = lambda : self.create_run_coroutine(handle, noargs_action, serde, max_attempts, max_retry_duration, None, None, None)
605611
return self.create_future(handle, serde) # type: ignore
606612

607613
def run_typed(
@@ -624,7 +630,16 @@ def run_typed(
624630
update_restate_context_is_replaying(self.vm)
625631

626632
func = functools.partial(action, *args, **kwargs)
627-
self.run_coros_to_execute[handle] = lambda : self.create_run_coroutine(handle, func, options.serde, options.max_attempts, options.max_retry_duration)
633+
self.run_coros_to_execute[handle] = lambda : self.create_run_coroutine(
634+
handle,
635+
func,
636+
options.serde,
637+
options.max_attempts,
638+
options.max_duration,
639+
options.initial_retry_interval,
640+
options.max_retry_interval,
641+
options.retry_interval_factor
642+
)
628643
return self.create_future(handle, options.serde)
629644

630645
def sleep(self, delta: timedelta, name: Optional[str] = None) -> RestateDurableSleepFuture:

python/restate/vm.py

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,15 @@ class Invocation:
3333
@dataclass
3434
class RunRetryConfig:
3535
"""
36-
Expo Retry Configuration
36+
Exponential Retry Configuration
37+
38+
All duration/interval values are in milliseconds.
3739
"""
3840
initial_interval: typing.Optional[int] = None
3941
max_attempts: typing.Optional[int] = None
4042
max_duration: typing.Optional[int] = None
43+
max_interval: typing.Optional[int] = None
44+
interval_factor: typing.Optional[float] = None
4145

4246
@dataclass
4347
class Failure:
@@ -394,22 +398,20 @@ def propose_run_completion_failure(self, handle: int, output: Failure) -> int:
394398
return self.vm.propose_run_completion_failure(handle, res)
395399

396400
# pylint: disable=line-too-long
397-
def propose_run_completion_transient(self, handle: int, failure: Failure, attempt_duration_ms: int, config: RunRetryConfig) -> int | None:
401+
def propose_run_completion_transient(self, handle: int, failure: Failure, attempt_duration_ms: int, config: RunRetryConfig):
398402
"""
399403
Exit a side effect with a transient Error.
400404
This requires a retry policy to be provided.
401405
"""
402406
py_failure = PyFailure(failure.code, failure.message)
403-
py_config = PyExponentialRetryConfig(config.initial_interval, config.max_attempts, config.max_duration)
404-
try:
405-
handle = self.vm.propose_run_completion_failure_transient(handle, py_failure, attempt_duration_ms, py_config)
406-
# The VM decided not to retry, therefore we get back an handle that will be resolved
407-
# with a terminal failure.
408-
return handle
409-
# pylint: disable=bare-except
410-
except:
411-
# The VM decided to retry, therefore we tear down the current execution
412-
return None
407+
py_config = PyExponentialRetryConfig(
408+
config.initial_interval,
409+
config.max_attempts,
410+
config.max_duration,
411+
config.max_interval,
412+
config.interval_factor
413+
)
414+
self.vm.propose_run_completion_failure_transient(handle, py_failure, attempt_duration_ms, py_config)
413415

414416
def sys_end(self):
415417
"""

src/lib.rs

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
1-
use std::fmt;
21
use pyo3::create_exception;
32
use pyo3::prelude::*;
43
use pyo3::types::{PyBytes, PyNone, PyString};
4+
use restate_sdk_shared_core::fmt::{set_error_formatter, ErrorFormatter};
55
use restate_sdk_shared_core::{
66
CallHandle, CoreVM, DoProgressResponse, Error, Header, IdentityVerifier, Input, NonEmptyValue,
77
NotificationHandle, ResponseHead, RetryPolicy, RunExitResult, TakeOutputResult, Target,
88
TerminalFailure, VMOptions, Value, CANCEL_NOTIFICATION_HANDLE, VM,
99
};
10+
use std::fmt;
1011
use std::time::{Duration, SystemTime};
11-
use restate_sdk_shared_core::fmt::{set_error_formatter, ErrorFormatter};
1212

1313
// Current crate version
1414
const CURRENT_VERSION: &str = env!("CARGO_PKG_VERSION");
@@ -115,33 +115,55 @@ struct PyExponentialRetryConfig {
115115
max_attempts: Option<u32>,
116116
#[pyo3(get, set)]
117117
max_duration: Option<u64>,
118+
#[pyo3(get, set)]
119+
max_interval: Option<u64>,
120+
#[pyo3(get, set)]
121+
factor: Option<f64>,
118122
}
119123

120124
#[pymethods]
121125
impl PyExponentialRetryConfig {
122-
#[pyo3(signature = (initial_interval=None, max_attempts=None, max_duration=None))]
126+
#[pyo3(signature = (initial_interval=None, max_attempts=None, max_duration=None, max_interval=None, factor=None))]
123127
#[new]
124128
fn new(
125129
initial_interval: Option<u64>,
126130
max_attempts: Option<u32>,
127131
max_duration: Option<u64>,
132+
max_interval: Option<u64>,
133+
factor: Option<f64>,
128134
) -> Self {
129135
Self {
130136
initial_interval,
131137
max_attempts,
132138
max_duration,
139+
max_interval,
140+
factor,
133141
}
134142
}
135143
}
136144

137145
impl From<PyExponentialRetryConfig> for RetryPolicy {
138146
fn from(value: PyExponentialRetryConfig) -> Self {
139-
RetryPolicy::Exponential {
140-
initial_interval: Duration::from_millis(value.initial_interval.unwrap_or(10)),
141-
max_attempts: value.max_attempts,
142-
max_duration: value.max_duration.map(Duration::from_millis),
143-
factor: 2.0,
144-
max_interval: None,
147+
if value.initial_interval.is_some()
148+
|| value.max_attempts.is_some()
149+
|| value.max_duration.is_some()
150+
|| value.max_interval.is_some()
151+
|| value.factor.is_some()
152+
{
153+
// If any of the values are set, then let's create the exponential retry policy
154+
RetryPolicy::Exponential {
155+
initial_interval: Duration::from_millis(value.initial_interval.unwrap_or(50)),
156+
max_attempts: value.max_attempts,
157+
max_duration: value.max_duration.map(Duration::from_millis),
158+
factor: value.factor.unwrap_or(2.0) as f32,
159+
max_interval: value
160+
.max_interval
161+
.map(Duration::from_millis)
162+
.or_else(|| Some(Duration::from_secs(10))),
163+
}
164+
} else {
165+
// Let's use retry policy infinite here, which will give back control to the invocation retry policy
166+
RetryPolicy::Infinite
145167
}
146168
}
147169
}

test-services/services/failing.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
# https://github.com/restatedev/sdk-typescript/blob/main/LICENSE
1010
#
1111
"""example.py"""
12+
from datetime import timedelta
13+
1214
# pylint: disable=C0116
1315
# pylint: disable=W0613
1416
# pylint: disable=W0622
@@ -61,7 +63,7 @@ def side_effect():
6163
return eventual_success_side_effects
6264
raise ValueError(f"Failed at attempt: {eventual_success_side_effects}")
6365

64-
options: RunOptions[int] = RunOptions(max_attempts=minimum_attempts + 1)
66+
options: RunOptions[int] = RunOptions(max_attempts=minimum_attempts + 1, initial_retry_interval=timedelta(milliseconds=1), retry_interval_factor=1.0)
6567
return await ctx.run_typed("sideEffect", side_effect, options)
6668

6769
eventual_failure_side_effects = 0
@@ -75,7 +77,7 @@ def side_effect():
7577
raise ValueError(f"Failed at attempt: {eventual_failure_side_effects}")
7678

7779
try:
78-
options: RunOptions[int] = RunOptions(max_attempts=retry_policy_max_retry_count)
80+
options: RunOptions[int] = RunOptions(max_attempts=retry_policy_max_retry_count, initial_retry_interval=timedelta(milliseconds=1), retry_interval_factor=1.0)
7981
await ctx.run_typed("sideEffect", side_effect, options)
8082
raise ValueError("Side effect did not fail.")
8183
except TerminalError as t:

0 commit comments

Comments
 (0)