Skip to content

Commit

Permalink
Revert "Revert "Add ability to specify worker and driver ports (ray-p…
Browse files Browse the repository at this point in the history
…roject#7833)" (ray-project#8069)"

This reverts commit 90ef585.
  • Loading branch information
edoakes committed Apr 17, 2020
1 parent 90ef585 commit 7860678
Show file tree
Hide file tree
Showing 20 changed files with 351 additions and 108 deletions.
4 changes: 4 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ matrix:
- PYTHONWARNINGS=ignore
- RAY_DEFAULT_BUILD=1
- RAY_CYTHON_EXAMPLES=1
- RAY_USE_RANDOM_PORTS=1
before_install:
- . ./ci/travis/ci.sh init RAY_CI_SERVE_AFFECTED,RAY_CI_TUNE_AFFECTED,RAY_CI_PYTHON_AFFECTED
install:
Expand All @@ -22,6 +23,7 @@ matrix:
- PYTHONWARNINGS=ignore
- RAY_DEFAULT_BUILD=1
- RAY_CYTHON_EXAMPLES=1
- RAY_USE_RANDOM_PORTS=1
before_install:
- . ./ci/travis/ci.sh init RAY_CI_SERVE_AFFECTED,RAY_CI_TUNE_AFFECTED,RAY_CI_PYTHON_AFFECTED
install:
Expand All @@ -46,6 +48,7 @@ matrix:
- JDK='Oracle JDK 8'
- RAY_INSTALL_JAVA=1
- PYTHON=3.6 PYTHONWARNINGS=ignore
- RAY_USE_RANDOM_PORTS=1
before_install:
- . ./ci/travis/ci.sh init RAY_CI_STREAMING_PYTHON_AFFECTED,RAY_CI_STREAMING_JAVA_AFFECTED
install:
Expand Down Expand Up @@ -83,6 +86,7 @@ matrix:
- RAY_INSTALL_JAVA=1
- RAY_GCS_SERVICE_ENABLED=false
- RAY_CYTHON_EXAMPLES=1
- RAY_USE_RANDOM_PORTS=1
before_install:
- . ./ci/travis/ci.sh init RAY_CI_ONLY_RLLIB_AFFECTED
install:
Expand Down
2 changes: 2 additions & 0 deletions python/ray/cluster_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ def add_node(self, **node_args):
"num_cpus": 1,
"num_gpus": 0,
"object_store_memory": 150 * 1024 * 1024, # 150 MiB
"min_worker_port": 0,
"max_worker_port": 0,
}
if "_internal_config" in node_args:
node_args["_internal_config"] = json.loads(
Expand Down
2 changes: 2 additions & 0 deletions python/ray/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,8 @@ def start_raylet(self, use_valgrind=False, use_profiler=False):
self._temp_dir,
self._session_dir,
self.get_resource_spec(),
self._ray_params.min_worker_port,
self._ray_params.max_worker_port,
self._ray_params.object_manager_port,
self._ray_params.redis_password,
use_valgrind=use_valgrind,
Expand Down
34 changes: 34 additions & 0 deletions python/ray/parameter.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import os

import numpy as np

Expand Down Expand Up @@ -35,6 +36,10 @@ class RayParams:
node_ip_address (str): The IP address of the node that we are on.
raylet_ip_address (str): The IP address of the raylet that this node
connects to.
min_worker_port (int): The lowest port number that workers will bind
on. If not set or set to 0, random ports will be chosen.
max_worker_port (int): The highest port number that workers will bind
on. If set, min_worker_port must also be set.
object_id_seed (int): Used to seed the deterministic generation of
object IDs. The same value can be used across multiple runs of the
same job in order to generate the object IDs in a consistent
Expand Down Expand Up @@ -98,6 +103,8 @@ def __init__(self,
node_manager_port=None,
node_ip_address=None,
raylet_ip_address=None,
min_worker_port=None,
max_worker_port=None,
object_id_seed=None,
driver_mode=None,
redirect_worker_output=None,
Expand Down Expand Up @@ -135,6 +142,8 @@ def __init__(self,
self.node_manager_port = node_manager_port
self.node_ip_address = node_ip_address
self.raylet_ip_address = raylet_ip_address
self.min_worker_port = min_worker_port
self.max_worker_port = max_worker_port
self.driver_mode = driver_mode
self.redirect_worker_output = redirect_worker_output
self.redirect_output = redirect_output
Expand Down Expand Up @@ -189,6 +198,31 @@ def update_if_absent(self, **kwargs):
self._check_usage()

def _check_usage(self):
# Used primarily for testing.
if os.environ.get("RAY_USE_RANDOM_PORTS", False):
if self.min_worker_port is None and self.min_worker_port is None:
self.min_worker_port = 0
self.max_worker_port = 0

if self.min_worker_port is not None:
if self.min_worker_port != 0 and (self.min_worker_port < 1024
or self.min_worker_port > 65535):
raise ValueError("min_worker_port must be 0 or an integer "
"between 1024 and 65535.")

if self.max_worker_port is not None:
if self.min_worker_port is None:
raise ValueError("If max_worker_port is set, min_worker_port "
"must also be set.")
elif self.max_worker_port != 0:
if self.max_worker_port < 1024 or self.max_worker_port > 65535:
raise ValueError(
"max_worker_port must be 0 or an integer between "
"1024 and 65535.")
elif self.max_worker_port <= self.min_worker_port:
raise ValueError("max_worker_port must be higher than "
"min_worker_port.")

if self.resources is not None:
assert "CPU" not in self.resources, (
"'CPU' should not be included in the resource dictionary. Use "
Expand Down
25 changes: 21 additions & 4 deletions python/ray/scripts/scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,20 @@ def dashboard(cluster_config_file, cluster_name, port):
required=False,
type=int,
help="the port to use for starting the node manager")
@click.option(
"--min-worker-port",
required=False,
type=int,
default=10000,
help="the lowest port number that workers will bind on. If not set, "
"random ports will be chosen.")
@click.option(
"--max-worker-port",
required=False,
type=int,
default=10999,
help="the highest port number that workers will bind on. If set, "
"'--min-worker-port' must also be set.")
@click.option(
"--memory",
required=False,
Expand Down Expand Up @@ -277,10 +291,11 @@ def dashboard(cluster_config_file, cluster_name, port):
help="Specify whether load code from local file or GCS serialization.")
def start(node_ip_address, redis_address, address, redis_port,
num_redis_shards, redis_max_clients, redis_password,
redis_shard_ports, object_manager_port, node_manager_port, memory,
object_store_memory, redis_max_memory, num_cpus, num_gpus, resources,
head, include_webui, webui_host, block, plasma_directory, huge_pages,
autoscaling_config, no_redirect_worker_output, no_redirect_output,
redis_shard_ports, object_manager_port, node_manager_port,
min_worker_port, max_worker_port, memory, object_store_memory,
redis_max_memory, num_cpus, num_gpus, resources, head, include_webui,
webui_host, block, plasma_directory, huge_pages, autoscaling_config,
no_redirect_worker_output, no_redirect_output,
plasma_store_socket_name, raylet_socket_name, temp_dir, include_java,
java_worker_options, load_code_from_local, internal_config):
if redis_address is not None:
Expand Down Expand Up @@ -308,6 +323,8 @@ def start(node_ip_address, redis_address, address, redis_port,
redirect_output = None if not no_redirect_output else True
ray_params = ray.parameter.RayParams(
node_ip_address=node_ip_address,
min_worker_port=min_worker_port,
max_worker_port=max_worker_port,
object_manager_port=object_manager_port,
node_manager_port=node_manager_port,
memory=memory,
Expand Down
14 changes: 14 additions & 0 deletions python/ray/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -1223,6 +1223,8 @@ def start_raylet(redis_address,
temp_dir,
session_dir,
resource_spec,
min_worker_port=None,
max_worker_port=None,
object_manager_port=None,
redis_password=None,
use_valgrind=False,
Expand Down Expand Up @@ -1251,6 +1253,10 @@ def start_raylet(redis_address,
resource_spec (ResourceSpec): Resources for this raylet.
object_manager_port: The port to use for the object manager. If this is
None, then the object manager will choose its own port.
min_worker_port (int): The lowest port number that workers will bind
on. If not set, random ports will be chosen.
max_worker_port (int): The highest port number that workers will bind
on. If set, min_worker_port must also be set.
redis_password: The password to use when connecting to Redis.
use_valgrind (bool): True if the raylet should be started inside
of valgrind. If this is True, use_profiler must be False.
Expand Down Expand Up @@ -1328,6 +1334,12 @@ def start_raylet(redis_address,
if object_manager_port is None:
object_manager_port = 0

if min_worker_port is None:
min_worker_port = 0

if max_worker_port is None:
max_worker_port = 0

if load_code_from_local:
start_worker_command += ["--load-code-from-local"]

Expand All @@ -1336,6 +1348,8 @@ def start_raylet(redis_address,
"--raylet_socket_name={}".format(raylet_name),
"--store_socket_name={}".format(plasma_store_name),
"--object_manager_port={}".format(object_manager_port),
"--min_worker_port={}".format(min_worker_port),
"--max_worker_port={}".format(max_worker_port),
"--node_manager_port={}".format(node_manager_port),
"--node_ip_address={}".format(node_ip_address),
"--redis_address={}".format(gcs_ip_address),
Expand Down
7 changes: 7 additions & 0 deletions python/ray/tests/test_multi_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,13 @@ def test_calling_start_ray_head(call_ray_stop_only):
])
subprocess.check_output(["ray", "stop"])

# Test starting Ray with the worker port range specified.
subprocess.check_output([
"ray", "start", "--head", "--min-worker-port", "12345",
"--max-worker-port", "12346"
])
subprocess.check_output(["ray", "stop"])

# Test starting Ray with the number of CPUs specified.
subprocess.check_output(["ray", "start", "--head", "--num-cpus", "2"])
subprocess.check_output(["ray", "stop"])
Expand Down
26 changes: 18 additions & 8 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,6 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
client_call_manager_(new rpc::ClientCallManager(io_service_)),
death_check_timer_(io_service_),
internal_timer_(io_service_),
core_worker_server_(WorkerTypeString(options_.worker_type),
0 /* let grpc choose a port */),
task_queue_length_(0),
num_executed_tasks_(0),
task_execution_service_work_(task_execution_service_),
Expand Down Expand Up @@ -303,10 +301,6 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
[this] { return local_raylet_client_->TaskDone(); }));
}

// Start RPC server after all the task receivers are properly initialized.
core_worker_server_.RegisterService(grpc_service_);
core_worker_server_.Run();

// Initialize raylet client.
// TODO(zhijunfu): currently RayletClient would crash in its constructor if it cannot
// connect to Raylet after a number of retries, this can be changed later
Expand All @@ -315,17 +309,33 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
auto grpc_client = rpc::NodeManagerWorkerClient::make(
options_.raylet_ip_address, options_.node_manager_port, *client_call_manager_);
ClientID local_raylet_id;
int assigned_port;
local_raylet_client_ = std::shared_ptr<raylet::RayletClient>(new raylet::RayletClient(
io_service_, std::move(grpc_client), options_.raylet_socket, GetWorkerID(),
(options_.worker_type == ray::WorkerType::WORKER),
worker_context_.GetCurrentJobID(), options_.language, &local_raylet_id,
options_.node_ip_address, core_worker_server_.GetPort()));
options_.node_ip_address, &assigned_port));
connected_ = true;

RAY_CHECK(assigned_port != -1)
<< "Failed to allocate a port for the worker. Please specify a wider port range "
"using the '--min-worker-port' and '--max-worker-port' arguments to 'ray "
"start'.";

// Start RPC server after all the task receivers are properly initialized and we have
// our assigned port from the raylet.
core_worker_server_ = std::unique_ptr<rpc::GrpcServer>(
new rpc::GrpcServer(WorkerTypeString(options_.worker_type), assigned_port));
core_worker_server_->RegisterService(grpc_service_);
core_worker_server_->Run();

// Tell the raylet the port that we are listening on.
RAY_CHECK_OK(local_raylet_client_->AnnounceWorkerPort(core_worker_server_->GetPort()));

// Set our own address.
RAY_CHECK(!local_raylet_id.IsNil());
rpc_address_.set_ip_address(options_.node_ip_address);
rpc_address_.set_port(core_worker_server_.GetPort());
rpc_address_.set_port(core_worker_server_->GetPort());
rpc_address_.set_raylet_id(local_raylet_id.Binary());
rpc_address_.set_worker_id(worker_context_.GetWorkerID().Binary());
RAY_LOG(INFO) << "Initializing worker at address: " << rpc_address_.ip_address() << ":"
Expand Down
2 changes: 1 addition & 1 deletion src/ray/core_worker/core_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -916,7 +916,7 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
boost::asio::steady_timer internal_timer_;

/// RPC server used to receive tasks to execute.
rpc::GrpcServer core_worker_server_;
std::unique_ptr<rpc::GrpcServer> core_worker_server_;

/// Address of our RPC server.
rpc::Address rpc_address_;
Expand Down
9 changes: 9 additions & 0 deletions src/ray/raylet/format/node_manager.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ enum MessageType:int {
// Send a reply confirming the successful registration of a worker or driver.
// This is sent from the raylet to a worker or driver.
RegisterClientReply,
// Send the worker's gRPC port to the raylet.
AnnounceWorkerPort,
// Notify the raylet that this client is disconnecting unexpectedly.
// This is sent from a worker to a raylet.
DisconnectClient,
Expand Down Expand Up @@ -160,6 +162,13 @@ table RegisterClientRequest {
table RegisterClientReply {
// GCS ClientID of the local node manager.
raylet_id: string;
// Port that this worker should listen on.
port: int;
}

table AnnounceWorkerPort {
// Port that this worker is listening on.
port: int;
}

table RegisterNodeManagerRequest {
Expand Down
8 changes: 8 additions & 0 deletions src/ray/raylet/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ DEFINE_int32(node_manager_port, -1, "The port of node manager.");
DEFINE_string(node_ip_address, "", "The ip address of this node.");
DEFINE_string(redis_address, "", "The ip address of redis server.");
DEFINE_int32(redis_port, -1, "The port of redis server.");
DEFINE_int32(min_worker_port, 0,
"The lowest port that workers' gRPC servers will bind on.");
DEFINE_int32(max_worker_port, 0,
"The highest port that workers' gRPC servers will bind on.");
DEFINE_int32(num_initial_workers, 0, "Number of initial workers.");
DEFINE_int32(maximum_startup_concurrency, 1, "Maximum startup concurrency");
DEFINE_string(static_resource_list, "", "The static resource list of this node.");
Expand Down Expand Up @@ -62,6 +66,8 @@ int main(int argc, char *argv[]) {
const std::string node_ip_address = FLAGS_node_ip_address;
const std::string redis_address = FLAGS_redis_address;
const int redis_port = static_cast<int>(FLAGS_redis_port);
const int min_worker_port = static_cast<int>(FLAGS_min_worker_port);
const int max_worker_port = static_cast<int>(FLAGS_max_worker_port);
const int num_initial_workers = static_cast<int>(FLAGS_num_initial_workers);
const int maximum_startup_concurrency =
static_cast<int>(FLAGS_maximum_startup_concurrency);
Expand Down Expand Up @@ -121,6 +127,8 @@ int main(int argc, char *argv[]) {
node_manager_config.node_manager_port = node_manager_port;
node_manager_config.num_initial_workers = num_initial_workers;
node_manager_config.maximum_startup_concurrency = maximum_startup_concurrency;
node_manager_config.min_worker_port = min_worker_port;
node_manager_config.max_worker_port = max_worker_port;

if (!python_worker_command.empty()) {
node_manager_config.worker_commands.emplace(
Expand Down
Loading

0 comments on commit 7860678

Please sign in to comment.