From f3ed94c8e745d0038b33c17a1c6dcb9c3d70f600 Mon Sep 17 00:00:00 2001 From: Jeremy Maitin-Shepard Date: Wed, 30 Aug 2023 23:16:24 -0700 Subject: [PATCH] Reduce flakiness of gcs_testbench_test Previously, gcs_testbench_test could fail to start the storage_testbench for several reasons: 1. Deadline exceeded, especially for AddressSanitizer builds. 2. CURLE_UNSUPPORTED_PROTOCOL error returned from "start_grpc" request, if request is made just while the server is starting. 3. Port is already in use. This commit addresses: - problem (1) by increasing the deadling to 30 seconds; - problem (2) by treating that error as UNAVAILABLE and retrying. - problem (3) by checking if the subprocess has exited and restarting with a new port, and additionally allowing the grpc port to be chosen automatically since the chosen port is returned in the HTTP response. In order to support testing if a process has exited, this adds a a non-blocking option to Subprocess::Join. PiperOrigin-RevId: 561557166 Change-Id: I6cd3fb4dfbaf96cb8a2f3e721ec210d56476f832 --- tensorstore/internal/http/curl_wrappers.cc | 4 +- .../internal/http/curl_wrappers_test.cc | 2 +- tensorstore/internal/subprocess.cc | 31 +++++-- tensorstore/internal/subprocess.h | 5 +- tensorstore/internal/subprocess_test.cc | 4 + tensorstore/kvstore/gcs/BUILD | 2 + tensorstore/kvstore/gcs/gcs_testbench.cc | 88 +++++++++++++------ tensorstore/kvstore/gcs/gcs_testbench.h | 5 +- 8 files changed, 101 insertions(+), 40 deletions(-) diff --git a/tensorstore/internal/http/curl_wrappers.cc b/tensorstore/internal/http/curl_wrappers.cc index eb68802d8..36e45b1e2 100644 --- a/tensorstore/internal/http/curl_wrappers.cc +++ b/tensorstore/internal/http/curl_wrappers.cc @@ -65,6 +65,9 @@ absl::Status CurlCodeToStatus(CURLcode code, std::string_view detail, case CURLE_RECV_ERROR: case CURLE_SEND_ERROR: case CURLE_SSL_CONNECT_ERROR: + // Curl sometimes returns `CURLE_UNSUPPORTED_PROTOCOL` in the case of a + // malformed response. + case CURLE_UNSUPPORTED_PROTOCOL: error_code = absl::StatusCode::kUnavailable; break; @@ -93,7 +96,6 @@ absl::Status CurlCodeToStatus(CURLcode code, std::string_view detail, // and almost always indicate a precondition or missing required feature. case CURLE_BAD_FUNCTION_ARGUMENT: case CURLE_OUT_OF_MEMORY: - case CURLE_UNSUPPORTED_PROTOCOL: case CURLE_NOT_BUILT_IN: case CURLE_UNKNOWN_OPTION: case CURLE_BAD_DOWNLOAD_RESUME: diff --git a/tensorstore/internal/http/curl_wrappers_test.cc b/tensorstore/internal/http/curl_wrappers_test.cc index b86f56a56..3f2ba84ae 100644 --- a/tensorstore/internal/http/curl_wrappers_test.cc +++ b/tensorstore/internal/http/curl_wrappers_test.cc @@ -43,7 +43,7 @@ TEST(CurlFactoryTest, CurlCodeToStatus) { {CURLE_HTTP2, absl::StatusCode::kUnavailable}, {CURLE_BAD_DOWNLOAD_RESUME, absl::StatusCode::kInternal}, {CURLE_RANGE_ERROR, absl::StatusCode::kInternal}, - {CURLE_UNSUPPORTED_PROTOCOL, absl::StatusCode::kInternal}, + {CURLE_UNSUPPORTED_PROTOCOL, absl::StatusCode::kUnavailable}, }; for (auto const& t : expected_codes) { diff --git a/tensorstore/internal/subprocess.cc b/tensorstore/internal/subprocess.cc index d019b8bc2..f06b141b1 100644 --- a/tensorstore/internal/subprocess.cc +++ b/tensorstore/internal/subprocess.cc @@ -121,7 +121,7 @@ struct Subprocess::Impl { ~Impl(); absl::Status Kill(int signal); - Result Join(); + Result Join(bool block); PROCESS_INFORMATION pi_; }; @@ -142,14 +142,24 @@ absl::Status Subprocess::Impl::Kill(int signal) { return StatusFromOsError(GetLastErrorCode(), "On Subprocess::Kill"); } -Result Subprocess::Impl::Join() { - // If stdin were a pipe, close it here. - DWORD wait_status = WaitForSingleObject(pi_.hProcess, INFINITE); - if (wait_status == WAIT_OBJECT_0) { +Result Subprocess::Impl::Join(bool block) { + if (!block) { DWORD process_exit_code = 0; if (0 != GetExitCodeProcess(pi_.hProcess, &process_exit_code)) { + if (process_exit_code == STILL_ACTIVE) { + return absl::UnavailableError(""); + } return static_cast(process_exit_code); } + } else { + // If stdin were a pipe, close it here. + DWORD wait_status = WaitForSingleObject(pi_.hProcess, INFINITE); + if (wait_status == WAIT_OBJECT_0) { + DWORD process_exit_code = 0; + if (0 != GetExitCodeProcess(pi_.hProcess, &process_exit_code)) { + return static_cast(process_exit_code); + } + } } return StatusFromOsError(GetLastErrorCode(), "Subprocess::Join failed"); } @@ -233,7 +243,7 @@ struct Subprocess::Impl { ~Impl(); absl::Status Kill(int signal); - Result Join(); + Result Join(bool block); std::atomic child_pid_{-1}; std::atomic exit_code_{-1}; @@ -258,7 +268,7 @@ absl::Status Subprocess::Impl::Kill(int signal) { return StatusFromOsError(GetLastErrorCode(), "On Subprocess::Kill"); } -Result Subprocess::Impl::Join() { +Result Subprocess::Impl::Join(bool block) { // If stdin were a pipe, close it here. int status; for (;;) { @@ -269,10 +279,13 @@ Result Subprocess::Impl::Join() { return exit_code_.load(); } - int result = waitpid(pid, &status, 0); + int result = waitpid(pid, &status, block ? 0 : WNOHANG); if ((result < 0) && !retry(errno)) { return StatusFromOsError(GetLastErrorCode(), "Subprocess::Join failed"); } + if (!block && result == 0) { + return absl::UnavailableError(""); + } if (result != pid) continue; if (WIFEXITED(status)) { int exit_code = WEXITSTATUS(status); @@ -361,7 +374,7 @@ Subprocess::~Subprocess() = default; absl::Status Subprocess::Kill(int signal) const { return impl_->Kill(signal); } -Result Subprocess::Join() const { return impl_->Join(); } +Result Subprocess::Join(bool block) const { return impl_->Join(block); } } // namespace internal } // namespace tensorstore diff --git a/tensorstore/internal/subprocess.h b/tensorstore/internal/subprocess.h index 62e0dcec8..a22da3b14 100644 --- a/tensorstore/internal/subprocess.h +++ b/tensorstore/internal/subprocess.h @@ -54,7 +54,10 @@ class Subprocess { /// Block until the process exits, returning the exit status. If necessary /// file handles to the process may be closed. - Result Join() const; + /// + /// If `block` is `false`, return immediately with + /// `absl::StatusCode::kUnavailable` if the process has not already exited. + Result Join(bool block = true) const; private: friend Result SpawnSubprocess(const SubprocessOptions& options); diff --git a/tensorstore/internal/subprocess_test.cc b/tensorstore/internal/subprocess_test.cc index 4cc7f82e1..0c21985b4 100644 --- a/tensorstore/internal/subprocess_test.cc +++ b/tensorstore/internal/subprocess_test.cc @@ -19,6 +19,7 @@ #include #include +#include #include #include "absl/log/absl_log.h" #include "absl/status/status.h" @@ -58,6 +59,9 @@ TEST(SubprocessTest, Kill) { auto child = SpawnSubprocess(opts); EXPECT_TRUE(child.ok()); + EXPECT_THAT(child->Join(/*block=*/false), + tensorstore::MatchesStatus(absl::StatusCode::kUnavailable)); + child->Kill().IgnoreError(); // exit code on killed process is os dependent. diff --git a/tensorstore/kvstore/gcs/BUILD b/tensorstore/kvstore/gcs/BUILD index 567c4c384..2c8b44b9f 100644 --- a/tensorstore/kvstore/gcs/BUILD +++ b/tensorstore/kvstore/gcs/BUILD @@ -76,7 +76,9 @@ tensorstore_cc_library( "@com_github_grpc_grpc//:grpc++", "@com_google_absl//absl/flags:flag", "@com_google_absl//absl/log:absl_check", + "@com_google_absl//absl/log:absl_log", "@com_google_absl//absl/status", + "@com_google_absl//absl/strings", "@com_google_absl//absl/strings:str_format", "@com_google_absl//absl/time", "@com_google_googleapis//google/storage/v2:storage_cc_grpc", diff --git a/tensorstore/kvstore/gcs/gcs_testbench.cc b/tensorstore/kvstore/gcs/gcs_testbench.cc index 58a6f9645..4ac8009f4 100644 --- a/tensorstore/kvstore/gcs/gcs_testbench.cc +++ b/tensorstore/kvstore/gcs/gcs_testbench.cc @@ -17,11 +17,13 @@ #include #include - #include "absl/flags/flag.h" #include "absl/log/absl_check.h" +#include "absl/log/absl_log.h" #include "absl/status/status.h" +#include "absl/strings/numbers.h" #include "absl/strings/str_format.h" +#include "absl/time/clock.h" #include "absl/time/time.h" #include "grpcpp/channel.h" // third_party #include "grpcpp/client_context.h" // third_party @@ -55,12 +57,7 @@ using ::tensorstore::internal_http::HttpRequestBuilder; using ::tensorstore::transport_test_utils::TryPickUnusedPort; using ::google::storage::v2::Storage; -StorageTestbench::StorageTestbench() - : http_port(TryPickUnusedPort().value_or(0)), - grpc_port(TryPickUnusedPort().value_or(0)) { - ABSL_CHECK(http_port > 0); - ABSL_CHECK(grpc_port > 0); -} +StorageTestbench::StorageTestbench() = default; std::string StorageTestbench::http_address() { return absl::StrFormat("localhost:%d", http_port); @@ -72,38 +69,77 @@ std::string StorageTestbench::grpc_address() { void StorageTestbench::SpawnProcess() { if (running) return; - ABSL_LOG(INFO) << "Spawning testbench: http://" << http_address(); - { - SubprocessOptions options{absl::GetFlag(FLAGS_testbench_binary), - {absl::StrFormat("--port=%d", http_port)}}; + const auto start_child = [&] { + http_port = TryPickUnusedPort().value_or(0); + ABSL_CHECK(http_port > 0); - /// TODO: getcwd() so that it can be run from anywhere. - TENSORSTORE_CHECK_OK_AND_ASSIGN(child, SpawnSubprocess(options)); - } + ABSL_LOG(INFO) << "Spawning testbench: http://" << http_address(); + + { + SubprocessOptions options{absl::GetFlag(FLAGS_testbench_binary), + {absl::StrFormat("--port=%d", http_port)}}; + + // TODO: getcwd() so that it can be run from anywhere. + TENSORSTORE_CHECK_OK_AND_ASSIGN(child, SpawnSubprocess(options)); + } + }; + + start_child(); - /// Wait for the process to fully start. - for (auto deadline = absl::Now() + absl::Seconds(10);;) { + // Wait for the process to fully start. + for (auto deadline = absl::Now() + absl::Seconds(30);;) { // Once the process is running, start a gRPC server on the provided port. absl::SleepFor(absl::Milliseconds(200)); - auto start_grpc_future = GetDefaultHttpTransport()->IssueRequest( - HttpRequestBuilder( - "GET", absl::StrFormat("http://localhost:%d/start_grpc", http_port)) - .AddQueryParameter("port", absl::StrCat(grpc_port)) - .BuildRequest(), - absl::Cord(), absl::Seconds(15), absl::Seconds(15)); - if (start_grpc_future.status().ok()) break; - if (absl::Now() < deadline && - absl::IsUnavailable(start_grpc_future.status())) { + if (!absl::IsUnavailable(child->Join(/*block=*/false).status())) { + // Child is not running. Assume that it failed due to the port being in + // use. + start_child(); + } + + auto result = + GetDefaultHttpTransport() + ->IssueRequest( + HttpRequestBuilder( + "GET", absl::StrFormat("http://localhost:%d/start_grpc", + http_port)) + .BuildRequest(), + absl::Cord(), absl::Seconds(15), absl::Seconds(15)) + .result(); + + if (result.ok()) { + // Try to parse port number. + if (result->status_code != 200) { + ABSL_LOG(ERROR) << "Failed to start grpc server: " << *result; + } else if (!absl::SimpleAtoi(result->payload.Flatten(), &grpc_port)) { + ABSL_LOG(ERROR) << "Unexpected response from start_grpc: " << *result; + } else { + break; + } + } else { + ABSL_LOG(ERROR) << "Failed to start grpc server: " << result.status(); + } + if (absl::Now() < deadline && absl::IsUnavailable(result.status())) { continue; } // Deadline has expired & there's nothing to show for it. - TENSORSTORE_CHECK_OK(start_grpc_future.status()); + ABSL_LOG(FATAL) << "Failed to start testbench: " << result.status(); } running = true; } +StorageTestbench::~StorageTestbench() { + if (child) { + child->Kill().IgnoreError(); + auto join_result = child->Join(); + if (!join_result.ok()) { + ABSL_LOG(ERROR) << "Joining storage_testbench subprocess failed: " + << join_result.status(); + } + } +} + void StorageTestbench::CreateBucket(std::string bucket) { ABSL_CHECK(running); diff --git a/tensorstore/kvstore/gcs/gcs_testbench.h b/tensorstore/kvstore/gcs/gcs_testbench.h index 1be208cb4..455b9d74d 100644 --- a/tensorstore/kvstore/gcs/gcs_testbench.h +++ b/tensorstore/kvstore/gcs/gcs_testbench.h @@ -26,6 +26,7 @@ namespace gcs_testbench { class StorageTestbench { public: StorageTestbench(); + ~StorageTestbench(); // Spawns the subprocess and returns the grpc address. void SpawnProcess(); @@ -36,8 +37,8 @@ class StorageTestbench { std::string http_address(); std::string grpc_address(); - const int http_port; - const int grpc_port; + int http_port; + int grpc_port; bool running = false; std::optional child;