Skip to content

Expose resource based auto-tuner options #559

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
332 changes: 209 additions & 123 deletions temporalio/bridge/Cargo.lock

Large diffs are not rendered by default.

7 changes: 4 additions & 3 deletions temporalio/bridge/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@ name = "temporal_sdk_bridge"
crate-type = ["cdylib"]

[dependencies]
anyhow = "1.0"
futures = "0.3"
log = "0.4"
once_cell = "1.16"
prost = "0.12"
prost-types = "0.12"
pyo3 = { version = "0.19", features = ["extension-module", "abi3-py38"] }
pyo3-asyncio = { version = "0.19", features = ["tokio-runtime"] }
pythonize = "0.19"
pyo3 = { version = "0.20", features = ["extension-module", "abi3-py38", "anyhow"] }
pyo3-asyncio = { version = "0.20", features = ["tokio-runtime"] }
pythonize = "0.20"
Comment on lines +17 to +19
Copy link
Member

@cretz cretz Jun 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What was the impetus for this upgrade? Is it just for the enum? Not that I disagree, just want to understand. Also, why not upgrade to latest version? (I am assuming latest involves more refactoring?)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to get Duration conversion but that's only in latest, I kept what I could. There is no pyo3-asyncio that supports latest

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why update at all if you couldn't update enough to get the feature you wanted? (not that I disagree w/ an update for update's sake, just checking if there was something else in 0.20 you're leveraging)

temporal-client = { version = "0.1.0", path = "./sdk-core/client" }
temporal-sdk-core = { version = "0.1.0", path = "./sdk-core/core", features = ["ephemeral-server"] }
temporal-sdk-core-api = { version = "0.1.0", path = "./sdk-core/core-api" }
Expand Down
1 change: 0 additions & 1 deletion temporalio/bridge/src/runtime.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use futures::channel::mpsc::Receiver;
use pyo3::exceptions::{PyRuntimeError, PyValueError};
use pyo3::prelude::*;
use pyo3::AsPyPointer;
use pythonize::pythonize;
use std::collections::HashMap;
use std::future::Future;
Expand Down
149 changes: 131 additions & 18 deletions temporalio/bridge/src/worker.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use anyhow::Context;
use prost::Message;
use pyo3::exceptions::{PyException, PyRuntimeError, PyValueError};
use pyo3::prelude::*;
Expand Down Expand Up @@ -34,9 +35,7 @@ pub struct WorkerConfig {
build_id: String,
identity_override: Option<String>,
max_cached_workflows: usize,
max_outstanding_workflow_tasks: usize,
max_outstanding_activities: usize,
max_outstanding_local_activities: usize,
tuner: TunerHolder,
max_concurrent_workflow_task_polls: usize,
nonsticky_to_sticky_poll_ratio: f32,
max_concurrent_activity_task_polls: usize,
Expand All @@ -52,6 +51,39 @@ pub struct WorkerConfig {
nondeterminism_as_workflow_fail_for_types: HashSet<String>,
}

#[derive(FromPyObject)]
pub struct TunerHolder {
workflow_slot_supplier: SlotSupplier,
activity_slot_supplier: SlotSupplier,
local_activity_slot_supplier: SlotSupplier,
}

#[derive(FromPyObject)]
pub enum SlotSupplier {
FixedSize(FixedSizeSlotSupplier),
ResourceBased(ResourceBasedSlotSupplier),
}

#[derive(FromPyObject)]
pub struct FixedSizeSlotSupplier {
num_slots: usize,
}

#[derive(FromPyObject)]
pub struct ResourceBasedSlotSupplier {
minimum_slots: usize,
maximum_slots: usize,
// Need pyo3 0.21+ for this to be std Duration
ramp_throttle_ms: u64,
Comment on lines +76 to +77
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Need pyo3 0.21+ for this to be std Duration
ramp_throttle_ms: u64,
ramp_throttle_millis: u64,

We do this in plenty of other places, it's fine for the bridge (we usually use _millis suffix)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, it'd be nicer if it just worked though

tuner_config: ResourceBasedTunerConfig,
}

#[derive(FromPyObject, Clone, Copy, PartialEq)]
pub struct ResourceBasedTunerConfig {
target_memory_usage: f64,
target_cpu_usage: f64,
}

macro_rules! enter_sync {
($runtime:expr) => {
if let Some(subscriber) = $runtime.core.telemetry().trace_subscriber() {
Expand All @@ -73,7 +105,7 @@ pub fn new_worker(
config,
client.retry_client.clone().into_inner(),
)
.map_err(|err| PyValueError::new_err(format!("Failed creating worker: {}", err)))?;
.context("Failed creating worker")?;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to understand, can you confirm with these context changes what the error string looks like to Python users now? I just want to see exactly what the concatenation looks like since it's not explicit anymore.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I'll double check what it looks like, but I doubt it's materially different

Ok(WorkerRef {
worker: Some(Arc::new(worker)),
runtime: runtime_ref.runtime.clone(),
Expand Down Expand Up @@ -107,9 +139,11 @@ impl WorkerRef {
fn validate<'p>(&self, py: Python<'p>) -> PyResult<&'p PyAny> {
let worker = self.worker.as_ref().unwrap().clone();
self.runtime.future_into_py(py, async move {
worker.validate().await.map_err(|err| {
PyRuntimeError::new_err(format!("Worker validation failed: {}", err))
})
worker
.validate()
.await
.context("Worker validation failed")
.map_err(Into::into)
})
}

Expand Down Expand Up @@ -151,10 +185,8 @@ impl WorkerRef {
worker
.complete_workflow_activation(completion)
.await
.map_err(|err| {
// TODO(cretz): More error types
PyRuntimeError::new_err(format!("Completion failure: {}", err))
})
.context("Completion failure")
.map_err(Into::into)
})
}

Expand All @@ -166,10 +198,8 @@ impl WorkerRef {
worker
.complete_activity_task(completion)
.await
.map_err(|err| {
// TODO(cretz): More error types
PyRuntimeError::new_err(format!("Completion failure: {}", err))
})
.context("Completion failure")
.map_err(Into::into)
})
}

Expand Down Expand Up @@ -226,16 +256,15 @@ impl TryFrom<WorkerConfig> for temporal_sdk_core::WorkerConfig {
type Error = PyErr;

fn try_from(conf: WorkerConfig) -> PyResult<Self> {
let converted_tuner: temporal_sdk_core::TunerHolder = conf.tuner.try_into()?;
temporal_sdk_core::WorkerConfigBuilder::default()
.namespace(conf.namespace)
.task_queue(conf.task_queue)
.worker_build_id(conf.build_id)
.client_identity_override(conf.identity_override)
.max_cached_workflows(conf.max_cached_workflows)
.max_outstanding_workflow_tasks(conf.max_outstanding_workflow_tasks)
.max_outstanding_activities(conf.max_outstanding_activities)
.max_outstanding_local_activities(conf.max_outstanding_local_activities)
.max_concurrent_wft_polls(conf.max_concurrent_workflow_task_polls)
.tuner(Arc::new(converted_tuner))
.nonsticky_to_sticky_poll_ratio(conf.nonsticky_to_sticky_poll_ratio)
.max_concurrent_at_polls(conf.max_concurrent_activity_task_polls)
.no_remote_activities(conf.no_remote_activities)
Expand Down Expand Up @@ -276,6 +305,90 @@ impl TryFrom<WorkerConfig> for temporal_sdk_core::WorkerConfig {
}
}

impl TryFrom<TunerHolder> for temporal_sdk_core::TunerHolder {
type Error = PyErr;

fn try_from(holder: TunerHolder) -> PyResult<Self> {
// Verify all resource-based options are the same if any are set
let maybe_wf_resource_opts =
if let SlotSupplier::ResourceBased(ref ss) = holder.workflow_slot_supplier {
Some(&ss.tuner_config)
} else {
None
};
let maybe_act_resource_opts =
if let SlotSupplier::ResourceBased(ref ss) = holder.activity_slot_supplier {
Some(&ss.tuner_config)
} else {
None
};
let maybe_local_act_resource_opts =
if let SlotSupplier::ResourceBased(ref ss) = holder.local_activity_slot_supplier {
Some(&ss.tuner_config)
} else {
None
};
let all_resource_opts = [
maybe_wf_resource_opts,
maybe_act_resource_opts,
maybe_local_act_resource_opts,
];
let mut set_resource_opts = all_resource_opts.iter().flatten();
let first = set_resource_opts.next();
let all_are_same = if let Some(first) = first {
set_resource_opts.all(|elem| elem == first)
} else {
true
};
if !all_are_same {
return Err(PyValueError::new_err(
"All resource-based slot suppliers must have the same ResourceBasedTunerOptions",
));
}
Comment on lines +343 to +347
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a temporary limitation?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. It always needs to be this way, otherwise whatever slot type has lower targets could be completely starved out by the one with higher targets. Since you're targeting system resource use, it doesn't really make sense to have different targets within the process.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes sense, I just wanted to confirm before some of my other suggestions in this PR


let mut options = temporal_sdk_core::TunerHolderOptionsBuilder::default();
if let Some(first) = first {
options.resource_based_options(
temporal_sdk_core::ResourceBasedSlotsOptionsBuilder::default()
.target_mem_usage(first.target_memory_usage)
.target_cpu_usage(first.target_cpu_usage)
.build()
.expect("Building ResourceBasedSlotsOptions is infallible"),
);
};
options
.workflow_slot_options(holder.workflow_slot_supplier.try_into()?)
.activity_slot_options(holder.activity_slot_supplier.try_into()?)
.local_activity_slot_options(holder.local_activity_slot_supplier.try_into()?);
Ok(options
.build()
.map_err(|e| PyValueError::new_err(format!("Invalid tuner holder options: {}", e)))?
.build_tuner_holder()
.context("Failed building tuner holder")?)
}
}

impl TryFrom<SlotSupplier> for temporal_sdk_core::SlotSupplierOptions {
type Error = PyErr;

fn try_from(supplier: SlotSupplier) -> PyResult<temporal_sdk_core::SlotSupplierOptions> {
Ok(match supplier {
SlotSupplier::FixedSize(fs) => temporal_sdk_core::SlotSupplierOptions::FixedSize {
slots: fs.num_slots,
},
SlotSupplier::ResourceBased(ss) => {
temporal_sdk_core::SlotSupplierOptions::ResourceBased(
temporal_sdk_core::ResourceSlotOptions::new(
ss.minimum_slots,
ss.maximum_slots,
Duration::from_millis(ss.ramp_throttle_ms),
),
)
}
})
}
}

/// For feeding histories into core during replay
#[pyclass]
pub struct HistoryPusher {
Expand Down
43 changes: 40 additions & 3 deletions temporalio/bridge/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from __future__ import annotations

from dataclasses import dataclass
from datetime import timedelta
from typing import (
TYPE_CHECKING,
Awaitable,
Expand All @@ -15,6 +16,7 @@
Sequence,
Set,
Tuple,
Union,
)

import google.protobuf.internal.containers
Expand Down Expand Up @@ -43,9 +45,7 @@ class WorkerConfig:
build_id: str
identity_override: Optional[str]
max_cached_workflows: int
max_outstanding_workflow_tasks: int
max_outstanding_activities: int
max_outstanding_local_activities: int
tuner: TunerHolder
max_concurrent_workflow_task_polls: int
nonsticky_to_sticky_poll_ratio: float
max_concurrent_activity_task_polls: int
Expand All @@ -61,6 +61,43 @@ class WorkerConfig:
nondeterminism_as_workflow_fail_for_types: Set[str]


@dataclass
class ResourceBasedTunerConfig:
"""Python representation of the Rust struct for configuring a resource-based tuner."""

target_memory_usage: float
target_cpu_usage: float


@dataclass
class ResourceBasedSlotSupplier:
"""Python representation of the Rust struct for a resource-based slot supplier."""

minimum_slots: int
maximum_slots: int
ramp_throttle_ms: int
tuner_config: ResourceBasedTunerConfig


@dataclass(frozen=True)
class FixedSizeSlotSupplier:
"""Python representation of the Rust struct for a fixed-size slot supplier."""

num_slots: int


SlotSupplier: TypeAlias = Union[FixedSizeSlotSupplier, ResourceBasedSlotSupplier]


@dataclass
class TunerHolder:
"""Python representation of the Rust struct for a tuner holder."""

workflow_slot_supplier: SlotSupplier
activity_slot_supplier: SlotSupplier
local_activity_slot_supplier: SlotSupplier


class Worker:
"""SDK Core worker."""

Expand Down
13 changes: 13 additions & 0 deletions temporalio/worker/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@
WorkflowReplayResult,
WorkflowReplayResults,
)
from ._tuning import (
FixedSizeSlotSupplier,
ResourceBasedSlotConfig,
ResourceBasedSlotSupplier,
ResourceBasedTunerConfig,
WorkerTuner,
)
from ._worker import Worker, WorkerConfig
from ._workflow_instance import (
UnsandboxedWorkflowRunner,
Expand Down Expand Up @@ -69,4 +76,10 @@
"WorkflowInstance",
"WorkflowInstanceDetails",
"UnsandboxedWorkflowRunner",
# Tuning types
"WorkerTuner",
"FixedSizeSlotSupplier",
"ResourceBasedSlotSupplier",
"ResourceBasedTunerConfig",
"ResourceBasedSlotConfig",
]
14 changes: 11 additions & 3 deletions temporalio/worker/_replayer.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,9 +223,17 @@ def on_eviction_hook(
nondeterminism_as_workflow_fail_for_types=workflow_worker.nondeterminism_as_workflow_fail_for_types(),
# All values below are ignored but required by Core
max_cached_workflows=2,
max_outstanding_workflow_tasks=2,
max_outstanding_activities=1,
max_outstanding_local_activities=1,
tuner=temporalio.bridge.worker.TunerHolder(
workflow_slot_supplier=temporalio.bridge.worker.FixedSizeSlotSupplier(
2
),
activity_slot_supplier=temporalio.bridge.worker.FixedSizeSlotSupplier(
1
),
local_activity_slot_supplier=temporalio.bridge.worker.FixedSizeSlotSupplier(
1
),
),
max_concurrent_workflow_task_polls=1,
nonsticky_to_sticky_poll_ratio=1,
max_concurrent_activity_task_polls=1,
Expand Down
Loading
Loading