Skip to content

Commit

Permalink
Schemas -> Live<Schema>
Browse files Browse the repository at this point in the history
This changes how we pass Schema around, instead of implying being updateable, we invert control to allow users to decide when and how to look at a consistent snapshot of the schema.
  • Loading branch information
AhmedSoliman committed Jun 27, 2024
1 parent 295517d commit 777b3cc
Show file tree
Hide file tree
Showing 27 changed files with 134 additions and 168 deletions.
10 changes: 7 additions & 3 deletions crates/core/src/metadata/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
mod manager;
pub use manager::MetadataManager;
use restate_types::live::{Live, Pinned};
use restate_types::schema::{Schema, UpdateableSchema};
use restate_types::schema::Schema;

use std::sync::{Arc, OnceLock};

Expand Down Expand Up @@ -148,8 +148,12 @@ impl Metadata {
self.inner.schema.load().version()
}

pub fn schema_updateable(&self) -> UpdateableSchema {
UpdateableSchema::from(Arc::clone(&self.inner.schema))
pub fn schema_ref(&self) -> Pinned<Schema> {
Pinned::new(&self.inner.schema)
}

pub fn updateable_schema(&self) -> Live<Schema> {
Live::from(self.inner.schema.clone())
}

// Returns when the metadata kind is at the provided version (or newer)
Expand Down
3 changes: 2 additions & 1 deletion crates/ingress-http/src/handler/health.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ where
Schemas: ServiceMetadataResolver + Send + Sync + 'static,
{
pub(crate) fn handle_health<B: http_body::Body>(
&self,
&mut self,
req: Request<B>,
) -> Result<Response<Full<Bytes>>, HandlerError> {
if req.method() != Method::GET {
Expand All @@ -39,6 +39,7 @@ where
let response = HealthResponse {
services: self
.schemas
.pinned()
.list_services()
.into_iter()
.map(|c| c.name)
Expand Down
2 changes: 2 additions & 0 deletions crates/ingress-http/src/handler/invocation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ where
response.idempotency_expiry_time.as_deref(),
move |invocation_target| {
self.schemas
.pinned()
.resolve_latest_invocation_target(
invocation_target.service_name(),
invocation_target.handler_name(),
Expand Down Expand Up @@ -172,6 +173,7 @@ where
None,
move |invocation_target| {
self.schemas
.pinned()
.resolve_latest_invocation_target(
invocation_target.service_name(),
invocation_target.handler_name(),
Expand Down
37 changes: 20 additions & 17 deletions crates/ingress-http/src/handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,20 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use super::*;
mod awakeables;
mod error;
mod health;
mod invocation;
mod path_parsing;
mod responses;
mod service_handler;
#[cfg(test)]
mod tests;
mod tracing;
mod workflow;

use std::convert::Infallible;
use std::task::{Context, Poll};

use error::HandlerError;
use futures::future::BoxFuture;
Expand All @@ -17,36 +30,26 @@ use http_body_util::Full;
use hyper::http::HeaderValue;
use hyper::{Request, Response};
use path_parsing::RequestType;

use restate_ingress_dispatcher::DispatchIngressRequest;
use restate_types::live::Live;
use restate_types::schema::invocation_target::InvocationTargetResolver;
use restate_types::schema::service::ServiceMetadataResolver;
use std::convert::Infallible;
use std::task::{Context, Poll};

mod awakeables;
mod error;
mod health;
mod invocation;
mod path_parsing;
mod responses;
mod service_handler;
#[cfg(test)]
mod tests;
mod tracing;
mod workflow;
use super::*;

const APPLICATION_JSON: HeaderValue = HeaderValue::from_static("application/json");

#[derive(Clone)]
pub(crate) struct Handler<Schemas, Dispatcher, StorageReader> {
schemas: Schemas,
schemas: Live<Schemas>,
dispatcher: Dispatcher,
storage_reader: StorageReader,
}

impl<Schemas, Dispatcher, StorageReader> Handler<Schemas, Dispatcher, StorageReader> {
pub(crate) fn new(
schemas: Schemas,
schemas: Live<Schemas>,
dispatcher: Dispatcher,
storage_reader: StorageReader,
) -> Self {
Expand Down Expand Up @@ -79,7 +82,7 @@ where
fn call(&mut self, req: Request<Body>) -> Self::Future {
let res = self.parse_path(req.uri());

let this = self.clone();
let mut this = self.clone();
async move {
match res? {
RequestType::Health => this.handle_health(req),
Expand Down
7 changes: 4 additions & 3 deletions crates/ingress-http/src/handler/path_parsing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,19 +229,20 @@ where
StorageReader: InvocationStorageReader + Clone + Send + Sync + 'static,
{
/// This function takes care of parsing the path of the request, inferring the correct request type
pub(crate) fn parse_path(&self, uri: &Uri) -> Result<RequestType, HandlerError> {
pub(crate) fn parse_path(&mut self, uri: &Uri) -> Result<RequestType, HandlerError> {
let mut path_parts = uri.path().split('/').skip(1);

let first_segment = path_parts.next().ok_or(HandlerError::NotFound)?;

let schema = self.schemas.live_load();
match first_segment {
"restate" => match path_parts.next().ok_or(HandlerError::NotFound)? {
"health" => Ok(RequestType::Health),
"awakeables" | "a" => Ok(RequestType::Awakeable(
AwakeableRequestType::from_path_chunks(path_parts)?,
)),
"invocation" => Ok(RequestType::Invocation(
InvocationRequestType::from_path_chunks(path_parts, &self.schemas)?,
InvocationRequestType::from_path_chunks(path_parts, schema)?,
)),
"workflow" => Ok(RequestType::Workflow(
WorkflowRequestType::from_path_chunks(path_parts)?,
Expand All @@ -252,7 +253,7 @@ where
segment => Ok(RequestType::Service(ServiceRequestType::from_path_chunks(
path_parts,
segment.to_owned(),
&self.schemas,
schema,
)?)),
}
}
Expand Down
1 change: 1 addition & 0 deletions crates/ingress-http/src/handler/service_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ where

let invocation_target_meta = if let Some(invocation_target) = self
.schemas
.pinned()
.resolve_latest_invocation_target(&service_name, &handler_name)
{
if !invocation_target.public {
Expand Down
8 changes: 7 additions & 1 deletion crates/ingress-http/src/handler/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use googletest::prelude::*;
use http::StatusCode;
use http::{Method, Request, Response};
use http_body_util::{BodyExt, Empty, Full};
use restate_types::live::Live;
use tokio::sync::mpsc;
use tower::ServiceExt;
use tracing_test::traced_test;
Expand Down Expand Up @@ -1093,7 +1094,12 @@ where
let handler_fut = node_env.tc.run_in_scope(
"ingress",
None,
Handler::new(schemas, dispatcher, invocation_storage_reader).oneshot(req),
Handler::new(
Live::from_value(schemas),
dispatcher,
invocation_storage_reader,
)
.oneshot(req),
);

// Mock the service invocation receiver
Expand Down
2 changes: 2 additions & 0 deletions crates/ingress-http/src/handler/workflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ where
response.idempotency_expiry_time.as_deref(),
move |invocation_target| {
self.schemas
.pinned()
.resolve_latest_invocation_target(
invocation_target.service_name(),
invocation_target.handler_name(),
Expand Down Expand Up @@ -146,6 +147,7 @@ where
None,
move |invocation_target| {
self.schemas
.pinned()
.resolve_latest_invocation_target(
invocation_target.service_name(),
invocation_target.handler_name(),
Expand Down
9 changes: 5 additions & 4 deletions crates/ingress-http/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use hyper_util::server::conn::auto;
use restate_core::{cancellation_watcher, task_center, TaskKind};
use restate_ingress_dispatcher::{DispatchIngressRequest, IngressDispatcher};
use restate_types::config::IngressOptions;
use restate_types::live::Live;
use restate_types::schema::invocation_target::InvocationTargetResolver;
use restate_types::schema::service::ServiceMetadataResolver;
use std::convert::Infallible;
Expand Down Expand Up @@ -56,7 +57,7 @@ pub struct HyperServerIngress<Schemas, Dispatcher, StorageReader> {
concurrency_limit: usize,

// Parameters to build the layers
schemas: Schemas,
schemas: Live<Schemas>,
dispatcher: Dispatcher,
storage_reader: StorageReader,

Expand All @@ -72,7 +73,7 @@ where
pub fn from_options(
ingress_options: &IngressOptions,
dispatcher: IngressDispatcher,
schemas: Schemas,
schemas: Live<Schemas>,
storage_reader: StorageReader,
) -> HyperServerIngress<Schemas, IngressDispatcher, StorageReader> {
crate::metric_definitions::describe_metrics();
Expand All @@ -97,7 +98,7 @@ where
pub(crate) fn new(
listening_addr: SocketAddr,
concurrency_limit: usize,
schemas: Schemas,
schemas: Live<Schemas>,
dispatcher: Dispatcher,
storage_reader: StorageReader,
) -> (Self, StartSignal) {
Expand Down Expand Up @@ -346,7 +347,7 @@ mod tests {
let (ingress, start_signal) = HyperServerIngress::new(
"0.0.0.0:0".parse().unwrap(),
Semaphore::MAX_PERMITS,
mock_schemas(),
Live::from_value(mock_schemas()),
MockDispatcher::new(ingress_request_tx),
MockStorageReader::default(),
);
Expand Down
4 changes: 2 additions & 2 deletions crates/invoker-api/src/entry_enricher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use restate_types::journal::raw::PlainRawEntry;

pub trait EntryEnricher {
fn enrich_entry(
&self,
&mut self,
entry: PlainRawEntry,
current_invocation_target: &InvocationTarget,
current_invocation_span_context: &ServiceInvocationSpanContext,
Expand All @@ -38,7 +38,7 @@ pub mod test_util {

impl EntryEnricher for MockEntryEnricher {
fn enrich_entry(
&self,
&mut self,
entry: PlainRawEntry,
_current_invocation_target: &InvocationTarget,
current_invocation_span_context: &ServiceInvocationSpanContext,
Expand Down
7 changes: 5 additions & 2 deletions crates/invoker-impl/src/invocation_task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use restate_types::identifiers::{DeploymentId, EntryIndex, InvocationId, Partiti
use restate_types::invocation::InvocationTarget;
use restate_types::journal::enriched::EnrichedRawEntry;
use restate_types::journal::EntryType;
use restate_types::live::Live;
use restate_types::schema::deployment::DeploymentResolver;
use restate_types::service_protocol::ServiceProtocolVersion;
use restate_types::service_protocol::{MAX_SERVICE_PROTOCOL_VERSION, MIN_SERVICE_PROTOCOL_VERSION};
Expand Down Expand Up @@ -261,7 +262,7 @@ pub(super) struct InvocationTask<SR, JR, EE, DMR> {
state_reader: SR,
journal_reader: JR,
entry_enricher: EE,
deployment_metadata_resolver: DMR,
deployment_metadata_resolver: Live<DMR>,
invoker_tx: mpsc::UnboundedSender<InvocationTaskOutput>,
invoker_rx: mpsc::UnboundedReceiver<Notification>,
}
Expand Down Expand Up @@ -319,7 +320,7 @@ where
state_reader: SR,
journal_reader: JR,
entry_enricher: EE,
deployment_metadata_resolver: DMR,
deployment_metadata_resolver: Live<DMR>,
invoker_tx: mpsc::UnboundedSender<InvocationTaskOutput>,
invoker_rx: mpsc::UnboundedReceiver<Notification>,
) -> Self {
Expand Down Expand Up @@ -409,6 +410,7 @@ where
// deployments have been registered for the same service.
let deployment_metadata = shortcircuit!(self
.deployment_metadata_resolver
.live_load()
.get_deployment(&pinned_deployment.deployment_id)
.ok_or_else(|| InvocationTaskError::UnknownDeployment(
pinned_deployment.deployment_id
Expand All @@ -432,6 +434,7 @@ where
// of the registered service.
let deployment = shortcircuit!(self
.deployment_metadata_resolver
.live_load()
.resolve_latest_deployment_for_service(self.invocation_target.service_name())
.ok_or(InvocationTaskError::NoDeploymentForService));

Expand Down
17 changes: 8 additions & 9 deletions crates/invoker-impl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use restate_types::identifiers::{EntryIndex, PartitionLeaderEpoch};
use restate_types::journal::enriched::EnrichedRawEntry;
use restate_types::journal::raw::PlainRawEntry;
use restate_types::journal::Completion;
use restate_types::live::LiveLoad;
use restate_types::live::{Live, LiveLoad};
use restate_types::retries::RetryPolicy;
use restate_types::schema::deployment::DeploymentResolver;
use status_store::InvocationStatusStore;
Expand Down Expand Up @@ -88,11 +88,10 @@ trait InvocationTaskRunner<SR> {
) -> AbortHandle;
}

#[derive(Debug)]
struct DefaultInvocationTaskRunner<EE, DMR> {
client: ServiceClient,
entry_enricher: EE,
deployment_metadata_resolver: DMR,
deployment_metadata_resolver: Live<DMR>,
}

impl<SR, EE, DMR> InvocationTaskRunner<SR> for DefaultInvocationTaskRunner<EE, DMR>
Expand All @@ -101,7 +100,7 @@ where
<SR as JournalReader>::JournalStream: Unpin + Send + 'static,
<SR as StateReader>::StateIter: Send,
EE: EntryEnricher + Clone + Send + Sync + 'static,
DMR: DeploymentResolver + Clone + Send + 'static,
DMR: DeploymentResolver + Clone + Send + Sync + 'static,
{
fn start_invocation_task(
&self,
Expand Down Expand Up @@ -140,7 +139,7 @@ where

// -- Service implementation

#[derive(Debug)]
//#[derive(Debug)]
pub struct Service<SR, EntryEnricher, DeploymentRegistry> {
// Used for constructing the invoker sender and status reader
input_tx: mpsc::UnboundedSender<InputCommand<SR>>,
Expand All @@ -161,7 +160,7 @@ impl<SR, EE, DMR> Service<SR, EE, DMR> {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new<JS>(
options: &InvokerOptions,
deployment_metadata_resolver: DMR,
deployment_metadata_resolver: Live<DMR>,
client: ServiceClient,
entry_enricher: EE,
) -> Service<SR, EE, DMR>
Expand Down Expand Up @@ -202,7 +201,7 @@ impl<SR, EE, DMR> Service<SR, EE, DMR> {
service_client_options: &ServiceClientOptions,
invoker_options: &InvokerOptions,
entry_enricher: EE,
deployment_registry: DMR,
deployment_registry: Live<DMR>,
) -> Result<Service<SR, EE, DMR>, BuildError>
where
SR: JournalReader<JournalStream = JS> + StateReader + Clone + Send + Sync + 'static,
Expand Down Expand Up @@ -235,7 +234,7 @@ where
<SR as JournalReader>::JournalStream: Unpin + Send + 'static,
<SR as StateReader>::StateIter: Send,
EE: EntryEnricher + Clone + Send + Sync + 'static,
EMR: DeploymentResolver + Clone + Send + 'static,
EMR: DeploymentResolver + Clone + Send + Sync + 'static,
{
pub fn handle(&self) -> InvokerHandle<SR> {
InvokerHandle {
Expand Down Expand Up @@ -1158,7 +1157,7 @@ mod tests {
let service = Service::new(
&invoker_options,
// all invocations are unknown leading to immediate retries
MockDeploymentMetadataRegistry::default(),
Live::from_value(MockDeploymentMetadataRegistry::default()),
ServiceClient::from_options(
&ServiceClientOptions::default(),
restate_service_client::AssumeRoleCacheMode::None,
Expand Down
2 changes: 1 addition & 1 deletion crates/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ impl Node {
metadata_store_client.clone(),
);
metadata_manager.register_in_message_router(&mut router_builder);
let updating_schema_information = metadata.schema_updateable();
let updating_schema_information = metadata.updateable_schema();
let bifrost = BifrostService::new(metadata.clone());

let tc = task_center();
Expand Down
Loading

0 comments on commit 777b3cc

Please sign in to comment.