diff --git a/crates/core/src/metadata/mod.rs b/crates/core/src/metadata/mod.rs index c35b09c7f5..ff270e44ed 100644 --- a/crates/core/src/metadata/mod.rs +++ b/crates/core/src/metadata/mod.rs @@ -14,7 +14,7 @@ mod manager; pub use manager::MetadataManager; use restate_types::live::{Live, Pinned}; -use restate_types::schema::{Schema, UpdateableSchema}; +use restate_types::schema::Schema; use std::sync::{Arc, OnceLock}; @@ -148,8 +148,12 @@ impl Metadata { self.inner.schema.load().version() } - pub fn schema_updateable(&self) -> UpdateableSchema { - UpdateableSchema::from(Arc::clone(&self.inner.schema)) + pub fn schema_ref(&self) -> Pinned { + Pinned::new(&self.inner.schema) + } + + pub fn updateable_schema(&self) -> Live { + Live::from(self.inner.schema.clone()) } // Returns when the metadata kind is at the provided version (or newer) diff --git a/crates/ingress-http/src/handler/health.rs b/crates/ingress-http/src/handler/health.rs index b04411c51f..757e1890e6 100644 --- a/crates/ingress-http/src/handler/health.rs +++ b/crates/ingress-http/src/handler/health.rs @@ -30,7 +30,7 @@ where Schemas: ServiceMetadataResolver + Send + Sync + 'static, { pub(crate) fn handle_health( - &self, + &mut self, req: Request, ) -> Result>, HandlerError> { if req.method() != Method::GET { @@ -39,6 +39,7 @@ where let response = HealthResponse { services: self .schemas + .pinned() .list_services() .into_iter() .map(|c| c.name) diff --git a/crates/ingress-http/src/handler/invocation.rs b/crates/ingress-http/src/handler/invocation.rs index dddc826851..0f41e09509 100644 --- a/crates/ingress-http/src/handler/invocation.rs +++ b/crates/ingress-http/src/handler/invocation.rs @@ -125,6 +125,7 @@ where response.idempotency_expiry_time.as_deref(), move |invocation_target| { self.schemas + .pinned() .resolve_latest_invocation_target( invocation_target.service_name(), invocation_target.handler_name(), @@ -172,6 +173,7 @@ where None, move |invocation_target| { self.schemas + .pinned() .resolve_latest_invocation_target( invocation_target.service_name(), invocation_target.handler_name(), diff --git a/crates/ingress-http/src/handler/mod.rs b/crates/ingress-http/src/handler/mod.rs index e8885fae9b..9c06914f68 100644 --- a/crates/ingress-http/src/handler/mod.rs +++ b/crates/ingress-http/src/handler/mod.rs @@ -8,7 +8,20 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use super::*; +mod awakeables; +mod error; +mod health; +mod invocation; +mod path_parsing; +mod responses; +mod service_handler; +#[cfg(test)] +mod tests; +mod tracing; +mod workflow; + +use std::convert::Infallible; +use std::task::{Context, Poll}; use error::HandlerError; use futures::future::BoxFuture; @@ -17,36 +30,26 @@ use http_body_util::Full; use hyper::http::HeaderValue; use hyper::{Request, Response}; use path_parsing::RequestType; + use restate_ingress_dispatcher::DispatchIngressRequest; +use restate_types::live::Live; use restate_types::schema::invocation_target::InvocationTargetResolver; use restate_types::schema::service::ServiceMetadataResolver; -use std::convert::Infallible; -use std::task::{Context, Poll}; -mod awakeables; -mod error; -mod health; -mod invocation; -mod path_parsing; -mod responses; -mod service_handler; -#[cfg(test)] -mod tests; -mod tracing; -mod workflow; +use super::*; const APPLICATION_JSON: HeaderValue = HeaderValue::from_static("application/json"); #[derive(Clone)] pub(crate) struct Handler { - schemas: Schemas, + schemas: Live, dispatcher: Dispatcher, storage_reader: StorageReader, } impl Handler { pub(crate) fn new( - schemas: Schemas, + schemas: Live, dispatcher: Dispatcher, storage_reader: StorageReader, ) -> Self { @@ -79,7 +82,7 @@ where fn call(&mut self, req: Request) -> Self::Future { let res = self.parse_path(req.uri()); - let this = self.clone(); + let mut this = self.clone(); async move { match res? { RequestType::Health => this.handle_health(req), diff --git a/crates/ingress-http/src/handler/path_parsing.rs b/crates/ingress-http/src/handler/path_parsing.rs index c11a0ba51f..92117317db 100644 --- a/crates/ingress-http/src/handler/path_parsing.rs +++ b/crates/ingress-http/src/handler/path_parsing.rs @@ -229,11 +229,12 @@ where StorageReader: InvocationStorageReader + Clone + Send + Sync + 'static, { /// This function takes care of parsing the path of the request, inferring the correct request type - pub(crate) fn parse_path(&self, uri: &Uri) -> Result { + pub(crate) fn parse_path(&mut self, uri: &Uri) -> Result { let mut path_parts = uri.path().split('/').skip(1); let first_segment = path_parts.next().ok_or(HandlerError::NotFound)?; + let schema = self.schemas.live_load(); match first_segment { "restate" => match path_parts.next().ok_or(HandlerError::NotFound)? { "health" => Ok(RequestType::Health), @@ -241,7 +242,7 @@ where AwakeableRequestType::from_path_chunks(path_parts)?, )), "invocation" => Ok(RequestType::Invocation( - InvocationRequestType::from_path_chunks(path_parts, &self.schemas)?, + InvocationRequestType::from_path_chunks(path_parts, schema)?, )), "workflow" => Ok(RequestType::Workflow( WorkflowRequestType::from_path_chunks(path_parts)?, @@ -252,7 +253,7 @@ where segment => Ok(RequestType::Service(ServiceRequestType::from_path_chunks( path_parts, segment.to_owned(), - &self.schemas, + schema, )?)), } } diff --git a/crates/ingress-http/src/handler/service_handler.rs b/crates/ingress-http/src/handler/service_handler.rs index bd1afb32a0..a3e68aa9f9 100644 --- a/crates/ingress-http/src/handler/service_handler.rs +++ b/crates/ingress-http/src/handler/service_handler.rs @@ -85,6 +85,7 @@ where let invocation_target_meta = if let Some(invocation_target) = self .schemas + .pinned() .resolve_latest_invocation_target(&service_name, &handler_name) { if !invocation_target.public { diff --git a/crates/ingress-http/src/handler/tests.rs b/crates/ingress-http/src/handler/tests.rs index 898abe1b66..2b836942a8 100644 --- a/crates/ingress-http/src/handler/tests.rs +++ b/crates/ingress-http/src/handler/tests.rs @@ -17,6 +17,7 @@ use googletest::prelude::*; use http::StatusCode; use http::{Method, Request, Response}; use http_body_util::{BodyExt, Empty, Full}; +use restate_types::live::Live; use tokio::sync::mpsc; use tower::ServiceExt; use tracing_test::traced_test; @@ -1093,7 +1094,12 @@ where let handler_fut = node_env.tc.run_in_scope( "ingress", None, - Handler::new(schemas, dispatcher, invocation_storage_reader).oneshot(req), + Handler::new( + Live::from_value(schemas), + dispatcher, + invocation_storage_reader, + ) + .oneshot(req), ); // Mock the service invocation receiver diff --git a/crates/ingress-http/src/handler/workflow.rs b/crates/ingress-http/src/handler/workflow.rs index dae2f91b59..a3c11f8e3c 100644 --- a/crates/ingress-http/src/handler/workflow.rs +++ b/crates/ingress-http/src/handler/workflow.rs @@ -99,6 +99,7 @@ where response.idempotency_expiry_time.as_deref(), move |invocation_target| { self.schemas + .pinned() .resolve_latest_invocation_target( invocation_target.service_name(), invocation_target.handler_name(), @@ -146,6 +147,7 @@ where None, move |invocation_target| { self.schemas + .pinned() .resolve_latest_invocation_target( invocation_target.service_name(), invocation_target.handler_name(), diff --git a/crates/ingress-http/src/server.rs b/crates/ingress-http/src/server.rs index 87e3e5fb74..0a9b57673c 100644 --- a/crates/ingress-http/src/server.rs +++ b/crates/ingress-http/src/server.rs @@ -21,6 +21,7 @@ use hyper_util::server::conn::auto; use restate_core::{cancellation_watcher, task_center, TaskKind}; use restate_ingress_dispatcher::{DispatchIngressRequest, IngressDispatcher}; use restate_types::config::IngressOptions; +use restate_types::live::Live; use restate_types::schema::invocation_target::InvocationTargetResolver; use restate_types::schema::service::ServiceMetadataResolver; use std::convert::Infallible; @@ -56,7 +57,7 @@ pub struct HyperServerIngress { concurrency_limit: usize, // Parameters to build the layers - schemas: Schemas, + schemas: Live, dispatcher: Dispatcher, storage_reader: StorageReader, @@ -72,7 +73,7 @@ where pub fn from_options( ingress_options: &IngressOptions, dispatcher: IngressDispatcher, - schemas: Schemas, + schemas: Live, storage_reader: StorageReader, ) -> HyperServerIngress { crate::metric_definitions::describe_metrics(); @@ -97,7 +98,7 @@ where pub(crate) fn new( listening_addr: SocketAddr, concurrency_limit: usize, - schemas: Schemas, + schemas: Live, dispatcher: Dispatcher, storage_reader: StorageReader, ) -> (Self, StartSignal) { @@ -346,7 +347,7 @@ mod tests { let (ingress, start_signal) = HyperServerIngress::new( "0.0.0.0:0".parse().unwrap(), Semaphore::MAX_PERMITS, - mock_schemas(), + Live::from_value(mock_schemas()), MockDispatcher::new(ingress_request_tx), MockStorageReader::default(), ); diff --git a/crates/invoker-api/src/entry_enricher.rs b/crates/invoker-api/src/entry_enricher.rs index ef68d0d3b3..15b7fe571a 100644 --- a/crates/invoker-api/src/entry_enricher.rs +++ b/crates/invoker-api/src/entry_enricher.rs @@ -15,7 +15,7 @@ use restate_types::journal::raw::PlainRawEntry; pub trait EntryEnricher { fn enrich_entry( - &self, + &mut self, entry: PlainRawEntry, current_invocation_target: &InvocationTarget, current_invocation_span_context: &ServiceInvocationSpanContext, @@ -38,7 +38,7 @@ pub mod test_util { impl EntryEnricher for MockEntryEnricher { fn enrich_entry( - &self, + &mut self, entry: PlainRawEntry, _current_invocation_target: &InvocationTarget, current_invocation_span_context: &ServiceInvocationSpanContext, diff --git a/crates/invoker-impl/src/invocation_task/mod.rs b/crates/invoker-impl/src/invocation_task/mod.rs index c2c0a5864b..b9db3711f9 100644 --- a/crates/invoker-impl/src/invocation_task/mod.rs +++ b/crates/invoker-impl/src/invocation_task/mod.rs @@ -32,6 +32,7 @@ use restate_types::identifiers::{DeploymentId, EntryIndex, InvocationId, Partiti use restate_types::invocation::InvocationTarget; use restate_types::journal::enriched::EnrichedRawEntry; use restate_types::journal::EntryType; +use restate_types::live::Live; use restate_types::schema::deployment::DeploymentResolver; use restate_types::service_protocol::ServiceProtocolVersion; use restate_types::service_protocol::{MAX_SERVICE_PROTOCOL_VERSION, MIN_SERVICE_PROTOCOL_VERSION}; @@ -261,7 +262,7 @@ pub(super) struct InvocationTask { state_reader: SR, journal_reader: JR, entry_enricher: EE, - deployment_metadata_resolver: DMR, + deployment_metadata_resolver: Live, invoker_tx: mpsc::UnboundedSender, invoker_rx: mpsc::UnboundedReceiver, } @@ -319,7 +320,7 @@ where state_reader: SR, journal_reader: JR, entry_enricher: EE, - deployment_metadata_resolver: DMR, + deployment_metadata_resolver: Live, invoker_tx: mpsc::UnboundedSender, invoker_rx: mpsc::UnboundedReceiver, ) -> Self { @@ -409,6 +410,7 @@ where // deployments have been registered for the same service. let deployment_metadata = shortcircuit!(self .deployment_metadata_resolver + .live_load() .get_deployment(&pinned_deployment.deployment_id) .ok_or_else(|| InvocationTaskError::UnknownDeployment( pinned_deployment.deployment_id @@ -432,6 +434,7 @@ where // of the registered service. let deployment = shortcircuit!(self .deployment_metadata_resolver + .live_load() .resolve_latest_deployment_for_service(self.invocation_target.service_name()) .ok_or(InvocationTaskError::NoDeploymentForService)); diff --git a/crates/invoker-impl/src/lib.rs b/crates/invoker-impl/src/lib.rs index 5cac2854ca..a04011ccf9 100644 --- a/crates/invoker-impl/src/lib.rs +++ b/crates/invoker-impl/src/lib.rs @@ -36,7 +36,7 @@ use restate_types::identifiers::{EntryIndex, PartitionLeaderEpoch}; use restate_types::journal::enriched::EnrichedRawEntry; use restate_types::journal::raw::PlainRawEntry; use restate_types::journal::Completion; -use restate_types::live::LiveLoad; +use restate_types::live::{Live, LiveLoad}; use restate_types::retries::RetryPolicy; use restate_types::schema::deployment::DeploymentResolver; use status_store::InvocationStatusStore; @@ -88,11 +88,10 @@ trait InvocationTaskRunner { ) -> AbortHandle; } -#[derive(Debug)] struct DefaultInvocationTaskRunner { client: ServiceClient, entry_enricher: EE, - deployment_metadata_resolver: DMR, + deployment_metadata_resolver: Live, } impl InvocationTaskRunner for DefaultInvocationTaskRunner @@ -101,7 +100,7 @@ where ::JournalStream: Unpin + Send + 'static, ::StateIter: Send, EE: EntryEnricher + Clone + Send + Sync + 'static, - DMR: DeploymentResolver + Clone + Send + 'static, + DMR: DeploymentResolver + Clone + Send + Sync + 'static, { fn start_invocation_task( &self, @@ -140,7 +139,7 @@ where // -- Service implementation -#[derive(Debug)] +//#[derive(Debug)] pub struct Service { // Used for constructing the invoker sender and status reader input_tx: mpsc::UnboundedSender>, @@ -161,7 +160,7 @@ impl Service { #[allow(clippy::too_many_arguments)] pub(crate) fn new( options: &InvokerOptions, - deployment_metadata_resolver: DMR, + deployment_metadata_resolver: Live, client: ServiceClient, entry_enricher: EE, ) -> Service @@ -202,7 +201,7 @@ impl Service { service_client_options: &ServiceClientOptions, invoker_options: &InvokerOptions, entry_enricher: EE, - deployment_registry: DMR, + deployment_registry: Live, ) -> Result, BuildError> where SR: JournalReader + StateReader + Clone + Send + Sync + 'static, @@ -235,7 +234,7 @@ where ::JournalStream: Unpin + Send + 'static, ::StateIter: Send, EE: EntryEnricher + Clone + Send + Sync + 'static, - EMR: DeploymentResolver + Clone + Send + 'static, + EMR: DeploymentResolver + Clone + Send + Sync + 'static, { pub fn handle(&self) -> InvokerHandle { InvokerHandle { @@ -1158,7 +1157,7 @@ mod tests { let service = Service::new( &invoker_options, // all invocations are unknown leading to immediate retries - MockDeploymentMetadataRegistry::default(), + Live::from_value(MockDeploymentMetadataRegistry::default()), ServiceClient::from_options( &ServiceClientOptions::default(), restate_service_client::AssumeRoleCacheMode::None, diff --git a/crates/node/src/lib.rs b/crates/node/src/lib.rs index bff1975738..d36002e71a 100644 --- a/crates/node/src/lib.rs +++ b/crates/node/src/lib.rs @@ -153,7 +153,7 @@ impl Node { metadata_store_client.clone(), ); metadata_manager.register_in_message_router(&mut router_builder); - let updating_schema_information = metadata.schema_updateable(); + let updating_schema_information = metadata.updateable_schema(); let bifrost = BifrostService::new(metadata.clone()); let tc = task_center(); diff --git a/crates/node/src/roles/worker.rs b/crates/node/src/roles/worker.rs index 3f725a75fe..3973ae3634 100644 --- a/crates/node/src/roles/worker.rs +++ b/crates/node/src/roles/worker.rs @@ -13,14 +13,14 @@ use codederror::CodedError; use restate_bifrost::Bifrost; use restate_core::network::MessageRouterBuilder; use restate_core::network::Networking; -use restate_core::{cancellation_watcher, metadata, task_center, Metadata, MetadataKind}; +use restate_core::{cancellation_watcher, task_center, Metadata, MetadataKind}; use restate_core::{ShutdownError, TaskKind}; use restate_metadata_store::MetadataStoreClient; use restate_storage_query_datafusion::context::QueryContext; use restate_types::config::Configuration; use restate_types::live::Live; use restate_types::schema::subscriptions::SubscriptionResolver; -use restate_types::schema::UpdateableSchema; +use restate_types::schema::Schema; use restate_types::Version; use restate_worker::SubscriptionController; use restate_worker::Worker; @@ -59,6 +59,7 @@ pub enum WorkerRoleBuildError { } pub struct WorkerRole { + metadata: Metadata, worker: Worker, } @@ -70,11 +71,11 @@ impl WorkerRole { networking: Networking, bifrost: Bifrost, metadata_store_client: MetadataStoreClient, - updating_schema_information: UpdateableSchema, + updating_schema_information: Live, ) -> Result { let worker = Worker::create( updateable_config, - metadata, + metadata.clone(), networking, bifrost, router_builder, @@ -83,7 +84,7 @@ impl WorkerRole { ) .await?; - Ok(WorkerRole { worker }) + Ok(WorkerRole { worker, metadata }) } pub fn storage_query_context(&self) -> &QueryContext { @@ -97,7 +98,7 @@ impl WorkerRole { TaskKind::MetadataBackgroundSync, "subscription_controller", None, - Self::watch_subscriptions(self.worker.subscription_controller_handle()), + Self::watch_subscriptions(self.metadata, self.worker.subscription_controller_handle()), )?; tc.spawn_child(TaskKind::RoleRunner, "worker-service", None, async { @@ -107,12 +108,14 @@ impl WorkerRole { Ok(()) } - async fn watch_subscriptions(subscription_controller: SC) -> anyhow::Result<()> + async fn watch_subscriptions( + metadata: Metadata, + subscription_controller: SC, + ) -> anyhow::Result<()> where SC: SubscriptionController + Clone + Send + Sync, { - let metadata = metadata(); - let schema_view = metadata.schema_updateable(); + let schema_view = metadata.updateable_schema(); let mut next_version = Version::MIN; let cancellation_watcher = cancellation_watcher(); tokio::pin!(cancellation_watcher); @@ -128,7 +131,7 @@ impl WorkerRole { // This might return subscriptions belonging to a higher schema version. As a // result we might re-apply the same list of subscriptions. This is not a // problem, since update_subscriptions is idempotent. - let subscriptions = schema_view.list_subscriptions(&[]); + let subscriptions = schema_view.pinned().list_subscriptions(&[]); subscription_controller .update_subscriptions(subscriptions) .await?; diff --git a/crates/storage-query-datafusion/src/context.rs b/crates/storage-query-datafusion/src/context.rs index adb0f43ecb..0eb2b2e5fd 100644 --- a/crates/storage-query-datafusion/src/context.rs +++ b/crates/storage-query-datafusion/src/context.rs @@ -26,6 +26,7 @@ use restate_partition_store::PartitionStoreManager; use restate_types::config::QueryEngineOptions; use restate_types::errors::GenericError; use restate_types::identifiers::PartitionId; +use restate_types::live::Live; use restate_types::schema::deployment::DeploymentResolver; use restate_types::schema::service::ServiceMetadataResolver; @@ -94,13 +95,9 @@ impl QueryContext { partition_selector: impl SelectPartitions + Clone, partition_store_manager: PartitionStoreManager, status: impl StatusHandle + Send + Sync + Debug + Clone + 'static, - schemas: impl DeploymentResolver - + ServiceMetadataResolver - + Send - + Sync - + Debug - + Clone - + 'static, + schemas: Live< + impl DeploymentResolver + ServiceMetadataResolver + Send + Sync + Debug + Clone + 'static, + >, ) -> Result { let ctx = QueryContext::new( options.memory_size.get(), diff --git a/crates/storage-query-datafusion/src/deployment/table.rs b/crates/storage-query-datafusion/src/deployment/table.rs index 5ac3fcf043..a0cc67b526 100644 --- a/crates/storage-query-datafusion/src/deployment/table.rs +++ b/crates/storage-query-datafusion/src/deployment/table.rs @@ -8,7 +8,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use std::fmt::Debug; +use std::fmt; use std::sync::Arc; use datafusion::arrow::datatypes::SchemaRef; @@ -16,6 +16,7 @@ use datafusion::arrow::record_batch::RecordBatch; use datafusion::logical_expr::Expr; use datafusion::physical_plan::stream::RecordBatchReceiverStream; use datafusion::physical_plan::SendableRecordBatchStream; +use restate_types::live::Live; use tokio::sync::mpsc::Sender; use restate_types::identifiers::ServiceRevision; @@ -29,7 +30,7 @@ use crate::table_util::Builder; pub(crate) fn register_self( ctx: &QueryContext, - resolver: impl DeploymentResolver + Send + Sync + Debug + 'static, + resolver: Live, ) -> datafusion::common::Result<()> { let deployment_table = GenericTableProvider::new( SysDeploymentBuilder::schema(), @@ -41,12 +42,16 @@ pub(crate) fn register_self( .map(|_| ()) } -#[derive(Debug, Clone)] -struct DeploymentMetadataScanner(DMR); +#[derive(Clone)] +struct DeploymentMetadataScanner(Live); -impl Scan - for DeploymentMetadataScanner -{ +impl fmt::Debug for DeploymentMetadataScanner { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("DeploymentMetadataScanner") + } +} + +impl Scan for DeploymentMetadataScanner { fn scan( &self, projection: SchemaRef, @@ -57,7 +62,7 @@ impl Scan let mut stream_builder = RecordBatchReceiverStream::builder(projection, 16); let tx = stream_builder.tx(); - let rows = self.0.get_deployments(); + let rows = self.0.pinned().get_deployments(); stream_builder.spawn(async move { for_each_state(schema, tx, rows).await; Ok(()) diff --git a/crates/storage-query-datafusion/src/mocks.rs b/crates/storage-query-datafusion/src/mocks.rs index 60e2c8d6ec..da16a84d41 100644 --- a/crates/storage-query-datafusion/src/mocks.rs +++ b/crates/storage-query-datafusion/src/mocks.rs @@ -27,7 +27,7 @@ use restate_types::config::{CommonOptions, QueryEngineOptions, WorkerOptions}; use restate_types::errors::GenericError; use restate_types::identifiers::{DeploymentId, PartitionId, PartitionKey, ServiceRevision}; use restate_types::invocation::ServiceType; -use restate_types::live::Constant; +use restate_types::live::{Constant, Live}; use restate_types::schema::deployment::test_util::MockDeploymentMetadataRegistry; use restate_types::schema::deployment::{Deployment, DeploymentResolver}; use restate_types::schema::service::test_util::MockServiceMetadataResolver; @@ -134,7 +134,7 @@ impl MockQueryEngine { MockPartitionSelector, manager, status, - schemas, + Live::from_value(schemas), ) .await .unwrap(), diff --git a/crates/storage-query-datafusion/src/service/table.rs b/crates/storage-query-datafusion/src/service/table.rs index 59dd934ca1..d18d1e1211 100644 --- a/crates/storage-query-datafusion/src/service/table.rs +++ b/crates/storage-query-datafusion/src/service/table.rs @@ -8,7 +8,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use std::fmt::Debug; +use std::fmt; use std::sync::Arc; use datafusion::arrow::datatypes::SchemaRef; @@ -16,6 +16,7 @@ use datafusion::arrow::record_batch::RecordBatch; use datafusion::logical_expr::Expr; use datafusion::physical_plan::stream::RecordBatchReceiverStream; use datafusion::physical_plan::SendableRecordBatchStream; +use restate_types::live::Live; use tokio::sync::mpsc::Sender; use restate_types::schema::service::{ServiceMetadata, ServiceMetadataResolver}; @@ -28,7 +29,7 @@ use crate::table_util::Builder; pub(crate) fn register_self( ctx: &QueryContext, - resolver: impl ServiceMetadataResolver + Send + Sync + Debug + 'static, + resolver: Live, ) -> datafusion::common::Result<()> { let service_table = GenericTableProvider::new( SysServiceBuilder::schema(), @@ -40,12 +41,16 @@ pub(crate) fn register_self( .map(|_| ()) } -#[derive(Debug, Clone)] -struct ServiceMetadataScanner(SMR); +#[derive(Clone)] +struct ServiceMetadataScanner(Live); -impl Scan - for ServiceMetadataScanner -{ +impl fmt::Debug for ServiceMetadataScanner { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("ServiceMetadataScanner") + } +} + +impl Scan for ServiceMetadataScanner { fn scan( &self, projection: SchemaRef, @@ -56,7 +61,7 @@ impl Scan let mut stream_builder = RecordBatchReceiverStream::builder(projection, 16); let tx = stream_builder.tx(); - let rows = self.0.list_services(); + let rows = self.0.pinned().list_services(); stream_builder.spawn(async move { for_each_state(schema, tx, rows).await; Ok(()) diff --git a/crates/storage-query-datafusion/src/table_providers.rs b/crates/storage-query-datafusion/src/table_providers.rs index 864e397080..9d14dc826b 100644 --- a/crates/storage-query-datafusion/src/table_providers.rs +++ b/crates/storage-query-datafusion/src/table_providers.rs @@ -189,7 +189,7 @@ where // Generic-based table provider that provides node-level or global data rather than // partition-keyed data. -pub(crate) trait Scan: Send + Sync + Debug + 'static { +pub(crate) trait Scan: Debug + Send + Sync + 'static { fn scan( &self, projection: SchemaRef, diff --git a/crates/types/src/live.rs b/crates/types/src/live.rs index fc6cc3eb59..068a866755 100644 --- a/crates/types/src/live.rs +++ b/crates/types/src/live.rs @@ -59,6 +59,9 @@ impl Live { } impl Live { + pub fn from_value(value: T) -> Self { + Self::from(Arc::new(ArcSwap::from_pointee(value))) + } /// Potentially fast access to a snapshot, should be used if using [[Updateable]] /// isn't possible (requires mutablility to call load()). /// Pinned doesn't track updates. diff --git a/crates/types/src/schema/deployment.rs b/crates/types/src/schema/deployment.rs index 5c4ea55365..f6bad00a92 100644 --- a/crates/types/src/schema/deployment.rs +++ b/crates/types/src/schema/deployment.rs @@ -21,7 +21,7 @@ use serde_with::serde_as; use crate::identifiers::{DeploymentId, LambdaARN, ServiceRevision}; use crate::schema::service::ServiceMetadata; -use crate::schema::{Schema, UpdateableSchema}; +use crate::schema::Schema; use crate::time::MillisSinceEpoch; #[derive(Debug, Copy, Clone, Eq, PartialEq, Serialize, Deserialize)] @@ -348,29 +348,3 @@ impl DeploymentResolver for Schema { .collect() } } - -impl DeploymentResolver for UpdateableSchema { - fn resolve_latest_deployment_for_service( - &self, - service_name: impl AsRef, - ) -> Option { - self.0 - .load() - .resolve_latest_deployment_for_service(service_name) - } - - fn get_deployment(&self, deployment_id: &DeploymentId) -> Option { - self.0.load().get_deployment(deployment_id) - } - - fn get_deployment_and_services( - &self, - deployment_id: &DeploymentId, - ) -> Option<(Deployment, Vec)> { - self.0.load().get_deployment_and_services(deployment_id) - } - - fn get_deployments(&self) -> Vec<(Deployment, Vec<(String, ServiceRevision)>)> { - self.0.load().get_deployments() - } -} diff --git a/crates/types/src/schema/invocation_target.rs b/crates/types/src/schema/invocation_target.rs index d9257378b8..12de9f5532 100644 --- a/crates/types/src/schema/invocation_target.rs +++ b/crates/types/src/schema/invocation_target.rs @@ -17,7 +17,7 @@ use bytestring::ByteString; use itertools::Itertools; use serde::{Deserialize, Serialize}; -use super::{Schema, UpdateableSchema}; +use super::Schema; use crate::invocation::InvocationTargetType; pub const DEFAULT_IDEMPOTENCY_RETENTION: Duration = Duration::from_secs(60 * 60 * 24); @@ -74,18 +74,6 @@ impl InvocationTargetResolver for Schema { } } -impl InvocationTargetResolver for UpdateableSchema { - fn resolve_latest_invocation_target( - &self, - service_name: impl AsRef, - handler_name: impl AsRef, - ) -> Option { - self.0 - .load() - .resolve_latest_invocation_target(service_name, handler_name) - } -} - // --- Input rules #[derive(Debug, thiserror::Error)] diff --git a/crates/types/src/schema/mod.rs b/crates/types/src/schema/mod.rs index 413d60ca68..b2f22b568f 100644 --- a/crates/types/src/schema/mod.rs +++ b/crates/types/src/schema/mod.rs @@ -14,9 +14,7 @@ pub mod service; pub mod subscriptions; use std::collections::HashMap; -use std::sync::Arc; -use arc_swap::ArcSwap; use serde_with::serde_as; use self::deployment::DeploymentSchemas; @@ -27,13 +25,6 @@ use crate::identifiers::{DeploymentId, SubscriptionId}; use crate::Version; use crate::Versioned; -/// Schema information which automatically loads the latest version when accessing it. -/// -/// Temporary bridge until users are migrated to directly using the metadata -/// provided schema information. -#[derive(Debug, Default, Clone, derive_more::From)] -pub struct UpdateableSchema(Arc>); - /// The schema information #[serde_as] #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] diff --git a/crates/types/src/schema/service.rs b/crates/types/src/schema/service.rs index 318ad259f4..bb7fcf136e 100644 --- a/crates/types/src/schema/service.rs +++ b/crates/types/src/schema/service.rs @@ -17,7 +17,6 @@ use serde_with::serde_as; use super::invocation_target::InvocationTargetMetadata; use super::Schema; -use super::UpdateableSchema; use crate::identifiers::{DeploymentId, ServiceRevision}; use crate::invocation::{ InvocationTargetType, ServiceType, VirtualObjectHandlerType, WorkflowHandlerType, @@ -191,20 +190,6 @@ impl ServiceMetadataResolver for Schema { } } -impl ServiceMetadataResolver for UpdateableSchema { - fn resolve_latest_service(&self, service_name: impl AsRef) -> Option { - self.0.load().resolve_latest_service(service_name) - } - - fn resolve_latest_service_type(&self, service_name: impl AsRef) -> Option { - self.0.load().resolve_latest_service_type(service_name) - } - - fn list_services(&self) -> Vec { - self.0.load().list_services() - } -} - #[cfg(feature = "test-util")] #[allow(dead_code)] pub mod test_util { diff --git a/crates/types/src/schema/subscriptions.rs b/crates/types/src/schema/subscriptions.rs index 5ac5f11f00..4cd7c97d74 100644 --- a/crates/types/src/schema/subscriptions.rs +++ b/crates/types/src/schema/subscriptions.rs @@ -15,7 +15,7 @@ use serde::Deserialize; use serde::Serialize; use tracing::warn; -use super::{Schema, UpdateableSchema}; +use super::Schema; use crate::config::IngressOptions; use crate::errors::GenericError; use crate::identifiers::SubscriptionId; @@ -163,16 +163,6 @@ impl SubscriptionResolver for Schema { } } -impl SubscriptionResolver for UpdateableSchema { - fn get_subscription(&self, id: SubscriptionId) -> Option { - self.0.load().get_subscription(id) - } - - fn list_subscriptions(&self, filters: &[ListSubscriptionFilter]) -> Vec { - self.0.load().list_subscriptions(filters) - } -} - pub trait SubscriptionValidator { type Error: Into; diff --git a/crates/worker/src/invoker_integration.rs b/crates/worker/src/invoker_integration.rs index a6cc93ea02..ed7dd501df 100644 --- a/crates/worker/src/invoker_integration.rs +++ b/crates/worker/src/invoker_integration.rs @@ -28,17 +28,18 @@ use restate_types::journal::enriched::{ use restate_types::journal::raw::{PlainEntryHeader, PlainRawEntry, RawEntry, RawEntryCodec}; use restate_types::journal::{CompleteAwakeableEntry, Entry, InvokeEntry, OneWayCallEntry}; use restate_types::journal::{EntryType, InvokeRequest}; +use restate_types::live::Live; use restate_types::schema::invocation_target::InvocationTargetResolver; -#[derive(Debug, Clone)] +#[derive(Clone)] pub(super) struct EntryEnricher { - schemas: Schemas, + schemas: Live, _codec: PhantomData, } impl EntryEnricher { - pub(super) fn new(schemas: Schemas) -> Self { + pub(super) fn new(schemas: Live) -> Self { Self { schemas, _codec: Default::default(), @@ -52,7 +53,7 @@ where Codec: RawEntryCodec, { fn resolve_service_invocation_target( - &self, + &mut self, entry_type: EntryType, serialized_entry: &Bytes, request_extractor: impl Fn(Entry) -> InvokeRequest, @@ -64,6 +65,7 @@ where let meta = self .schemas + .live_load() .resolve_latest_invocation_target(&request.service_name, &request.handler_name) .ok_or_else(|| { InvocationError::service_handler_not_found( @@ -118,7 +120,7 @@ where Codec: RawEntryCodec, { fn enrich_entry( - &self, + &mut self, entry: PlainRawEntry, current_invocation_target: &InvocationTarget, current_invocation_span_context: &ServiceInvocationSpanContext, diff --git a/crates/worker/src/lib.rs b/crates/worker/src/lib.rs index cc40c2005b..deb633a82a 100644 --- a/crates/worker/src/lib.rs +++ b/crates/worker/src/lib.rs @@ -42,7 +42,7 @@ use restate_storage_query_datafusion::context::QueryContext; use restate_storage_query_postgres::service::PostgresQueryService; use restate_types::config::Configuration; use restate_types::live::Live; -use restate_types::schema::UpdateableSchema; +use restate_types::schema::Schema; pub use self::error::*; pub use self::handle::*; @@ -57,7 +57,7 @@ type PartitionProcessor = partition::PartitionProcessor< >; type ExternalClientIngress = - HyperServerIngress; + HyperServerIngress; #[derive(Debug, thiserror::Error, CodedError)] #[error("failed creating worker: {0}")] @@ -100,8 +100,8 @@ pub struct Worker { #[allow(clippy::type_complexity)] invoker: InvokerService< InvokerStorageReader, - EntryEnricher, - UpdateableSchema, + EntryEnricher, + Schema, >, external_client_ingress: ExternalClientIngress, ingress_kafka: IngressKafkaService, @@ -116,7 +116,7 @@ impl Worker { networking: Networking, bifrost: Bifrost, router_builder: &mut MessageRouterBuilder, - schema_view: UpdateableSchema, + schema: Live, metadata_store_client: MetadataStoreClient, ) -> Result { metric_definitions::describe_metrics(); @@ -145,15 +145,15 @@ impl Worker { let ingress_http = HyperServerIngress::from_options( &config.ingress, ingress_dispatcher.clone(), - schema_view.clone(), + schema.clone(), InvocationStorageReaderImpl::new(partition_store_manager.clone()), ); let invoker = InvokerService::from_options( &config.common.service_client, &config.worker.invoker, - EntryEnricher::new(schema_view.clone()), - schema_view.clone(), + EntryEnricher::new(schema.clone()), + schema.clone(), )?; let partition_processor_manager = PartitionProcessorManager::new( @@ -173,7 +173,7 @@ impl Worker { partition_processor_manager.handle(), partition_store_manager.clone(), invoker.status_reader(), - schema_view.clone(), + schema.clone(), ) .await?;