Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 159 additions & 0 deletions core/src/worker/tuner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,24 @@ impl TunerHolderOptions {
}
None => {}
}
match self.nexus_slot_options {
Some(SlotSupplierOptions::FixedSize { slots }) => {
builder.nexus_slot_supplier(Arc::new(FixedSizeSlotSupplier::new(slots)));
}
Some(SlotSupplierOptions::ResourceBased(rso)) => {
builder.nexus_slot_supplier(
rb_tuner
.as_mut()
.unwrap()
.with_nexus_slots_options(rso)
.nexus_task_slot_supplier(),
);
}
Some(SlotSupplierOptions::Custom(ss)) => {
builder.nexus_slot_supplier(ss);
}
None => {}
}
Ok(builder.build())
}
}
Expand Down Expand Up @@ -144,6 +162,9 @@ impl TunerHolderOptionsBuilder {
) || matches!(
self.local_activity_slot_options,
Some(Some(SlotSupplierOptions::ResourceBased(_)))
) || matches!(
self.nexus_slot_options,
Some(Some(SlotSupplierOptions::ResourceBased(_)))
);
if any_is_resource_based && matches!(self.resource_based_options, None | Some(None)) {
return Err(
Expand Down Expand Up @@ -270,3 +291,141 @@ impl WorkerTuner for TunerHolder {
self.nexus_supplier.clone()
}
}

#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;
use temporal_sdk_core_api::worker::{
SlotMarkUsedContext, SlotReleaseContext, SlotReservationContext, SlotSupplierPermit,
};

struct TestSlotSupplier;
#[async_trait::async_trait]
impl SlotSupplier for TestSlotSupplier {
type SlotKind = NexusSlotKind;
async fn reserve_slot(&self, _: &dyn SlotReservationContext) -> SlotSupplierPermit {
SlotSupplierPermit::default()
}
fn try_reserve_slot(&self, _: &dyn SlotReservationContext) -> Option<SlotSupplierPermit> {
Some(SlotSupplierPermit::default())
}
fn mark_slot_used(&self, _: &dyn SlotMarkUsedContext<SlotKind = Self::SlotKind>) {}
fn release_slot(&self, _: &dyn SlotReleaseContext<SlotKind = Self::SlotKind>) {}
}

#[test]
fn tuner_holder_options_nexus_fixed_size() {
let options = TunerHolderOptions {
workflow_slot_options: None,
activity_slot_options: None,
local_activity_slot_options: None,
nexus_slot_options: Some(SlotSupplierOptions::FixedSize { slots: 50 }),
resource_based_options: None,
};

let tuner = options.build_tuner_holder().unwrap();
// The tuner is built successfully with fixed size nexus slots
let _ = tuner.nexus_task_slot_supplier();
}

#[test]
fn tuner_holder_options_nexus_resource_based() {
let resource_opts = ResourceBasedSlotsOptionsBuilder::default()
.target_mem_usage(0.8)
.target_cpu_usage(0.9)
.build()
.unwrap();

let options = TunerHolderOptions {
workflow_slot_options: None,
activity_slot_options: None,
local_activity_slot_options: None,
nexus_slot_options: Some(SlotSupplierOptions::ResourceBased(
ResourceSlotOptions::new(5, 100, Duration::from_millis(100)),
)),
resource_based_options: Some(resource_opts),
};

let tuner = options.build_tuner_holder().unwrap();
// The tuner is built successfully with resource-based nexus slots
let _ = tuner.nexus_task_slot_supplier();
}

#[test]
fn tuner_holder_options_nexus_custom() {
let custom_supplier: Arc<dyn SlotSupplier<SlotKind = NexusSlotKind> + Send + Sync> =
Arc::new(TestSlotSupplier);

let options = TunerHolderOptions {
workflow_slot_options: None,
activity_slot_options: None,
local_activity_slot_options: None,
nexus_slot_options: Some(SlotSupplierOptions::Custom(custom_supplier.clone())),
resource_based_options: None,
};

let tuner = options.build_tuner_holder().unwrap();
// The tuner is built successfully with custom nexus slots
let _ = tuner.nexus_task_slot_supplier();
}

#[test]
fn tuner_builder_with_nexus_slot_supplier() {
let mut builder = TunerBuilder::default();
let custom_supplier: Arc<dyn SlotSupplier<SlotKind = NexusSlotKind> + Send + Sync> =
Arc::new(FixedSizeSlotSupplier::new(25));

builder.nexus_slot_supplier(custom_supplier.clone());
let tuner = builder.build();

// The tuner is built successfully with the custom nexus slot supplier
let _ = tuner.nexus_task_slot_supplier();
}

#[test]
fn tuner_holder_options_builder_validates_resource_based_requirements() {
// Should fail when nexus uses ResourceBased but resource_based_options is not set
let result = TunerHolderOptionsBuilder::default()
.nexus_slot_options(SlotSupplierOptions::ResourceBased(
ResourceSlotOptions::new(5, 100, Duration::from_millis(100)),
))
.build();

assert!(result.is_err());
assert!(
result
.unwrap_err()
.to_string()
.contains("resource_based_options")
);
}

#[test]
fn tuner_holder_options_all_slot_types() {
let resource_opts = ResourceBasedSlotsOptionsBuilder::default()
.target_mem_usage(0.8)
.target_cpu_usage(0.9)
.build()
.unwrap();

let options = TunerHolderOptions {
workflow_slot_options: Some(SlotSupplierOptions::FixedSize { slots: 10 }),
activity_slot_options: Some(SlotSupplierOptions::FixedSize { slots: 20 }),
local_activity_slot_options: Some(SlotSupplierOptions::ResourceBased(
ResourceSlotOptions::new(2, 50, Duration::from_millis(100)),
)),
nexus_slot_options: Some(SlotSupplierOptions::ResourceBased(
ResourceSlotOptions::new(5, 100, Duration::from_millis(100)),
)),
resource_based_options: Some(resource_opts),
};

let tuner = options.build_tuner_holder().unwrap();
// All suppliers should be successfully configured
let _ = tuner.workflow_task_slot_supplier();
let _ = tuner.activity_task_slot_supplier();
let _ = tuner.local_activity_slot_supplier();
let _ = tuner.nexus_task_slot_supplier();
}
}
Loading