-
Notifications
You must be signed in to change notification settings - Fork 104
Expose resource based auto-tuner options #559
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
56ef361
1929bda
80e41ee
3e2a000
cf25933
bcd8262
cd18bb2
0c5cedf
fc4a86d
b20cacd
abe8c6f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change | ||||||
---|---|---|---|---|---|---|---|---|
@@ -1,3 +1,4 @@ | ||||||||
use anyhow::Context; | ||||||||
use prost::Message; | ||||||||
use pyo3::exceptions::{PyException, PyRuntimeError, PyValueError}; | ||||||||
use pyo3::prelude::*; | ||||||||
|
@@ -34,9 +35,7 @@ pub struct WorkerConfig { | |||||||
build_id: String, | ||||||||
identity_override: Option<String>, | ||||||||
max_cached_workflows: usize, | ||||||||
max_outstanding_workflow_tasks: usize, | ||||||||
max_outstanding_activities: usize, | ||||||||
max_outstanding_local_activities: usize, | ||||||||
tuner: TunerHolder, | ||||||||
max_concurrent_workflow_task_polls: usize, | ||||||||
nonsticky_to_sticky_poll_ratio: f32, | ||||||||
max_concurrent_activity_task_polls: usize, | ||||||||
|
@@ -52,6 +51,39 @@ pub struct WorkerConfig { | |||||||
nondeterminism_as_workflow_fail_for_types: HashSet<String>, | ||||||||
} | ||||||||
|
||||||||
#[derive(FromPyObject)] | ||||||||
pub struct TunerHolder { | ||||||||
workflow_slot_supplier: SlotSupplier, | ||||||||
activity_slot_supplier: SlotSupplier, | ||||||||
local_activity_slot_supplier: SlotSupplier, | ||||||||
} | ||||||||
|
||||||||
#[derive(FromPyObject)] | ||||||||
pub enum SlotSupplier { | ||||||||
FixedSize(FixedSizeSlotSupplier), | ||||||||
ResourceBased(ResourceBasedSlotSupplier), | ||||||||
} | ||||||||
|
||||||||
#[derive(FromPyObject)] | ||||||||
pub struct FixedSizeSlotSupplier { | ||||||||
num_slots: usize, | ||||||||
} | ||||||||
|
||||||||
#[derive(FromPyObject)] | ||||||||
pub struct ResourceBasedSlotSupplier { | ||||||||
minimum_slots: usize, | ||||||||
maximum_slots: usize, | ||||||||
// Need pyo3 0.21+ for this to be std Duration | ||||||||
ramp_throttle_ms: u64, | ||||||||
Comment on lines
+76
to
+77
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
We do this in plenty of other places, it's fine for the bridge (we usually use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure, it'd be nicer if it just worked though |
||||||||
tuner_config: ResourceBasedTunerConfig, | ||||||||
} | ||||||||
|
||||||||
#[derive(FromPyObject, Clone, Copy, PartialEq)] | ||||||||
pub struct ResourceBasedTunerConfig { | ||||||||
target_memory_usage: f64, | ||||||||
target_cpu_usage: f64, | ||||||||
} | ||||||||
|
||||||||
macro_rules! enter_sync { | ||||||||
($runtime:expr) => { | ||||||||
if let Some(subscriber) = $runtime.core.telemetry().trace_subscriber() { | ||||||||
|
@@ -73,7 +105,7 @@ pub fn new_worker( | |||||||
config, | ||||||||
client.retry_client.clone().into_inner(), | ||||||||
) | ||||||||
.map_err(|err| PyValueError::new_err(format!("Failed creating worker: {}", err)))?; | ||||||||
.context("Failed creating worker")?; | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just to understand, can you confirm with these There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, I'll double check what it looks like, but I doubt it's materially different |
||||||||
Ok(WorkerRef { | ||||||||
worker: Some(Arc::new(worker)), | ||||||||
runtime: runtime_ref.runtime.clone(), | ||||||||
|
@@ -107,9 +139,11 @@ impl WorkerRef { | |||||||
fn validate<'p>(&self, py: Python<'p>) -> PyResult<&'p PyAny> { | ||||||||
let worker = self.worker.as_ref().unwrap().clone(); | ||||||||
self.runtime.future_into_py(py, async move { | ||||||||
worker.validate().await.map_err(|err| { | ||||||||
PyRuntimeError::new_err(format!("Worker validation failed: {}", err)) | ||||||||
}) | ||||||||
worker | ||||||||
.validate() | ||||||||
.await | ||||||||
.context("Worker validation failed") | ||||||||
.map_err(Into::into) | ||||||||
}) | ||||||||
} | ||||||||
|
||||||||
|
@@ -151,10 +185,8 @@ impl WorkerRef { | |||||||
worker | ||||||||
.complete_workflow_activation(completion) | ||||||||
.await | ||||||||
.map_err(|err| { | ||||||||
// TODO(cretz): More error types | ||||||||
PyRuntimeError::new_err(format!("Completion failure: {}", err)) | ||||||||
}) | ||||||||
.context("Completion failure") | ||||||||
.map_err(Into::into) | ||||||||
}) | ||||||||
} | ||||||||
|
||||||||
|
@@ -166,10 +198,8 @@ impl WorkerRef { | |||||||
worker | ||||||||
.complete_activity_task(completion) | ||||||||
.await | ||||||||
.map_err(|err| { | ||||||||
// TODO(cretz): More error types | ||||||||
PyRuntimeError::new_err(format!("Completion failure: {}", err)) | ||||||||
}) | ||||||||
.context("Completion failure") | ||||||||
.map_err(Into::into) | ||||||||
}) | ||||||||
} | ||||||||
|
||||||||
|
@@ -226,16 +256,15 @@ impl TryFrom<WorkerConfig> for temporal_sdk_core::WorkerConfig { | |||||||
type Error = PyErr; | ||||||||
|
||||||||
fn try_from(conf: WorkerConfig) -> PyResult<Self> { | ||||||||
let converted_tuner: temporal_sdk_core::TunerHolder = conf.tuner.try_into()?; | ||||||||
temporal_sdk_core::WorkerConfigBuilder::default() | ||||||||
.namespace(conf.namespace) | ||||||||
.task_queue(conf.task_queue) | ||||||||
.worker_build_id(conf.build_id) | ||||||||
.client_identity_override(conf.identity_override) | ||||||||
.max_cached_workflows(conf.max_cached_workflows) | ||||||||
.max_outstanding_workflow_tasks(conf.max_outstanding_workflow_tasks) | ||||||||
.max_outstanding_activities(conf.max_outstanding_activities) | ||||||||
.max_outstanding_local_activities(conf.max_outstanding_local_activities) | ||||||||
.max_concurrent_wft_polls(conf.max_concurrent_workflow_task_polls) | ||||||||
.tuner(Arc::new(converted_tuner)) | ||||||||
.nonsticky_to_sticky_poll_ratio(conf.nonsticky_to_sticky_poll_ratio) | ||||||||
.max_concurrent_at_polls(conf.max_concurrent_activity_task_polls) | ||||||||
.no_remote_activities(conf.no_remote_activities) | ||||||||
|
@@ -276,6 +305,90 @@ impl TryFrom<WorkerConfig> for temporal_sdk_core::WorkerConfig { | |||||||
} | ||||||||
} | ||||||||
|
||||||||
impl TryFrom<TunerHolder> for temporal_sdk_core::TunerHolder { | ||||||||
type Error = PyErr; | ||||||||
|
||||||||
fn try_from(holder: TunerHolder) -> PyResult<Self> { | ||||||||
// Verify all resource-based options are the same if any are set | ||||||||
let maybe_wf_resource_opts = | ||||||||
if let SlotSupplier::ResourceBased(ref ss) = holder.workflow_slot_supplier { | ||||||||
Some(&ss.tuner_config) | ||||||||
} else { | ||||||||
None | ||||||||
}; | ||||||||
let maybe_act_resource_opts = | ||||||||
if let SlotSupplier::ResourceBased(ref ss) = holder.activity_slot_supplier { | ||||||||
Some(&ss.tuner_config) | ||||||||
} else { | ||||||||
None | ||||||||
}; | ||||||||
let maybe_local_act_resource_opts = | ||||||||
if let SlotSupplier::ResourceBased(ref ss) = holder.local_activity_slot_supplier { | ||||||||
Some(&ss.tuner_config) | ||||||||
} else { | ||||||||
None | ||||||||
}; | ||||||||
let all_resource_opts = [ | ||||||||
maybe_wf_resource_opts, | ||||||||
maybe_act_resource_opts, | ||||||||
maybe_local_act_resource_opts, | ||||||||
]; | ||||||||
let mut set_resource_opts = all_resource_opts.iter().flatten(); | ||||||||
let first = set_resource_opts.next(); | ||||||||
let all_are_same = if let Some(first) = first { | ||||||||
set_resource_opts.all(|elem| elem == first) | ||||||||
} else { | ||||||||
true | ||||||||
}; | ||||||||
if !all_are_same { | ||||||||
return Err(PyValueError::new_err( | ||||||||
"All resource-based slot suppliers must have the same ResourceBasedTunerOptions", | ||||||||
)); | ||||||||
} | ||||||||
Comment on lines
+343
to
+347
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this a temporary limitation? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No. It always needs to be this way, otherwise whatever slot type has lower targets could be completely starved out by the one with higher targets. Since you're targeting system resource use, it doesn't really make sense to have different targets within the process. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This makes sense, I just wanted to confirm before some of my other suggestions in this PR |
||||||||
|
||||||||
let mut options = temporal_sdk_core::TunerHolderOptionsBuilder::default(); | ||||||||
if let Some(first) = first { | ||||||||
options.resource_based_options( | ||||||||
temporal_sdk_core::ResourceBasedSlotsOptionsBuilder::default() | ||||||||
.target_mem_usage(first.target_memory_usage) | ||||||||
.target_cpu_usage(first.target_cpu_usage) | ||||||||
.build() | ||||||||
.expect("Building ResourceBasedSlotsOptions is infallible"), | ||||||||
); | ||||||||
}; | ||||||||
options | ||||||||
.workflow_slot_options(holder.workflow_slot_supplier.try_into()?) | ||||||||
.activity_slot_options(holder.activity_slot_supplier.try_into()?) | ||||||||
.local_activity_slot_options(holder.local_activity_slot_supplier.try_into()?); | ||||||||
Ok(options | ||||||||
.build() | ||||||||
.map_err(|e| PyValueError::new_err(format!("Invalid tuner holder options: {}", e)))? | ||||||||
.build_tuner_holder() | ||||||||
.context("Failed building tuner holder")?) | ||||||||
} | ||||||||
} | ||||||||
|
||||||||
impl TryFrom<SlotSupplier> for temporal_sdk_core::SlotSupplierOptions { | ||||||||
type Error = PyErr; | ||||||||
|
||||||||
fn try_from(supplier: SlotSupplier) -> PyResult<temporal_sdk_core::SlotSupplierOptions> { | ||||||||
Ok(match supplier { | ||||||||
SlotSupplier::FixedSize(fs) => temporal_sdk_core::SlotSupplierOptions::FixedSize { | ||||||||
slots: fs.num_slots, | ||||||||
}, | ||||||||
SlotSupplier::ResourceBased(ss) => { | ||||||||
temporal_sdk_core::SlotSupplierOptions::ResourceBased( | ||||||||
temporal_sdk_core::ResourceSlotOptions::new( | ||||||||
ss.minimum_slots, | ||||||||
ss.maximum_slots, | ||||||||
Duration::from_millis(ss.ramp_throttle_ms), | ||||||||
), | ||||||||
) | ||||||||
} | ||||||||
}) | ||||||||
} | ||||||||
} | ||||||||
|
||||||||
/// For feeding histories into core during replay | ||||||||
#[pyclass] | ||||||||
pub struct HistoryPusher { | ||||||||
|
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What was the impetus for this upgrade? Is it just for the enum? Not that I disagree, just want to understand. Also, why not upgrade to latest version? (I am assuming latest involves more refactoring?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wanted to get
Duration
conversion but that's only in latest, I kept what I could. There is nopyo3-asyncio
that supports latestThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why update at all if you couldn't update enough to get the feature you wanted? (not that I disagree w/ an update for update's sake, just checking if there was something else in 0.20 you're leveraging)