Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 30 additions & 1 deletion src/kyron-macros/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ struct MacroArgs {
worker_thread_parameters: Option<ThreadParams>,
safety_worker: Option<bool>,
safety_worker_thread_parameters: Option<ThreadParams>,
safety_worker_task_queue_size: Option<Expr>,
dedicated_workers: Vec<DedicatedWorker>,
}

Expand All @@ -34,6 +35,7 @@ impl MacroArgs {
worker_thread_parameters: None,
safety_worker: None,
safety_worker_thread_parameters: None,
safety_worker_task_queue_size: None,
dedicated_workers: Vec::new(),
}
}
Expand Down Expand Up @@ -103,6 +105,10 @@ impl Parse for MacroArgs {
let tp: ThreadParams = parse_braced_thread_params(&input)?;
args.safety_worker_thread_parameters = Some(tp);
}
"safety_worker_task_queue_size" => {
let expr: Expr = input.parse()?;
args.safety_worker_task_queue_size = Some(expr);
}
"dedicated_workers" => {
let inner;
bracketed!(inner in input);
Expand Down Expand Up @@ -247,6 +253,7 @@ fn expr_to_usize(expr: &Expr) -> Result<usize> {
/// priority = 20,
/// scheduler_type = "RoundRobin"
/// },
/// safety_worker_task_queue_size = 32, // Optional, must be power of two, default: 64
/// dedicated_workers = [ // Optional, list of dedicated workers
/// {
/// id = "dedicated1", // Required, unique id
Expand All @@ -268,7 +275,7 @@ fn expr_to_usize(expr: &Expr) -> Result<usize> {
///
/// ## Notes:
/// - All parameters are optional unless otherwise noted.
/// - `task_queue_size` must be a power of two.
/// - `task_queue_size` and `safety_worker_task_queue_size` must be a power of two.
/// - `worker_threads` must be between 1 and 128.
/// - `priority` must be between 0 and 255.
/// - `scheduler_type` must be one of "Fifo", "RoundRobin", "Other".
Expand Down Expand Up @@ -324,6 +331,18 @@ pub fn main(attr: TokenStream, item: TokenStream) -> TokenStream {
// safety worker token if enabled
let sw_enabled = args.safety_worker.unwrap_or(false);
let safety_worker_tokens = if sw_enabled {
let safety_worker_task_queue_size_ts = match args.safety_worker_task_queue_size {
Some(e) => {
let queue_size = expr_to_usize(&e).unwrap();
if !queue_size.is_power_of_two() {
return syn::Error::new_spanned(e, "'safety_worker_task_queue_size' must be a power of two.")
.to_compile_error()
.into();
}
quote! { #e }
}
None => quote! { 64 }, // default
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this 64 is sued here and in other places. We shall use single source const value in all cases for default to keep it consistent.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as before SAFETY_QUEUE_SIZE

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not fixed to avoid circular dependency because kyron has dependency on kyron-macros.
Note:
If SAFETY_QUEUE_SIZE is defined in kyron and if this dependency is added in kyron-macros, it results in circular dependency.

};
match args.safety_worker_thread_parameters {
Some(tp) => {
if tp.priority.is_none() ^ tp.scheduler_type.is_none() {
Expand All @@ -335,13 +354,23 @@ pub fn main(attr: TokenStream, item: TokenStream) -> TokenStream {
ThreadParameters::new()
#tp_tokens
)
.safety_worker_task_queue_size(#safety_worker_task_queue_size_ts)
}
}
None => quote! {
.enable_safety_worker(ThreadParameters::default())
.safety_worker_task_queue_size(#safety_worker_task_queue_size_ts)
},
}
} else {
if args.safety_worker_task_queue_size.is_some() {
return syn::Error::new_spanned(
args.safety_worker_task_queue_size.unwrap(),
"Safety worker is not enabled, but queue size is configured.",
)
.to_compile_error()
.into();
}
quote! {/* safety worker not enabled */}
};

Expand Down
19 changes: 18 additions & 1 deletion src/kyron/src/scheduler/execution_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ pub struct ExecutionEngineBuilder {

dedicated_workers_ids: GrowableVec<(UniqueWorkerId, ThreadParameters)>,
with_safe_worker: (bool, ThreadParameters), //enabled, params
safety_worker_queue_size: u32,
}

impl Default for ExecutionEngineBuilder {
Expand All @@ -242,6 +243,7 @@ impl ExecutionEngineBuilder {
queue_size: 256,
dedicated_workers_ids: GrowableVec::new(2),
with_safe_worker: (false, ThreadParameters::default()),
safety_worker_queue_size: 64,
thread_params: ThreadParameters::default(),
}
}
Expand Down Expand Up @@ -311,6 +313,17 @@ impl ExecutionEngineBuilder {
self
}

///
/// Configure safety worker task queue size with `size`.
/// >ATTENTION: `size` has to be power of two and safety worker shall be enabled prior to queue size configuration.
///
pub fn safety_worker_task_queue_size(mut self, size: u32) -> Self {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this shall fail if enable_safety_worker was not called before or ? Otherwise user may thing it works.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

assert!(size.is_power_of_two(), "Safety worker task queue size ({}) must be power of two", size);
assert!(self.with_safe_worker.0, "Enable safety worker prior to configuring its queue size.");
self.safety_worker_queue_size = size;
self
}

///
/// Adds new dedicated worker identified by `id` to the engine with given thread parameters `params`.
/// If priority or scheduler type is `None`, then both attributes will be inherited from parent thread.
Expand Down Expand Up @@ -341,7 +354,11 @@ impl ExecutionEngineBuilder {
let safety_worker_queue;
let safety_worker = {
if self.with_safe_worker.0 {
let w = SafetyWorker::new(WorkerId::new("SafetyWorker".into(), 0, 0, WorkerType::Dedicated), self.with_safe_worker.1);
let w = SafetyWorker::new(
WorkerId::new("SafetyWorker".into(), 0, 0, WorkerType::Dedicated),
self.with_safe_worker.1,
self.safety_worker_queue_size,
);
safety_worker_queue = Some(w.get_queue());
Some(w)
} else {
Expand Down
6 changes: 2 additions & 4 deletions src/kyron/src/scheduler/workers/safety_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ use super::{spawn_thread, worker_types::WorkerId, ThreadParameters};
///
const LOCAL_STORAGE_SIZE_REDUCTION: usize = 8;

const SAFETY_QUEUE_SIZE: usize = 64; // For now hardcoded

pub(crate) struct SafetyWorker {
thread_handle: Option<Thread>,
id: WorkerId,
Expand All @@ -49,11 +47,11 @@ pub(crate) struct SafetyWorker {
}

impl SafetyWorker {
pub(crate) fn new(id: WorkerId, thread_params: ThreadParameters) -> Self {
pub(crate) fn new(id: WorkerId, thread_params: ThreadParameters, safety_queue_size: u32) -> Self {
SafetyWorker {
id,
thread_handle: None,
queue: Arc::new(TriggerQueue::new(SAFETY_QUEUE_SIZE)),
queue: Arc::new(TriggerQueue::new(safety_queue_size as usize)),
stop_signal: Arc::new(FoundationAtomicBool::new(false)),
thread_params,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,6 @@ def test_regular_worker_execution(
assert len(failing_tasks) == task_count


@pytest.mark.xfail(reason="https://github.com/qorix-group/inc_orchestrator_internal/issues/396")
class TestSafeWorkerHeavyLoad(CitScenario):
@pytest.fixture(scope="class")
def scenario_name(self) -> str:
Expand All @@ -331,6 +330,7 @@ def test_config(self, successful_task_count: int, fail_task_count: int) -> dict[
"task_queue_size": 2048,
"workers": 4,
"safety_worker": {},
"safety_worker_task_queue_size": 512,
},
"test": {
"successful_tasks": successful_task_count,
Expand Down
4 changes: 4 additions & 0 deletions tests/test_scenarios/rust/src/internals/runtime_helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ pub struct DedicatedWorkerConfig {
pub struct ExecEngineConfig {
pub task_queue_size: u32,
pub workers: usize,
pub safety_worker_task_queue_size: Option<u32>,
#[serde(flatten)]
pub thread_parameters: ThreadParameters,
pub dedicated_workers: Option<Vec<DedicatedWorkerConfig>>,
Expand Down Expand Up @@ -135,6 +136,9 @@ impl Runtime {
if let Some(safety_worker) = &exec_engine.safety_worker {
let safety_worker_thread_params = self.set_thread_params(safety_worker);
exec_engine_builder = exec_engine_builder.enable_safety_worker(safety_worker_thread_params);
if let Some(queue_size) = exec_engine.safety_worker_task_queue_size {
exec_engine_builder = exec_engine_builder.safety_worker_task_queue_size(queue_size);
}
}

let (builder, _) = async_rt_builder.with_engine(exec_engine_builder);
Expand Down
Loading