Skip to content

Commit

Permalink
Reduce flakiness of gcs_testbench_test
Browse files Browse the repository at this point in the history
Previously, gcs_testbench_test could fail to start the
storage_testbench for several reasons:

1. Deadline exceeded, especially for AddressSanitizer builds.
2. CURLE_UNSUPPORTED_PROTOCOL error returned from "start_grpc"
   request, if request is made just while the server is starting.
3. Port is already in use.

This commit addresses:

- problem (1) by increasing the deadling to 30 seconds;

- problem (2) by treating that error as UNAVAILABLE and retrying.

- problem (3) by checking if the subprocess has exited and restarting
  with a new port, and additionally allowing the grpc port to be
  chosen automatically since the chosen port is returned in the HTTP
  response.

In order to support testing if a process has exited, this adds a a
non-blocking option to Subprocess::Join.

PiperOrigin-RevId: 561557166
Change-Id: I6cd3fb4dfbaf96cb8a2f3e721ec210d56476f832
  • Loading branch information
jbms authored and copybara-github committed Aug 31, 2023
1 parent e371f6e commit f3ed94c
Show file tree
Hide file tree
Showing 8 changed files with 101 additions and 40 deletions.
4 changes: 3 additions & 1 deletion tensorstore/internal/http/curl_wrappers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ absl::Status CurlCodeToStatus(CURLcode code, std::string_view detail,
case CURLE_RECV_ERROR:
case CURLE_SEND_ERROR:
case CURLE_SSL_CONNECT_ERROR:
// Curl sometimes returns `CURLE_UNSUPPORTED_PROTOCOL` in the case of a
// malformed response.
case CURLE_UNSUPPORTED_PROTOCOL:
error_code = absl::StatusCode::kUnavailable;
break;

Expand Down Expand Up @@ -93,7 +96,6 @@ absl::Status CurlCodeToStatus(CURLcode code, std::string_view detail,
// and almost always indicate a precondition or missing required feature.
case CURLE_BAD_FUNCTION_ARGUMENT:
case CURLE_OUT_OF_MEMORY:
case CURLE_UNSUPPORTED_PROTOCOL:
case CURLE_NOT_BUILT_IN:
case CURLE_UNKNOWN_OPTION:
case CURLE_BAD_DOWNLOAD_RESUME:
Expand Down
2 changes: 1 addition & 1 deletion tensorstore/internal/http/curl_wrappers_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ TEST(CurlFactoryTest, CurlCodeToStatus) {
{CURLE_HTTP2, absl::StatusCode::kUnavailable},
{CURLE_BAD_DOWNLOAD_RESUME, absl::StatusCode::kInternal},
{CURLE_RANGE_ERROR, absl::StatusCode::kInternal},
{CURLE_UNSUPPORTED_PROTOCOL, absl::StatusCode::kInternal},
{CURLE_UNSUPPORTED_PROTOCOL, absl::StatusCode::kUnavailable},
};

for (auto const& t : expected_codes) {
Expand Down
31 changes: 22 additions & 9 deletions tensorstore/internal/subprocess.cc
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ struct Subprocess::Impl {
~Impl();

absl::Status Kill(int signal);
Result<int> Join();
Result<int> Join(bool block);

PROCESS_INFORMATION pi_;
};
Expand All @@ -142,14 +142,24 @@ absl::Status Subprocess::Impl::Kill(int signal) {
return StatusFromOsError(GetLastErrorCode(), "On Subprocess::Kill");
}

Result<int> Subprocess::Impl::Join() {
// If stdin were a pipe, close it here.
DWORD wait_status = WaitForSingleObject(pi_.hProcess, INFINITE);
if (wait_status == WAIT_OBJECT_0) {
Result<int> Subprocess::Impl::Join(bool block) {
if (!block) {
DWORD process_exit_code = 0;
if (0 != GetExitCodeProcess(pi_.hProcess, &process_exit_code)) {
if (process_exit_code == STILL_ACTIVE) {
return absl::UnavailableError("");
}
return static_cast<int>(process_exit_code);
}
} else {
// If stdin were a pipe, close it here.
DWORD wait_status = WaitForSingleObject(pi_.hProcess, INFINITE);
if (wait_status == WAIT_OBJECT_0) {
DWORD process_exit_code = 0;
if (0 != GetExitCodeProcess(pi_.hProcess, &process_exit_code)) {
return static_cast<int>(process_exit_code);
}
}
}
return StatusFromOsError(GetLastErrorCode(), "Subprocess::Join failed");
}
Expand Down Expand Up @@ -233,7 +243,7 @@ struct Subprocess::Impl {
~Impl();

absl::Status Kill(int signal);
Result<int> Join();
Result<int> Join(bool block);

std::atomic<pid_t> child_pid_{-1};
std::atomic<int> exit_code_{-1};
Expand All @@ -258,7 +268,7 @@ absl::Status Subprocess::Impl::Kill(int signal) {
return StatusFromOsError(GetLastErrorCode(), "On Subprocess::Kill");
}

Result<int> Subprocess::Impl::Join() {
Result<int> Subprocess::Impl::Join(bool block) {
// If stdin were a pipe, close it here.
int status;
for (;;) {
Expand All @@ -269,10 +279,13 @@ Result<int> Subprocess::Impl::Join() {
return exit_code_.load();
}

int result = waitpid(pid, &status, 0);
int result = waitpid(pid, &status, block ? 0 : WNOHANG);
if ((result < 0) && !retry(errno)) {
return StatusFromOsError(GetLastErrorCode(), "Subprocess::Join failed");
}
if (!block && result == 0) {
return absl::UnavailableError("");
}
if (result != pid) continue;
if (WIFEXITED(status)) {
int exit_code = WEXITSTATUS(status);
Expand Down Expand Up @@ -361,7 +374,7 @@ Subprocess::~Subprocess() = default;

absl::Status Subprocess::Kill(int signal) const { return impl_->Kill(signal); }

Result<int> Subprocess::Join() const { return impl_->Join(); }
Result<int> Subprocess::Join(bool block) const { return impl_->Join(block); }

} // namespace internal
} // namespace tensorstore
5 changes: 4 additions & 1 deletion tensorstore/internal/subprocess.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,10 @@ class Subprocess {

/// Block until the process exits, returning the exit status. If necessary
/// file handles to the process may be closed.
Result<int> Join() const;
///
/// If `block` is `false`, return immediately with
/// `absl::StatusCode::kUnavailable` if the process has not already exited.
Result<int> Join(bool block = true) const;

private:
friend Result<Subprocess> SpawnSubprocess(const SubprocessOptions& options);
Expand Down
4 changes: 4 additions & 0 deletions tensorstore/internal/subprocess_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <string_view>
#include <vector>

#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include "absl/log/absl_log.h"
#include "absl/status/status.h"
Expand Down Expand Up @@ -58,6 +59,9 @@ TEST(SubprocessTest, Kill) {
auto child = SpawnSubprocess(opts);
EXPECT_TRUE(child.ok());

EXPECT_THAT(child->Join(/*block=*/false),
tensorstore::MatchesStatus(absl::StatusCode::kUnavailable));

child->Kill().IgnoreError();

// exit code on killed process is os dependent.
Expand Down
2 changes: 2 additions & 0 deletions tensorstore/kvstore/gcs/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,9 @@ tensorstore_cc_library(
"@com_github_grpc_grpc//:grpc++",
"@com_google_absl//absl/flags:flag",
"@com_google_absl//absl/log:absl_check",
"@com_google_absl//absl/log:absl_log",
"@com_google_absl//absl/status",
"@com_google_absl//absl/strings",
"@com_google_absl//absl/strings:str_format",
"@com_google_absl//absl/time",
"@com_google_googleapis//google/storage/v2:storage_cc_grpc",
Expand Down
88 changes: 62 additions & 26 deletions tensorstore/kvstore/gcs/gcs_testbench.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@
#include <optional>
#include <string>


#include "absl/flags/flag.h"
#include "absl/log/absl_check.h"
#include "absl/log/absl_log.h"
#include "absl/status/status.h"
#include "absl/strings/numbers.h"
#include "absl/strings/str_format.h"
#include "absl/time/clock.h"
#include "absl/time/time.h"
#include "grpcpp/channel.h" // third_party
#include "grpcpp/client_context.h" // third_party
Expand Down Expand Up @@ -55,12 +57,7 @@ using ::tensorstore::internal_http::HttpRequestBuilder;
using ::tensorstore::transport_test_utils::TryPickUnusedPort;
using ::google::storage::v2::Storage;

StorageTestbench::StorageTestbench()
: http_port(TryPickUnusedPort().value_or(0)),
grpc_port(TryPickUnusedPort().value_or(0)) {
ABSL_CHECK(http_port > 0);
ABSL_CHECK(grpc_port > 0);
}
StorageTestbench::StorageTestbench() = default;

std::string StorageTestbench::http_address() {
return absl::StrFormat("localhost:%d", http_port);
Expand All @@ -72,38 +69,77 @@ std::string StorageTestbench::grpc_address() {

void StorageTestbench::SpawnProcess() {
if (running) return;
ABSL_LOG(INFO) << "Spawning testbench: http://" << http_address();

{
SubprocessOptions options{absl::GetFlag(FLAGS_testbench_binary),
{absl::StrFormat("--port=%d", http_port)}};
const auto start_child = [&] {
http_port = TryPickUnusedPort().value_or(0);
ABSL_CHECK(http_port > 0);

/// TODO: getcwd() so that it can be run from anywhere.
TENSORSTORE_CHECK_OK_AND_ASSIGN(child, SpawnSubprocess(options));
}
ABSL_LOG(INFO) << "Spawning testbench: http://" << http_address();

{
SubprocessOptions options{absl::GetFlag(FLAGS_testbench_binary),
{absl::StrFormat("--port=%d", http_port)}};

// TODO: getcwd() so that it can be run from anywhere.
TENSORSTORE_CHECK_OK_AND_ASSIGN(child, SpawnSubprocess(options));
}
};

start_child();

/// Wait for the process to fully start.
for (auto deadline = absl::Now() + absl::Seconds(10);;) {
// Wait for the process to fully start.
for (auto deadline = absl::Now() + absl::Seconds(30);;) {
// Once the process is running, start a gRPC server on the provided port.
absl::SleepFor(absl::Milliseconds(200));
auto start_grpc_future = GetDefaultHttpTransport()->IssueRequest(
HttpRequestBuilder(
"GET", absl::StrFormat("http://localhost:%d/start_grpc", http_port))
.AddQueryParameter("port", absl::StrCat(grpc_port))
.BuildRequest(),
absl::Cord(), absl::Seconds(15), absl::Seconds(15));
if (start_grpc_future.status().ok()) break;
if (absl::Now() < deadline &&
absl::IsUnavailable(start_grpc_future.status())) {
if (!absl::IsUnavailable(child->Join(/*block=*/false).status())) {
// Child is not running. Assume that it failed due to the port being in
// use.
start_child();
}

auto result =
GetDefaultHttpTransport()
->IssueRequest(
HttpRequestBuilder(
"GET", absl::StrFormat("http://localhost:%d/start_grpc",
http_port))
.BuildRequest(),
absl::Cord(), absl::Seconds(15), absl::Seconds(15))
.result();

if (result.ok()) {
// Try to parse port number.
if (result->status_code != 200) {
ABSL_LOG(ERROR) << "Failed to start grpc server: " << *result;
} else if (!absl::SimpleAtoi(result->payload.Flatten(), &grpc_port)) {
ABSL_LOG(ERROR) << "Unexpected response from start_grpc: " << *result;
} else {
break;
}
} else {
ABSL_LOG(ERROR) << "Failed to start grpc server: " << result.status();
}
if (absl::Now() < deadline && absl::IsUnavailable(result.status())) {
continue;
}
// Deadline has expired & there's nothing to show for it.
TENSORSTORE_CHECK_OK(start_grpc_future.status());
ABSL_LOG(FATAL) << "Failed to start testbench: " << result.status();
}

running = true;
}

StorageTestbench::~StorageTestbench() {
if (child) {
child->Kill().IgnoreError();
auto join_result = child->Join();
if (!join_result.ok()) {
ABSL_LOG(ERROR) << "Joining storage_testbench subprocess failed: "
<< join_result.status();
}
}
}

void StorageTestbench::CreateBucket(std::string bucket) {
ABSL_CHECK(running);

Expand Down
5 changes: 3 additions & 2 deletions tensorstore/kvstore/gcs/gcs_testbench.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ namespace gcs_testbench {
class StorageTestbench {
public:
StorageTestbench();
~StorageTestbench();

// Spawns the subprocess and returns the grpc address.
void SpawnProcess();
Expand All @@ -36,8 +37,8 @@ class StorageTestbench {
std::string http_address();
std::string grpc_address();

const int http_port;
const int grpc_port;
int http_port;
int grpc_port;
bool running = false;

std::optional<tensorstore::internal::Subprocess> child;
Expand Down

0 comments on commit f3ed94c

Please sign in to comment.