Skip to content

Commit b8e1e6c

Browse files
Implement negotiation
1 parent 4d22ec0 commit b8e1e6c

File tree

1 file changed

+105
-9
lines changed

1 file changed

+105
-9
lines changed

src/endpoint/mod.rs

Lines changed: 105 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ use std::task::{Context, Poll};
2121
use std::time::Duration;
2222
use tracing::{info_span, Instrument};
2323

24-
const DISCOVERY_CONTENT_TYPE: &str = "application/vnd.restate.endpointmanifest.v3+json";
24+
const DISCOVERY_CONTENT_TYPE_V2: &str = "application/vnd.restate.endpointmanifest.v2+json";
25+
const DISCOVERY_CONTENT_TYPE_V3: &str = "application/vnd.restate.endpointmanifest.v3+json";
2526

2627
type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
2728

@@ -93,7 +94,8 @@ impl Error {
9394
| ErrorInner::Deserialization { .. }
9495
| ErrorInner::Serialization { .. }
9596
| ErrorInner::HandlerResult { .. } => 500,
96-
ErrorInner::BadDiscovery(_) => 415,
97+
ErrorInner::FieldRequiresMinimumVersion { .. } => 500,
98+
ErrorInner::BadDiscoveryVersion(_) => 415,
9799
ErrorInner::Header { .. } | ErrorInner::BadPath { .. } => 400,
98100
ErrorInner::IdentityVerification(_) => 401,
99101
}
@@ -112,8 +114,10 @@ pub(crate) enum ErrorInner {
112114
IdentityVerification(#[from] VerifyError),
113115
#[error("Cannot convert header '{0}', reason: {1}")]
114116
Header(String, #[source] BoxError),
115-
#[error("Cannot reply to discovery, got accept header '{0}' but currently supported discovery is {DISCOVERY_CONTENT_TYPE}")]
116-
BadDiscovery(String),
117+
#[error("Cannot reply to discovery, got accept header '{0}' but currently supported discovery versions are v2 and v3")]
118+
BadDiscoveryVersion(String),
119+
#[error("The field '{0}' was set in the service/handler options, but it requires minimum discovery protocol version {1}")]
120+
FieldRequiresMinimumVersion(&'static str, u32),
117121
#[error("Bad path '{0}', expected either '/discover' or '/invoke/service/handler'")]
118122
BadPath(String),
119123
#[error("Suspended")]
@@ -395,21 +399,113 @@ impl Endpoint {
395399
}
396400

397401
if parts.last() == Some(&"discover") {
402+
// Extract Accept header from request
398403
let accept_header = headers
399404
.extract("accept")
400405
.map_err(|e| ErrorInner::Header("accept".to_owned(), Box::new(e)))?;
401-
if accept_header.is_some() {
402-
let accept = accept_header.unwrap();
403-
if !accept.contains("application/vnd.restate.endpointmanifest.v3+json") {
404-
return Err(Error(ErrorInner::BadDiscovery(accept.to_owned())));
406+
407+
// Negotiate discovery protocol version
408+
let mut version = 2;
409+
let mut content_type = DISCOVERY_CONTENT_TYPE_V2;
410+
if let Some(accept) = accept_header {
411+
if accept.contains(DISCOVERY_CONTENT_TYPE_V3) {
412+
version = 3;
413+
content_type = DISCOVERY_CONTENT_TYPE_V3;
414+
} else if accept.contains(DISCOVERY_CONTENT_TYPE_V2) {
415+
version = 2;
416+
content_type = DISCOVERY_CONTENT_TYPE_V2;
417+
} else {
418+
Err(ErrorInner::BadDiscoveryVersion(accept.to_owned()))?
419+
}
420+
}
421+
422+
// Validate that new discovery fields aren't used with older protocol versions
423+
if version <= 2 {
424+
// Check for new discovery fields in version 3 that shouldn't be used in version 2 or lower
425+
for service in &self.0.discovery.services {
426+
if service.inactivity_timeout.is_some() {
427+
Err(ErrorInner::FieldRequiresMinimumVersion(
428+
"inactivity_timeout",
429+
3,
430+
))?;
431+
}
432+
if service.abort_timeout.is_some() {
433+
Err(ErrorInner::FieldRequiresMinimumVersion("abort_timeout", 3))?;
434+
}
435+
if service.idempotency_retention.is_some() {
436+
Err(ErrorInner::FieldRequiresMinimumVersion(
437+
"idempotency_retention",
438+
3,
439+
))?;
440+
}
441+
if service.journal_retention.is_some() {
442+
Err(ErrorInner::FieldRequiresMinimumVersion(
443+
"journal_retention",
444+
3,
445+
))?;
446+
}
447+
if service.enable_lazy_state.is_some() {
448+
Err(ErrorInner::FieldRequiresMinimumVersion(
449+
"enable_lazy_state",
450+
3,
451+
))?;
452+
}
453+
if service.ingress_private.is_some() {
454+
Err(ErrorInner::FieldRequiresMinimumVersion(
455+
"ingress_private",
456+
3,
457+
))?;
458+
}
459+
460+
for handler in &service.handlers {
461+
if handler.inactivity_timeout.is_some() {
462+
Err(ErrorInner::FieldRequiresMinimumVersion(
463+
"inactivity_timeout",
464+
3,
465+
))?;
466+
}
467+
if handler.abort_timeout.is_some() {
468+
Err(ErrorInner::FieldRequiresMinimumVersion("abort_timeout", 3))?;
469+
}
470+
if handler.idempotency_retention.is_some() {
471+
Err(ErrorInner::FieldRequiresMinimumVersion(
472+
"idempotency_retention",
473+
3,
474+
))?;
475+
}
476+
if handler.journal_retention.is_some() {
477+
Err(ErrorInner::FieldRequiresMinimumVersion(
478+
"journal_retention",
479+
3,
480+
))?;
481+
}
482+
if handler.workflow_completion_retention.is_some() {
483+
Err(ErrorInner::FieldRequiresMinimumVersion(
484+
"workflow_retention",
485+
3,
486+
))?;
487+
}
488+
if handler.enable_lazy_state.is_some() {
489+
Err(ErrorInner::FieldRequiresMinimumVersion(
490+
"enable_lazy_state",
491+
3,
492+
))?;
493+
}
494+
if handler.ingress_private.is_some() {
495+
Err(ErrorInner::FieldRequiresMinimumVersion(
496+
"ingress_private",
497+
3,
498+
))?;
499+
}
500+
}
405501
}
406502
}
407503

408504
return Ok(Response::ReplyNow {
409505
status_code: 200,
410506
headers: vec![Header {
411507
key: "content-type".into(),
412-
value: DISCOVERY_CONTENT_TYPE.into(),
508+
value: content_type.into(),
413509
}],
414510
body: Bytes::from(
415511
serde_json::to_string(&self.0.discovery)

0 commit comments

Comments
 (0)