Skip to content

Commit

Permalink
Add check for output/attach on invocations created without idempotenc…
Browse files Browse the repository at this point in the history
…y id or workflow runs
  • Loading branch information
slinkydeveloper committed Jun 3, 2024
1 parent 0d4a6ad commit 1997128
Show file tree
Hide file tree
Showing 8 changed files with 62 additions and 12 deletions.
7 changes: 6 additions & 1 deletion crates/ingress-http/src/handler/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ pub(crate) enum HandlerError {
NotReady,
#[error("method not allowed")]
MethodNotAllowed,
#[error(
"cannot get output for the given invocation. You can get output only for invocations created with an idempotency key, or for workflow methods."
)]
UnsupportedGetOutput,
#[error("invocation error: {0:?}")]
Invocation(InvocationError),
#[error("input validation error: {0}")]
Expand Down Expand Up @@ -106,7 +110,8 @@ impl HandlerError {
| HandlerError::BadInvocationId(_, _)
| HandlerError::BadWorkflowPath
| HandlerError::InputValidation(_)
| HandlerError::UnsupportedIdempotencyKey => StatusCode::BAD_REQUEST,
| HandlerError::UnsupportedIdempotencyKey
| HandlerError::UnsupportedGetOutput => StatusCode::BAD_REQUEST,
HandlerError::Body(_) => StatusCode::INTERNAL_SERVER_ERROR,
HandlerError::Unavailable => StatusCode::SERVICE_UNAVAILABLE,
HandlerError::MethodNotAllowed => StatusCode::METHOD_NOT_ALLOWED,
Expand Down
1 change: 1 addition & 0 deletions crates/ingress-http/src/handler/invocation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ where
Ok(GetOutputResult::Ready(out)) => out,
Ok(GetOutputResult::NotFound) => return Err(HandlerError::NotFound),
Ok(GetOutputResult::NotReady) => return Err(HandlerError::NotReady),
Ok(GetOutputResult::NotSupported) => return Err(HandlerError::UnsupportedGetOutput),
Err(e) => {
warn!(
restate.invocation.query = ?invocation_query,
Expand Down
1 change: 1 addition & 0 deletions crates/ingress-http/src/handler/workflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ where
Ok(GetOutputResult::Ready(out)) => out,
Ok(GetOutputResult::NotFound) => return Err(HandlerError::NotFound),
Ok(GetOutputResult::NotReady) => return Err(HandlerError::NotReady),
Ok(GetOutputResult::NotSupported) => return Err(HandlerError::UnsupportedGetOutput),
Err(e) => {
warn!(
restate.workflow.id = %workflow_id,
Expand Down
1 change: 1 addition & 0 deletions crates/ingress-http/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ impl ConnectInfo {
pub enum GetOutputResult {
NotFound,
NotReady,
NotSupported,
Ready(InvocationResponse),
}

Expand Down
11 changes: 11 additions & 0 deletions crates/storage-api/src/invocation_status_table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,17 @@ impl InvocationStatus {
}
}

#[inline]
pub fn idempotency_key(&self) -> Option<&ByteString> {
match self {
InvocationStatus::Inboxed(metadata) => metadata.idempotency_key.as_ref(),
InvocationStatus::Invoked(metadata) => metadata.idempotency_key.as_ref(),
InvocationStatus::Suspended { metadata, .. } => metadata.idempotency_key.as_ref(),
InvocationStatus::Completed(completed) => completed.idempotency_key.as_ref(),
_ => None,
}
}

#[inline]
pub fn into_journal_metadata(self) -> Option<JournalMetadata> {
match self {
Expand Down
3 changes: 3 additions & 0 deletions crates/types/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,9 @@ pub const GONE_INVOCATION_ERROR: InvocationError = InvocationError::new_static(c
pub const NOT_FOUND_INVOCATION_ERROR: InvocationError =
InvocationError::new_static(codes::NOT_FOUND, "not found");

pub const ATTACH_NOT_SUPPORTED_INVOCATION_ERROR: InvocationError =
InvocationError::new_static(codes::BAD_REQUEST, "attach not supported for this invocation. You can attach only to invocations created with an idempotency key, or for workflow methods.");

pub const ALREADY_COMPLETED_INVOCATION_ERROR: InvocationError =
InvocationError::new_static(codes::CONFLICT, "promise was already completed");

Expand Down
16 changes: 14 additions & 2 deletions crates/worker/src/ingress_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ use restate_storage_api::service_status_table::{
};
use restate_types::identifiers::WithPartitionKey;
use restate_types::ingress::{IngressResponseResult, InvocationResponse};
use restate_types::invocation::{InvocationQuery, ResponseResult};
use restate_types::invocation::{
InvocationQuery, InvocationTarget, InvocationTargetType, ResponseResult, WorkflowHandlerType,
};
use restate_types::partition_table::FindPartition;

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -75,6 +77,17 @@ impl InvocationStorageReader for InvocationStorageReaderImpl {
.await?;

match invocation_status {
InvocationStatus::Free => Ok(GetOutputResult::NotFound),
is if is.idempotency_key().is_none()
|| is
.invocation_target()
.map(InvocationTarget::invocation_target_ty)
!= Some(InvocationTargetType::Workflow(
WorkflowHandlerType::Workflow,
)) =>
{
Ok(GetOutputResult::NotSupported)
}
InvocationStatus::Completed(completed) => {
Ok(GetOutputResult::Ready(InvocationResponse {
request_id: Default::default(),
Expand All @@ -87,7 +100,6 @@ impl InvocationStorageReader for InvocationStorageReaderImpl {
invocation_id: Some(invocation_id),
}))
}
InvocationStatus::Free => Ok(GetOutputResult::NotFound),
_ => Ok(GetOutputResult::NotReady),
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ use restate_storage_api::timer_table::Timer;
use restate_storage_api::Result as StorageResult;
use restate_types::errors::{
InvocationError, InvocationErrorCode, ALREADY_COMPLETED_INVOCATION_ERROR,
CANCELED_INVOCATION_ERROR, GONE_INVOCATION_ERROR, KILLED_INVOCATION_ERROR,
NOT_FOUND_INVOCATION_ERROR,
ATTACH_NOT_SUPPORTED_INVOCATION_ERROR, CANCELED_INVOCATION_ERROR, GONE_INVOCATION_ERROR,
KILLED_INVOCATION_ERROR, NOT_FOUND_INVOCATION_ERROR,
};
use restate_types::identifiers::{
EntryIndex, IdempotencyId, InvocationId, JournalEntryId, PartitionKey, ServiceId,
Expand Down Expand Up @@ -1926,6 +1926,29 @@ where
}
};
match Self::get_invocation_status_and_trace(state, &invocation_id, effects).await? {
InvocationStatus::Free => self.send_response_to_sinks(
effects,
vec![attach_invocation_request.response_sink],
NOT_FOUND_INVOCATION_ERROR,
Some(invocation_id),
None,
),
is if is.idempotency_key().is_none()
|| is
.invocation_target()
.map(InvocationTarget::invocation_target_ty)
!= Some(InvocationTargetType::Workflow(
WorkflowHandlerType::Workflow,
)) =>
{
self.send_response_to_sinks(
effects,
vec![attach_invocation_request.response_sink],
ATTACH_NOT_SUPPORTED_INVOCATION_ERROR,
Some(invocation_id),
None,
)
}
is @ InvocationStatus::Invoked(_)
| is @ InvocationStatus::Suspended { .. }
| is @ InvocationStatus::Inboxed(_) => {
Expand All @@ -1944,13 +1967,6 @@ where
Some(&completed.invocation_target),
);
}
InvocationStatus::Free => self.send_response_to_sinks(
effects,
vec![attach_invocation_request.response_sink],
NOT_FOUND_INVOCATION_ERROR,
Some(invocation_id),
None,
),
}

Ok(())
Expand Down

0 comments on commit 1997128

Please sign in to comment.