Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement /restate/invocation/{id}/attach and /restate/invocation/{id}/output #1503

Merged
merged 4 commits into from
May 21, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Feedback
  • Loading branch information
slinkydeveloper committed May 20, 2024
commit 80a6ef9008ce3f3677e3dd4d559f436e9d8100cd
37 changes: 8 additions & 29 deletions crates/ingress-dispatcher/src/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ impl DispatchIngressRequest for IngressDispatcher {
.insert(ingress_response_key.1, response_sender);
(None, self.state.get_and_increment_msg_index(), None)
}
IngressRequestMode::WaitAttachNotification(id, tx) => {
IngressRequestMode::WaitSubmitNotification(id, tx) => {
self.state.waiting_submit_notification.insert(id, tx);
let msg_index = self.state.get_and_increment_msg_index();
(None, msg_index, None)
Expand Down Expand Up @@ -250,11 +250,11 @@ impl MessageHandler for IngressDispatcher {
}
}
}
IngressMessage::AttachedInvocationNotification(attach_idempotent_invocation) => {
IngressMessage::SubmittedInvocationNotification(attach_idempotent_invocation) => {
if let Some((_, sender)) = self
.state
.waiting_submit_notification
.remove(&attach_idempotent_invocation.submitted_invocation_id)
.remove(&attach_idempotent_invocation.original_invocation_id)
{
if let Err(response) = sender.send(SubmittedInvocationNotification {
invocation_id: attach_idempotent_invocation.attached_invocation_id,
Expand All @@ -266,13 +266,13 @@ impl MessageHandler for IngressDispatcher {
);
} else {
trace!(
restate.invocation.id = %attach_idempotent_invocation.submitted_invocation_id,
restate.invocation.id = %attach_idempotent_invocation.original_invocation_id,
partition_processor_peer = %peer,
"Sent response of invocation out"
);
}
} else {
trace!("Ignoring submit notification '{:?}' because no handler was found locally waiting for its invocation Id", &attach_idempotent_invocation.submitted_invocation_id);
trace!("Ignoring submit notification '{:?}' because no handler was found locally waiting for its invocation Id", &attach_idempotent_invocation.original_invocation_id);
}
}
}
Expand Down Expand Up @@ -337,7 +337,6 @@ mod tests {
use restate_wal_protocol::Envelope;
use std::time::Duration;
use test_log::test;
use tokio::sync::oneshot::error::TryRecvError;

#[test(tokio::test)]
async fn idempotent_invoke() -> anyhow::Result<()> {
Expand Down Expand Up @@ -452,7 +451,7 @@ mod tests {
}

#[test(tokio::test)]
async fn get_output_result_should_not_complete_pending_attach() {
async fn attach_invocation() {
// set it to 1 partition so that we know where the invocation for the IdempotentInvoker goes to
let mut env_builder = TestCoreEnvBuilder::new_with_mock_network()
.add_mock_nodes_config()
Expand All @@ -472,7 +471,7 @@ mod tests {

let invocation_id = InvocationId::mock_random();

let (attach_req, _, mut attach_res) =
let (attach_req, _, attach_res) =
IngressDispatcherRequest::attach(InvocationQuery::Invocation(invocation_id));
dispatcher.dispatch_ingress_request(attach_req).await?;

Expand All @@ -494,28 +493,8 @@ mod tests {
})))
);

// Now check that sending response for get output doesn't complete attach
let response = Bytes::from_static(b"vmoaifnuei");
node_env
.network_sender
.send(
metadata().my_node_id().into(),
&IngressMessage::InvocationResponse(InvocationResponse {
correlation_ids: InvocationResponseCorrelationIds::from_invocation_id(
invocation_id,
),
response: IngressResponseResult::Success(
InvocationTarget::mock_service(),
response.clone(),
),
}),
)
.await?;

// Should not be completed yet, but the channel should still be available
assert_that!(attach_res.try_recv(), err(eq(TryRecvError::Empty)));

// Now send the attach response
let response = Bytes::from_static(b"vmoaifnuei");
node_env
.network_sender
.send(
Expand Down
18 changes: 9 additions & 9 deletions crates/ingress-dispatcher/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ use restate_types::identifiers::{
};
use restate_types::ingress::IngressResponseResult;
use restate_types::invocation::{
AttachNotificationSink, InvocationQuery, InvocationResponse, InvocationTarget,
InvocationTargetType, ServiceInvocation, ServiceInvocationResponseSink, SpanRelation,
VirtualObjectHandlerType, WorkflowHandlerType,
InvocationQuery, InvocationResponse, InvocationTarget, InvocationTargetType, ServiceInvocation,
ServiceInvocationResponseSink, SpanRelation, SubmitNotificationSink, VirtualObjectHandlerType,
WorkflowHandlerType,
};
use restate_types::message::MessageIndex;
use std::fmt::Display;
Expand Down Expand Up @@ -127,7 +127,7 @@ enum IngressRequestMode {
deduplication_id: IngressDeduplicationId,
proxying_partition_key: Option<PartitionKey>,
},
WaitAttachNotification(InvocationId, IngressSubmittedInvocationNotificationSender),
WaitSubmitNotification(InvocationId, IngressSubmittedInvocationNotificationSender),
FireAndForget,
}

Expand Down Expand Up @@ -204,14 +204,14 @@ impl IngressDispatcherRequest {
== InvocationTargetType::Workflow(WorkflowHandlerType::Workflow)
{
let my_node_id = metadata().my_node_id();
service_invocation.attach_notification_sink =
Some(AttachNotificationSink::Ingress(my_node_id));
service_invocation.submit_notification_sink =
Some(SubmitNotificationSink::Ingress(my_node_id));

let (tx, rx) = oneshot::channel();

(
IngressDispatcherRequest {
request_mode: IngressRequestMode::WaitAttachNotification(
request_mode: IngressRequestMode::WaitSubmitNotification(
service_invocation.invocation_id,
tx,
),
Expand Down Expand Up @@ -407,7 +407,7 @@ pub mod mocks {
service_invocation
}

pub fn expect_one_way_invocation_with_attach_notification(
pub fn expect_one_way_invocation_with_submit_notification(
self,
) -> (
ServiceInvocation,
Expand All @@ -416,7 +416,7 @@ pub mod mocks {
let_assert!(
IngressDispatcherRequest {
inner: IngressDispatcherRequestInner::Invoke(service_invocation),
request_mode: IngressRequestMode::WaitAttachNotification(_, tx),
request_mode: IngressRequestMode::WaitSubmitNotification(_, tx),
} = self
);
(service_invocation, tx)
Expand Down
105 changes: 18 additions & 87 deletions crates/ingress-http/src/handler/invocation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,14 @@ use super::HandlerError;

use crate::{GetOutputResult, InvocationStorageReader};
use bytes::Bytes;
use http::{header, Method, Request, Response};
use http::{Method, Request, Response};
use http_body_util::Full;
use restate_ingress_dispatcher::DispatchIngressRequest;
use restate_ingress_dispatcher::IngressDispatcherRequest;
use restate_schema_api::invocation_target::InvocationTargetResolver;
use restate_types::identifiers::InvocationId;
use restate_types::ingress::IngressResponseResult;
use restate_types::invocation::InvocationQuery;
use tracing::{info, trace, warn};
use tracing::{info, warn};

impl<Schemas, Dispatcher, StorageReader> Handler<Schemas, Dispatcher, StorageReader>
where
Expand Down Expand Up @@ -97,50 +96,18 @@ where
return Err(HandlerError::Unavailable);
};

// Prepare response metadata
let mut response_builder = hyper::Response::builder();

// Add idempotency expiry time if available
// TODO reintroduce this once available
// if let Some(expiry_time) = response.idempotency_expiry_time() {
// response_builder = response_builder.header(IDEMPOTENCY_EXPIRES, expiry_time);
// }

match response.result {
IngressResponseResult::Success(invocation_target, response_payload) => {
trace!(rpc.response = ?response_payload, "Complete external HTTP request successfully");

// Resolve invocation target metadata.
// We need it for the output content type.
let invocation_target_metadata = self
.schemas
Self::reply_with_invocation_response(
response.result,
response.idempotency_expiry_time.as_deref(),
move |invocation_target| {
self.schemas
.resolve_latest_invocation_target(
invocation_target.service_name(),
invocation_target.handler_name(),
)
.ok_or(HandlerError::NotFound)?;

// Write out the content-type, if any
// TODO fix https://github.com/restatedev/restate/issues/1496
if let Some(ct) = invocation_target_metadata
.output_rules
.infer_content_type(response_payload.is_empty())
{
response_builder = response_builder.header(
header::CONTENT_TYPE,
// TODO we need this to_str().unwrap() because these two HeaderValue come from two different http crates
// We can remove it once https://github.com/restatedev/restate/issues/96 is done
ct.to_str().unwrap(),
)
}

Ok(response_builder.body(Full::new(response_payload)).unwrap())
}
IngressResponseResult::Failure(error) => {
info!(rpc.response = ?error, "Complete external HTTP request with a failure");
Ok(HandlerError::Invocation(error).fill_builder(response_builder))
}
}
.ok_or(HandlerError::NotFound)
},
)
}

pub(crate) async fn handle_invocation_get_output<B: http_body::Body>(
Expand Down Expand Up @@ -174,49 +141,13 @@ where
}
};

// Prepare response metadata
let mut response_builder = hyper::Response::builder();

// Add idempotency expiry time if available
// TODO reintroduce this once available
// if let Some(expiry_time) = response.idempotency_expiry_time() {
// response_builder = response_builder.header(IDEMPOTENCY_EXPIRES, expiry_time);
// }

match response.response {
IngressResponseResult::Success(invocation_target, response_payload) => {
trace!(rpc.response = ?response_payload, "Complete external HTTP request successfully");

// Resolve invocation target metadata.
// We need it for the output content type.
let invocation_target_metadata = self
.schemas
.resolve_latest_invocation_target(
invocation_target.service_name(),
invocation_target.handler_name(),
)
.ok_or(HandlerError::NotFound)?;

// Write out the content-type, if any
// TODO fix https://github.com/restatedev/restate/issues/1496
if let Some(ct) = invocation_target_metadata
.output_rules
.infer_content_type(response_payload.is_empty())
{
response_builder = response_builder.header(
header::CONTENT_TYPE,
// TODO we need this to_str().unwrap() because these two HeaderValue come from two different http crates
// We can remove it once https://github.com/restatedev/restate/issues/96 is done
ct.to_str().unwrap(),
)
}

Ok(response_builder.body(Full::new(response_payload)).unwrap())
}
IngressResponseResult::Failure(error) => {
info!(rpc.response = ?error, "Complete external HTTP request with a failure");
Ok(HandlerError::Invocation(error).fill_builder(response_builder))
}
}
Self::reply_with_invocation_response(response.response, None, move |invocation_target| {
self.schemas
.resolve_latest_invocation_target(
invocation_target.service_name(),
invocation_target.handler_name(),
)
.ok_or(HandlerError::NotFound)
})
}
}
1 change: 1 addition & 0 deletions crates/ingress-http/src/handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ mod error;
mod health;
mod invocation;
mod path_parsing;
mod responses;
mod service_handler;
#[cfg(test)]
mod tests;
Expand Down
73 changes: 73 additions & 0 deletions crates/ingress-http/src/handler/responses.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use crate::handler::error::HandlerError;
use crate::handler::Handler;
use bytes::Bytes;
use http::{header, HeaderName, Response};
use http_body_util::Full;
use restate_schema_api::invocation_target::InvocationTargetMetadata;
use restate_types::ingress::IngressResponseResult;
use restate_types::invocation::InvocationTarget;
use tracing::{info, trace};

const IDEMPOTENCY_EXPIRES: HeaderName = HeaderName::from_static("idempotency-expires");

impl<Schemas, Dispatcher, StorageReader> Handler<Schemas, Dispatcher, StorageReader> {
pub(crate) fn reply_with_invocation_response(
response: IngressResponseResult,
idempotency_expiry_time: Option<&str>,
invocation_target_metadata_retriever: impl FnOnce(
&InvocationTarget,
) -> Result<
InvocationTargetMetadata,
HandlerError,
>,
) -> Result<Response<Full<Bytes>>, HandlerError> {
// Prepare response metadata
let mut response_builder = hyper::Response::builder();

// Add idempotency expiry time if available
if let Some(expiry_time) = idempotency_expiry_time {
response_builder = response_builder.header(IDEMPOTENCY_EXPIRES, expiry_time);
}

match response {
IngressResponseResult::Success(invocation_target, response_payload) => {
trace!(rpc.response = ?response_payload, "Complete external HTTP request successfully");

// Resolve invocation target metadata.
// We need it for the output content type.
let invocation_target_metadata =
invocation_target_metadata_retriever(&invocation_target)?;

// Write out the content-type, if any
// TODO fix https://github.com/restatedev/restate/issues/1496
if let Some(ct) = invocation_target_metadata
.output_rules
.infer_content_type(response_payload.is_empty())
{
response_builder = response_builder.header(
header::CONTENT_TYPE,
// TODO we need this to_str().unwrap() because these two HeaderValue come from two different http crates
// We can remove it once https://github.com/restatedev/restate/issues/96 is done
ct.to_str().unwrap(),
)
}

Ok(response_builder.body(Full::new(response_payload)).unwrap())
}
IngressResponseResult::Failure(error) => {
info!(rpc.response = ?error, "Complete external HTTP request with a failure");
Ok(HandlerError::Invocation(error).fill_builder(response_builder))
}
}
}
}
Loading