Skip to content

Commit

Permalink
Feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
slinkydeveloper committed May 20, 2024
1 parent c52b388 commit b1ecf03
Show file tree
Hide file tree
Showing 20 changed files with 214 additions and 324 deletions.
37 changes: 8 additions & 29 deletions crates/ingress-dispatcher/src/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ impl DispatchIngressRequest for IngressDispatcher {
.insert(ingress_response_key.1, response_sender);
(None, self.state.get_and_increment_msg_index(), None)
}
IngressRequestMode::WaitAttachNotification(id, tx) => {
IngressRequestMode::WaitSubmitNotification(id, tx) => {
self.state.waiting_submit_notification.insert(id, tx);
let msg_index = self.state.get_and_increment_msg_index();
(None, msg_index, None)
Expand Down Expand Up @@ -250,11 +250,11 @@ impl MessageHandler for IngressDispatcher {
}
}
}
IngressMessage::AttachedInvocationNotification(attach_idempotent_invocation) => {
IngressMessage::SubmittedInvocationNotification(attach_idempotent_invocation) => {
if let Some((_, sender)) = self
.state
.waiting_submit_notification
.remove(&attach_idempotent_invocation.submitted_invocation_id)
.remove(&attach_idempotent_invocation.original_invocation_id)
{
if let Err(response) = sender.send(SubmittedInvocationNotification {
invocation_id: attach_idempotent_invocation.attached_invocation_id,
Expand All @@ -266,13 +266,13 @@ impl MessageHandler for IngressDispatcher {
);
} else {
trace!(
restate.invocation.id = %attach_idempotent_invocation.submitted_invocation_id,
restate.invocation.id = %attach_idempotent_invocation.original_invocation_id,
partition_processor_peer = %peer,
"Sent response of invocation out"
);
}
} else {
trace!("Ignoring submit notification '{:?}' because no handler was found locally waiting for its invocation Id", &attach_idempotent_invocation.submitted_invocation_id);
trace!("Ignoring submit notification '{:?}' because no handler was found locally waiting for its invocation Id", &attach_idempotent_invocation.original_invocation_id);
}
}
}
Expand Down Expand Up @@ -337,7 +337,6 @@ mod tests {
use restate_wal_protocol::Envelope;
use std::time::Duration;
use test_log::test;
use tokio::sync::oneshot::error::TryRecvError;

#[test(tokio::test)]
async fn idempotent_invoke() -> anyhow::Result<()> {
Expand Down Expand Up @@ -452,7 +451,7 @@ mod tests {
}

#[test(tokio::test)]
async fn get_output_result_should_not_complete_pending_attach() {
async fn attach_invocation() {
// set it to 1 partition so that we know where the invocation for the IdempotentInvoker goes to
let mut env_builder = TestCoreEnvBuilder::new_with_mock_network()
.add_mock_nodes_config()
Expand All @@ -472,7 +471,7 @@ mod tests {

let invocation_id = InvocationId::mock_random();

let (attach_req, _, mut attach_res) =
let (attach_req, _, attach_res) =
IngressDispatcherRequest::attach(InvocationQuery::Invocation(invocation_id));
dispatcher.dispatch_ingress_request(attach_req).await?;

Expand All @@ -494,28 +493,8 @@ mod tests {
})))
);

// Now check that sending response for get output doesn't complete attach
let response = Bytes::from_static(b"vmoaifnuei");
node_env
.network_sender
.send(
metadata().my_node_id().into(),
&IngressMessage::InvocationResponse(InvocationResponse {
correlation_ids: InvocationResponseCorrelationIds::from_invocation_id(
invocation_id,
),
response: IngressResponseResult::Success(
InvocationTarget::mock_service(),
response.clone(),
),
}),
)
.await?;

// Should not be completed yet, but the channel should still be available
assert_that!(attach_res.try_recv(), err(eq(TryRecvError::Empty)));

// Now send the attach response
let response = Bytes::from_static(b"vmoaifnuei");
node_env
.network_sender
.send(
Expand Down
18 changes: 9 additions & 9 deletions crates/ingress-dispatcher/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ use restate_types::identifiers::{
};
use restate_types::ingress::IngressResponseResult;
use restate_types::invocation::{
AttachNotificationSink, InvocationQuery, InvocationResponse, InvocationTarget,
InvocationTargetType, ServiceInvocation, ServiceInvocationResponseSink, SpanRelation,
VirtualObjectHandlerType, WorkflowHandlerType,
InvocationQuery, InvocationResponse, InvocationTarget, InvocationTargetType, ServiceInvocation,
ServiceInvocationResponseSink, SpanRelation, SubmitNotificationSink, VirtualObjectHandlerType,
WorkflowHandlerType,
};
use restate_types::message::MessageIndex;
use std::fmt::Display;
Expand Down Expand Up @@ -127,7 +127,7 @@ enum IngressRequestMode {
deduplication_id: IngressDeduplicationId,
proxying_partition_key: Option<PartitionKey>,
},
WaitAttachNotification(InvocationId, IngressSubmittedInvocationNotificationSender),
WaitSubmitNotification(InvocationId, IngressSubmittedInvocationNotificationSender),
FireAndForget,
}

Expand Down Expand Up @@ -204,14 +204,14 @@ impl IngressDispatcherRequest {
== InvocationTargetType::Workflow(WorkflowHandlerType::Workflow)
{
let my_node_id = metadata().my_node_id();
service_invocation.attach_notification_sink =
Some(AttachNotificationSink::Ingress(my_node_id));
service_invocation.submit_notification_sink =
Some(SubmitNotificationSink::Ingress(my_node_id));

let (tx, rx) = oneshot::channel();

(
IngressDispatcherRequest {
request_mode: IngressRequestMode::WaitAttachNotification(
request_mode: IngressRequestMode::WaitSubmitNotification(
service_invocation.invocation_id,
tx,
),
Expand Down Expand Up @@ -407,7 +407,7 @@ pub mod mocks {
service_invocation
}

pub fn expect_one_way_invocation_with_attach_notification(
pub fn expect_one_way_invocation_with_submit_notification(
self,
) -> (
ServiceInvocation,
Expand All @@ -416,7 +416,7 @@ pub mod mocks {
let_assert!(
IngressDispatcherRequest {
inner: IngressDispatcherRequestInner::Invoke(service_invocation),
request_mode: IngressRequestMode::WaitAttachNotification(_, tx),
request_mode: IngressRequestMode::WaitSubmitNotification(_, tx),
} = self
);
(service_invocation, tx)
Expand Down
105 changes: 18 additions & 87 deletions crates/ingress-http/src/handler/invocation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,14 @@ use super::HandlerError;

use crate::{GetOutputResult, InvocationStorageReader};
use bytes::Bytes;
use http::{header, Method, Request, Response};
use http::{Method, Request, Response};
use http_body_util::Full;
use restate_ingress_dispatcher::DispatchIngressRequest;
use restate_ingress_dispatcher::IngressDispatcherRequest;
use restate_schema_api::invocation_target::InvocationTargetResolver;
use restate_types::identifiers::InvocationId;
use restate_types::ingress::IngressResponseResult;
use restate_types::invocation::InvocationQuery;
use tracing::{info, trace, warn};
use tracing::{info, warn};

impl<Schemas, Dispatcher, StorageReader> Handler<Schemas, Dispatcher, StorageReader>
where
Expand Down Expand Up @@ -97,50 +96,18 @@ where
return Err(HandlerError::Unavailable);
};

// Prepare response metadata
let mut response_builder = hyper::Response::builder();

// Add idempotency expiry time if available
// TODO reintroduce this once available
// if let Some(expiry_time) = response.idempotency_expiry_time() {
// response_builder = response_builder.header(IDEMPOTENCY_EXPIRES, expiry_time);
// }

match response.result {
IngressResponseResult::Success(invocation_target, response_payload) => {
trace!(rpc.response = ?response_payload, "Complete external HTTP request successfully");

// Resolve invocation target metadata.
// We need it for the output content type.
let invocation_target_metadata = self
.schemas
Self::reply_with_invocation_response(
response.result,
response.idempotency_expiry_time.as_deref(),
move |invocation_target| {
self.schemas
.resolve_latest_invocation_target(
invocation_target.service_name(),
invocation_target.handler_name(),
)
.ok_or(HandlerError::NotFound)?;

// Write out the content-type, if any
// TODO fix https://github.com/restatedev/restate/issues/1496
if let Some(ct) = invocation_target_metadata
.output_rules
.infer_content_type(response_payload.is_empty())
{
response_builder = response_builder.header(
header::CONTENT_TYPE,
// TODO we need this to_str().unwrap() because these two HeaderValue come from two different http crates
// We can remove it once https://github.com/restatedev/restate/issues/96 is done
ct.to_str().unwrap(),
)
}

Ok(response_builder.body(Full::new(response_payload)).unwrap())
}
IngressResponseResult::Failure(error) => {
info!(rpc.response = ?error, "Complete external HTTP request with a failure");
Ok(HandlerError::Invocation(error).fill_builder(response_builder))
}
}
.ok_or(HandlerError::NotFound)
},
)
}

pub(crate) async fn handle_invocation_get_output<B: http_body::Body>(
Expand Down Expand Up @@ -174,49 +141,13 @@ where
}
};

// Prepare response metadata
let mut response_builder = hyper::Response::builder();

// Add idempotency expiry time if available
// TODO reintroduce this once available
// if let Some(expiry_time) = response.idempotency_expiry_time() {
// response_builder = response_builder.header(IDEMPOTENCY_EXPIRES, expiry_time);
// }

match response.response {
IngressResponseResult::Success(invocation_target, response_payload) => {
trace!(rpc.response = ?response_payload, "Complete external HTTP request successfully");

// Resolve invocation target metadata.
// We need it for the output content type.
let invocation_target_metadata = self
.schemas
.resolve_latest_invocation_target(
invocation_target.service_name(),
invocation_target.handler_name(),
)
.ok_or(HandlerError::NotFound)?;

// Write out the content-type, if any
// TODO fix https://github.com/restatedev/restate/issues/1496
if let Some(ct) = invocation_target_metadata
.output_rules
.infer_content_type(response_payload.is_empty())
{
response_builder = response_builder.header(
header::CONTENT_TYPE,
// TODO we need this to_str().unwrap() because these two HeaderValue come from two different http crates
// We can remove it once https://github.com/restatedev/restate/issues/96 is done
ct.to_str().unwrap(),
)
}

Ok(response_builder.body(Full::new(response_payload)).unwrap())
}
IngressResponseResult::Failure(error) => {
info!(rpc.response = ?error, "Complete external HTTP request with a failure");
Ok(HandlerError::Invocation(error).fill_builder(response_builder))
}
}
Self::reply_with_invocation_response(response.response, None, move |invocation_target| {
self.schemas
.resolve_latest_invocation_target(
invocation_target.service_name(),
invocation_target.handler_name(),
)
.ok_or(HandlerError::NotFound)
})
}
}
1 change: 1 addition & 0 deletions crates/ingress-http/src/handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ mod error;
mod health;
mod invocation;
mod path_parsing;
mod responses;
mod service_handler;
#[cfg(test)]
mod tests;
Expand Down
73 changes: 73 additions & 0 deletions crates/ingress-http/src/handler/responses.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use crate::handler::error::HandlerError;
use crate::handler::Handler;
use bytes::Bytes;
use http::{header, HeaderName, Response};
use http_body_util::Full;
use restate_schema_api::invocation_target::InvocationTargetMetadata;
use restate_types::ingress::IngressResponseResult;
use restate_types::invocation::InvocationTarget;
use tracing::{info, trace};

const IDEMPOTENCY_EXPIRES: HeaderName = HeaderName::from_static("idempotency-expires");

impl<Schemas, Dispatcher, StorageReader> Handler<Schemas, Dispatcher, StorageReader> {
pub(crate) fn reply_with_invocation_response(
response: IngressResponseResult,
idempotency_expiry_time: Option<&str>,
invocation_target_metadata_retriever: impl FnOnce(
&InvocationTarget,
) -> Result<
InvocationTargetMetadata,
HandlerError,
>,
) -> Result<Response<Full<Bytes>>, HandlerError> {
// Prepare response metadata
let mut response_builder = hyper::Response::builder();

// Add idempotency expiry time if available
if let Some(expiry_time) = idempotency_expiry_time {
response_builder = response_builder.header(IDEMPOTENCY_EXPIRES, expiry_time);
}

match response {
IngressResponseResult::Success(invocation_target, response_payload) => {
trace!(rpc.response = ?response_payload, "Complete external HTTP request successfully");

// Resolve invocation target metadata.
// We need it for the output content type.
let invocation_target_metadata =
invocation_target_metadata_retriever(&invocation_target)?;

// Write out the content-type, if any
// TODO fix https://github.com/restatedev/restate/issues/1496
if let Some(ct) = invocation_target_metadata
.output_rules
.infer_content_type(response_payload.is_empty())
{
response_builder = response_builder.header(
header::CONTENT_TYPE,
// TODO we need this to_str().unwrap() because these two HeaderValue come from two different http crates
// We can remove it once https://github.com/restatedev/restate/issues/96 is done
ct.to_str().unwrap(),
)
}

Ok(response_builder.body(Full::new(response_payload)).unwrap())
}
IngressResponseResult::Failure(error) => {
info!(rpc.response = ?error, "Complete external HTTP request with a failure");
Ok(HandlerError::Invocation(error).fill_builder(response_builder))
}
}
}
}
Loading

0 comments on commit b1ecf03

Please sign in to comment.