Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions temporalio/bridge/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ pub struct TunerHolder {
workflow_slot_supplier: SlotSupplier,
activity_slot_supplier: SlotSupplier,
local_activity_slot_supplier: SlotSupplier,
nexus_slot_supplier: SlotSupplier,
}

#[derive(FromPyObject)]
Expand Down Expand Up @@ -745,10 +746,17 @@ fn convert_tuner_holder(
} else {
None
};
let maybe_nexus_resource_opts =
if let SlotSupplier::ResourceBased(ref ss) = holder.nexus_slot_supplier {
Some(&ss.tuner_config)
} else {
None
};
let all_resource_opts = [
maybe_wf_resource_opts,
maybe_act_resource_opts,
maybe_local_act_resource_opts,
maybe_nexus_resource_opts,
];
let mut set_resource_opts = all_resource_opts.iter().flatten();
let first = set_resource_opts.next();
Expand Down Expand Up @@ -784,6 +792,10 @@ fn convert_tuner_holder(
)?)
.local_activity_slot_options(convert_slot_supplier(
holder.local_activity_slot_supplier,
task_locals.clone(),
)?)
.nexus_slot_options(convert_slot_supplier(
holder.nexus_slot_supplier,
task_locals,
)?);
Ok(options
Expand Down
1 change: 1 addition & 0 deletions temporalio/bridge/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ class TunerHolder:
workflow_slot_supplier: SlotSupplier
activity_slot_supplier: SlotSupplier
local_activity_slot_supplier: SlotSupplier
nexus_slot_supplier: SlotSupplier


class Worker:
Expand Down
2 changes: 2 additions & 0 deletions temporalio/worker/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
CustomSlotSupplier,
FixedSizeSlotSupplier,
LocalActivitySlotInfo,
NexusSlotInfo,
ResourceBasedSlotConfig,
ResourceBasedSlotSupplier,
ResourceBasedTunerConfig,
Expand Down Expand Up @@ -117,4 +118,5 @@
"SlotReleaseContext",
"SlotReserveContext",
"WorkflowSlotInfo",
"NexusSlotInfo",
]
3 changes: 3 additions & 0 deletions temporalio/worker/_replayer.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,9 @@ def on_eviction_hook(
local_activity_slot_supplier=temporalio.bridge.worker.FixedSizeSlotSupplier(
1
),
nexus_slot_supplier=temporalio.bridge.worker.FixedSizeSlotSupplier(
1
),
),
nonsticky_to_sticky_poll_ratio=1,
no_remote_activities=True,
Expand Down
81 changes: 65 additions & 16 deletions temporalio/worker/_tuning.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

import asyncio
import logging
from abc import ABC, abstractmethod
Expand All @@ -10,7 +12,7 @@
import temporalio.bridge.worker
from temporalio.common import WorkerDeploymentVersion

_DEFAULT_RESOURCE_ACTIVITY_MAX = 500
_DEFAULT_RESOURCE_SLOTS_MAX = 500

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -150,7 +152,22 @@ class LocalActivitySlotInfo(Protocol):
activity_type: str


SlotInfo: TypeAlias = Union[WorkflowSlotInfo, ActivitySlotInfo, LocalActivitySlotInfo]
# WARNING: This must match Rust worker::NexusSlotInfo
@runtime_checkable
class NexusSlotInfo(Protocol):
"""Info about a nexus task slot usage.

.. warning::
Custom slot suppliers are currently experimental.
"""

service: str
operation: str


SlotInfo: TypeAlias = Union[
WorkflowSlotInfo, ActivitySlotInfo, LocalActivitySlotInfo, NexusSlotInfo
]


# WARNING: This must match Rust worker::SlotMarkUsedCtx
Expand Down Expand Up @@ -303,13 +320,14 @@ def mark_slot_used(self, ctx: SlotMarkUsedContext) -> None:


def _to_bridge_slot_supplier(
slot_supplier: SlotSupplier, kind: Literal["workflow", "activity", "local_activity"]
slot_supplier: SlotSupplier,
kind: Literal["workflow", "activity", "local_activity", "nexus"],
) -> temporalio.bridge.worker.SlotSupplier:
if isinstance(slot_supplier, FixedSizeSlotSupplier):
return temporalio.bridge.worker.FixedSizeSlotSupplier(slot_supplier.num_slots)
elif isinstance(slot_supplier, ResourceBasedSlotSupplier):
min_slots = 5 if kind == "workflow" else 1
max_slots = _DEFAULT_RESOURCE_ACTIVITY_MAX
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a bug fix?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, we're now using the constant for both activities and nexus operation so I renamed it.

max_slots = _DEFAULT_RESOURCE_SLOTS_MAX
ramp_throttle = (
timedelta(seconds=0) if kind == "workflow" else timedelta(milliseconds=50)
)
Expand Down Expand Up @@ -347,7 +365,8 @@ def create_resource_based(
workflow_config: Optional[ResourceBasedSlotConfig] = None,
activity_config: Optional[ResourceBasedSlotConfig] = None,
local_activity_config: Optional[ResourceBasedSlotConfig] = None,
) -> "WorkerTuner":
nexus_config: Optional[ResourceBasedSlotConfig] = None,
) -> WorkerTuner:
"""Create a resource-based tuner with the provided options."""
resource_cfg = ResourceBasedTunerConfig(target_memory_usage, target_cpu_usage)
wf = ResourceBasedSlotSupplier(
Expand All @@ -359,26 +378,35 @@ def create_resource_based(
local_act = ResourceBasedSlotSupplier(
local_activity_config or ResourceBasedSlotConfig(), resource_cfg
)
nexus = ResourceBasedSlotSupplier(
nexus_config or ResourceBasedSlotConfig(), resource_cfg
)
return _CompositeTuner(
wf,
act,
local_act,
nexus,
)

@staticmethod
def create_fixed(
*,
workflow_slots: Optional[int],
activity_slots: Optional[int],
local_activity_slots: Optional[int],
) -> "WorkerTuner":
"""Create a fixed-size tuner with the provided number of slots. Any unspecified slots will default to 100."""
workflow_slots: Optional[int] = None,
activity_slots: Optional[int] = None,
local_activity_slots: Optional[int] = None,
nexus_slots: Optional[int] = None,
) -> WorkerTuner:
"""Create a fixed-size tuner with the provided number of slots.

Any unspecified slot numbers will default to 100.
"""
return _CompositeTuner(
FixedSizeSlotSupplier(workflow_slots if workflow_slots else 100),
FixedSizeSlotSupplier(activity_slots if activity_slots else 100),
FixedSizeSlotSupplier(
local_activity_slots if local_activity_slots else 100
),
FixedSizeSlotSupplier(nexus_slots if nexus_slots else 100),
)

@staticmethod
Expand All @@ -387,12 +415,14 @@ def create_composite(
workflow_supplier: SlotSupplier,
activity_supplier: SlotSupplier,
local_activity_supplier: SlotSupplier,
) -> "WorkerTuner":
nexus_supplier: SlotSupplier,
) -> WorkerTuner:
"""Create a tuner composed of the provided slot suppliers."""
return _CompositeTuner(
workflow_supplier,
activity_supplier,
local_activity_supplier,
nexus_supplier,
)

@abstractmethod
Expand All @@ -407,6 +437,10 @@ def _get_activity_task_slot_supplier(self) -> SlotSupplier:
def _get_local_activity_task_slot_supplier(self) -> SlotSupplier:
raise NotImplementedError

@abstractmethod
def _get_nexus_slot_supplier(self) -> SlotSupplier:
raise NotImplementedError

def _to_bridge_tuner(self) -> temporalio.bridge.worker.TunerHolder:
return temporalio.bridge.worker.TunerHolder(
_to_bridge_slot_supplier(
Expand All @@ -418,14 +452,25 @@ def _to_bridge_tuner(self) -> temporalio.bridge.worker.TunerHolder:
_to_bridge_slot_supplier(
self._get_local_activity_task_slot_supplier(), "local_activity"
),
_to_bridge_slot_supplier(self._get_nexus_slot_supplier(), "nexus"),
)

def _get_activities_max(self) -> Optional[int]:
ss = self._get_activity_task_slot_supplier()
if isinstance(ss, FixedSizeSlotSupplier):
return ss.num_slots
elif isinstance(ss, ResourceBasedSlotSupplier):
return ss.slot_config.maximum_slots or _DEFAULT_RESOURCE_ACTIVITY_MAX
return WorkerTuner._get_slot_supplier_max(
self._get_activity_task_slot_supplier()
)

def _get_nexus_tasks_max(self) -> Optional[int]:
return WorkerTuner._get_slot_supplier_max(self._get_nexus_slot_supplier())

@staticmethod
def _get_slot_supplier_max(slot_supplier: SlotSupplier) -> Optional[int]:
if isinstance(slot_supplier, FixedSizeSlotSupplier):
return slot_supplier.num_slots
elif isinstance(slot_supplier, ResourceBasedSlotSupplier):
return (
slot_supplier.slot_config.maximum_slots or _DEFAULT_RESOURCE_SLOTS_MAX
)
return None


Expand All @@ -436,6 +481,7 @@ class _CompositeTuner(WorkerTuner):
workflow_slot_supplier: SlotSupplier
activity_slot_supplier: SlotSupplier
local_activity_slot_supplier: SlotSupplier
nexus_slot_supplier: SlotSupplier

def _get_workflow_task_slot_supplier(self) -> SlotSupplier:
return self.workflow_slot_supplier
Expand All @@ -445,3 +491,6 @@ def _get_activity_task_slot_supplier(self) -> SlotSupplier:

def _get_local_activity_task_slot_supplier(self) -> SlotSupplier:
return self.local_activity_slot_supplier

def _get_nexus_slot_supplier(self) -> SlotSupplier:
return self.nexus_slot_supplier
Loading
Loading