From b3ded09418aac568fb389ee1a87722b7744adef7 Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Thu, 12 Sep 2024 17:55:31 +0200 Subject: [PATCH] feat!: make callback an opaque type (#1405) * feat!: make callback an opaque type It may allow special case for channel handlers later, in order to use async methods, or change the return type of the callback in order to support async callbacks. For the record I've tested quickly with an enum inside `Callback`, and there was a sensitive cost in performance of 1% when using callbacks. * Retrigger CI * Retrigger CI * Retrigger CI --- zenoh-ext/src/querying_subscriber.rs | 35 ++++++----- zenoh/src/api/admin.rs | 6 +- zenoh/src/api/handlers/callback.rs | 71 ++++++++++++++++------- zenoh/src/api/handlers/fifo.rs | 21 ++++--- zenoh/src/api/handlers/mod.rs | 13 ++--- zenoh/src/api/handlers/ring.rs | 14 +++-- zenoh/src/api/liveliness.rs | 16 +++--- zenoh/src/api/publisher.rs | 10 ++-- zenoh/src/api/query.rs | 12 ++-- zenoh/src/api/queryable.rs | 11 ++-- zenoh/src/api/scouting.rs | 12 ++-- zenoh/src/api/session.rs | 86 ++++++++++++++-------------- zenoh/src/api/subscriber.rs | 10 ++-- 13 files changed, 173 insertions(+), 144 deletions(-) diff --git a/zenoh-ext/src/querying_subscriber.rs b/zenoh-ext/src/querying_subscriber.rs index 9796109510..625494a757 100644 --- a/zenoh-ext/src/querying_subscriber.rs +++ b/zenoh-ext/src/querying_subscriber.rs @@ -23,7 +23,7 @@ use std::{ #[cfg(feature = "unstable")] use zenoh::pubsub::Reliability; use zenoh::{ - handlers::{locked, DefaultHandler, IntoHandler}, + handlers::{locked, Callback, DefaultHandler, IntoHandler}, internal::zlock, key_expr::KeyExpr, prelude::Wait, @@ -97,7 +97,7 @@ impl<'a, 'b, KeySpace> QueryingSubscriberBuilder<'a, 'b, KeySpace, DefaultHandle handler: Handler, ) -> QueryingSubscriberBuilder<'a, 'b, KeySpace, Handler> where - Handler: IntoHandler<'static, Sample>, + Handler: IntoHandler, { let QueryingSubscriberBuilder { session, @@ -230,7 +230,7 @@ impl QueryingSubscriberBuilder<'_, '_, KeySpace, Handler> { impl Resolvable for QueryingSubscriberBuilder<'_, '_, KeySpace, Handler> where - Handler: IntoHandler<'static, Sample>, + Handler: IntoHandler, Handler::Handler: Send, { type To = ZResult>; @@ -239,7 +239,7 @@ where impl Wait for QueryingSubscriberBuilder<'_, '_, KeySpace, Handler> where KeySpace: Into + Clone, - Handler: IntoHandler<'static, Sample> + Send, + Handler: IntoHandler + Send, Handler::Handler: Send, { fn wait(self) -> ::To { @@ -288,7 +288,7 @@ where impl IntoFuture for QueryingSubscriberBuilder<'_, '_, KeySpace, Handler> where KeySpace: Into + Clone, - Handler: IntoHandler<'static, Sample> + Send, + Handler: IntoHandler + Send, Handler::Handler: Send, { type Output = ::To; @@ -469,7 +469,7 @@ where handler: Handler, ) -> FetchingSubscriberBuilder<'a, 'b, KeySpace, Handler, Fetch, TryIntoSample> where - Handler: IntoHandler<'static, Sample>, + Handler: IntoHandler, { let FetchingSubscriberBuilder { session, @@ -574,7 +574,7 @@ impl< TryIntoSample, > Resolvable for FetchingSubscriberBuilder<'_, '_, KeySpace, Handler, Fetch, TryIntoSample> where - Handler: IntoHandler<'static, Sample>, + Handler: IntoHandler, Handler::Handler: Send, TryIntoSample: ExtractSample, { @@ -589,7 +589,7 @@ impl< > Wait for FetchingSubscriberBuilder<'_, '_, KeySpace, Handler, Fetch, TryIntoSample> where KeySpace: Into, - Handler: IntoHandler<'static, Sample> + Send, + Handler: IntoHandler + Send, Handler::Handler: Send, TryIntoSample: ExtractSample + Send + Sync, { @@ -606,7 +606,7 @@ impl< > IntoFuture for FetchingSubscriberBuilder<'_, '_, KeySpace, Handler, Fetch, TryIntoSample> where KeySpace: Into, - Handler: IntoHandler<'static, Sample> + Send, + Handler: IntoHandler + Send, Handler::Handler: Send, TryIntoSample: ExtractSample + Send + Sync, { @@ -651,7 +651,7 @@ where /// ``` pub struct FetchingSubscriber { subscriber: Subscriber<()>, - callback: Arc, + callback: Callback, state: Arc>, handler: Handler, } @@ -681,7 +681,7 @@ impl FetchingSubscriber { ) -> ZResult where KeySpace: Into, - InputHandler: IntoHandler<'static, Sample, Handler = Handler> + Send, + InputHandler: IntoHandler + Send, TryIntoSample: ExtractSample + Send + Sync, { let session_id = conf.session.zid(); @@ -698,7 +698,7 @@ impl FetchingSubscriber { move |s| { let state = &mut zlock!(state); if state.pending_fetches == 0 { - callback(s); + callback.call(s); } else { tracing::trace!( "Sample received while fetch in progress: push it to merge_queue" @@ -823,7 +823,7 @@ impl FetchingSubscriber { struct RepliesHandler { state: Arc>, - callback: Arc, + callback: Callback, } impl Drop for RepliesHandler { @@ -840,7 +840,7 @@ impl Drop for RepliesHandler { state.merge_queue.len() ); for s in state.merge_queue.drain() { - (self.callback)(s); + self.callback.call(s); } } } @@ -888,7 +888,7 @@ pub struct FetchBuilder< fetch: Fetch, phantom: std::marker::PhantomData, state: Arc>, - callback: Arc, + callback: Callback, } impl) -> ZResult<()>, TryIntoSample> @@ -923,10 +923,7 @@ where } } -fn register_handler( - state: Arc>, - callback: Arc, -) -> RepliesHandler { +fn register_handler(state: Arc>, callback: Callback) -> RepliesHandler { zlock!(state).pending_fetches += 1; // pending fetches will be decremented in RepliesHandler drop() RepliesHandler { state, callback } diff --git a/zenoh/src/api/admin.rs b/zenoh/src/api/admin.rs index f061779142..b96fc75dd2 100644 --- a/zenoh/src/api/admin.rs +++ b/zenoh/src/api/admin.rs @@ -34,7 +34,7 @@ use super::{ sample::{DataInfo, Locality, SampleKind}, subscriber::SubscriberKind, }; -use crate::api::session::WeakSession; +use crate::{api::session::WeakSession, handlers::Callback}; lazy_static::lazy_static!( static ref KE_STARSTAR: &'static keyexpr = unsafe { keyexpr::from_str_unchecked("**") }; @@ -54,10 +54,10 @@ pub(crate) fn init(session: WeakSession) { &admin_key, true, Locality::SessionLocal, - Arc::new({ + Callback::new(Arc::new({ let session = session.clone(); move |q| on_admin_query(&session, q) - }), + })), ); } } diff --git a/zenoh/src/api/handlers/callback.rs b/zenoh/src/api/handlers/callback.rs index 7e609949ef..23c0454de5 100644 --- a/zenoh/src/api/handlers/callback.rs +++ b/zenoh/src/api/handlers/callback.rs @@ -13,7 +13,10 @@ // //! Callback handler trait. -use super::{Dyn, IntoHandler}; + +use std::sync::Arc; + +use crate::api::handlers::IntoHandler; /// A function that can transform a [`FnMut`]`(T)` to /// a [`Fn`]`(T)` with the help of a [`Mutex`](std::sync::Mutex). @@ -22,50 +25,76 @@ pub fn locked(fnmut: impl FnMut(T)) -> impl Fn(T) { move |x| zlock!(lock)(x) } -/// An immutable callback function. -pub type Callback<'a, T> = Dyn; +/// Callback type used by zenoh entities. +pub struct Callback(Arc); + +impl Clone for Callback { + fn clone(&self) -> Self { + Self(self.0.clone()) + } +} + +impl Callback { + /// Instantiate a `Callback` from a callback function. + pub fn new(cb: Arc) -> Self { + Self(cb) + } + + /// Call the inner callback. + #[inline] + pub fn call(&self, arg: T) { + self.0(arg) + } +} + +impl IntoHandler for Callback { + type Handler = (); + fn into_handler(self) -> (Callback, Self::Handler) { + (self, ()) + } +} -impl<'a, T, F> IntoHandler<'a, T> for F +impl IntoHandler for F where - F: Fn(T) + Send + Sync + 'a, + F: Fn(T) + Send + Sync + 'static, { type Handler = (); - fn into_handler(self) -> (Callback<'a, T>, Self::Handler) { - (Dyn::from(self), ()) + fn into_handler(self) -> (Callback, Self::Handler) { + (Callback::new(Arc::new(self)), ()) } } -impl<'a, T, F, H> IntoHandler<'a, T> for (F, H) +impl IntoHandler for (F, H) where - F: Fn(T) + Send + Sync + 'a, + F: Fn(T) + Send + Sync + 'static, { type Handler = H; - fn into_handler(self) -> (Callback<'a, T>, Self::Handler) { - (Dyn::from(self.0), self.1) + fn into_handler(self) -> (Callback, Self::Handler) { + (self.0.into_handler().0, self.1) } } -impl<'a, T, H> IntoHandler<'a, T> for (Callback<'static, T>, H) { +impl IntoHandler for (Callback, H) { type Handler = H; - fn into_handler(self) -> (Callback<'a, T>, Self::Handler) { + fn into_handler(self) -> (Callback, Self::Handler) { self } } -impl IntoHandler<'static, T> for (flume::Sender, flume::Receiver) { +impl IntoHandler for (flume::Sender, flume::Receiver) { type Handler = flume::Receiver; - fn into_handler(self) -> (Callback<'static, T>, Self::Handler) { + fn into_handler(self) -> (Callback, Self::Handler) { let (sender, receiver) = self; ( - Dyn::new(move |t| { + Callback::new(Arc::new(move |t| { if let Err(e) = sender.send(t) { tracing::error!("{}", e) } - }), + })), receiver, ) } @@ -97,14 +126,14 @@ where } } -impl<'a, OnEvent, Event, DropFn> IntoHandler<'a, Event> for CallbackDrop +impl IntoHandler for CallbackDrop where - OnEvent: Fn(Event) + Send + Sync + 'a, + OnEvent: Fn(Event) + Send + Sync + 'static, DropFn: FnMut() + Send + Sync + 'static, { type Handler = (); - fn into_handler(self) -> (Callback<'a, Event>, Self::Handler) { - (Dyn::from(move |evt| (self.callback)(evt)), ()) + fn into_handler(self) -> (Callback, Self::Handler) { + (move |evt| (self.callback)(evt)).into_handler() } } diff --git a/zenoh/src/api/handlers/fifo.rs b/zenoh/src/api/handlers/fifo.rs index f0ae1a5257..db73aae0ef 100644 --- a/zenoh/src/api/handlers/fifo.rs +++ b/zenoh/src/api/handlers/fifo.rs @@ -13,7 +13,10 @@ // //! Callback handler trait. -use super::{callback::Callback, Dyn, IntoHandler, API_DATA_RECEPTION_CHANNEL_SIZE}; + +use std::sync::Arc; + +use crate::api::handlers::{callback::Callback, IntoHandler, API_DATA_RECEPTION_CHANNEL_SIZE}; /// The default handler in Zenoh is a FIFO queue. @@ -34,27 +37,27 @@ impl Default for FifoChannel { } } -impl IntoHandler<'static, T> for FifoChannel { +impl IntoHandler for FifoChannel { type Handler = flume::Receiver; - fn into_handler(self) -> (Callback<'static, T>, Self::Handler) { + fn into_handler(self) -> (Callback, Self::Handler) { flume::bounded(self.capacity).into_handler() } } -impl IntoHandler<'static, T> +impl IntoHandler for (std::sync::mpsc::SyncSender, std::sync::mpsc::Receiver) { type Handler = std::sync::mpsc::Receiver; - fn into_handler(self) -> (Callback<'static, T>, Self::Handler) { + fn into_handler(self) -> (Callback, Self::Handler) { let (sender, receiver) = self; ( - Dyn::new(move |t| { - if let Err(e) = sender.send(t) { - tracing::error!("{}", e) + Callback::new(Arc::new(move |t| { + if let Err(error) = sender.send(t.clone()) { + tracing::error!(%error) } - }), + })), receiver, ) } diff --git a/zenoh/src/api/handlers/mod.rs b/zenoh/src/api/handlers/mod.rs index 60ec658a4d..e4a706f172 100644 --- a/zenoh/src/api/handlers/mod.rs +++ b/zenoh/src/api/handlers/mod.rs @@ -23,19 +23,16 @@ pub use ring::*; use crate::api::session::API_DATA_RECEPTION_CHANNEL_SIZE; -/// An alias for `Arc`. -pub type Dyn = std::sync::Arc; - /// A type that can be converted into a [`Callback`]-Handler pair. /// /// When Zenoh functions accept types that implement these, it intends to use the [`Callback`] as just that, /// while granting you access to the handler through the returned value via [`std::ops::Deref`] and [`std::ops::DerefMut`]. /// /// Any closure that accepts `T` can be converted into a pair of itself and `()`. -pub trait IntoHandler<'a, T> { +pub trait IntoHandler { type Handler; - fn into_handler(self) -> (Callback<'a, T>, Self::Handler); + fn into_handler(self) -> (Callback, Self::Handler); } /// The default handler in Zenoh is a FIFO queue. @@ -43,10 +40,10 @@ pub trait IntoHandler<'a, T> { #[derive(Default)] pub struct DefaultHandler(FifoChannel); -impl IntoHandler<'static, T> for DefaultHandler { - type Handler = >::Handler; +impl IntoHandler for DefaultHandler { + type Handler = >::Handler; - fn into_handler(self) -> (Callback<'static, T>, Self::Handler) { + fn into_handler(self) -> (Callback, Self::Handler) { self.0.into_handler() } } diff --git a/zenoh/src/api/handlers/ring.rs b/zenoh/src/api/handlers/ring.rs index 7b058d1905..87bb860213 100644 --- a/zenoh/src/api/handlers/ring.rs +++ b/zenoh/src/api/handlers/ring.rs @@ -21,8 +21,10 @@ use std::{ use zenoh_collections::RingBuffer; use zenoh_result::ZResult; -use super::{callback::Callback, Dyn, IntoHandler}; -use crate::api::session::API_DATA_RECEPTION_CHANNEL_SIZE; +use crate::api::{ + handlers::{callback::Callback, IntoHandler}, + session::API_DATA_RECEPTION_CHANNEL_SIZE, +}; /// A synchronous ring channel with a limited size that allows users to keep the last N data. pub struct RingChannel { @@ -140,10 +142,10 @@ impl RingChannelHandler { } } -impl IntoHandler<'static, T> for RingChannel { +impl IntoHandler for RingChannel { type Handler = RingChannelHandler; - fn into_handler(self) -> (Callback<'static, T>, Self::Handler) { + fn into_handler(self) -> (Callback, Self::Handler) { let (sender, receiver) = flume::bounded(1); let inner = Arc::new(RingChannelInner { ring: std::sync::Mutex::new(RingBuffer::new(self.capacity)), @@ -153,7 +155,7 @@ impl IntoHandler<'static, T> for RingChannel { ring: Arc::downgrade(&inner), }; ( - Dyn::new(move |t| match inner.ring.lock() { + Callback::new(Arc::new(move |t| match inner.ring.lock() { Ok(mut g) => { // Eventually drop the oldest element. g.push_force(t); @@ -161,7 +163,7 @@ impl IntoHandler<'static, T> for RingChannel { let _ = sender.try_send(()); } Err(e) => tracing::error!("{}", e), - }), + })), receiver, ) } diff --git a/zenoh/src/api/liveliness.rs b/zenoh/src/api/liveliness.rs index 02fd2bb482..c8a0b6f194 100644 --- a/zenoh/src/api/liveliness.rs +++ b/zenoh/src/api/liveliness.rs @@ -525,7 +525,7 @@ impl<'a, 'b> LivelinessSubscriberBuilder<'a, 'b, DefaultHandler> { #[zenoh_macros::unstable] pub fn with(self, handler: Handler) -> LivelinessSubscriberBuilder<'a, 'b, Handler> where - Handler: crate::handlers::IntoHandler<'static, Sample>, + Handler: crate::handlers::IntoHandler, { let LivelinessSubscriberBuilder { session, @@ -568,7 +568,7 @@ impl LivelinessSubscriberBuilder<'_, '_, Handler> { #[zenoh_macros::unstable] impl<'a, Handler> Resolvable for LivelinessSubscriberBuilder<'a, '_, Handler> where - Handler: IntoHandler<'static, Sample> + Send, + Handler: IntoHandler + Send, Handler::Handler: Send, { type To = ZResult>; @@ -577,7 +577,7 @@ where #[zenoh_macros::unstable] impl Wait for LivelinessSubscriberBuilder<'_, '_, Handler> where - Handler: IntoHandler<'static, Sample> + Send, + Handler: IntoHandler + Send, Handler::Handler: Send, { #[zenoh_macros::unstable] @@ -611,7 +611,7 @@ where #[zenoh_macros::unstable] impl IntoFuture for LivelinessSubscriberBuilder<'_, '_, Handler> where - Handler: IntoHandler<'static, Sample> + Send, + Handler: IntoHandler + Send, Handler::Handler: Send, { type Output = ::To; @@ -732,7 +732,7 @@ impl<'a, 'b> LivelinessGetBuilder<'a, 'b, DefaultHandler> { #[inline] pub fn with(self, handler: Handler) -> LivelinessGetBuilder<'a, 'b, Handler> where - Handler: IntoHandler<'static, Reply>, + Handler: IntoHandler, { let LivelinessGetBuilder { session, @@ -760,7 +760,7 @@ impl LivelinessGetBuilder<'_, '_, Handler> { impl Resolvable for LivelinessGetBuilder<'_, '_, Handler> where - Handler: IntoHandler<'static, Reply> + Send, + Handler: IntoHandler + Send, Handler::Handler: Send, { type To = ZResult; @@ -768,7 +768,7 @@ where impl Wait for LivelinessGetBuilder<'_, '_, Handler> where - Handler: IntoHandler<'static, Reply> + Send, + Handler: IntoHandler + Send, Handler::Handler: Send, { fn wait(self) -> ::To { @@ -782,7 +782,7 @@ where impl IntoFuture for LivelinessGetBuilder<'_, '_, Handler> where - Handler: IntoHandler<'static, Reply> + Send, + Handler: IntoHandler + Send, Handler::Handler: Send, { type Output = ::To; diff --git a/zenoh/src/api/publisher.rs b/zenoh/src/api/publisher.rs index 3f5a6ca912..dfaf2deb2e 100644 --- a/zenoh/src/api/publisher.rs +++ b/zenoh/src/api/publisher.rs @@ -723,7 +723,7 @@ impl<'a, 'b> MatchingListenerBuilder<'a, 'b, DefaultHandler> { #[zenoh_macros::unstable] pub fn with(self, handler: Handler) -> MatchingListenerBuilder<'a, 'b, Handler> where - Handler: IntoHandler<'static, MatchingStatus>, + Handler: IntoHandler, { let MatchingListenerBuilder { publisher, @@ -756,7 +756,7 @@ impl MatchingListenerBuilder<'_, '_, Handler> { #[zenoh_macros::unstable] impl Resolvable for MatchingListenerBuilder<'_, '_, Handler> where - Handler: IntoHandler<'static, MatchingStatus> + Send, + Handler: IntoHandler + Send, Handler::Handler: Send, { type To = ZResult>; @@ -765,7 +765,7 @@ where #[zenoh_macros::unstable] impl Wait for MatchingListenerBuilder<'_, '_, Handler> where - Handler: IntoHandler<'static, MatchingStatus> + Send, + Handler: IntoHandler + Send, Handler::Handler: Send, { #[zenoh_macros::unstable] @@ -791,7 +791,7 @@ where #[zenoh_macros::unstable] impl IntoFuture for MatchingListenerBuilder<'_, '_, Handler> where - Handler: IntoHandler<'static, MatchingStatus> + Send, + Handler: IntoHandler + Send, Handler::Handler: Send, { type Output = ::To; @@ -809,7 +809,7 @@ pub(crate) struct MatchingListenerState { pub(crate) current: Mutex, pub(crate) key_expr: KeyExpr<'static>, pub(crate) destination: Locality, - pub(crate) callback: Callback<'static, MatchingStatus>, + pub(crate) callback: Callback, } #[zenoh_macros::unstable] diff --git a/zenoh/src/api/query.rs b/zenoh/src/api/query.rs index ae55d5ab8d..7cb8d5eb84 100644 --- a/zenoh/src/api/query.rs +++ b/zenoh/src/api/query.rs @@ -154,7 +154,7 @@ impl From for Result { #[cfg(feature = "unstable")] pub(crate) struct LivelinessQueryState { - pub(crate) callback: Callback<'static, Reply>, + pub(crate) callback: Callback, } pub(crate) struct QueryState { @@ -163,7 +163,7 @@ pub(crate) struct QueryState { pub(crate) parameters: Parameters<'static>, pub(crate) reception_mode: ConsolidationMode, pub(crate) replies: Option>, - pub(crate) callback: Callback<'static, Reply>, + pub(crate) callback: Callback, } impl QueryState { @@ -333,7 +333,7 @@ impl<'a, 'b> SessionGetBuilder<'a, 'b, DefaultHandler> { #[inline] pub fn with(self, handler: Handler) -> SessionGetBuilder<'a, 'b, Handler> where - Handler: IntoHandler<'static, Reply>, + Handler: IntoHandler, { let SessionGetBuilder { session, @@ -444,7 +444,7 @@ pub enum ReplyKeyExpr { impl Resolvable for SessionGetBuilder<'_, '_, Handler> where - Handler: IntoHandler<'static, Reply> + Send, + Handler: IntoHandler + Send, Handler::Handler: Send, { type To = ZResult; @@ -452,7 +452,7 @@ where impl Wait for SessionGetBuilder<'_, '_, Handler> where - Handler: IntoHandler<'static, Reply> + Send, + Handler: IntoHandler + Send, Handler::Handler: Send, { fn wait(self) -> ::To { @@ -483,7 +483,7 @@ where impl IntoFuture for SessionGetBuilder<'_, '_, Handler> where - Handler: IntoHandler<'static, Reply> + Send, + Handler: IntoHandler + Send, Handler::Handler: Send, { type Output = ::To; diff --git a/zenoh/src/api/queryable.rs b/zenoh/src/api/queryable.rs index 46cbb8ccd7..97675336b7 100644 --- a/zenoh/src/api/queryable.rs +++ b/zenoh/src/api/queryable.rs @@ -53,6 +53,7 @@ use crate::{ value::Value, Id, }, + handlers::Callback, net::primitives::Primitives, Session, }; @@ -530,7 +531,7 @@ pub(crate) struct QueryableState { pub(crate) key_expr: WireExpr<'static>, pub(crate) complete: bool, pub(crate) origin: Locality, - pub(crate) callback: Arc, + pub(crate) callback: Callback, } impl fmt::Debug for QueryableState { @@ -690,7 +691,7 @@ impl<'a, 'b> QueryableBuilder<'a, 'b, DefaultHandler> { #[inline] pub fn with(self, handler: Handler) -> QueryableBuilder<'a, 'b, Handler> where - Handler: IntoHandler<'static, Query>, + Handler: IntoHandler, { let QueryableBuilder { session, @@ -903,7 +904,7 @@ impl DerefMut for Queryable { impl Resolvable for QueryableBuilder<'_, '_, Handler> where - Handler: IntoHandler<'static, Query> + Send, + Handler: IntoHandler + Send, Handler::Handler: Send, { type To = ZResult>; @@ -911,7 +912,7 @@ where impl Wait for QueryableBuilder<'_, '_, Handler> where - Handler: IntoHandler<'static, Query> + Send, + Handler: IntoHandler + Send, Handler::Handler: Send, { fn wait(self) -> ::To { @@ -938,7 +939,7 @@ where impl IntoFuture for QueryableBuilder<'_, '_, Handler> where - Handler: IntoHandler<'static, Query> + Send, + Handler: IntoHandler + Send, Handler::Handler: Send, { type Output = ::To; diff --git a/zenoh/src/api/scouting.rs b/zenoh/src/api/scouting.rs index 0a41294548..73c0afcbdf 100644 --- a/zenoh/src/api/scouting.rs +++ b/zenoh/src/api/scouting.rs @@ -127,7 +127,7 @@ impl ScoutBuilder { #[inline] pub fn with(self, handler: Handler) -> ScoutBuilder where - Handler: IntoHandler<'static, Hello>, + Handler: IntoHandler, { let ScoutBuilder { what, @@ -144,7 +144,7 @@ impl ScoutBuilder { impl Resolvable for ScoutBuilder where - Handler: IntoHandler<'static, Hello> + Send, + Handler: IntoHandler + Send, Handler::Handler: Send, { type To = ZResult>; @@ -152,7 +152,7 @@ where impl Wait for ScoutBuilder where - Handler: IntoHandler<'static, Hello> + Send, + Handler: IntoHandler + Send, Handler::Handler: Send, { fn wait(self) -> ::To { @@ -163,7 +163,7 @@ where impl IntoFuture for ScoutBuilder where - Handler: IntoHandler<'static, Hello> + Send, + Handler: IntoHandler + Send, Handler::Handler: Send, { type Output = ::To; @@ -286,7 +286,7 @@ impl Scout { fn _scout( what: WhatAmIMatcher, config: zenoh_config::Config, - callback: Callback<'static, Hello>, + callback: Callback, ) -> ZResult { tracing::trace!("scout({}, {})", what, &config); let default_addr = SocketAddr::from(zenoh_config::defaults::scouting::multicast::address); @@ -316,7 +316,7 @@ fn _scout( let scout = Runtime::scout(&sockets, what, &addr, move |hello| { let callback = callback.clone(); async move { - callback(hello.into()); + callback.call(hello.into()); Loop::Continue } }); diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index 6a939e9fa3..4dbad4e6b2 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -1241,7 +1241,7 @@ impl SessionInner { self: &Arc, key_expr: &KeyExpr, origin: Locality, - callback: Callback<'static, Sample>, + callback: Callback, ) -> ZResult> { let mut state = zwrite!(self.state); tracing::trace!("declare_subscriber({:?})", key_expr); @@ -1449,7 +1449,7 @@ impl SessionInner { key_expr: &WireExpr, complete: bool, origin: Locality, - callback: Callback<'static, Query>, + callback: Callback, ) -> ZResult> { let mut state = zwrite!(self.state); tracing::trace!("declare_queryable({:?})", key_expr); @@ -1549,7 +1549,7 @@ impl SessionInner { key_expr: &KeyExpr, origin: Locality, history: bool, - callback: Callback<'static, Sample>, + callback: Callback, ) -> ZResult> { let mut state = zwrite!(self.state); trace!("declare_liveliness_subscriber({:?})", key_expr); @@ -1645,7 +1645,7 @@ impl SessionInner { pub(crate) fn declare_matches_listener_inner( &self, publisher: &Publisher, - callback: Callback<'static, MatchingStatus>, + callback: Callback, ) -> ZResult> { let mut state = zwrite!(self.state); let id = self.runtime.next_id(); @@ -1667,7 +1667,9 @@ impl SessionInner { .unwrap_or(true) { *current = true; - (listener_state.callback)(MatchingStatus { matching: true }); + listener_state + .callback + .call(MatchingStatus { matching: true }); } } Err(e) => tracing::error!("Error trying to acquire MathginListener lock: {}", e), @@ -1732,7 +1734,7 @@ impl SessionInner { if status.matching_subscribers() { *current = true; let callback = msub.callback.clone(); - (callback)(status) + callback.call(status) } } } @@ -1770,7 +1772,7 @@ impl SessionInner { if !status.matching_subscribers() { *current = false; let callback = msub.callback.clone(); - (callback)(status) + callback.call(status) } } } @@ -1857,26 +1859,22 @@ impl SessionInner { } }; drop(state); + let mut sample = info.clone().into_sample( + // SAFETY: the keyexpr is valid + unsafe { KeyExpr::from_str_unchecked("dummy") }, + payload, + #[cfg(feature = "unstable")] + reliability, + attachment, + ); let zenoh_collections::single_or_vec::IntoIter { drain, last } = callbacks.into_iter(); for (cb, key_expr) in drain { - let sample = info.clone().into_sample( - key_expr, - payload.clone(), - #[cfg(feature = "unstable")] - reliability, - attachment.clone(), - ); - cb(sample); + sample.key_expr = key_expr; + cb.call(sample.clone()); } if let Some((cb, key_expr)) = last { - let sample = info.into_sample( - key_expr, - payload, - #[cfg(feature = "unstable")] - reliability, - attachment.clone(), - ); - cb(sample); + sample.key_expr = key_expr; + cb.call(sample); } } @@ -1893,7 +1891,7 @@ impl SessionInner { value: Option, attachment: Option, #[cfg(feature = "unstable")] source: SourceInfo, - callback: Callback<'static, Reply>, + callback: Callback, ) -> ZResult<()> { tracing::trace!( "get({}, {:?}, {:?})", @@ -1929,10 +1927,10 @@ impl SessionInner { tracing::debug!("Timeout on query {}! Send error and close.", qid); if query.reception_mode == ConsolidationMode::Latest { for (_, reply) in query.replies.unwrap().into_iter() { - (query.callback)(reply); + query.callback.call(reply); } } - (query.callback)(Reply { + query.callback.call(Reply { result: Err(Value::new("Timeout", Encoding::ZENOH_STRING).into()), #[cfg(feature = "unstable")] replier_id: Some(zid.into()), @@ -2015,7 +2013,7 @@ impl SessionInner { self: &Arc, key_expr: &KeyExpr<'_>, timeout: Duration, - callback: Callback<'static, Reply>, + callback: Callback, ) -> ZResult<()> { tracing::trace!("liveliness.get({}, {:?})", key_expr, timeout); let mut state = zwrite!(self.state); @@ -2032,7 +2030,7 @@ impl SessionInner { if let Some(query) = state.liveliness_queries.remove(&id) { std::mem::drop(state); tracing::debug!("Timeout on liveliness query {}! Send error and close.", id); - (query.callback)(Reply { + query.callback.call(Reply { result: Err(Value::new("Timeout", Encoding::ZENOH_STRING).into()), #[cfg(feature = "unstable")] replier_id: Some(zid.into()), @@ -2107,7 +2105,7 @@ impl SessionInner { } ) .map(|(id, qable)| (*id, qable.callback.clone())) - .collect::)>>(); + .collect::)>>(); (primitives, key_expr.into_owned(), queryables) } Err(err) => { @@ -2130,16 +2128,18 @@ impl SessionInner { primitives }, }); - for (eid, callback) in queryables { - callback(Query { - inner: query_inner.clone(), - eid, - value: body.as_ref().map(|b| Value { - payload: b.payload.clone().into(), - encoding: b.encoding.clone().into(), - }), - attachment: attachment.clone(), - }); + let mut query = Query { + inner: query_inner, + eid: 0, + value: body.map(|b| Value { + payload: b.payload.into(), + encoding: b.encoding.into(), + }), + attachment, + }; + for (eid, cb) in queryables { + query.eid = eid; + cb.call(query.clone()); } } } @@ -2251,7 +2251,7 @@ impl Primitives for WeakSession { replier_id: None, }; - (query.callback)(reply); + query.callback.call(reply); } } else { state.remote_tokens.insert(m.id, key_expr.clone()); @@ -2428,7 +2428,7 @@ impl Primitives for WeakSession { #[cfg(feature = "unstable")] replier_id: e.ext_sinfo.map(|info| info.id.zid), }; - callback(new_reply); + callback.call(new_reply); } None => { tracing::warn!("Received ReplyData for unknown Query: {}", msg.rid); @@ -2598,7 +2598,7 @@ impl Primitives for WeakSession { }; std::mem::drop(state); if let Some((callback, new_reply)) = callback { - callback(new_reply); + callback.call(new_reply); } } None => { @@ -2620,7 +2620,7 @@ impl Primitives for WeakSession { std::mem::drop(state); if query.reception_mode == ConsolidationMode::Latest { for (_, reply) in query.replies.unwrap().into_iter() { - (query.callback)(reply); + query.callback.call(reply); } } trace!("Close query {}", msg.rid); diff --git a/zenoh/src/api/subscriber.rs b/zenoh/src/api/subscriber.rs index a2a3024cf7..3cbe16044e 100644 --- a/zenoh/src/api/subscriber.rs +++ b/zenoh/src/api/subscriber.rs @@ -41,7 +41,7 @@ pub(crate) struct SubscriberState { pub(crate) remote_id: Id, pub(crate) key_expr: KeyExpr<'static>, pub(crate) origin: Locality, - pub(crate) callback: Callback<'static, Sample>, + pub(crate) callback: Callback, } impl fmt::Debug for SubscriberState { @@ -226,7 +226,7 @@ impl<'a, 'b> SubscriberBuilder<'a, 'b, DefaultHandler> { #[inline] pub fn with(self, handler: Handler) -> SubscriberBuilder<'a, 'b, Handler> where - Handler: IntoHandler<'static, Sample>, + Handler: IntoHandler, { let SubscriberBuilder { session, @@ -304,7 +304,7 @@ impl SubscriberBuilder<'_, '_, Handler> { // Push mode impl Resolvable for SubscriberBuilder<'_, '_, Handler> where - Handler: IntoHandler<'static, Sample> + Send, + Handler: IntoHandler + Send, Handler::Handler: Send, { type To = ZResult>; @@ -312,7 +312,7 @@ where impl Wait for SubscriberBuilder<'_, '_, Handler> where - Handler: IntoHandler<'static, Sample> + Send, + Handler: IntoHandler + Send, Handler::Handler: Send, { fn wait(self) -> ::To { @@ -337,7 +337,7 @@ where impl IntoFuture for SubscriberBuilder<'_, '_, Handler> where - Handler: IntoHandler<'static, Sample> + Send, + Handler: IntoHandler + Send, Handler::Handler: Send, { type Output = ::To;