Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions bazel/BUILD.arrow
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ PROPAGATED_WINDOWS_DEFINES = ["ARROW_STATIC"]

COPTS = [] + select({
"@bazel_tools//src/conditions:windows": [
"-D" + "WIN32_REPLACE_FD_APIS",
"/FI" + "win32fd.h",
] + ["-D" + define for define in PROPAGATED_WINDOWS_DEFINES],
"-D" + define
for define in PROPAGATED_WINDOWS_DEFINES
],
"//conditions:default": [
"-DARROW_USE_GLOG",
],
Expand Down Expand Up @@ -91,7 +91,6 @@ cc_library(
strip_include_prefix = "cpp/src",
visibility = ["//visibility:public"],
deps = [
"@//:platform_shims",
"@boost//:filesystem",
"@com_github_google_glog//:glog",
],
Expand Down
1 change: 0 additions & 1 deletion bazel/ray_deps_setup.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,6 @@ def ray_deps_setup():
sha256 = "2f0aaa50053792aa274b402f2530e63c1542085021cfef83beee9281412c12f6",
patches = [
"//thirdparty/patches:arrow-windows-export.patch",
"//thirdparty/patches:arrow-windows-nonstdc.patch",
],
)

Expand Down
16 changes: 6 additions & 10 deletions src/ray/common/test/client_connection_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,6 @@
#include "gmock/gmock.h"
#include "gtest/gtest.h"

#if !defined(BOOST_ASIO_HAS_LOCAL_SOCKETS)
#include <sys/socket.h>
#include <sys/types.h>
#endif

#include "ray/common/client_connection.h"

namespace ray {
Expand All @@ -40,11 +35,12 @@ class ClientConnectionTest : public ::testing::Test {
in_ = std::move(input);
out_ = std::move(output);
#else
boost::asio::detail::socket_type pair[2] = {boost::asio::detail::invalid_socket,
boost::asio::detail::invalid_socket};
RAY_CHECK(socketpair(boost::asio::ip::tcp::v4().family(), SOCK_STREAM, 0, pair) == 0);
in_.assign(boost::asio::ip::tcp::v4(), pair[0]);
out_.assign(boost::asio::ip::tcp::v4(), pair[1]);
// Choose a free port.
auto endpoint = ParseUrlEndpoint("tcp://127.0.0.1:65437");
boost::asio::basic_socket_acceptor<local_stream_protocol> acceptor(io_service_,
endpoint);
out_.connect(endpoint);
acceptor.accept(in_);
#endif
}

Expand Down
2 changes: 0 additions & 2 deletions src/ray/common/test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@

#pragma once

#include <unistd.h>

#include <boost/optional.hpp>
#include <functional>
#include <string>
Expand Down
5 changes: 3 additions & 2 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "ray/core_worker/transport/direct_actor_transport.h"
#include "ray/core_worker/transport/raylet_transport.h"
#include "ray/gcs/gcs_client/service_based_gcs_client.h"
#include "ray/util/process.h"
#include "ray/util/util.h"

namespace {
Expand Down Expand Up @@ -664,13 +665,13 @@ void CoreWorker::RegisterToGcs() {

RAY_CHECK_OK(gcs_client_->Workers().AsyncAdd(worker_data, nullptr));
}

void CoreWorker::CheckForRayletFailure(const boost::system::error_code &error) {
if (error == boost::asio::error::operation_aborted) {
return;
}

// If the raylet fails, we will be reassigned to init (PID=1).
if (getppid() == 1) {
if (!IsParentProcessAlive()) {
RAY_LOG(ERROR) << "Raylet failed. Shutting down.";
Shutdown();
}
Expand Down
2 changes: 1 addition & 1 deletion src/ray/gcs/gcs_client/service_based_gcs_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

#include "ray/gcs/gcs_client/service_based_gcs_client.h"
#include <unistd.h>

#include "ray/common/ray_config.h"
#include "ray/gcs/gcs_client/service_based_accessor.h"

Expand Down
1 change: 0 additions & 1 deletion src/ray/gcs/redis_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

#include "ray/gcs/redis_client.h"

#include <unistd.h>
#include "ray/common/ray_config.h"
#include "ray/gcs/redis_context.h"

Expand Down
2 changes: 0 additions & 2 deletions src/ray/gcs/redis_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@

#include "ray/gcs/redis_context.h"

#include <unistd.h>

#include <sstream>

#include "ray/stats/stats.h"
Expand Down
2 changes: 0 additions & 2 deletions src/ray/gcs/redis_gcs_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@

#include "ray/gcs/redis_gcs_client.h"

#include <unistd.h>

#include "ray/common/ray_config.h"
#include "ray/gcs/redis_accessor.h"
#include "ray/gcs/redis_context.h"
Expand Down
4 changes: 4 additions & 0 deletions src/ray/object_manager/plasma/fling.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@

#include "ray/object_manager/plasma/fling.h"

#ifdef _WIN32
#error "This file does not supposed to be compiled under windows."
#endif

#include <errno.h>
#include <string.h>

Expand Down
2 changes: 0 additions & 2 deletions src/ray/object_manager/test/object_manager_stress_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <unistd.h>

#include <chrono>
#include <iostream>
#include <random>
Expand Down
2 changes: 0 additions & 2 deletions src/ray/object_manager/test/object_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@

#include "ray/object_manager/object_manager.h"

#include <unistd.h>

#include <iostream>
#include <thread>

Expand Down
2 changes: 0 additions & 2 deletions src/ray/rpc/gcs_server/gcs_rpc_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@

#pragma once

#include <unistd.h>

#include "ray/common/network_util.h"
#include "ray/rpc/grpc_client.h"
#include "src/ray/protobuf/gcs_service.grpc.pb.h"
Expand Down
1 change: 0 additions & 1 deletion src/ray/rpc/grpc_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
#include "ray/rpc/grpc_server.h"

#include <grpcpp/impl/service_type.h>
#include <unistd.h>

#include <boost/asio/detail/socket_holder.hpp>

Expand Down
65 changes: 65 additions & 0 deletions src/ray/util/process.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@
#include "ray/util/process.h"

#ifdef _WIN32
#ifndef WIN32_LEAN_AND_MEAN
#define WIN32_LEAN_AND_MEAN 1
#endif
#include <Windows.h>
#include <Winternl.h>
#include <process.h>
#else
#include <poll.h>
Expand All @@ -26,6 +31,7 @@
#endif

#include <algorithm>
#include <atomic>
#include <fstream>
#include <string>
#include <vector>
Expand Down Expand Up @@ -439,6 +445,65 @@ void Process::Kill() {
}
}

#ifdef _WIN32
#ifndef STATUS_BUFFER_OVERFLOW
#define STATUS_BUFFER_OVERFLOW ((NTSTATUS)0x80000005L)
#endif
typedef LONG NTSTATUS;
typedef NTSTATUS WINAPI NtQueryInformationProcess_t(HANDLE ProcessHandle,
ULONG ProcessInformationClass,
PVOID ProcessInformation,
ULONG ProcessInformationLength,
ULONG *ReturnLength);

static std::atomic<NtQueryInformationProcess_t *> NtQueryInformationProcess_ =
ATOMIC_VAR_INIT(NULL);

pid_t GetParentPID() {
NtQueryInformationProcess_t *NtQueryInformationProcess = NtQueryInformationProcess_;
if (!NtQueryInformationProcess) {
NtQueryInformationProcess = reinterpret_cast<NtQueryInformationProcess_t *>(
GetProcAddress(GetModuleHandle(TEXT("ntdll.dll")),
_CRT_STRINGIZE(NtQueryInformationProcess)));
NtQueryInformationProcess_ = NtQueryInformationProcess;
}
DWORD ppid = 0;
PROCESS_BASIC_INFORMATION info;
ULONG cb = sizeof(info);
NTSTATUS status = NtQueryInformationProcess(GetCurrentProcess(), 0, &info, cb, &cb);
if ((status >= 0 || status == STATUS_BUFFER_OVERFLOW) && cb >= sizeof(info)) {
ppid = static_cast<DWORD>(reinterpret_cast<uintptr_t>(info.Reserved3));
}
pid_t result = 0;
if (ppid > 0) {
// For now, assume PPID = 1 (simulating the reassignment to "init" on Linux)
result = 1;
if (HANDLE parent = OpenProcess(PROCESS_QUERY_INFORMATION, FALSE, ppid)) {
long long me_created, parent_created;
FILETIME unused;
if (GetProcessTimes(GetCurrentProcess(), reinterpret_cast<FILETIME *>(&me_created),
&unused, &unused, &unused) &&
GetProcessTimes(parent, reinterpret_cast<FILETIME *>(&parent_created), &unused,
&unused, &unused)) {
if (me_created >= parent_created) {
// We verified the child is younger than the parent, so we know the parent
// is still alive.
// (Note that the parent can still die by the time this function returns,
// but that race condition exists on POSIX too, which we're emulating here.)
result = static_cast<pid_t>(ppid);
}
}
CloseHandle(parent);
}
}
return result;
}
#else
pid_t GetParentPID() { return getppid(); }
#endif // #ifdef _WIN32

bool IsParentProcessAlive() { return GetParentPID() != 1; }

} // namespace ray

namespace std {
Expand Down
6 changes: 6 additions & 0 deletions src/ray/util/process.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,12 @@ class Process {
int Wait() const;
};

// Get the Process ID of the parent process. If the parent process exits, the PID
// will be 1 (this simulates POSIX getppid()).
pid_t GetParentPID();

bool IsParentProcessAlive();

} // namespace ray

// We only define operators required by the standard library (==, hash):
Expand Down
12 changes: 0 additions & 12 deletions src/shims/windows/msg.cc

This file was deleted.

7 changes: 0 additions & 7 deletions src/shims/windows/poll.h

This file was deleted.

Loading