Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 93 additions & 0 deletions src/kyron/examples/safety_task.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
//
// Copyright (c) 2025 Contributors to the Eclipse Foundation
//
// See the NOTICE file(s) distributed with this work for additional
// information regarding copyright ownership.
//
// This program and the accompanying materials are made available under the
// terms of the Apache License Version 2.0 which is available at
// <https://www.apache.org/licenses/LICENSE-2.0>
//
// SPDX-License-Identifier: Apache-2.0
//

use kyron::prelude::*;
use kyron::safety;
use kyron::scheduler::task::task_context::TaskContext;
use kyron::spawn_on_dedicated;
use kyron_foundation::prelude::*;

async fn failing_safety_task() -> Result<(), String> {
info!(
"Worker-N: failing_safety_task. Worker ID: {:?}, Task ID: {:?}",
TaskContext::worker_id(),
TaskContext::task_id().unwrap()
);
Err("Intentional failure".to_string())
}

async fn passing_safety_task() -> Result<(), String> {
info!(
"Worker-N: passing_safety_task. Worker ID: {:?}, Task ID: {:?}",
TaskContext::worker_id(),
TaskContext::task_id().unwrap()
);
Ok(())
}

async fn passing_non_safety_task() -> Result<(), String> {
info!(
"Dedicated worker (dw1): passing_non_safety_task. Worker ID: {:?}, Task ID: {:?}",
TaskContext::worker_id(),
TaskContext::task_id().unwrap()
);
Ok(())
}

fn main() {
tracing_subscriber::fmt()
.with_target(false) // Optional: Remove module path
.with_max_level(Level::DEBUG)
.with_thread_ids(true)
.with_thread_names(true)
.init();

// Create runtime
let (builder, _engine_id) = kyron::runtime::RuntimeBuilder::new().with_engine(
ExecutionEngineBuilder::new()
.task_queue_size(256)
.enable_safety_worker(ThreadParameters::default())
.with_dedicated_worker("dw1".into(), ThreadParameters::default())
.workers(2),
);

let mut runtime = builder.build().unwrap();
// Put programs into runtime and run them
runtime.block_on(async move {
info!(
"Parent task. Worker ID: {:?}, Task Id: {:?}",
TaskContext::worker_id(),
TaskContext::task_id().unwrap()
);
let handle1 = safety::spawn(failing_safety_task());
let handle2 = safety::spawn(passing_safety_task());
let handle3 = spawn_on_dedicated(passing_non_safety_task(), "dw1".into());

info!("=============================== Spawned all tasks ===============================");

let _ = handle1.await;
info!("Since safety task fails, safety worker may execute parent task from this statement onwards.");

info!(
"Parent task. Worker ID: {:?}, Task Id: {:?}",
TaskContext::worker_id(),
TaskContext::task_id().unwrap()
);
let _ = handle2.await;
let _ = handle3.await;

info!("Program finished running.");
});

info!("Exit.");
}
33 changes: 19 additions & 14 deletions src/kyron/src/core/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@
// SPDX-License-Identifier: Apache-2.0
//

use ::core::cell::Cell;
use ::core::future::Future;
use ::core::pin::Pin;
use ::core::sync::atomic::AtomicU64;
use std::sync::Arc;

use crate::scheduler::workers::worker_types::WorkerId;

// Used to Box Futures
pub(crate) type BoxCustom<T> = Box<T>; // TODO: We shall replace Global allocator with our own. Since Allocator API is not stable, we shall provide own Box impl (only for internal purpose handling)

Expand All @@ -30,27 +32,30 @@ pub(crate) type BoxInternal<T> = Box<T>; // TODO: Use mempool allocator, for now
pub(crate) type ArcInternal<T> = Arc<T>; // TODO: Use mempool allocator, for now we keep default impl

///
/// TaskId encodes the worker on which it was created and it's number local to the worker.
/// TaskId encodes the worker on which it was created and it is global to the process.
/// This id cannot be used to infer task order creation or anything like that, it's only for identification purpose.
///
#[derive(Copy, Clone, Debug)]
pub(crate) struct TaskId(pub(crate) u32);

thread_local! {
static TASK_COUNTER: Cell<u32> = const {Cell::new(0)};
}
pub struct TaskId(pub(crate) u64);

#[allow(dead_code)]
impl TaskId {
pub(crate) fn new(worker_id: u8) -> Self {
let val = (TASK_COUNTER.get()) % 0x00FFFFFF; //TODO: Fix it later or change algo
TASK_COUNTER.set(val + 1);
pub(crate) fn new(worker_id: &WorkerId) -> Self {
let engine_id = worker_id.engine_id();
let worker_id = worker_id.worker_id();
static TASK_COUNTER: AtomicU64 = const { AtomicU64::new(0) };
// Just increment the global counter, it wraps around on overflow. Only lower 48 bits are used for the TaskId.
let val = TASK_COUNTER.fetch_add(1, ::core::sync::atomic::Ordering::Relaxed);
Self((val << 16) | ((engine_id as u64) << 8) | worker_id as u64)
}

Self((val << 8) | worker_id as u32)
/// Get the worker id that created this task
pub fn worker(&self) -> u8 {
(self.0 & 0xFF_u64) as u8
}

pub(crate) fn worker(&self) -> u8 {
(self.0 & 0xFF_u32) as u8
/// Get the engine id that created this task
pub fn engine(&self) -> u8 {
((self.0 >> 8) & 0xFF_u64) as u8
}
}

Expand Down
1 change: 1 addition & 0 deletions src/kyron/src/runtime/runtime_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ impl RuntimeBuilder {
///
pub fn with_engine(mut self, builder: ExecutionEngineBuilder) -> (Self, usize) {
let id = self.next_id;
let builder = builder.set_engine_id(id as u8);
self.engine_builders.push(builder);
self.next_id += 1;
(self, id)
Expand Down
120 changes: 64 additions & 56 deletions src/kyron/src/scheduler/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,26 +165,26 @@ impl Handler {
fn internal<T>(
&self,
boxed: FutureBox<T>,
c: impl Fn(FutureBox<T>, u8, Arc<AsyncScheduler>) -> Arc<AsyncTask<T, BoxCustom<dyn Future<Output = T> + Send>, Arc<AsyncScheduler>>>,
c: impl Fn(FutureBox<T>, &WorkerId, Arc<AsyncScheduler>) -> Arc<AsyncTask<T, BoxCustom<dyn Future<Output = T> + Send>, Arc<AsyncScheduler>>>,
) -> JoinHandle<T>
where
T: Send + 'static,
{
let task_ref;
let handle;

let worker_id = ctx_get_worker_id().worker_id();
let worker_id = ctx_get_worker_id();

match self.inner {
HandlerImpl::Async(ref async_inner) => {
let task = c(boxed, worker_id, async_inner.scheduler.clone());
let task = c(boxed, &worker_id, async_inner.scheduler.clone());
task_ref = TaskRef::new(task.clone());
handle = JoinHandle::new(task_ref.clone());

async_inner.scheduler.spawn_from_runtime(task_ref, &async_inner.prod_con);
}
HandlerImpl::Dedicated(ref dedicated_inner) => {
let task = c(boxed, worker_id, dedicated_inner.scheduler.clone());
let task = c(boxed, &worker_id, dedicated_inner.scheduler.clone());
task_ref = TaskRef::new(task.clone());
handle = JoinHandle::new(task_ref.clone());

Expand All @@ -198,26 +198,26 @@ impl Handler {
fn reusable_safety_internal<T>(
&self,
reusable: ReusableBoxFuture<T>,
c: impl Fn(Pin<ReusableBoxFuture<T>>, u8, Arc<AsyncScheduler>) -> Arc<AsyncTask<T, ReusableBoxFuture<T>, Arc<AsyncScheduler>>>,
c: impl Fn(Pin<ReusableBoxFuture<T>>, &WorkerId, Arc<AsyncScheduler>) -> Arc<AsyncTask<T, ReusableBoxFuture<T>, Arc<AsyncScheduler>>>,
) -> JoinHandle<T>
where
T: Send + 'static,
{
let task_ref;
let handle;

let worker_id = ctx_get_worker_id().worker_id();
let worker_id = ctx_get_worker_id();

match self.inner {
HandlerImpl::Async(ref async_inner) => {
let task = c(reusable.into_pin(), worker_id, async_inner.scheduler.clone());
let task = c(reusable.into_pin(), &worker_id, async_inner.scheduler.clone());
task_ref = TaskRef::new(task.clone());
handle = JoinHandle::new(task_ref.clone());

async_inner.scheduler.spawn_from_runtime(task_ref, &async_inner.prod_con);
}
HandlerImpl::Dedicated(ref dedicated_inner) => {
let task = c(reusable.into_pin(), worker_id, dedicated_inner.scheduler.clone());
let task = c(reusable.into_pin(), &worker_id, dedicated_inner.scheduler.clone());
task_ref = TaskRef::new(task.clone());
handle = JoinHandle::new(task_ref.clone());

Expand All @@ -232,7 +232,11 @@ impl Handler {
&self,
boxed: FutureBox<T>,
worker_id: UniqueWorkerId,
c: impl Fn(FutureBox<T>, u8, DedicatedSchedulerLocal) -> Arc<AsyncTask<T, BoxCustom<dyn Future<Output = T> + Send>, DedicatedSchedulerLocal>>,
c: impl Fn(
FutureBox<T>,
&WorkerId,
DedicatedSchedulerLocal,
) -> Arc<AsyncTask<T, BoxCustom<dyn Future<Output = T> + Send>, DedicatedSchedulerLocal>>,
) -> JoinHandle<T>
where
T: Send + 'static,
Expand All @@ -242,11 +246,7 @@ impl Handler {
HandlerImpl::Dedicated(ref dedicated_inner) => &dedicated_inner.dedicated_scheduler,
};

let task = c(
boxed,
ctx_get_worker_id().worker_id(),
DedicatedSchedulerLocal::new(worker_id, scheduler.clone()),
);
let task = c(boxed, &ctx_get_worker_id(), DedicatedSchedulerLocal::new(worker_id, scheduler.clone()));

let task_ref = TaskRef::new(task.clone());
let handle = JoinHandle::new(task_ref.clone());
Expand All @@ -261,7 +261,7 @@ impl Handler {
&self,
reusable: ReusableBoxFuture<T>,
worker_id: UniqueWorkerId,
c: impl Fn(Pin<ReusableBoxFuture<T>>, u8, DedicatedSchedulerLocal) -> Arc<AsyncTask<T, ReusableBoxFuture<T>, DedicatedSchedulerLocal>>,
c: impl Fn(Pin<ReusableBoxFuture<T>>, &WorkerId, DedicatedSchedulerLocal) -> Arc<AsyncTask<T, ReusableBoxFuture<T>, DedicatedSchedulerLocal>>,
) -> JoinHandle<T>
where
T: Send + 'static,
Expand All @@ -273,7 +273,7 @@ impl Handler {

let task = c(
reusable.into_pin(),
ctx_get_worker_id().worker_id(),
&ctx_get_worker_id(),
DedicatedSchedulerLocal::new(worker_id, scheduler.clone()),
);

Expand Down Expand Up @@ -303,9 +303,8 @@ impl Handler {
/// This is an entry point for public API that is filled by each worker once it's created
///
pub(crate) struct WorkerContext {
#[allow(dead_code)] // used in the tests
/// The ID of task that is currently run by worker
running_task_id: Cell<Option<TaskId>>,
/// Task that is currently run by worker
running_task: RefCell<Option<TaskRef>>,

/// WorkerID and EngineID
worker_id: Cell<WorkerId>,
Expand Down Expand Up @@ -395,7 +394,7 @@ impl ContextBuilder {

pub(crate) fn build(self) -> WorkerContext {
WorkerContext {
running_task_id: Cell::new(None),
running_task: RefCell::new(None),
worker_id: Cell::new(self.worker_id.expect("Worker type must be set in context builder!")),
handler: RefCell::new(Some(Rc::new(self.handle.expect("Handler type must be set in context builder!")))),
is_safety_enabled: self.is_with_safety,
Expand Down Expand Up @@ -482,43 +481,51 @@ pub(crate) fn ctx_get_drivers() -> Drivers {
.unwrap()
}

#[cfg(test)]
mod tests {
use super::*;
///
/// Sets currently running `task`
///
pub(super) fn ctx_set_running_task(task: TaskRef) {
let _ = CTX
.try_with(|ctx| {
ctx.borrow().as_ref().expect("Called before CTX init?").running_task.replace(Some(task));
})
.map_err(|e| {
panic!("Something is really bad here, error {}!", e);
});
}

///
/// Sets currently running `task id`
///
pub(crate) fn ctx_set_running_task_id(id: TaskId) {
let _ = CTX
.try_with(|ctx| {
ctx.borrow().as_ref().expect("Called before CTX init?").running_task_id.replace(Some(id));
})
.map_err(|e| {
panic!("Something is really bad here, error {}!", e);
});
}
///
/// Clears currently running `task`
///
pub(super) fn ctx_unset_running_task() {
let _ = CTX
.try_with(|ctx| {
ctx.borrow().as_ref().expect("Called before CTX init?").running_task.replace(None);
})
.map_err(|_| {});
}

///
/// Clears currently running `task id`
///
pub(crate) fn ctx_unset_running_task_id() {
let _ = CTX
.try_with(|ctx| {
ctx.borrow().as_ref().expect("Called before CTX init?").running_task_id.replace(None);
})
.map_err(|_| {});
}
///
/// Gets currently running `task id`
///
pub(crate) fn ctx_get_running_task_id() -> Option<TaskId> {
CTX.try_with(|ctx| {
ctx.borrow()
.as_ref()
.expect("Called before CTX init?")
.running_task
.borrow()
.as_ref()
.map(|task| task.id())
})
.unwrap_or_else(|e| {
panic!("Something is really bad here, error {}!", e);
})
}

///
/// Gets currently running `task id`
///
pub(crate) fn ctx_get_running_task_id() -> Option<TaskId> {
CTX.try_with(|ctx| ctx.borrow().as_ref().expect("Called before CTX init?").running_task_id.get())
.unwrap_or_else(|e| {
panic!("Something is really bad here, error {}!", e);
})
}
#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_context_no_init_panic_handler() {
Expand All @@ -540,12 +547,13 @@ mod tests {
#[test]
#[should_panic]
fn test_context_no_init_panic_set_task_id() {
ctx_set_running_task_id(TaskId::new(1));
let (_, task) = crate::testing::get_dummy_task_waker();
ctx_set_running_task(TaskRef::new(task));
}

#[test]
#[should_panic]
fn test_context_no_init_panic_unset_task_id() {
ctx_unset_running_task_id();
ctx_unset_running_task();
}
}
Loading
Loading