From 92295d721ffa68ca0af9ed595fc1a0582f454d82 Mon Sep 17 00:00:00 2001 From: Klim Tsoutsman Date: Tue, 5 Dec 2023 16:10:51 +1100 Subject: [PATCH] Rename `async_channel` to `sync_channel` (#1078) The previous `async_channel` was asynchronous in the sense that it had an internal buffer that allowed a sender and receiver to not have to synchronously rendezvous in order to exchange a message/data. However, it was not asynchronous in the sense of language-level async/await, so the naming would be confusing. We rename that channel implementation to `sync_channel`, which is more accurate in the sense that Senders and Receivers interact with it synchronously (not using async/await), which means the current task may be blocked when waiting on a send or receive operation. This is in preparation for an upcoming PR that will introduce a channel that supports language-level async/await syntax. Signed-off-by: Klimenty Tsoutsman --- Cargo.lock | 40 ++--- applications/bm/Cargo.toml | 4 +- applications/bm/src/lib.rs | 150 +++++++++--------- applications/test_channel/Cargo.toml | 4 +- applications/test_channel/src/lib.rs | 64 ++++---- kernel/console/Cargo.toml | 2 +- kernel/console/src/lib.rs | 6 +- kernel/serial_port/Cargo.toml | 2 +- kernel/serial_port/src/lib.rs | 4 +- .../Cargo.toml | 2 +- .../src/lib.rs | 24 +-- kernel/tty/Cargo.toml | 2 +- kernel/tty/src/channel.rs | 4 +- kernel/tty/src/discipline.rs | 2 +- kernel/unified_channel/Cargo.toml | 4 +- kernel/unified_channel/src/lib.rs | 34 ++-- 16 files changed, 174 insertions(+), 174 deletions(-) rename kernel/{async_channel => sync_channel}/Cargo.toml (96%) rename kernel/{async_channel => sync_channel}/src/lib.rs (95%) diff --git a/Cargo.lock b/Cargo.lock index 35c2f6ea2c..c05769a28f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -159,20 +159,6 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b" -[[package]] -name = "async_channel" -version = "0.1.0" -dependencies = [ - "core2", - "crossbeam-utils", - "debugit", - "log", - "mpmc", - "sync", - "sync_spin", - "wait_queue", -] - [[package]] name = "ata" version = "0.1.0" @@ -320,7 +306,6 @@ version = "0.1.0" dependencies = [ "apic", "app_io", - "async_channel", "cpu", "fs_node", "getopts", @@ -336,6 +321,7 @@ dependencies = [ "scheduler", "simple_ipc", "spawn", + "sync_channel", "task", ] @@ -529,7 +515,6 @@ name = "console" version = "0.1.0" dependencies = [ "app_io", - "async_channel", "core2", "hull", "io", @@ -538,6 +523,7 @@ dependencies = [ "path", "serial_port", "spawn", + "sync_channel", "sync_irq", "task", "tty", @@ -3267,13 +3253,13 @@ dependencies = [ name = "serial_port" version = "0.1.0" dependencies = [ - "async_channel", "core2", "deferred_interrupt_tasks", "interrupts", "log", "serial_port_basic", "spin 0.9.4", + "sync_channel", "sync_irq", ] @@ -3624,6 +3610,20 @@ dependencies = [ "wait_queue", ] +[[package]] +name = "sync_channel" +version = "0.1.0" +dependencies = [ + "core2", + "crossbeam-utils", + "debugit", + "log", + "mpmc", + "sync", + "sync_spin", + "wait_queue", +] + [[package]] name = "sync_irq" version = "0.1.0" @@ -3763,7 +3763,6 @@ name = "test_channel" version = "0.1.0" dependencies = [ "app_io", - "async_channel", "cpu", "getopts", "log", @@ -3771,6 +3770,7 @@ dependencies = [ "scheduler", "spawn", "spin 0.9.4", + "sync_channel", "task", ] @@ -4128,9 +4128,9 @@ dependencies = [ name = "tty" version = "0.1.0" dependencies = [ - "async_channel", "core2", "sync_block", + "sync_channel", ] [[package]] @@ -4182,9 +4182,9 @@ checksum = "f7fe0bb3479651439c9112f72b6c505038574c9fbb575ed1bf3b797fa39dd564" name = "unified_channel" version = "0.1.0" dependencies = [ - "async_channel", "cfg-if 0.1.10", "rendezvous", + "sync_channel", ] [[package]] diff --git a/applications/bm/Cargo.toml b/applications/bm/Cargo.toml index b18fa0d227..f88b37dd8e 100644 --- a/applications/bm/Cargo.toml +++ b/applications/bm/Cargo.toml @@ -53,8 +53,8 @@ path = "../../kernel/memory" [dependencies.rendezvous] path = "../../kernel/rendezvous" -[dependencies.async_channel] -path = "../../kernel/async_channel" +[dependencies.sync_channel] +path = "../../kernel/sync_channel" [dependencies.simple_ipc] path = "../../kernel/simple_ipc" diff --git a/applications/bm/src/lib.rs b/applications/bm/src/lib.rs index a75331dd34..907048e532 100644 --- a/applications/bm/src/lib.rs +++ b/applications/bm/src/lib.rs @@ -1,4 +1,4 @@ -//! A collection of micro-benchmarks for Theseus. +//! A collection of micro-benchmarks for Theseus. //! They include null syscall, context switching, process creation, memory mapping, IPC and file system benchmarks. //! //! To run the memory mapping benchmark, Theseus should be compiled with the "bm_map" configuration option. @@ -23,7 +23,7 @@ extern crate scheduler; extern crate libtest; extern crate memory; extern crate rendezvous; -extern crate async_channel; +extern crate sync_channel; extern crate simple_ipc; extern crate getopts; extern crate pmu_x86; @@ -89,10 +89,10 @@ pub fn main(args: Vec) -> isize { opts.optflag("", "fs_delete", "file delete"); opts.optflag("", "fs", "test code for checking FS' ability"); - opts.optflag("a", "async", "Run IPC bm for the async channel"); + opts.optflag("a", "sync", "Run IPC bm for the sync channel"); opts.optflag("r", "rendezvous", "Run IPC bm for the rendezvous channel"); opts.optflag("p", "pinned", "Sender and Receiver should be pinned to the same core in the IPC bm"); - opts.optflag("b", "blocking", "Sender and Receiver should use blocking versions in the async IPC bm"); + opts.optflag("b", "blocking", "Sender and Receiver should use blocking versions in the sync IPC bm"); opts.optflag("c", "cycles", "Measure the IPC times in reference cycles (need to have a PMU for this option)"); @@ -101,7 +101,7 @@ pub fn main(args: Vec) -> isize { Err(_f) => { println!("{}", _f); print_usage(opts); - return -1; + return -1; } }; @@ -157,8 +157,8 @@ pub fn main(args: Vec) -> isize { println!("RENDEZVOUS IPC"); do_ipc_rendezvous(pinned, cycles) } else if matches.opt_present("a") { - println!("ASYNC IPC"); - do_ipc_async(pinned, blocking, cycles) + println!("SYNC IPC"); + do_ipc_sync(pinned, blocking, cycles) } else { Err("Specify channel type to use") } @@ -196,8 +196,8 @@ pub fn main(args: Vec) -> isize { } -/// Measures the time for null syscall. -/// Calls `do_null_inner` multiple times and averages the value. +/// Measures the time for null syscall. +/// Calls `do_null_inner` multiple times and averages the value. fn do_null() -> Result<(), &'static str> { let mut tries: u64 = 0; let mut max: u64 = core::u64::MIN; @@ -216,7 +216,7 @@ fn do_null() -> Result<(), &'static str> { if lat > max {max = lat;} if lat < min {min = lat;} } - + let lat = tries / TRIES as u64; // We expect the maximum and minimum to be within 10*THRESHOLD_ERROR_RATIO % of the mean value let err = (lat * 10 * THRESHOLD_ERROR_RATIO) / 100; @@ -224,7 +224,7 @@ fn do_null() -> Result<(), &'static str> { printlnwarn!("null_test diff is too big: {} ({} - {}) {}", max-min, max, min, T_UNIT); } let stats = calculate_stats(&vec).ok_or("couldn't calculate stats")?; - + printlninfo!("NULL result: ({})", T_UNIT); printlninfo!("{:?}", stats); printlninfo!("This test is equivalent to `lat_syscall null` in LMBench"); @@ -232,7 +232,7 @@ fn do_null() -> Result<(), &'static str> { } /// Internal function that actually calculates the time for null syscall. -/// Measures this by calling `get_my_current_task_id` of the current task. +/// Measures this by calling `get_my_current_task_id` of the current task. fn do_null_inner(overhead_ct: u64, th: usize, nr: usize) -> Result { let start_hpet: u64; let end_hpet: u64; @@ -263,12 +263,12 @@ fn do_null_inner(overhead_ct: u64, th: usize, nr: usize) -> Result Result<(), &'static str>{ let child_core = match pick_free_core() { - Ok(child_core) => { - printlninfo!("core_{} is idle, so my children will play on it.", child_core); + Ok(child_core) => { + printlninfo!("core_{} is idle, so my children will play on it.", child_core); child_core } _ => { @@ -284,7 +284,7 @@ fn do_spawn() -> Result<(), &'static str>{ let overhead_ct = hpet_timing_overhead()?; print_header(TRIES, ITERATIONS); - + for i in 0..TRIES { let lat = do_spawn_inner(overhead_ct, i+1, TRIES, child_core)?; @@ -328,8 +328,8 @@ fn do_spawn_inner(overhead_ct: u64, th: usize, nr: usize, on_cpu: CpuId) -> Resu .ok_or("Could not find the application 'hello'")?; let crate_name = crate_name_from_path(&app_path).ok_or("invalid app path")?.to_string(); - // here we are taking the time at every iteration. - // otherwise the crate is not fully unloaded from the namespace before the next iteration starts + // here we are taking the time at every iteration. + // otherwise the crate is not fully unloaded from the namespace before the next iteration starts // so it cannot be loaded again and we are returned an error. let iterations = 100; for _ in 0..iterations{ @@ -342,7 +342,7 @@ fn do_spawn_inner(overhead_ct: u64, th: usize, nr: usize, on_cpu: CpuId) -> Resu child.join()?; end_hpet = hpet.get_counter(); - delta_hpet += end_hpet - start_hpet - overhead_ct; + delta_hpet += end_hpet - start_hpet - overhead_ct; } let delta_time = hpet_2_time("", delta_hpet); @@ -354,12 +354,12 @@ fn do_spawn_inner(overhead_ct: u64, th: usize, nr: usize, on_cpu: CpuId) -> Resu } -/// Measures the time to switch between two kernel threads. +/// Measures the time to switch between two kernel threads. /// Calls `do_ctx_inner` multiple times to perform the actual operation fn do_ctx() -> Result<(), &'static str> { let child_core = match pick_free_core() { - Ok(child_core) => { - printlninfo!("core_{} is idle, so my children will play on it.", child_core); + Ok(child_core) => { + printlninfo!("core_{} is idle, so my children will play on it.", child_core); child_core } _ => { @@ -372,12 +372,12 @@ fn do_ctx() -> Result<(), &'static str> { let mut max: u64 = core::u64::MIN; let mut min: u64 = core::u64::MAX; let mut vec = Vec::with_capacity(TRIES); - + print_header(TRIES, ITERATIONS*1000*2); for i in 0..TRIES { let lat = do_ctx_inner(i+1, TRIES, child_core)?; - + tries += lat; vec.push(lat); @@ -458,7 +458,7 @@ fn do_ctx_inner(th: usize, nr: usize, child_core: CpuId) -> Result Result<(), &'static str> { let mut tries: u64 = 0; @@ -478,7 +478,7 @@ fn do_memory_map() -> Result<(), &'static str> { if lat > max {max = lat;} if lat < min {min = lat;} } - + let lat = tries / TRIES as u64; // We expect the maximum and minimum to be within 10*THRESHOLD_ERROR_RATIO % of the mean value let err = (lat * 10 * THRESHOLD_ERROR_RATIO) / 100; @@ -486,7 +486,7 @@ fn do_memory_map() -> Result<(), &'static str> { printlnwarn!("memory_map_test diff is too big: {} ({} - {}) {}", max-min, max, min, T_UNIT); } let stats = calculate_stats(&vec).ok_or("couldn't calculate stats")?; - + printlninfo!("MEMORY MAP result: ({})", T_UNIT); printlninfo!("{:?}", stats); printlninfo!("This test is equivalent to `lat_mmap` in LMBench"); @@ -523,7 +523,7 @@ fn do_memory_map_inner(overhead_ct: u64, th: usize, nr: usize) -> Result Result<(), &'static str> { let child_core = if pinned { @@ -541,9 +541,9 @@ fn do_ipc_rendezvous(pinned: bool, cycles: bool) -> Result<(), &'static str> { for i in 0..TRIES { let lat = if cycles { - do_ipc_rendezvous_inner_cycles(i+1, TRIES, child_core)? + do_ipc_rendezvous_inner_cycles(i+1, TRIES, child_core)? } else { - do_ipc_rendezvous_inner(i+1, TRIES, child_core)? + do_ipc_rendezvous_inner(i+1, TRIES, child_core)? }; tries += lat; vec.push(lat); @@ -586,7 +586,7 @@ fn do_ipc_rendezvous_inner(th: usize, nr: usize, child_core: Option) -> R let taskref3; - if let Some(core) = child_core { + if let Some(core) = child_core { taskref3 = spawn::new_task_builder(overhead_task ,1) .name(String::from("overhead_task_1")) .pin_on_cpu(core) @@ -596,7 +596,7 @@ fn do_ipc_rendezvous_inner(th: usize, nr: usize, child_core: Option) -> R .name(String::from("overhead_task_1")) .spawn()?; } - + taskref3.join()?; let overhead = hpet.get_counter(); @@ -604,11 +604,11 @@ fn do_ipc_rendezvous_inner(th: usize, nr: usize, child_core: Option) -> R // we then create the sender and receiver endpoints for the 2 tasks let (sender1, receiver1) = rendezvous::new_channel(); let (sender2, receiver2) = rendezvous::new_channel(); - + let taskref1; //then we spawn the child task - if let Some(core) = child_core { + if let Some(core) = child_core { taskref1 = spawn::new_task_builder(rendezvous_task_sender, (sender1, receiver2)) .name(String::from("sender")) .pin_on_cpu(core) @@ -651,7 +651,7 @@ fn do_ipc_rendezvous_inner_cycles(th: usize, nr: usize, child_core: Option, rendezv } } -/// Measures the round trip time to send a 1-byte message on an async channel. -/// Calls `do_ipc_async_inner` multiple times to perform the actual operation -fn do_ipc_async(pinned: bool, blocking: bool, cycles: bool) -> Result<(), &'static str> { +/// Measures the round trip time to send a 1-byte message on an sync channel. +/// Calls `do_ipc_sync_inner` multiple times to perform the actual operation +fn do_ipc_sync(pinned: bool, blocking: bool, cycles: bool) -> Result<(), &'static str> { let child_core = if pinned { Some(CPU_ID!()) } else { @@ -738,9 +738,9 @@ fn do_ipc_async(pinned: bool, blocking: bool, cycles: bool) -> Result<(), &'stat for i in 0..TRIES { let lat = if cycles { - do_ipc_async_inner_cycles(i+1, TRIES, child_core, blocking)? + do_ipc_sync_inner_cycles(i+1, TRIES, child_core, blocking)? } else { - do_ipc_async_inner(i+1, TRIES, child_core, blocking)? + do_ipc_sync_inner(i+1, TRIES, child_core, blocking)? }; tries += lat; vec.push(lat); @@ -754,14 +754,14 @@ fn do_ipc_async(pinned: bool, blocking: bool, cycles: bool) -> Result<(), &'stat // We expect the maximum and minimum to be within 10*THRESHOLD_ERROR_RATIO % of the mean value let err = (lat * 10 * THRESHOLD_ERROR_RATIO) / 100; if max - lat > err || lat - min > err { - printlnwarn!("ipc_async_test diff is too big: {} ({} - {})", max-min, max, min); + printlnwarn!("ipc_sync_test diff is too big: {} ({} - {})", max-min, max, min); } let stats = calculate_stats(&vec).ok_or("couldn't calculate stats")?; if cycles { - printlninfo!("IPC ASYNC result: Round Trip Time: (cycles)",); + printlninfo!("IPC SYNC result: Round Trip Time: (cycles)",); } else { - printlninfo!("IPC ASYNC result: Round Trip Time: ({})", T_UNIT); + printlninfo!("IPC SYNC result: Round Trip Time: ({})", T_UNIT); } printlninfo!("{:?}", stats); printlninfo!("This test is equivalent to `lat_pipe` in LMBench when run with the pinned flag enabled"); @@ -772,13 +772,13 @@ fn do_ipc_async(pinned: bool, blocking: bool, cycles: bool) -> Result<(), &'stat /// Internal function that actually calculates the round trip time to send a message between two threads. /// This is measured by creating a child task, and sending messages between the parent and child. /// Overhead is measured by creating a task that just returns. -fn do_ipc_async_inner(th: usize, nr: usize, child_core: Option, blocking: bool) -> Result { +fn do_ipc_sync_inner(th: usize, nr: usize, child_core: Option, blocking: bool) -> Result { let hpet = get_hpet().ok_or("Could not retrieve hpet counter")?; - let (sender_task, receiver_task): (fn((async_channel::Sender, async_channel::Receiver)), fn((async_channel::Sender, async_channel::Receiver))) = if blocking { - (async_task_sender, async_task_receiver) + let (sender_task, receiver_task): (fn((sync_channel::Sender, sync_channel::Receiver)), fn((sync_channel::Sender, sync_channel::Receiver))) = if blocking { + (sync_task_sender, sync_task_receiver) } else { - (async_task_sender_nonblocking, async_task_receiver_nonblocking) + (sync_task_sender_nonblocking, sync_task_receiver_nonblocking) }; // we first spawn one task to get the overhead of creating and joining the task @@ -788,7 +788,7 @@ fn do_ipc_async_inner(th: usize, nr: usize, child_core: Option, blocking: let taskref3; - if let Some(core) = child_core { + if let Some(core) = child_core { taskref3 = spawn::new_task_builder(overhead_task ,1) .name(String::from("overhead_task_1")) .pin_on_cpu(core) @@ -798,7 +798,7 @@ fn do_ipc_async_inner(th: usize, nr: usize, child_core: Option, blocking: .name(String::from("overhead_task_1")) .spawn()?; } - + taskref3.join()?; let overhead = hpet.get_counter(); @@ -808,12 +808,12 @@ fn do_ipc_async_inner(th: usize, nr: usize, child_core: Option, blocking: // which is 16 4 KiB-pages, or 65,536 bytes. const CAPACITY: usize = 65536; - let (sender1, receiver1) = async_channel::new_channel(CAPACITY); - let (sender2, receiver2) = async_channel::new_channel(CAPACITY); - + let (sender1, receiver1) = sync_channel::new_channel(CAPACITY); + let (sender2, receiver2) = sync_channel::new_channel(CAPACITY); + let taskref1; - if let Some(core) = child_core { + if let Some(core) = child_core { taskref1 = spawn::new_task_builder(sender_task, (sender1, receiver2)) .name(String::from("sender")) .pin_on_cpu(core) @@ -845,14 +845,14 @@ fn do_ipc_async_inner(th: usize, nr: usize, child_core: Option, blocking: /// Internal function that actually calculates the round trip time to send a message between two threads. /// This is measured by creating a child task, and sending messages between the parent and child. /// Overhead is measured by creating a task that just returns. -fn do_ipc_async_inner_cycles(th: usize, nr: usize, child_core: Option, blocking: bool) -> Result { +fn do_ipc_sync_inner_cycles(th: usize, nr: usize, child_core: Option, blocking: bool) -> Result { pmu_x86::init()?; let mut counter = start_counting_reference_cycles()?; - let (sender_task, receiver_task): (fn((async_channel::Sender, async_channel::Receiver)), fn((async_channel::Sender, async_channel::Receiver))) = if blocking { - (async_task_sender, async_task_receiver) + let (sender_task, receiver_task): (fn((sync_channel::Sender, sync_channel::Receiver)), fn((sync_channel::Sender, sync_channel::Receiver))) = if blocking { + (sync_task_sender, sync_task_receiver) } else { - (async_task_sender_nonblocking, async_task_receiver_nonblocking) + (sync_task_sender_nonblocking, sync_task_receiver_nonblocking) }; // we first spawn one task to get the overhead of creating and joining the task @@ -862,7 +862,7 @@ fn do_ipc_async_inner_cycles(th: usize, nr: usize, child_core: Option, bl let taskref3; - if let Some(core) = child_core { + if let Some(core) = child_core { taskref3 = spawn::new_task_builder(overhead_task ,1) .name(String::from("overhead_task_1")) .pin_on_cpu(core) @@ -872,7 +872,7 @@ fn do_ipc_async_inner_cycles(th: usize, nr: usize, child_core: Option, bl .name(String::from("overhead_task_1")) .spawn()?; } - + taskref3.join()?; let overhead = counter.diff(); @@ -883,12 +883,12 @@ fn do_ipc_async_inner_cycles(th: usize, nr: usize, child_core: Option, bl // which is 16 4 KiB-pages, or 65,536 bytes. const CAPACITY: usize = 65536; - let (sender1, receiver1) = async_channel::new_channel(CAPACITY); - let (sender2, receiver2) = async_channel::new_channel(CAPACITY); - + let (sender1, receiver1) = sync_channel::new_channel(CAPACITY); + let (sender2, receiver2) = sync_channel::new_channel(CAPACITY); + let taskref1; - if let Some(core) = child_core { + if let Some(core) = child_core { taskref1 = spawn::new_task_builder(sender_task, (sender1, receiver2)) .name(String::from("sender")) .pin_on_cpu(core) @@ -916,25 +916,25 @@ fn do_ipc_async_inner_cycles(th: usize, nr: usize, child_core: Option, bl } /// A task which sends and then receives a message for a number of iterations -fn async_task_sender((sender, receiver): (async_channel::Sender, async_channel::Receiver)) { +fn sync_task_sender((sender, receiver): (sync_channel::Sender, sync_channel::Receiver)) { let mut msg = 0; for _ in 0..ITERATIONS{ - sender.send(msg).expect("async channel task: could not send message!"); - msg = receiver.receive().expect("async channel task: could not receive message"); + sender.send(msg).expect("sync channel task: could not send message!"); + msg = receiver.receive().expect("sync channel task: could not receive message"); } } /// A task which receives and then sends a message for a number of iterations -fn async_task_receiver((sender, receiver): (async_channel::Sender, async_channel::Receiver)) { +fn sync_task_receiver((sender, receiver): (sync_channel::Sender, sync_channel::Receiver)) { let mut msg; for _ in 0..ITERATIONS{ - msg = receiver.receive().expect("async channel task: could not receive message"); - sender.send(msg).expect("async channel task: could not send message!"); + msg = receiver.receive().expect("sync channel task: could not receive message"); + sender.send(msg).expect("sync channel task: could not send message!"); } } /// A task which sends and then receives a message for a number of iterations -fn async_task_sender_nonblocking((sender, receiver): (async_channel::Sender, async_channel::Receiver)) { +fn sync_task_sender_nonblocking((sender, receiver): (sync_channel::Sender, sync_channel::Receiver)) { let mut msg = Ok(0); for _ in 0..ITERATIONS{ while sender.try_send(*msg.as_ref().unwrap()).is_err() {} @@ -946,7 +946,7 @@ fn async_task_sender_nonblocking((sender, receiver): (async_channel::Sender, } /// A task which receives and then sends a message for a number of iterations -fn async_task_receiver_nonblocking((sender, receiver): (async_channel::Sender, async_channel::Receiver)) { +fn sync_task_receiver_nonblocking((sender, receiver): (sync_channel::Sender, sync_channel::Receiver)) { let mut msg; for _ in 0..ITERATIONS{ msg = receiver.try_receive(); diff --git a/applications/test_channel/Cargo.toml b/applications/test_channel/Cargo.toml index 5b6e97354b..f49e7ceab7 100644 --- a/applications/test_channel/Cargo.toml +++ b/applications/test_channel/Cargo.toml @@ -29,5 +29,5 @@ path = "../../kernel/spawn" [dependencies.rendezvous] path = "../../kernel/rendezvous" -[dependencies.async_channel] -path = "../../kernel/async_channel" +[dependencies.sync_channel] +path = "../../kernel/sync_channel" diff --git a/applications/test_channel/src/lib.rs b/applications/test_channel/src/lib.rs index b93cd9a261..06d086c024 100644 --- a/applications/test_channel/src/lib.rs +++ b/applications/test_channel/src/lib.rs @@ -9,7 +9,7 @@ extern crate task; extern crate spawn; extern crate scheduler; extern crate rendezvous; -extern crate async_channel; +extern crate sync_channel; extern crate cpu; @@ -117,13 +117,13 @@ fn rmain(matches: Matches) -> Result<(), &'static str> { println!("Running rendezvous channel test in multiple mode."); rendezvous_test_multiple(send_count!(), receive_count!(), send_panic_point, receive_panic_point)?; - println!("Running asynchronous channel test in oneshot mode."); + println!("Running sync channel test in oneshot mode."); for _i in 0 .. iterations!() { - asynchronous_test_oneshot()?; + sync_test_oneshot()?; } - println!("Running asynchronous channel test in multiple mode."); - asynchronous_test_multiple(send_count!(), receive_count!(), send_panic_point, receive_panic_point)?; + println!("Running sync channel test in multiple mode."); + sync_test_multiple(send_count!(), receive_count!(), send_panic_point, receive_panic_point)?; Ok(()) } @@ -250,84 +250,84 @@ fn rendezvous_sender_task ((sender, iterations, panic_point): (rendezvous::Sende /// A simple test that spawns a sender & receiver task to send a single message /// Optionally can set panics at `send_panic` and `receive_panic` locations -fn asynchronous_test_oneshot() -> Result<(), &'static str> { +fn sync_test_oneshot() -> Result<(), &'static str> { let my_cpu = cpu::current_cpu(); - let (sender, receiver) = async_channel::new_channel(2); + let (sender, receiver) = sync_channel::new_channel(2); let t1 = spawn::new_task_builder(|_: ()| -> Result<(), &'static str> { - warn!("asynchronous_test_oneshot(): Entered sender task!"); + warn!("sync_test_oneshot(): Entered sender task!"); sender.send("hello").map_err(|error| { warn!("Sender task failed due to : {:?}", error); return "Sender task failed"; })?; Ok(()) }, ()) - .name(String::from("sender_task_asynchronous_oneshot")) + .name(String::from("sender_task_sync_oneshot")) .block(); let t1 = pin_task!(t1, my_cpu).spawn()?; let t2 = spawn::new_task_builder(|_: ()| -> Result<(), &'static str> { - warn!("asynchronous_test_oneshot(): Entered receiver task!"); + warn!("sync_test_oneshot(): Entered receiver task!"); let msg = receiver.receive().map_err(|error| { warn!("Receiver task failed due to : {:?}", error); return "Receiver task failed" })?; - warn!("asynchronous_test_oneshot(): Receiver got msg: {:?}", msg); + warn!("sync_test_oneshot(): Receiver got msg: {:?}", msg); Ok(()) }, ()) - .name(String::from("receiver_task_asynchronous_oneshot")) + .name(String::from("receiver_task_sync_oneshot")) .block(); let t2 = pin_task!(t2, my_cpu).spawn()?; - warn!("asynchronous_test_oneshot(): Finished spawning the sender and receiver tasks"); + warn!("sync_test_oneshot(): Finished spawning the sender and receiver tasks"); t2.unblock().unwrap(); t1.unblock().unwrap(); t1.join()?; t2.join()?; - warn!("asynchronous_test_oneshot(): Joined the sender and receiver tasks."); + warn!("sync_test_oneshot(): Joined the sender and receiver tasks."); Ok(()) } /// A simple test that spawns a sender & receiver task to send `send_count` and receive `receive_count` messages. -fn asynchronous_test_multiple(send_count: usize, receive_count: usize, send_panic: Option, receive_panic: Option) -> Result<(), &'static str> { +fn sync_test_multiple(send_count: usize, receive_count: usize, send_panic: Option, receive_panic: Option) -> Result<(), &'static str> { let my_cpu = cpu::current_cpu(); - let (sender, receiver) = async_channel::new_channel(2); + let (sender, receiver) = sync_channel::new_channel(2); - let t1 = spawn::new_task_builder(asynchronous_sender_task, (sender, send_count, send_panic)) - .name(String::from("sender_task_asynchronous")) + let t1 = spawn::new_task_builder(sync_sender_task, (sender, send_count, send_panic)) + .name(String::from("sender_task_sync")) .block(); let t1 = pin_task!(t1, my_cpu).spawn()?; - let t2 = spawn::new_task_builder(asynchronous_receiver_task, (receiver, receive_count, receive_panic)) - .name(String::from("receiver_task_asynchronous")) + let t2 = spawn::new_task_builder(sync_receiver_task, (receiver, receive_count, receive_panic)) + .name(String::from("receiver_task_sync")) .block(); let t2 = pin_task!(t2, my_cpu).spawn()?; - warn!("asynchronous_test_multiple(): Finished spawning the sender and receiver tasks"); + warn!("sync_test_multiple(): Finished spawning the sender and receiver tasks"); t2.unblock().unwrap(); t1.unblock().unwrap(); t1.join()?; t2.join()?; - warn!("asynchronous_test_multiple(): Joined the sender and receiver tasks."); + warn!("sync_test_multiple(): Joined the sender and receiver tasks."); Ok(()) } /// A simple receiver receiving `iterations` messages /// Optionally may panic after sending `panic_pont` messages -fn asynchronous_receiver_task ((receiver, iterations, panic_point): (async_channel::Receiver, usize, Option)) -> Result<(), &'static str> { - warn!("asynchronous_test(): Entered receiver task! Expecting to receive {} messages", iterations); +fn sync_receiver_task ((receiver, iterations, panic_point): (sync_channel::Receiver, usize, Option)) -> Result<(), &'static str> { + warn!("sync_test(): Entered receiver task! Expecting to receive {} messages", iterations); if panic_point.is_some(){ - warn!("asynchronous_test(): Panic will occur in receiver task at message {}",panic_point.unwrap()); + warn!("sync_test(): Panic will occur in receiver task at message {}",panic_point.unwrap()); } for i in 0..iterations { @@ -335,23 +335,23 @@ fn asynchronous_receiver_task ((receiver, iterations, panic_point): (async_chann warn!("Receiver task failed due to : {:?}", error); return "Receiver task failed" })?; - warn!("asynchronous_test(): Receiver got {:?} ({:03})", msg, i); + warn!("sync_test(): Receiver got {:?} ({:03})", msg, i); if panic_point == Some(i) { panic!("rendezvous_test() : User specified panic in receiver"); } } - warn!("asynchronous_test(): Done receiver task!"); + warn!("sync_test(): Done receiver task!"); Ok(()) } /// A simple sender sending `iterations` messages /// Optionally may panic after sending `panic_pont` messages -fn asynchronous_sender_task ((sender, iterations, panic_point): (async_channel::Sender, usize, Option)) -> Result<(), &'static str> { - warn!("asynchronous_test(): Entered sender task! Expecting to send {} messages", iterations); +fn sync_sender_task ((sender, iterations, panic_point): (sync_channel::Sender, usize, Option)) -> Result<(), &'static str> { + warn!("sync_test(): Entered sender task! Expecting to send {} messages", iterations); if panic_point.is_some(){ - warn!("asynchronous_test(): Panic will occur in sender task at message {}",panic_point.unwrap()); + warn!("sync_test(): Panic will occur in sender task at message {}",panic_point.unwrap()); } for i in 0..iterations { @@ -359,14 +359,14 @@ fn asynchronous_sender_task ((sender, iterations, panic_point): (async_channel:: warn!("Sender task failed due to : {:?}", error); return "Sender task failed"; })?; - warn!("asynchronous_test(): Sender sent message {:03}", i); + warn!("sync_test(): Sender sent message {:03}", i); if panic_point == Some(i) { panic!("rendezvous_test() : User specified panic in receiver"); } } - warn!("asynchronous_test(): Done sender task!"); + warn!("sync_test(): Done sender task!"); Ok(()) } diff --git a/kernel/console/Cargo.toml b/kernel/console/Cargo.toml index a5b18ce8d4..4feaadbeff 100644 --- a/kernel/console/Cargo.toml +++ b/kernel/console/Cargo.toml @@ -10,7 +10,7 @@ log = "0.4.8" core2 = { version = "0.4.0", default-features = false, features = ["alloc", "nightly"] } app_io = { path = "../app_io" } -async_channel = { path = "../async_channel" } +sync_channel = { path = "../sync_channel" } io = { path = "../io" } mod_mgmt = { path = "../mod_mgmt" } path = { path = "../path" } diff --git a/kernel/console/src/lib.rs b/kernel/console/src/lib.rs index 723e628fca..455b93a0de 100644 --- a/kernel/console/src/lib.rs +++ b/kernel/console/src/lib.rs @@ -5,7 +5,7 @@ extern crate alloc; use alloc::{format, sync::Arc}; -use async_channel::Receiver; +use sync_channel::Receiver; use core::sync::atomic::{AtomicU16, Ordering}; use core2::io::Write; use sync_irq::IrqSafeMutex; @@ -31,7 +31,7 @@ pub fn ignore_serial_port_input(serial_port_address: u16) { /// /// Returns the newly-spawned detection task. pub fn start_connection_detection() -> Result { - let (sender, receiver) = async_channel::new_channel(4); + let (sender, receiver) = sync_channel::new_channel(4); serial_port::set_connection_listener(sender); spawn::new_task_builder(console_connection_detector, receiver) @@ -69,7 +69,7 @@ fn console_connection_detector( } }; - let (sender, receiver) = async_channel::new_channel(16); + let (sender, receiver) = sync_channel::new_channel(16); if serial_port.lock().set_data_sender(sender).is_err() { warn!( "Serial port {:?} already had a data sender, skipping console connection request", diff --git a/kernel/serial_port/Cargo.toml b/kernel/serial_port/Cargo.toml index 326278dff9..7bea25df9c 100644 --- a/kernel/serial_port/Cargo.toml +++ b/kernel/serial_port/Cargo.toml @@ -16,7 +16,7 @@ interrupts = { path = "../interrupts" } deferred_interrupt_tasks = { path = "../deferred_interrupt_tasks" } # Dependencies below here are temporary, for console creation testing. -async_channel = { path = "../async_channel" } +sync_channel = { path = "../sync_channel" } [lib] crate-type = ["rlib"] diff --git a/kernel/serial_port/src/lib.rs b/kernel/serial_port/src/lib.rs index 00416bd6b9..ec0e025ab1 100644 --- a/kernel/serial_port/src/lib.rs +++ b/kernel/serial_port/src/lib.rs @@ -39,8 +39,8 @@ use interrupts::{PL011_RX_SPI, init_pl011_rx_interrupt}; // Dependencies below here are temporary and will be removed // after we have support for separate interrupt handling tasks. -extern crate async_channel; -use async_channel::Sender; +extern crate sync_channel; +use sync_channel::Sender; /// A temporary hack to allow the serial port interrupt handler /// to inform a listener on the other end of this channel diff --git a/kernel/async_channel/Cargo.toml b/kernel/sync_channel/Cargo.toml similarity index 96% rename from kernel/async_channel/Cargo.toml rename to kernel/sync_channel/Cargo.toml index 8c2867dba3..1647710914 100644 --- a/kernel/async_channel/Cargo.toml +++ b/kernel/sync_channel/Cargo.toml @@ -1,6 +1,6 @@ [package] authors = ["Kevin Boos "] -name = "async_channel" +name = "sync_channel" description = "Channel for asynchronous Inter-Task Communication via a bounded buffer" version = "0.1.0" diff --git a/kernel/async_channel/src/lib.rs b/kernel/sync_channel/src/lib.rs similarity index 95% rename from kernel/async_channel/src/lib.rs rename to kernel/sync_channel/src/lib.rs index ae82aca872..823689018f 100644 --- a/kernel/async_channel/src/lib.rs +++ b/kernel/sync_channel/src/lib.rs @@ -1,8 +1,8 @@ -//! An asynchronous channel for Inter-Task Communication (ITC) with an internal queue for buffering messages. +//! A blocking channel for Inter-Task Communication (ITC) with an internal queue for buffering messages. //! -//! This crate offers an asynchronous channel that allows multiple tasks -//! to exchange messages through the use of a bounded-capacity intermediate buffer. -//! Unlike the `rendezvous` channel, the sender and receiver do not need to rendezvous to send or receive data. +//! This crate offers a blocking channel that allows multiple tasks to exchange messages through the +//! use of a bounded-capacity intermediate buffer. Unlike the `rendezvous` channel, the sender and +//! receiver do not need to rendezvous to send or receive data. //! //! Only `Send` types can be sent or received through the channel. //! @@ -49,11 +49,11 @@ pub fn new_channel(minimum_capacity: usize) -> (Sender, Receiver) new_channel_with(minimum_capacity) } -/// Creates a new asynchronous channel with the specified deadlock prevention method. +/// Creates a new blocking channel with the specified deadlock prevention method. /// /// See [`new_channel()`] for more details. /// -/// The asynchronous channel uses a wait queue internally and hence exposes a +/// The blocking channel uses a wait queue internally and hence exposes a /// deadlock prevention type parameter `P` that is [`Spin`] by default. /// See [`WaitQueue`]'s documentation for more info on setting this type parameter. pub fn new_channel_with( @@ -109,7 +109,7 @@ impl From for core2::io::Error { /// The inner channel for asynchronous communication between `Sender`s and `Receiver`s. /// -/// This struct is effectively a wrapper around a MPMC queue +/// This struct is effectively a wrapper around an MPMC queue /// with waitqueues for senders (producers) and receivers (consumers). /// /// This channel object is not Send/Sync or cloneable itself; @@ -184,7 +184,7 @@ impl Sender { /// otherwise returns an [`Error`]. pub fn send(&self, msg: T) -> Result<(), Error> { #[cfg(trace_channel)] - trace!("async_channel: sending msg: {:?}", debugit!(msg)); + trace!("sync_channel: sending msg: {:?}", debugit!(msg)); // Fast path: attempt to send the message, assuming the buffer isn't full let msg = match self.try_send(msg) { // if successful return ok @@ -365,7 +365,7 @@ impl Receiver { /// /// Returns the message if it was received properly, otherwise returns an [`Error`]. pub fn receive(&self) -> Result { - // trace!("async_channel: receive() entry"); + // trace!("sync_channel: receive() entry"); // Fast path: attempt to receive a message, assuming the buffer isn't empty // The code progresses beyond this match only if try_receive fails due to // empty channel @@ -373,7 +373,7 @@ impl Receiver { Err(Error::WouldBlock) => {}, x => { #[cfg(trace_channel)] - trace!("async_channel: received msg: {:?}", debugit!(x)); + trace!("sync_channel: received msg: {:?}", debugit!(x)); return x; } }; @@ -408,12 +408,12 @@ impl Receiver { // If we successfully received a message, we need to notify any waiting senders. // As stated above, to avoid deadlock, this must be done here rather than in the above closure. if let Ok(ref _msg) = res { - // trace!("async_channel: successful receive() is notifying senders."); + // trace!("sync_channel: successful receive() is notifying senders."); self.channel.waiting_senders.notify_one(); } #[cfg(trace_channel)] - trace!("async_channel: received msg: {:?}", debugit!(res)); + trace!("sync_channel: received msg: {:?}", debugit!(res)); res } diff --git a/kernel/tty/Cargo.toml b/kernel/tty/Cargo.toml index f1a5fa6ad6..c7b86b27d8 100644 --- a/kernel/tty/Cargo.toml +++ b/kernel/tty/Cargo.toml @@ -6,7 +6,7 @@ description = "TTY abstractions" edition = "2021" [dependencies] -async_channel = { path = "../async_channel" } +sync_channel = { path = "../sync_channel" } sync_block = { path = "../sync_block" } [dependencies.core2] diff --git a/kernel/tty/src/channel.rs b/kernel/tty/src/channel.rs index 28ed06a185..841c1e91b0 100644 --- a/kernel/tty/src/channel.rs +++ b/kernel/tty/src/channel.rs @@ -1,4 +1,4 @@ -use async_channel::{Receiver, Sender}; +use sync_channel::{Receiver, Sender}; use core2::io::Result; #[derive(Clone)] @@ -9,7 +9,7 @@ pub(crate) struct Channel { impl Channel { pub(crate) fn new() -> Self { - let (sender, receiver) = async_channel::new_channel(256); + let (sender, receiver) = sync_channel::new_channel(256); Self { sender, receiver } } diff --git a/kernel/tty/src/discipline.rs b/kernel/tty/src/discipline.rs index 36e52f992b..7b245d3e3a 100644 --- a/kernel/tty/src/discipline.rs +++ b/kernel/tty/src/discipline.rs @@ -2,7 +2,7 @@ use core::sync::atomic::{AtomicBool, Ordering}; use crate::Channel; use alloc::vec::Vec; -use async_channel::{new_channel, Receiver, Sender}; +use sync_channel::{new_channel, Receiver, Sender}; use core2::io::Result; use sync_block::Mutex; diff --git a/kernel/unified_channel/Cargo.toml b/kernel/unified_channel/Cargo.toml index 417ef04d1a..5702d42168 100644 --- a/kernel/unified_channel/Cargo.toml +++ b/kernel/unified_channel/Cargo.toml @@ -7,8 +7,8 @@ authors = ["Kevin Boos "] [dependencies] cfg-if = "0.1.6" -[dependencies.async_channel] -path = "../async_channel" +[dependencies.sync_channel] +path = "../sync_channel" [dependencies.rendezvous] path = "../rendezvous" diff --git a/kernel/unified_channel/src/lib.rs b/kernel/unified_channel/src/lib.rs index b5d9d3ed82..af5b1490e6 100644 --- a/kernel/unified_channel/src/lib.rs +++ b/kernel/unified_channel/src/lib.rs @@ -1,22 +1,22 @@ //! A cfg-based wrapper that unifies rendezvous channels and async channels, for evaluation purposes. //! -//! The cfg option is `use_async_channel`, the default is to use the rendezvous channel. +//! The cfg option is `use_sync_channel`, the default is to use the rendezvous channel. #![no_std] extern crate alloc; -extern crate async_channel; +extern crate sync_channel; extern crate rendezvous; use alloc::string::String; pub fn new_string_channel(_minimum_capacity: usize) -> (StringSender, StringReceiver) { - #[cfg(use_async_channel)] { - let (sender, receiver) = async_channel::new_channel::(_minimum_capacity); + #[cfg(use_sync_channel)] { + let (sender, receiver) = sync_channel::new_channel::(_minimum_capacity); return (StringSender { sender }, StringReceiver { receiver }); } - #[cfg(not(use_async_channel))] { + #[cfg(not(use_sync_channel))] { let (sender, receiver) = rendezvous::new_channel::(); return (StringSender { sender }, StringReceiver { receiver }); } @@ -24,18 +24,18 @@ pub fn new_string_channel(_minimum_capacity: usize) -> (StringSender, StringRece #[derive(Clone)] pub struct StringSender { - #[cfg(use_async_channel)] - sender: async_channel::Sender, - #[cfg(not(use_async_channel))] + #[cfg(use_sync_channel)] + sender: sync_channel::Sender, + #[cfg(not(use_sync_channel))] sender: rendezvous::Sender, } impl StringSender { - #[cfg(use_async_channel)] + #[cfg(use_sync_channel)] pub fn send(&self, msg: String) -> Result<(), &'static str> { - self.sender.send(msg).map_err(|_e| "async channel send error") + self.sender.send(msg).map_err(|_e| "mpmc channel send error") } - #[cfg(not(use_async_channel))] + #[cfg(not(use_sync_channel))] pub fn send(&self, msg: String) -> Result<(), &'static str> { self.sender.send(msg) } @@ -43,18 +43,18 @@ impl StringSender { #[derive(Clone)] pub struct StringReceiver { - #[cfg(use_async_channel)] - receiver: async_channel::Receiver, - #[cfg(not(use_async_channel))] + #[cfg(use_sync_channel)] + receiver: sync_channel::Receiver, + #[cfg(not(use_sync_channel))] receiver: rendezvous::Receiver, } impl StringReceiver { - #[cfg(use_async_channel)] + #[cfg(use_sync_channel)] pub fn receive(&self) -> Result { - self.receiver.receive().map_err(|_e| "async channel receive error") + self.receiver.receive().map_err(|_e| "mpmc channel receive error") } - #[cfg(not(use_async_channel))] + #[cfg(not(use_sync_channel))] pub fn receive(&self) -> Result { self.receiver.receive() }