Skip to content

Commit

Permalink
[multi-language part 2] Change the command line arguments to start ra…
Browse files Browse the repository at this point in the history
  • Loading branch information
jovany-wang authored and robertnishihara committed Aug 17, 2018
1 parent a719e08 commit 06a5801
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 13 deletions.
21 changes: 12 additions & 9 deletions java/runtime-native/src/main/java/org/ray/runner/RunManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -198,11 +198,12 @@ private Process startProcess(String[] cmd, Map<String, String> env, RunInfo.Proc
}

int processIndex = runInfo.allProcesses.get(type.ordinal()).size();

ProcessBuilder builder;
List<String> newCmd = Arrays.stream(cmd).filter(s -> s.length() > 0)
.collect(Collectors.toList());
builder = new ProcessBuilder(newCmd);
List<String> newCommand = Arrays.asList(cmd);
builder = new ProcessBuilder(newCommand);
builder.directory(new File(workDir));

if (redirect) {
String stdoutFile;
String stderrFile;
Expand Down Expand Up @@ -689,10 +690,10 @@ private void startRaylet(String storeName, AddressInfo info, int numWorkers,
String rayletSocketName = "/tmp/raylet" + rpcPort;

String filePath = paths.raylet;
String workerCmd = null;
workerCmd = buildWorkerCommandRaylet(info.storeName, rayletSocketName, UniqueID.nil,
"", workDir + rpcPort, ip, redisAddress);

//Create the worker command that the raylet will use to start workers.
String workerCommand = buildWorkerCommandRaylet(info.storeName, rayletSocketName,
UniqueID.nil, "", workDir + rpcPort, ip, redisAddress);

int sep = redisAddress.indexOf(':');
assert (sep != -1);
Expand All @@ -701,8 +702,10 @@ private void startRaylet(String storeName, AddressInfo info, int numWorkers,

String resourceArgument = ResourceUtil.getResourcesStringFromMap(staticResources);

String[] cmds = new String[]{filePath, rayletSocketName, storeName, ip, gcsIp,
gcsPort, "" + numWorkers, workerCmd, resourceArgument};
// The second-last arugment is the worker command for Python, not needed for Java.
String[] cmds = new String[]{filePath,rayletSocketName, storeName, ip, gcsIp,
gcsPort, "" + numWorkers, resourceArgument,
"", workerCommand};

Process p = startProcess(cmds, null, RunInfo.ProcessType.PT_RAYLET,
workDir + rpcPort, redisAddress, ip, redirect, cleanup);
Expand Down
3 changes: 2 additions & 1 deletion python/ray/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -990,8 +990,9 @@ def start_raylet(redis_address,
gcs_ip_address,
gcs_port,
str(num_workers),
start_worker_command,
resource_argument,
start_worker_command,
"", # Worker command for Java, not needed for Python.
]

if use_valgrind:
Expand Down
17 changes: 14 additions & 3 deletions src/ray/raylet/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,17 @@

#ifndef RAYLET_TEST
int main(int argc, char *argv[]) {
RAY_CHECK(argc == 9);
RAY_CHECK(argc == 10);

const std::string raylet_socket_name = std::string(argv[1]);
const std::string store_socket_name = std::string(argv[2]);
const std::string node_ip_address = std::string(argv[3]);
const std::string redis_address = std::string(argv[4]);
int redis_port = std::stoi(argv[5]);
int num_initial_workers = std::stoi(argv[6]);
const std::string worker_command = std::string(argv[7]);
const std::string static_resource_list = std::string(argv[8]);
const std::string static_resource_list = std::string(argv[7]);
const std::string python_worker_command = std::string(argv[8]);
const std::string java_worker_command = std::string(argv[9]);

// Configuration for the node manager.
ray::raylet::NodeManagerConfig node_manager_config;
Expand All @@ -39,6 +40,16 @@ int main(int argc, char *argv[]) {
RayConfig::instance().num_workers_per_process();
// Use a default worker that can execute empty tasks with dependencies.

std::string worker_command;
if (!python_worker_command.empty()) {
worker_command = python_worker_command;
} else if (!java_worker_command.empty()) {
worker_command = java_worker_command;
} else {
RAY_CHECK(0)
<< "Either Python worker command or Java worker command should be provided.";
}

std::istringstream iss(worker_command);
std::vector<std::string> results(std::istream_iterator<std::string>{iss},
std::istream_iterator<std::string>());
Expand Down

0 comments on commit 06a5801

Please sign in to comment.