diff --git a/Cargo.lock b/Cargo.lock index 9740344616..dc465ce342 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -274,16 +274,6 @@ dependencies = [ "syn 2.0.52", ] -[[package]] -name = "async-attributes" -version = "1.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a3203e79f4dd9bdda415ed03cf14dae5a2bf775c683a00f94e9cd1faf0f596e5" -dependencies = [ - "quote", - "syn 1.0.109", -] - [[package]] name = "async-channel" version = "1.9.0" @@ -439,7 +429,6 @@ version = "1.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "62565bb4402e926b29953c785397c6dc0391b7b446e45008b0049eb43cec6f5d" dependencies = [ - "async-attributes", "async-channel", "async-global-executor", "async-io", @@ -5523,12 +5512,12 @@ dependencies = [ name = "zenoh-backend-example" version = "0.11.0-dev" dependencies = [ - "async-std", "async-trait", "const_format", "futures", "git-version", "serde_json", + "tokio", "tracing", "zenoh", "zenoh-plugin-trait", @@ -5942,11 +5931,12 @@ dependencies = [ name = "zenoh-plugin-example" version = "0.11.0-dev" dependencies = [ - "async-std", "const_format", "futures", "git-version", + "lazy_static", "serde_json", + "tokio", "tracing", "zenoh", "zenoh-plugin-trait", @@ -5958,7 +5948,6 @@ name = "zenoh-plugin-rest" version = "0.11.0-dev" dependencies = [ "anyhow", - "async-std", "base64 0.22.1", "clap", "const_format", @@ -5973,6 +5962,7 @@ dependencies = [ "serde", "serde_json", "tide", + "tokio", "tracing", "zenoh", "zenoh-plugin-trait", @@ -5983,7 +5973,6 @@ name = "zenoh-plugin-storage-manager" version = "0.11.0-dev" dependencies = [ "async-global-executor", - "async-std", "async-trait", "const_format", "crc", @@ -5998,6 +5987,7 @@ dependencies = [ "schemars", "serde", "serde_json", + "tokio", "tracing", "urlencoding", "zenoh", @@ -6146,7 +6136,6 @@ dependencies = [ name = "zenoh-util" version = "0.11.0-dev" dependencies = [ - "async-std", "async-trait", "const_format", "flume", @@ -6171,7 +6160,6 @@ dependencies = [ name = "zenoh_backend_traits" version = "0.11.0-dev" dependencies = [ - "async-std", "async-trait", "const_format", "derive_more", diff --git a/Cargo.toml b/Cargo.toml index 047cf35a02..034d059862 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -81,7 +81,6 @@ anyhow = { version = "1.0.69", default-features = false } # Default features are async-executor = "1.5.0" async-global-executor = "2.3.1" async-io = "2.3.3" -async-std = { version = "=1.12.0", default-features = false } # Default features are disabled due to some crates' requirements async-trait = "0.1.60" base64 = "0.22.1" bincode = "1.3.3" diff --git a/DEFAULT_CONFIG.json5 b/DEFAULT_CONFIG.json5 index c4052313d9..893a1930a5 100644 --- a/DEFAULT_CONFIG.json5 +++ b/DEFAULT_CONFIG.json5 @@ -493,6 +493,12 @@ // __config__: "./plugins/zenoh-plugin-rest/config.json5", // /// http port to answer to rest requests // http_port: 8000, + // /// The number of worker thread in TOKIO runtime (default: 2) + // /// The configuration only takes effect if running as a dynamic plugin, which can not reuse the current runtime. + // work_thread_num: 0, + // /// The number of blocking thread in TOKIO runtime (default: 50) + // /// The configuration only takes effect if running as a dynamic plugin, which can not reuse the current runtime. + // max_block_thread_num: 50, // }, // // /// Configure the storage manager plugin diff --git a/commons/zenoh-util/Cargo.toml b/commons/zenoh-util/Cargo.toml index df99e01385..38694dc1b5 100644 --- a/commons/zenoh-util/Cargo.toml +++ b/commons/zenoh-util/Cargo.toml @@ -35,7 +35,6 @@ maintenance = { status = "actively-developed" } test = [] [dependencies] -async-std = { workspace = true, features = ["default", "unstable"] } tokio = { workspace = true, features = ["time", "net"] } async-trait = { workspace = true } flume = { workspace = true } diff --git a/commons/zenoh-util/src/timer.rs b/commons/zenoh-util/src/timer.rs index 7fd059b0cf..ab52c0c996 100644 --- a/commons/zenoh-util/src/timer.rs +++ b/commons/zenoh-util/src/timer.rs @@ -21,9 +21,9 @@ use std::{ time::{Duration, Instant}, }; -use async_std::{prelude::*, sync::Mutex, task}; use async_trait::async_trait; use flume::{bounded, Receiver, RecvError, Sender}; +use tokio::{runtime::Handle, select, sync::Mutex, task, time}; use zenoh_core::zconfigurable; zconfigurable! { @@ -120,7 +120,7 @@ async fn timer_task( let mut events = events.lock().await; loop { - // Fuuture for adding new events + // Future for adding new events let new = new_event.recv_async(); match events.peek() { @@ -130,12 +130,17 @@ async fn timer_task( let next = next.clone(); let now = Instant::now(); if next.when > now { - task::sleep(next.when - now).await; + time::sleep(next.when - now).await; } Ok((false, next)) }; - match new.race(wait).await { + let result = select! { + result = wait => { result }, + result = new => { result }, + }; + + match result { Ok((is_new, mut ev)) => { if is_new { // A new event has just been added: push it onto the heap @@ -204,14 +209,14 @@ impl Timer { // Start the timer task let c_e = timer.events.clone(); let fut = async move { - let _ = sl_receiver - .recv_async() - .race(timer_task(c_e, ev_receiver)) - .await; + select! { + _ = sl_receiver.recv_async() => {}, + _ = timer_task(c_e, ev_receiver) => {}, + }; tracing::trace!("A - Timer task no longer running..."); }; if spawn_blocking { - task::spawn_blocking(|| task::block_on(fut)); + task::spawn_blocking(|| Handle::current().block_on(fut)); } else { task::spawn(fut); } @@ -234,14 +239,14 @@ impl Timer { // Start the timer task let c_e = self.events.clone(); let fut = async move { - let _ = sl_receiver - .recv_async() - .race(timer_task(c_e, ev_receiver)) - .await; + select! { + _ = sl_receiver.recv_async() => {}, + _ = timer_task(c_e, ev_receiver) => {}, + }; tracing::trace!("A - Timer task no longer running..."); }; if spawn_blocking { - task::spawn_blocking(|| task::block_on(fut)); + task::spawn_blocking(|| Handle::current().block_on(fut)); } else { task::spawn(fut); } @@ -307,8 +312,8 @@ mod tests { time::{Duration, Instant}, }; - use async_std::task; use async_trait::async_trait; + use tokio::{runtime::Runtime, time}; use super::{Timed, TimedEvent, Timer}; @@ -349,7 +354,7 @@ mod tests { timer.add_async(event).await; // Wait for the event to occur - task::sleep(3 * interval).await; + time::sleep(3 * interval).await; // Load and reset the counter value let value = counter.swap(0, Ordering::SeqCst); @@ -368,7 +373,7 @@ mod tests { handle.defuse(); // Wait for the event to occur - task::sleep(3 * interval).await; + time::sleep(3 * interval).await; // Load and reset the counter value let value = counter.swap(0, Ordering::SeqCst); @@ -390,7 +395,7 @@ mod tests { timer.add_async(event).await; // Wait for the events to occur - task::sleep(to_elapse + interval).await; + time::sleep(to_elapse + interval).await; // Load and reset the counter value let value = counter.swap(0, Ordering::SeqCst); @@ -401,7 +406,7 @@ mod tests { handle.defuse(); // Wait a bit more to verify that not more events have been fired - task::sleep(to_elapse).await; + time::sleep(to_elapse).await; // Load and reset the counter value let value = counter.swap(0, Ordering::SeqCst); @@ -416,7 +421,7 @@ mod tests { timer.add_async(event).await; // Wait for the events to occur - task::sleep(to_elapse + interval).await; + time::sleep(to_elapse + interval).await; // Load and reset the counter value let value = counter.swap(0, Ordering::SeqCst); @@ -426,7 +431,7 @@ mod tests { timer.stop_async().await; // Wait some time - task::sleep(to_elapse).await; + time::sleep(to_elapse).await; // Load and reset the counter value let value = counter.swap(0, Ordering::SeqCst); @@ -436,13 +441,14 @@ mod tests { timer.start_async(false).await; // Wait for the events to occur - task::sleep(to_elapse).await; + time::sleep(to_elapse).await; // Load and reset the counter value let value = counter.swap(0, Ordering::SeqCst); assert_eq!(value, amount); } - task::block_on(run()); + let rt = Runtime::new().unwrap(); + rt.block_on(run()); } } diff --git a/io/zenoh-links/zenoh-link-vsock/src/unicast.rs b/io/zenoh-links/zenoh-link-vsock/src/unicast.rs index 6bbd627537..e7b261f292 100644 --- a/io/zenoh-links/zenoh-link-vsock/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-vsock/src/unicast.rs @@ -83,7 +83,7 @@ pub fn get_vsock_addr(address: Address<'_>) -> ZResult { } pub struct LinkUnicastVsock { - // The underlying socket as returned from the async-std library + // The underlying socket as returned from the tokio library socket: UnsafeCell, // The source socket address of this link (address used on the local host) src_addr: VsockAddr, diff --git a/plugins/zenoh-backend-example/Cargo.toml b/plugins/zenoh-backend-example/Cargo.toml index df505bd211..9f548e1187 100644 --- a/plugins/zenoh-backend-example/Cargo.toml +++ b/plugins/zenoh-backend-example/Cargo.toml @@ -29,11 +29,11 @@ name = "zenoh_backend_example" crate-type = ["cdylib"] [dependencies] -async-std = { workspace = true, features = ["default"] } const_format = { workspace = true } futures = { workspace = true } git-version = { workspace = true } tracing = { workspace = true } +tokio = { workspace = true } serde_json = { workspace = true } zenoh = { workspace = true, features = ["default"] } zenoh-plugin-trait = { workspace = true } diff --git a/plugins/zenoh-backend-example/src/lib.rs b/plugins/zenoh-backend-example/src/lib.rs index bd64fd5024..b9e670b799 100644 --- a/plugins/zenoh-backend-example/src/lib.rs +++ b/plugins/zenoh-backend-example/src/lib.rs @@ -13,8 +13,8 @@ // use std::collections::{hash_map::Entry, HashMap}; -use async_std::sync::RwLock; use async_trait::async_trait; +use tokio::sync::RwLock; use zenoh::{internal::Value, key_expr::OwnedKeyExpr, prelude::*, time::Timestamp}; use zenoh_backend_traits::{ config::{StorageConfig, VolumeConfig}, diff --git a/plugins/zenoh-backend-traits/Cargo.toml b/plugins/zenoh-backend-traits/Cargo.toml index 1a574dd118..766f52d609 100644 --- a/plugins/zenoh-backend-traits/Cargo.toml +++ b/plugins/zenoh-backend-traits/Cargo.toml @@ -27,7 +27,6 @@ description = "Zenoh: traits to be implemented by backends libraries" maintenance = { status = "actively-developed" } [dependencies] -async-std = { workspace = true, features = ["default"] } async-trait = { workspace = true } derive_more = { workspace = true } serde_json = { workspace = true } diff --git a/plugins/zenoh-plugin-example/Cargo.toml b/plugins/zenoh-plugin-example/Cargo.toml index bc52ee5fb2..5341adcf8c 100644 --- a/plugins/zenoh-plugin-example/Cargo.toml +++ b/plugins/zenoh-plugin-example/Cargo.toml @@ -34,12 +34,13 @@ name = "zenoh_plugin_example" crate-type = ["cdylib"] [dependencies] -async-std = { workspace = true, features = ["default"] } const_format = { workspace = true } zenoh-util = { workspace = true } futures = { workspace = true } +lazy_static = { workspace = true } git-version = { workspace = true } tracing = { workspace = true } +tokio = { workspace = true } serde_json = { workspace = true } zenoh = { workspace = true, features = [ "default", diff --git a/plugins/zenoh-plugin-example/src/lib.rs b/plugins/zenoh-plugin-example/src/lib.rs index cbd84fb766..b7c494946d 100644 --- a/plugins/zenoh-plugin-example/src/lib.rs +++ b/plugins/zenoh-plugin-example/src/lib.rs @@ -17,6 +17,7 @@ use std::{ borrow::Cow, collections::HashMap, convert::TryFrom, + future::Future, sync::{ atomic::{AtomicBool, Ordering::Relaxed}, Arc, Mutex, @@ -39,6 +40,32 @@ use zenoh::{ }; use zenoh_plugin_trait::{plugin_long_version, plugin_version, Plugin, PluginControl}; +const WORKER_THREAD_NUM: usize = 2; +const MAX_BLOCK_THREAD_NUM: usize = 50; +lazy_static::lazy_static! { + // The global runtime is used in the dynamic plugins, which we can't get the current runtime + static ref TOKIO_RUNTIME: tokio::runtime::Runtime = tokio::runtime::Builder::new_multi_thread() + .worker_threads(WORKER_THREAD_NUM) + .max_blocking_threads(MAX_BLOCK_THREAD_NUM) + .enable_all() + .build() + .expect("Unable to create runtime"); +} +#[inline(always)] +fn spawn_runtime(task: impl Future + Send + 'static) { + // Check whether able to get the current runtime + match tokio::runtime::Handle::try_current() { + Ok(rt) => { + // Able to get the current runtime (standalone binary), spawn on the current runtime + rt.spawn(task); + } + Err(_) => { + // Unable to get the current runtime (dynamic plugins), spawn on the global runtime + TOKIO_RUNTIME.spawn(task); + } + } +} + // The struct implementing the ZenohPlugin and ZenohPlugin traits pub struct ExamplePlugin {} @@ -78,8 +105,7 @@ impl Plugin for ExamplePlugin { // a flag to end the plugin's loop when the plugin is removed from the config let flag = Arc::new(AtomicBool::new(true)); - // spawn the task running the plugin's loop - async_std::task::spawn(run(runtime.clone(), selector, flag.clone())); + spawn_runtime(run(runtime.clone(), selector, flag.clone())); // return a RunningPlugin to zenohd Ok(Box::new(RunningPlugin(Arc::new(Mutex::new( RunningPluginInner { @@ -122,11 +148,7 @@ impl RunningPluginTrait for RunningPlugin { match KeyExpr::try_from(selector.clone()) { Err(e) => tracing::error!("{}", e), Ok(selector) => { - async_std::task::spawn(run( - guard.runtime.clone(), - selector, - guard.flag.clone(), - )); + spawn_runtime(run(guard.runtime.clone(), selector, guard.flag.clone())); } } return Ok(None); diff --git a/plugins/zenoh-plugin-rest/Cargo.toml b/plugins/zenoh-plugin-rest/Cargo.toml index 5f36b5bf34..d9a53e9f43 100644 --- a/plugins/zenoh-plugin-rest/Cargo.toml +++ b/plugins/zenoh-plugin-rest/Cargo.toml @@ -33,7 +33,6 @@ crate-type = ["cdylib", "rlib"] [dependencies] anyhow = { workspace = true, features = ["default"] } -async-std = { workspace = true, features = ["default", "attributes"] } base64 = { workspace = true } const_format = { workspace = true } flume = { workspace = true } @@ -46,6 +45,7 @@ schemars = { workspace = true } serde = { workspace = true, features = ["default"] } serde_json = { workspace = true } tide = { workspace = true } +tokio = { workspace = true } zenoh = { workspace = true, features = [ "plugins", "default", diff --git a/plugins/zenoh-plugin-rest/examples/z_serve_sse.rs b/plugins/zenoh-plugin-rest/examples/z_serve_sse.rs index e3fae4d285..aefdfd4f86 100644 --- a/plugins/zenoh-plugin-rest/examples/z_serve_sse.rs +++ b/plugins/zenoh-plugin-rest/examples/z_serve_sse.rs @@ -34,7 +34,7 @@ if(typeof(EventSource) !== "undefined") { } "#; -#[async_std::main] +#[tokio::main] async fn main() { // initiate logging zenoh::try_init_log_from_env(); @@ -49,7 +49,7 @@ async fn main() { println!("Declaring Queryable on '{key}'..."); let queryable = session.declare_queryable(key).await.unwrap(); - async_std::task::spawn({ + tokio::task::spawn({ let receiver = queryable.handler().clone(); async move { while let Ok(request) = receiver.recv_async().await { @@ -75,7 +75,7 @@ async fn main() { println!("Data updates are accessible through HTML5 SSE at http://:8000/{key}"); loop { publisher.put(value).await.unwrap(); - async_std::task::sleep(Duration::from_secs(1)).await; + tokio::time::sleep(Duration::from_secs(1)).await; } } diff --git a/plugins/zenoh-plugin-rest/src/config.rs b/plugins/zenoh-plugin-rest/src/config.rs index a785eec094..d215b8a5a7 100644 --- a/plugins/zenoh-plugin-rest/src/config.rs +++ b/plugins/zenoh-plugin-rest/src/config.rs @@ -21,12 +21,18 @@ use serde::{ }; const DEFAULT_HTTP_INTERFACE: &str = "[::]"; +pub const DEFAULT_WORK_THREAD_NUM: usize = 2; +pub const DEFAULT_MAX_BLOCK_THREAD_NUM: usize = 50; #[derive(JsonSchema, Deserialize, serde::Serialize, Clone, Debug)] #[serde(deny_unknown_fields)] pub struct Config { #[serde(deserialize_with = "deserialize_http_port")] pub http_port: String, + #[serde(default = "default_work_thread_num")] + pub work_thread_num: usize, + #[serde(default = "default_max_block_thread_num")] + pub max_block_thread_num: usize, #[serde(default, deserialize_with = "deserialize_path")] __path__: Option>, __required__: Option, @@ -47,6 +53,14 @@ where deserializer.deserialize_any(HttpPortVisitor) } +fn default_work_thread_num() -> usize { + DEFAULT_WORK_THREAD_NUM +} + +fn default_max_block_thread_num() -> usize { + DEFAULT_MAX_BLOCK_THREAD_NUM +} + struct HttpPortVisitor; impl<'de> Visitor<'de> for HttpPortVisitor { diff --git a/plugins/zenoh-plugin-rest/src/lib.rs b/plugins/zenoh-plugin-rest/src/lib.rs index 8affec9067..eb65a991d6 100644 --- a/plugins/zenoh-plugin-rest/src/lib.rs +++ b/plugins/zenoh-plugin-rest/src/lib.rs @@ -17,14 +17,24 @@ //! This crate is intended for Zenoh's internal use. //! //! [Click here for Zenoh's documentation](../zenoh/index.html) -use std::{borrow::Cow, convert::TryFrom, str::FromStr, sync::Arc}; +use std::{ + borrow::Cow, + convert::TryFrom, + future::Future, + str::FromStr, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, + time::Duration, +}; -use async_std::prelude::FutureExt; use base64::Engine; use futures::StreamExt; use http_types::Method; use serde::{Deserialize, Serialize}; use tide::{http::Mime, sse::Sender, Request, Response, Server, StatusCode}; +use tokio::time::timeout; use zenoh::{ bytes::{Encoding, ZBytes}, internal::{ @@ -51,6 +61,32 @@ lazy_static::lazy_static! { } const RAW_KEY: &str = "_raw"; +lazy_static::lazy_static! { + static ref WORKER_THREAD_NUM: AtomicUsize = AtomicUsize::new(config::DEFAULT_WORK_THREAD_NUM); + static ref MAX_BLOCK_THREAD_NUM: AtomicUsize = AtomicUsize::new(config::DEFAULT_MAX_BLOCK_THREAD_NUM); + // The global runtime is used in the dynamic plugins, which we can't get the current runtime + static ref TOKIO_RUNTIME: tokio::runtime::Runtime = tokio::runtime::Builder::new_multi_thread() + .worker_threads(WORKER_THREAD_NUM.load(Ordering::SeqCst)) + .max_blocking_threads(MAX_BLOCK_THREAD_NUM.load(Ordering::SeqCst)) + .enable_all() + .build() + .expect("Unable to create runtime"); +} +#[inline(always)] +pub(crate) fn blockon_runtime(task: F) -> F::Output { + // Check whether able to get the current runtime + match tokio::runtime::Handle::try_current() { + Ok(rt) => { + // Able to get the current runtime (standalone binary), use the current runtime + tokio::task::block_in_place(|| rt.block_on(task)) + } + Err(_) => { + // Unable to get the current runtime (dynamic plugins), reuse the global runtime + tokio::task::block_in_place(|| TOKIO_RUNTIME.block_on(task)) + } + } +} + #[derive(Serialize, Deserialize)] struct JSONSample { key: String, @@ -246,8 +282,14 @@ impl Plugin for RestPlugin { let conf: Config = serde_json::from_value(plugin_conf.clone()) .map_err(|e| zerror!("Plugin `{}` configuration error: {}", name, e))?; - let task = async_std::task::spawn(run(runtime.clone(), conf.clone())); - let task = async_std::task::block_on(task.timeout(std::time::Duration::from_millis(1))); + WORKER_THREAD_NUM.store(conf.work_thread_num, Ordering::SeqCst); + MAX_BLOCK_THREAD_NUM.store(conf.max_block_thread_num, Ordering::SeqCst); + + let task = run(runtime.clone(), conf.clone()); + let task = blockon_runtime(async { + timeout(Duration::from_millis(1), TOKIO_RUNTIME.spawn(task)).await + }); + if let Ok(Err(e)) = task { bail!("REST server failed within 1ms: {e}") } @@ -332,12 +374,8 @@ async fn query(mut req: Request<(Arc, String)>) -> tide::Result, String)>) -> tide::Result {} Ok(Err(e)) => { - tracing::debug!( - "SSE error ({})! Unsubscribe and terminate (task {})", - e, - async_std::task::current().id() - ); + tracing::debug!("SSE error ({})! Unsubscribe and terminate", e); if let Err(e) = sub.undeclare().await { tracing::error!("Error undeclaring subscriber: {}", e); } break; } Err(_) => { - tracing::debug!( - "SSE timeout! Unsubscribe and terminate (task {})", - async_std::task::current().id() - ); + tracing::debug!("SSE timeout! Unsubscribe and terminate",); if let Err(e) = sub.undeclare().await { tracing::error!("Error undeclaring subscriber: {}", e); } diff --git a/plugins/zenoh-plugin-storage-manager/Cargo.toml b/plugins/zenoh-plugin-storage-manager/Cargo.toml index fa7650fcc2..08367f75c3 100644 --- a/plugins/zenoh-plugin-storage-manager/Cargo.toml +++ b/plugins/zenoh-plugin-storage-manager/Cargo.toml @@ -32,7 +32,6 @@ name = "zenoh_plugin_storage_manager" crate-type = ["cdylib", "rlib"] [dependencies] -async-std = { workspace = true, features = ["default"] } async-trait = { workspace = true } crc = { workspace = true } const_format = { workspace = true } @@ -45,6 +44,7 @@ libloading = { workspace = true } tracing = { workspace = true } serde = { workspace = true, features = ["default"] } serde_json = { workspace = true } +tokio = { workspace = true } urlencoding = { workspace = true } zenoh = { workspace = true, features = [ "default", diff --git a/plugins/zenoh-plugin-storage-manager/src/lib.rs b/plugins/zenoh-plugin-storage-manager/src/lib.rs index 4043665c5d..fb578b198d 100644 --- a/plugins/zenoh-plugin-storage-manager/src/lib.rs +++ b/plugins/zenoh-plugin-storage-manager/src/lib.rs @@ -25,7 +25,6 @@ use std::{ sync::{Arc, Mutex}, }; -use async_std::task; use flume::Sender; use memory_backend::MemoryBackend; use storages_mgt::StorageMessage; @@ -56,6 +55,18 @@ mod memory_backend; mod replica; mod storages_mgt; +const WORKER_THREAD_NUM: usize = 2; +const MAX_BLOCK_THREAD_NUM: usize = 50; +lazy_static::lazy_static! { + // The global runtime is used in the zenohd case, which we can't get the current runtime + static ref TOKIO_RUNTIME: tokio::runtime::Runtime = tokio::runtime::Builder::new_multi_thread() + .worker_threads(WORKER_THREAD_NUM) + .max_blocking_threads(MAX_BLOCK_THREAD_NUM) + .enable_all() + .build() + .expect("Unable to create runtime"); +} + #[cfg(feature = "dynamic_plugin")] zenoh_plugin_trait::declare_plugin!(StoragesPlugin); @@ -194,11 +205,13 @@ impl StorageRuntimeInner { let name = name.as_ref(); tracing::info!("Killing volume '{}'", name); if let Some(storages) = self.storages.remove(name) { - async_std::task::block_on(futures::future::join_all( - storages - .into_values() - .map(|s| async move { s.send(StorageMessage::Stop) }), - )); + tokio::task::block_in_place(|| { + TOKIO_RUNTIME.block_on(futures::future::join_all( + storages + .into_values() + .map(|s| async move { s.send(StorageMessage::Stop) }), + )) + }); } self.plugins_manager .started_plugin_mut(name) @@ -266,12 +279,14 @@ impl StorageRuntimeInner { volume_id, backend.name() ); - let stopper = async_std::task::block_on(create_and_start_storage( - admin_key, - storage.clone(), - backend.instance(), - self.session.clone(), - ))?; + let stopper = tokio::task::block_in_place(|| { + TOKIO_RUNTIME.block_on(create_and_start_storage( + admin_key, + storage.clone(), + backend.instance(), + self.session.clone(), + )) + })?; self.storages .entry(volume_id) .or_default() @@ -359,10 +374,12 @@ impl RunningPluginTrait for StorageRuntime { for (storage, handle) in storages { with_extended_string(key, &[storage], |key| { if keyexpr::new(key.as_str()).unwrap().intersects(key_expr) { - if let Ok(value) = task::block_on(async { - let (tx, rx) = async_std::channel::bounded(1); - let _ = handle.send(StorageMessage::GetStatus(tx)); - rx.recv().await + if let Some(value) = tokio::task::block_in_place(|| { + TOKIO_RUNTIME.block_on(async { + let (tx, mut rx) = tokio::sync::mpsc::channel(1); + let _ = handle.send(StorageMessage::GetStatus(tx)); + rx.recv().await + }) }) { responses.push(Response::new(key.clone(), value)) } diff --git a/plugins/zenoh-plugin-storage-manager/src/memory_backend/mod.rs b/plugins/zenoh-plugin-storage-manager/src/memory_backend/mod.rs index 7c74d9f7f9..b056cf7faf 100644 --- a/plugins/zenoh-plugin-storage-manager/src/memory_backend/mod.rs +++ b/plugins/zenoh-plugin-storage-manager/src/memory_backend/mod.rs @@ -13,8 +13,8 @@ // use std::{collections::HashMap, sync::Arc}; -use async_std::sync::RwLock; use async_trait::async_trait; +use tokio::sync::RwLock; use zenoh::{internal::Value, key_expr::OwnedKeyExpr, time::Timestamp, Result as ZResult}; use zenoh_backend_traits::{ config::{StorageConfig, VolumeConfig}, diff --git a/plugins/zenoh-plugin-storage-manager/src/replica/align_queryable.rs b/plugins/zenoh-plugin-storage-manager/src/replica/align_queryable.rs index c11a632e41..737ce79144 100644 --- a/plugins/zenoh-plugin-storage-manager/src/replica/align_queryable.rs +++ b/plugins/zenoh-plugin-storage-manager/src/replica/align_queryable.rs @@ -17,9 +17,9 @@ use std::{ collections::{BTreeSet, HashMap, HashSet}, str, str::FromStr, + sync::Arc, }; -use async_std::sync::Arc; use zenoh::{ internal::Value, key_expr::OwnedKeyExpr, prelude::*, query::Parameters, sample::Sample, time::Timestamp, Session, diff --git a/plugins/zenoh-plugin-storage-manager/src/replica/aligner.rs b/plugins/zenoh-plugin-storage-manager/src/replica/aligner.rs index 7992053a67..952a72f499 100644 --- a/plugins/zenoh-plugin-storage-manager/src/replica/aligner.rs +++ b/plugins/zenoh-plugin-storage-manager/src/replica/aligner.rs @@ -16,10 +16,11 @@ use std::{ borrow::Cow, collections::{HashMap, HashSet}, str, + sync::Arc, }; -use async_std::sync::{Arc, RwLock}; use flume::{Receiver, Sender}; +use tokio::sync::RwLock; use zenoh::{ internal::Value, key_expr::{KeyExpr, OwnedKeyExpr}, diff --git a/plugins/zenoh-plugin-storage-manager/src/replica/mod.rs b/plugins/zenoh-plugin-storage-manager/src/replica/mod.rs index 930b4511a2..ecb8815153 100644 --- a/plugins/zenoh-plugin-storage-manager/src/replica/mod.rs +++ b/plugins/zenoh-plugin-storage-manager/src/replica/mod.rs @@ -17,15 +17,13 @@ use std::{ collections::{HashMap, HashSet}, str, + sync::Arc, time::{Duration, SystemTime}, }; -use async_std::{ - stream::{interval, StreamExt}, - sync::{Arc, RwLock}, -}; use flume::{Receiver, Sender}; use futures::{pin_mut, select, FutureExt}; +use tokio::{sync::RwLock, time::interval}; use zenoh::{key_expr::keyexpr, prelude::*}; use zenoh_backend_traits::config::{ReplicaConfig, StorageConfig}; @@ -277,7 +275,7 @@ impl Replica { // time it takes to publish. let mut interval = interval(self.replica_config.publication_interval); loop { - let _ = interval.next().await; + let _ = interval.tick().await; let digest = snapshotter.get_digest().await; let digest = digest.compress(); diff --git a/plugins/zenoh-plugin-storage-manager/src/replica/snapshotter.rs b/plugins/zenoh-plugin-storage-manager/src/replica/snapshotter.rs index 190cf6005b..3f00648597 100644 --- a/plugins/zenoh-plugin-storage-manager/src/replica/snapshotter.rs +++ b/plugins/zenoh-plugin-storage-manager/src/replica/snapshotter.rs @@ -14,16 +14,16 @@ use std::{ collections::{HashMap, HashSet}, convert::TryFrom, + sync::Arc, time::Duration, }; -use async_std::{ - stream::{interval, StreamExt}, - sync::{Arc, RwLock}, - task::sleep, -}; use flume::Receiver; use futures::join; +use tokio::{ + sync::RwLock, + time::{interval, sleep}, +}; use zenoh::{key_expr::OwnedKeyExpr, time::Timestamp, Session}; use zenoh_backend_traits::config::ReplicaConfig; @@ -126,7 +126,7 @@ impl Snapshotter { let mut interval = interval(self.replica_config.delta); loop { - let _ = interval.next().await; + let _ = interval.tick().await; let mut last_snapshot_time = self.content.last_snapshot_time.write().await; let mut last_interval = self.content.last_interval.write().await; diff --git a/plugins/zenoh-plugin-storage-manager/src/replica/storage.rs b/plugins/zenoh-plugin-storage-manager/src/replica/storage.rs index d12b51042c..d3e34f064c 100644 --- a/plugins/zenoh-plugin-storage-manager/src/replica/storage.rs +++ b/plugins/zenoh-plugin-storage-manager/src/replica/storage.rs @@ -14,13 +14,14 @@ use std::{ collections::{HashMap, HashSet}, str::{self, FromStr}, + sync::Arc, time::{SystemTime, UNIX_EPOCH}, }; -use async_std::sync::{Arc, Mutex, RwLock}; use async_trait::async_trait; use flume::{Receiver, Sender}; use futures::select; +use tokio::sync::{Mutex, RwLock}; use zenoh::{ bytes::EncodingBuilderTrait, internal::{ diff --git a/plugins/zenoh-plugin-storage-manager/src/storages_mgt.rs b/plugins/zenoh-plugin-storage-manager/src/storages_mgt.rs index 4ca39cb093..fcc8425545 100644 --- a/plugins/zenoh-plugin-storage-manager/src/storages_mgt.rs +++ b/plugins/zenoh-plugin-storage-manager/src/storages_mgt.rs @@ -11,7 +11,8 @@ // Contributors: // ZettaScale Zenoh Team, // -use async_std::sync::Arc; +use std::sync::Arc; + use zenoh::{session::Session, Result as ZResult}; use zenoh_backend_traits::config::StorageConfig; @@ -19,7 +20,7 @@ pub use super::replica::{Replica, StorageService}; pub enum StorageMessage { Stop, - GetStatus(async_std::channel::Sender), + GetStatus(tokio::sync::mpsc::Sender), } pub(crate) async fn start_storage( @@ -38,7 +39,7 @@ pub(crate) async fn start_storage( let (tx, rx) = flume::bounded(1); - async_std::task::spawn(async move { + tokio::task::spawn(async move { // If a configuration for replica is present, we initialize a replica, else only a storage service // A replica contains a storage service and all metadata required for anti-entropy if config.replica_config.is_some() { diff --git a/plugins/zenoh-plugin-storage-manager/tests/operations.rs b/plugins/zenoh-plugin-storage-manager/tests/operations.rs index d8ada83e4c..483b87e223 100644 --- a/plugins/zenoh-plugin-storage-manager/tests/operations.rs +++ b/plugins/zenoh-plugin-storage-manager/tests/operations.rs @@ -18,7 +18,7 @@ use std::{borrow::Cow, str::FromStr, thread::sleep}; -use async_std::task; +use tokio::runtime::Runtime; use zenoh::{ internal::zasync_executor_init, prelude::*, query::Reply, sample::Sample, time::Timestamp, Config, Session, @@ -51,9 +51,10 @@ async fn get_data(session: &Session, key_expr: &str) -> Vec { } async fn test_updates_in_order() { - task::block_on(async { + async { zasync_executor_init!(); - }); + } + .await; let mut config = Config::default(); config .insert_json5( @@ -148,5 +149,6 @@ async fn test_updates_in_order() { #[test] fn updates_test() { - task::block_on(async { test_updates_in_order().await }); + let rt = Runtime::new().unwrap(); + rt.block_on(async { test_updates_in_order().await }); } diff --git a/plugins/zenoh-plugin-storage-manager/tests/wildcard.rs b/plugins/zenoh-plugin-storage-manager/tests/wildcard.rs index d1633a28d4..6a6e36b2fd 100644 --- a/plugins/zenoh-plugin-storage-manager/tests/wildcard.rs +++ b/plugins/zenoh-plugin-storage-manager/tests/wildcard.rs @@ -19,7 +19,7 @@ use std::{borrow::Cow, str::FromStr, thread::sleep}; // use std::collections::HashMap; -use async_std::task; +use tokio::runtime::Runtime; use zenoh::{ internal::zasync_executor_init, prelude::*, query::Reply, sample::Sample, time::Timestamp, Config, Session, @@ -52,9 +52,7 @@ async fn get_data(session: &Session, key_expr: &str) -> Vec { } async fn test_wild_card_in_order() { - task::block_on(async { - zasync_executor_init!(); - }); + zasync_executor_init!(); let mut config = Config::default(); config .insert_json5( @@ -189,12 +187,8 @@ async fn test_wild_card_in_order() { drop(storage); } -// fn test_wild_card_out_of_order() { -// assert_eq!(true, true); -// } - #[test] fn wildcard_test() { - task::block_on(async { test_wild_card_in_order().await }); - // task::block_on(async { test_wild_card_out_of_order() }); + let rt = Runtime::new().unwrap(); + rt.block_on(async { test_wild_card_in_order().await }); }