Skip to content

Commit 39adc52

Browse files
Refactor Endpoint internals. It's ok to depend on http crate, and this reduces the number of additional abstractions needed.
1 parent 24c829d commit 39adc52

File tree

4 files changed

+638
-626
lines changed

4 files changed

+638
-626
lines changed

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ tracing-span-filter = ["dep:tracing-subscriber"]
2727
bytes = "1.10"
2828
futures = "0.3"
2929
http = "1.3"
30+
http-body = "1.0.1"
3031
http-body-util = { version = "0.1", optional = true }
3132
hyper = { version = "1.6", optional = true}
3233
hyper-util = { version = "0.1", features = ["tokio", "server", "server-graceful", "http2"], optional = true }
@@ -35,7 +36,7 @@ rand = { version = "0.9", optional = true }
3536
regress = "0.10"
3637
restate-sdk-macros = { version = "0.6", path = "macros" }
3738
restate-sdk-shared-core = { version = "=0.4.0", features = ["request_identity", "sha2_random_seed", "http"] }
38-
schemars = { version = "1.0.0-alpha.17", optional = true }
39+
schemars = { version = "1.0.0", optional = true }
3940
serde = "1.0"
4041
serde_json = "1.0"
4142
thiserror = "2.0"

src/endpoint/builder.rs

Lines changed: 270 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,270 @@
1+
use crate::endpoint::{BoxedService, Endpoint, EndpointInner, Error};
2+
use crate::service::{Discoverable, Service};
3+
use futures::future::BoxFuture;
4+
use restate_sdk_shared_core::{IdentityVerifier, KeyError};
5+
use std::collections::HashMap;
6+
use std::sync::Arc;
7+
use std::time::Duration;
8+
9+
/// Various configuration options that can be provided when binding a service
10+
#[derive(Default, Debug, Clone)]
11+
pub struct ServiceOptions {
12+
pub(crate) metadata: HashMap<String, String>,
13+
pub(crate) inactivity_timeout: Option<Duration>,
14+
pub(crate) abort_timeout: Option<Duration>,
15+
pub(crate) idempotency_retention: Option<Duration>,
16+
pub(crate) journal_retention: Option<Duration>,
17+
pub(crate) enable_lazy_state: Option<bool>,
18+
pub(crate) ingress_private: Option<bool>,
19+
pub(crate) handler_options: HashMap<String, HandlerOptions>,
20+
21+
_priv: (),
22+
}
23+
24+
impl ServiceOptions {
25+
pub fn new() -> Self {
26+
Self::default()
27+
}
28+
29+
/// This timer guards against stalled invocations. Once it expires, Restate triggers a graceful
30+
/// termination by asking the invocation to suspend (which preserves intermediate progress).
31+
///
32+
/// The abort_timeout is used to abort the invocation, in case it doesn't react to the request to
33+
/// suspend.
34+
///
35+
/// This overrides the default inactivity timeout configured in the restate-server for all
36+
/// invocations to this service.
37+
pub fn inactivity_timeout(mut self, timeout: Duration) -> Self {
38+
self.inactivity_timeout = Some(timeout);
39+
self
40+
}
41+
42+
/// This timer guards against stalled service/handler invocations that are supposed to terminate. The
43+
/// abort timeout is started after the inactivity_timeout has expired and the service/handler
44+
/// invocation has been asked to gracefully terminate. Once the timer expires, it will abort the
45+
/// service/handler invocation.
46+
///
47+
/// This timer potentially *interrupts* user code. If the user code needs longer to gracefully
48+
/// terminate, then this value needs to be set accordingly.
49+
///
50+
/// This overrides the default abort timeout configured in the restate-server for all invocations to
51+
/// this service.
52+
pub fn abort_timeout(mut self, timeout: Duration) -> Self {
53+
self.abort_timeout = Some(timeout);
54+
self
55+
}
56+
57+
/// The retention duration of idempotent requests to this service.
58+
pub fn idempotency_retention(mut self, retention: Duration) -> Self {
59+
self.idempotency_retention = Some(retention);
60+
self
61+
}
62+
63+
/// The journal retention. When set, this applies to all requests to all handlers of this service.
64+
///
65+
/// In case the request has an idempotency key, the idempotency_retention caps the journal retention
66+
/// time.
67+
pub fn journal_retention(mut self, retention: Duration) -> Self {
68+
self.journal_retention = Some(retention);
69+
self
70+
}
71+
72+
/// When set to `true`, lazy state will be enabled for all invocations to this service. This is
73+
/// relevant only for workflows and virtual objects.
74+
pub fn enable_lazy_state(mut self, enable: bool) -> Self {
75+
self.enable_lazy_state = Some(enable);
76+
self
77+
}
78+
79+
/// When set to `true` this service, with all its handlers, cannot be invoked from the restate-server
80+
/// HTTP and Kafka ingress, but only from other services.
81+
pub fn ingress_private(mut self, private: bool) -> Self {
82+
self.ingress_private = Some(private);
83+
self
84+
}
85+
86+
/// Custom metadata of this service definition. This metadata is shown on the Admin API when querying the service definition.
87+
pub fn metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
88+
self.metadata.insert(key.into(), value.into());
89+
self
90+
}
91+
92+
/// Handler-specific options.
93+
///
94+
/// *Note*: If you provide a handler name for a non-existing handler, binding the service will *panic!*.
95+
pub fn handler(mut self, handler_name: impl Into<String>, options: HandlerOptions) -> Self {
96+
self.handler_options.insert(handler_name.into(), options);
97+
self
98+
}
99+
}
100+
101+
/// Various configuration options that can be provided when binding a service handler
102+
#[derive(Default, Debug, Clone)]
103+
pub struct HandlerOptions {
104+
pub(crate) metadata: HashMap<String, String>,
105+
pub(crate) inactivity_timeout: Option<Duration>,
106+
pub(crate) abort_timeout: Option<Duration>,
107+
pub(crate) idempotency_retention: Option<Duration>,
108+
pub(crate) workflow_retention: Option<Duration>,
109+
pub(crate) journal_retention: Option<Duration>,
110+
pub(crate) ingress_private: Option<bool>,
111+
pub(crate) enable_lazy_state: Option<bool>,
112+
113+
_priv: (),
114+
}
115+
116+
impl HandlerOptions {
117+
pub fn new() -> Self {
118+
Self::default()
119+
}
120+
121+
/// Custom metadata of this handler definition. This metadata is shown on the Admin API when querying the service/handler definition.
122+
pub fn metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
123+
self.metadata.insert(key.into(), value.into());
124+
self
125+
}
126+
127+
/// This timer guards against stalled invocations. Once it expires, Restate triggers a graceful
128+
/// termination by asking the invocation to suspend (which preserves intermediate progress).
129+
///
130+
/// The abort_timeout is used to abort the invocation, in case it doesn't react to the request to
131+
/// suspend.
132+
///
133+
/// This overrides the inactivity timeout set for the service and the default set in restate-server.
134+
pub fn inactivity_timeout(mut self, timeout: Duration) -> Self {
135+
self.inactivity_timeout = Some(timeout);
136+
self
137+
}
138+
139+
/// This timer guards against stalled invocations that are supposed to terminate. The abort timeout
140+
/// is started after the inactivity_timeout has expired and the invocation has been asked to
141+
/// gracefully terminate. Once the timer expires, it will abort the invocation.
142+
///
143+
/// This timer potentially *interrupts* user code. If the user code needs longer to gracefully
144+
/// terminate, then this value needs to be set accordingly.
145+
///
146+
/// This overrides the abort timeout set for the service and the default set in restate-server.
147+
pub fn abort_timeout(mut self, timeout: Duration) -> Self {
148+
self.abort_timeout = Some(timeout);
149+
self
150+
}
151+
152+
/// The retention duration of idempotent requests to this service.
153+
pub fn idempotency_retention(mut self, retention: Duration) -> Self {
154+
self.idempotency_retention = Some(retention);
155+
self
156+
}
157+
158+
/// The retention duration for this workflow handler.
159+
pub fn workflow_retention(mut self, retention: Duration) -> Self {
160+
self.workflow_retention = Some(retention);
161+
self
162+
}
163+
164+
/// The journal retention for invocations to this handler.
165+
///
166+
/// In case the request has an idempotency key, the idempotency_retention caps the journal retention
167+
/// time.
168+
pub fn journal_retention(mut self, retention: Duration) -> Self {
169+
self.journal_retention = Some(retention);
170+
self
171+
}
172+
173+
/// When set to `true` this handler cannot be invoked from the restate-server HTTP and Kafka ingress,
174+
/// but only from other services.
175+
pub fn ingress_private(mut self, private: bool) -> Self {
176+
self.ingress_private = Some(private);
177+
self
178+
}
179+
180+
/// When set to `true`, lazy state will be enabled for all invocations to this handler. This is
181+
/// relevant only for workflows and virtual objects.
182+
pub fn enable_lazy_state(mut self, enable: bool) -> Self {
183+
self.enable_lazy_state = Some(enable);
184+
self
185+
}
186+
}
187+
188+
/// Builder for [`Endpoint`]
189+
pub struct Builder {
190+
svcs: HashMap<String, BoxedService>,
191+
discovery: crate::discovery::Endpoint,
192+
identity_verifier: IdentityVerifier,
193+
}
194+
195+
impl Default for Builder {
196+
fn default() -> Self {
197+
Self {
198+
svcs: Default::default(),
199+
discovery: crate::discovery::Endpoint {
200+
max_protocol_version: 5,
201+
min_protocol_version: 5,
202+
protocol_mode: Some(crate::discovery::ProtocolMode::BidiStream),
203+
services: vec![],
204+
},
205+
identity_verifier: Default::default(),
206+
}
207+
}
208+
}
209+
210+
impl Builder {
211+
/// Create a new builder for [`Endpoint`].
212+
pub fn new() -> Self {
213+
Self::default()
214+
}
215+
216+
/// Add a [`Service`] to this endpoint.
217+
///
218+
/// When using the [`service`](macro@crate::service), [`object`](macro@crate::object) or [`workflow`](macro@crate::workflow) macros,
219+
/// you need to pass the result of the `serve` method.
220+
pub fn bind<
221+
S: Service<Future = BoxFuture<'static, Result<(), Error>>>
222+
+ Discoverable
223+
+ Send
224+
+ Sync
225+
+ 'static,
226+
>(
227+
self,
228+
s: S,
229+
) -> Self {
230+
self.bind_with_options(s, ServiceOptions::default())
231+
}
232+
233+
/// Like [`bind`], but providing options
234+
pub fn bind_with_options<
235+
S: Service<Future = BoxFuture<'static, Result<(), Error>>>
236+
+ Discoverable
237+
+ Send
238+
+ Sync
239+
+ 'static,
240+
>(
241+
mut self,
242+
s: S,
243+
service_options: ServiceOptions,
244+
) -> Self {
245+
// Discover and apply options
246+
let mut service_metadata = S::discover();
247+
service_metadata.apply_options(service_options);
248+
249+
let boxed_service = BoxedService::new(s);
250+
self.svcs
251+
.insert(service_metadata.name.to_string(), boxed_service);
252+
self.discovery.services.push(service_metadata);
253+
self
254+
}
255+
256+
/// Add identity key, e.g. `publickeyv1_ChjENKeMvCtRnqG2mrBK1HmPKufgFUc98K8B3ononQvp`.
257+
pub fn identity_key(mut self, key: &str) -> Result<Self, KeyError> {
258+
self.identity_verifier = self.identity_verifier.with_key(key)?;
259+
Ok(self)
260+
}
261+
262+
/// Build the [`Endpoint`].
263+
pub fn build(self) -> Endpoint {
264+
Endpoint(Arc::new(EndpointInner {
265+
svcs: self.svcs,
266+
discovery: self.discovery,
267+
identity_verifier: self.identity_verifier,
268+
}))
269+
}
270+
}

0 commit comments

Comments
 (0)