Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 13 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,8 @@ The following table maps each Scaler command to its corresponding section name i
| `scaler_object_storage_server` | `[object_storage_server]` |
| `scaler_ui` | `[webui]` |
| `scaler_top` | `[top]` |
| `scaler_worker_adapter_native` | `[native_worker_adapter]` |
| `scaler_worker_adapter_symphony` | `[symphony_worker_adapter]` |
| `scaler_worker_manager_native` | `[worker_manager_native]` |
| `scaler_worker_manager_symphony` | `[worker_manager_symphony]` |

### Practical Scenarios & Examples

Expand Down Expand Up @@ -376,7 +376,7 @@ might be added in the future.
A Scaler scheduler can interface with IBM Spectrum Symphony to provide distributed computing across Symphony clusters.

```bash
$ scaler_worker_adapter_symphony tcp://127.0.0.1:2345 --service-name ScalerService --base-concurrency 4 --host 127.0.0.1 --port 8080
$ scaler_worker_manager_symphony tcp://127.0.0.1:2345 --service-name ScalerService --base-concurrency 4 --host 127.0.0.1 --port 8080
```

This will start a Scaler worker that connects to the Scaler scheduler at `tcp://127.0.0.1:2345` and uses the Symphony
Expand Down Expand Up @@ -461,31 +461,31 @@ where `deepest_nesting_level` is the deepest nesting level a task has in your wo
workload that has
a base task that calls a nested task that calls another nested task, then the deepest nesting level is 2.

## Worker Adapter usage
## Worker Manager usage

> **Note**: This feature is experimental and may change in future releases.

Scaler provides a Worker Adapter webhook interface to integrate with other job schedulers or resource managers. The
Worker Adapter allows external systems to request the creation and termination of Scaler workers dynamically.
Scaler provides a Worker Manager webhook interface to integrate with other job schedulers or resource managers. The
Worker Manager allows external systems to request the creation and termination of Scaler workers dynamically.

Please check the OpenGRIS standard for more details on the Worker Adapter
Please check the OpenGRIS standard for more details on the Worker Manager
specification [here](https://github.com/finos/opengris).

### Starting the Native Worker Adapter
### Starting the Native Worker Manager

Starting a Native Worker Adapter server at `http://127.0.0.1:8080`:
Starting a Native Worker Manager server at `http://127.0.0.1:8080`:

```bash
$ scaler_worker_adapter_native tcp://127.0.0.1:2345 --host 127.0.0.1 --port 8080
$ scaler_worker_manager_native tcp://127.0.0.1:2345 --host 127.0.0.1 --port 8080
```

Pass the `--adapter-webhook-url` argument to the Scaler scheduler to connect to the Worker Adapter:
Pass the `--manager-webhook-url` argument to the Scaler scheduler to connect to the Worker Manager:

```bash
$ scaler_scheduler tcp://127.0.0.1:2345 --adapter-webhook-url http://127.0.0.1:8080
$ scaler_scheduler tcp://127.0.0.1:2345 --manager-webhook-url http://127.0.0.1:8080
````

To check that the Worker Adapter is working, you can bring up `scaler_top` to see workers spawning and terminating as
To check that the Worker Manager is working, you can bring up `scaler_top` to see workers spawning and terminating as
there is task load changes.

## Performance
Expand Down
8 changes: 4 additions & 4 deletions docs/source/tutorials/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -179,10 +179,10 @@ The following table maps each Scaler command to its corresponding section name i
- ``[webui]``
* - ``scaler_top``
- ``[top]``
* - ``scaler_worker_adapter_native``
- ``[native_worker_adapter]``
* - ``scaler_worker_adapter_symphony``
- ``[symphony_worker_adapter]``
* - ``scaler_worker_manager_native``
- ``[worker_manager_native]``
* - ``scaler_worker_manager_symphony``
- ``[worker_manager_symphony]``


Practical Scenarios & Examples
Expand Down
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ scaler_cluster = "scaler.entry_points.cluster:main"
scaler_top = "scaler.entry_points.top:main"
scaler_ui = "scaler.entry_points.webui:main"
scaler_object_storage_server = "scaler.entry_points.object_storage_server:main"
scaler_worker_adapter_native = "scaler.entry_points.worker_adapter_native:main"
scaler_worker_adapter_symphony = "scaler.entry_points.worker_adapter_symphony:main"
scaler_worker_manager_native = "scaler.entry_points.worker_manager_native:main"
scaler_worker_manager_symphony = "scaler.entry_points.worker_manager_symphony:main"

[tool.scikit-build]
cmake.source-dir = "."
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from scaler.entry_points.worker_adapter_native import main
from scaler.entry_points.worker_manager_native import main
from scaler.utility.debug import pdb_wrapped

if __name__ == "__main__":
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from scaler.entry_points.worker_adapter_symphony import main
from scaler.entry_points.worker_manager_symphony import main
from scaler.utility.debug import pdb_wrapped

if __name__ == "__main__":
Expand Down
2 changes: 1 addition & 1 deletion scaler/cluster/combo.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ def __init__(
io_threads=scheduler_io_threads,
max_number_of_tasks_waiting=max_number_of_tasks_waiting,
client_timeout_seconds=client_timeout_seconds,
adapter_webhook_url=None,
manager_webhook_url=None,
worker_timeout_seconds=worker_timeout_seconds,
object_retention_seconds=object_retention_seconds,
load_balance_seconds=load_balance_seconds,
Expand Down
4 changes: 2 additions & 2 deletions scaler/cluster/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def __init__(
address: ZMQConfig,
storage_address: Optional[ObjectStorageConfig],
monitor_address: Optional[ZMQConfig],
adapter_webhook_url: Optional[str],
manager_webhook_url: Optional[str],
io_threads: int,
max_number_of_tasks_waiting: int,
client_timeout_seconds: int,
Expand All @@ -40,7 +40,7 @@ def __init__(
scheduler_address=address,
object_storage_address=storage_address,
monitor_address=monitor_address,
adapter_webhook_url=adapter_webhook_url,
manager_webhook_url=manager_webhook_url,
io_threads=io_threads,
max_number_of_tasks_waiting=max_number_of_tasks_waiting,
client_timeout_seconds=client_timeout_seconds,
Expand Down
8 changes: 4 additions & 4 deletions scaler/config/section/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class SchedulerConfig:
scheduler_address: ZMQConfig = dataclasses.field()
object_storage_address: Optional[ObjectStorageConfig] = None
monitor_address: Optional[ZMQConfig] = None
adapter_webhook_url: Optional[str] = None
manager_webhook_url: Optional[str] = None
protected: bool = True
allocate_policy: AllocatePolicy = AllocatePolicy.even
event_loop: str = "builtin"
Expand Down Expand Up @@ -44,10 +44,10 @@ def __post_init__(self):
raise ValueError("All timeout/retention/balance second values must be positive.")
if self.load_balance_trigger_times <= 0:
raise ValueError("load_balance_trigger_times must be a positive integer.")
if self.adapter_webhook_url:
parsed_url = urlparse(self.adapter_webhook_url)
if self.manager_webhook_url:
parsed_url = urlparse(self.manager_webhook_url)
if not all([parsed_url.scheme, parsed_url.netloc]):
raise ValueError(f"adapter_webhook_url '{self.adapter_webhook_url}' is not a valid URL.")
raise ValueError(f"manager_webhook_url '{self.manager_webhook_url}' is not a valid URL.")
valid_levels = {level.name for level in LoggingLevel}
if self.logging_level.upper() not in valid_levels:
raise ValueError(f"logging_level must be one of {valid_levels}, but got '{self.logging_level}'")
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@


@dataclasses.dataclass
class NativeWorkerAdapterConfig:
class WorkerManagerNativeConfig:
scheduler_address: ZMQConfig
storage_address: Optional[ObjectStorageConfig] = None
adapter_web_host: str = "localhost"
adapter_web_port: int = 8080
manager_web_host: str = "localhost"
manager_web_port: int = 8080
per_worker_capabilities: WorkerCapabilities = dataclasses.field(
default_factory=lambda: WorkerCapabilities.from_string("")
)
Expand All @@ -32,10 +32,10 @@ class NativeWorkerAdapterConfig:
logging_config_file: Optional[str] = None

def __post_init__(self):
if not isinstance(self.adapter_web_host, str):
raise TypeError(f"adapter_web_host should be string, given {self.adapter_web_host}")
if not isinstance(self.adapter_web_port, int):
raise TypeError(f"adapter_web_port must be between 1 and 65535, but got {self.adapter_web_port}")
if not isinstance(self.manager_web_host, str):
raise TypeError(f"manager_web_host should be string, given {self.manager_web_host}")
if not isinstance(self.manager_web_port, int):
raise TypeError(f"manager_web_port must be between 1 and 65535, but got {self.manager_web_port}")
if self.io_threads <= 0:
raise ValueError("io_threads must be a positive integer.")
if self.worker_task_queue_size <= 0:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@


@dataclasses.dataclass
class SymphonyWorkerConfig:
class WorkerManagerSymphonyConfig:
scheduler_address: ZMQConfig
object_storage_address: Optional[ObjectStorageConfig]
service_name: str
Expand Down
6 changes: 3 additions & 3 deletions scaler/entry_points/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,10 @@ def get_args():
"tcp://localhost:2347",
)
parser.add_argument(
"--adapter-webhook-url",
"--manager-webhook-url",
"-awu",
type=str,
help="specify the adapter webhook url, if not specified, the adapter will not be used",
help="specify the worker manager webhook url, if not specified, the worker manager will not be used",
)
parser.add_argument(
"scheduler_address", nargs="?", type=str, help="scheduler address to connect to, e.g.: `tcp://localhost:6378`"
Expand Down Expand Up @@ -113,7 +113,7 @@ def main():
address=scheduler_config.scheduler_address,
storage_address=object_storage_address,
monitor_address=scheduler_config.monitor_address,
adapter_webhook_url=scheduler_config.adapter_webhook_url,
manager_webhook_url=scheduler_config.manager_webhook_url,
io_threads=scheduler_config.io_threads,
max_number_of_tasks_waiting=scheduler_config.max_number_of_tasks_waiting,
client_timeout_seconds=scheduler_config.client_timeout_seconds,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,22 @@
from aiohttp import web

from scaler.config.loader import load_config
from scaler.config.section.native_worker_adapter import NativeWorkerAdapterConfig
from scaler.config.section.worker_manager_native import WorkerManagerNativeConfig
from scaler.utility.event_loop import EventLoopType, register_event_loop
from scaler.utility.logging.utility import setup_logger
from scaler.worker_adapter.native import NativeWorkerAdapter
from scaler.worker_manager.native import WorkerManagerNative


def get_args():
parser = argparse.ArgumentParser(
"scaler_native_worker_adapter", formatter_class=argparse.ArgumentDefaultsHelpFormatter
"scaler_native_worker_manager", formatter_class=argparse.ArgumentDefaultsHelpFormatter
)

parser.add_argument("--config", "-c", type=str, default=None, help="Path to the TOML configuration file.")

# Server configuration
parser.add_argument("--adapter-web-host", type=str, help="Host for the native worker adapter HTTP server.")
parser.add_argument("--adapter-web-port", "-p", type=int, help="Port for the native worker adapter HTTP server.")
parser.add_argument("--manager-web-host", type=str, help="Host for the native worker manager HTTP server.")
parser.add_argument("--manager-web-port", "-p", type=int, help="Port for the native worker manager HTTP server.")

# Worker configuration
parser.add_argument("--io-threads", type=int, help="number of io threads for zmq")
Expand Down Expand Up @@ -98,39 +98,41 @@ def get_args():

def main():
args = get_args()
native_adapter_config = load_config(
NativeWorkerAdapterConfig, args.config, args, section_name="native_worker_adapter"
worker_manager_native_config = load_config(
WorkerManagerNativeConfig, args.config, args, section_name="worker_manager_native"
)

register_event_loop(native_adapter_config.event_loop)
register_event_loop(worker_manager_native_config.event_loop)

setup_logger(
native_adapter_config.logging_paths,
native_adapter_config.logging_config_file,
native_adapter_config.logging_level,
)

native_worker_adapter = NativeWorkerAdapter(
address=native_adapter_config.scheduler_address,
storage_address=native_adapter_config.storage_address,
capabilities=native_adapter_config.per_worker_capabilities.capabilities,
io_threads=native_adapter_config.io_threads,
task_queue_size=native_adapter_config.worker_task_queue_size,
max_workers=native_adapter_config.max_workers,
heartbeat_interval_seconds=native_adapter_config.heartbeat_interval_seconds,
task_timeout_seconds=native_adapter_config.task_timeout_seconds,
death_timeout_seconds=native_adapter_config.death_timeout_seconds,
garbage_collect_interval_seconds=native_adapter_config.garbage_collect_interval_seconds,
trim_memory_threshold_bytes=native_adapter_config.trim_memory_threshold_bytes,
hard_processor_suspend=native_adapter_config.hard_processor_suspend,
event_loop=native_adapter_config.event_loop,
logging_paths=native_adapter_config.logging_paths,
logging_level=native_adapter_config.logging_level,
logging_config_file=native_adapter_config.logging_config_file,
)

app = native_worker_adapter.create_app()
web.run_app(app, host=native_adapter_config.adapter_web_host, port=native_adapter_config.adapter_web_port)
worker_manager_native_config.logging_paths,
worker_manager_native_config.logging_config_file,
worker_manager_native_config.logging_level,
)

worker_manager_native = WorkerManagerNative(
address=worker_manager_native_config.scheduler_address,
storage_address=worker_manager_native_config.storage_address,
capabilities=worker_manager_native_config.per_worker_capabilities.capabilities,
io_threads=worker_manager_native_config.io_threads,
task_queue_size=worker_manager_native_config.worker_task_queue_size,
max_workers=worker_manager_native_config.max_workers,
heartbeat_interval_seconds=worker_manager_native_config.heartbeat_interval_seconds,
task_timeout_seconds=worker_manager_native_config.task_timeout_seconds,
death_timeout_seconds=worker_manager_native_config.death_timeout_seconds,
garbage_collect_interval_seconds=worker_manager_native_config.garbage_collect_interval_seconds,
trim_memory_threshold_bytes=worker_manager_native_config.trim_memory_threshold_bytes,
hard_processor_suspend=worker_manager_native_config.hard_processor_suspend,
event_loop=worker_manager_native_config.event_loop,
logging_paths=worker_manager_native_config.logging_paths,
logging_level=worker_manager_native_config.logging_level,
logging_config_file=worker_manager_native_config.logging_config_file,
)

app = worker_manager_native.create_app()
web.run_app(
app, host=worker_manager_native_config.manager_web_host, port=worker_manager_native_config.manager_web_port
)


if __name__ == "__main__":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,24 @@
from aiohttp import web

from scaler.config.loader import load_config
from scaler.config.section.symphony_worker_adapter import SymphonyWorkerConfig
from scaler.config.section.worker_manager_symphony import WorkerManagerSymphonyConfig
from scaler.utility.event_loop import EventLoopType, register_event_loop
from scaler.utility.logging.utility import setup_logger
from scaler.worker_adapter.symphony.worker_adapter import SymphonyWorkerAdapter
from scaler.worker_manager.symphony.worker_manager import WorkerManagerSymphony


def get_args():
parser = argparse.ArgumentParser(
"scaler Symphony worker adapter", formatter_class=argparse.ArgumentDefaultsHelpFormatter
"scaler_symphony_worker_manager", formatter_class=argparse.ArgumentDefaultsHelpFormatter
)
parser.add_argument("--config", "-c", type=str, default=None, help="Path to the TOML configuration file.")

# Server configuration
parser.add_argument(
"--server-http-host", "-h", type=str, help="host address for the native worker adapter HTTP server"
"--server-http-host", "-h", type=str, help="host address for the native Worker Manager HTTP server"
)
parser.add_argument(
"--server-http-port", "-p", type=int, required=True, help="port for the native worker adapter HTTP server"
"--server-http-port", "-p", type=int, required=True, help="port for the native Worker Manager HTTP server"
)

# Symphony configuration
Expand Down Expand Up @@ -73,12 +73,14 @@ def get_args():

def main():
args = get_args()
symphony_config = load_config(SymphonyWorkerConfig, args.config, args, section_name="symphony_worker_adapter")
symphony_config = load_config(
WorkerManagerSymphonyConfig, args.config, args, section_name="worker_manager_symphony"
)
register_event_loop(symphony_config.event_loop)

setup_logger(symphony_config.logging_paths, symphony_config.logging_config_file, symphony_config.logging_level)

symphony_worker_adapter = SymphonyWorkerAdapter(
worker_manager_symphony = WorkerManagerSymphony(
address=symphony_config.scheduler_address,
storage_address=symphony_config.object_storage_address,
capabilities=symphony_config.worker_capabilities.capabilities,
Expand All @@ -94,7 +96,7 @@ def main():
logging_config_file=symphony_config.logging_config_file,
)

app = symphony_worker_adapter.create_app()
app = worker_manager_symphony.create_app()
web.run_app(app, host=symphony_config.server_http_host, port=symphony_config.server_http_port)


Expand Down
8 changes: 4 additions & 4 deletions scaler/scheduler/controllers/scaling_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ async def on_snapshot(self, information_snapshot: InformationSnapshot):


class VanillaScalingController(ScalingController):
def __init__(self, adapter_webhook_url: str, lower_task_ratio: float = 1, upper_task_ratio: float = 10):
self._adapter_webhook_url = adapter_webhook_url
def __init__(self, manager_webhook_url: str, lower_task_ratio: float = 1, upper_task_ratio: float = 10):
self._manager_webhook_url = manager_webhook_url
self._lower_task_ratio = lower_task_ratio
self._upper_task_ratio = upper_task_ratio
assert upper_task_ratio >= lower_task_ratio
Expand Down Expand Up @@ -71,7 +71,7 @@ async def _shutdown_worker_group(self, worker_group_id: WorkerGroupID):
{"action": "shutdown_worker_group", "worker_group_id": worker_group_id.decode()}
)
if status == web.HTTPNotFound.status_code:
logging.error(f"Worker group with ID {worker_group_id.decode()} not found in adapter.")
logging.error(f"Worker group with ID {worker_group_id.decode()} not found in worker manager.")
return
if status == web.HTTPInternalServerError.status_code:
logging.error(f"Failed to shutdown worker group: {response.get('error', 'Unknown error')}")
Expand All @@ -82,5 +82,5 @@ async def _shutdown_worker_group(self, worker_group_id: WorkerGroupID):

async def _make_request(self, payload):
async with aiohttp.ClientSession() as session:
async with session.post(self._adapter_webhook_url, json=payload) as response:
async with session.post(self._manager_webhook_url, json=payload) as response:
return await response.json(), response.status
Loading
Loading