Skip to content

Commit

Permalink
Use kvstore::ReadResult named constructors in more places.
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 572689002
Change-Id: I9d9b1847f3d6dac6a0fbc597f95220af887c3050
  • Loading branch information
laramiel authored and copybara-github committed Oct 11, 2023
1 parent fd8241b commit eb69315
Show file tree
Hide file tree
Showing 17 changed files with 194 additions and 226 deletions.
20 changes: 8 additions & 12 deletions tensorstore/internal/cache/kvs_backed_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
///
/// Integrates `AsyncCache` with `kvstore::Driver`.

#include <cstddef>
#include <stddef.h>

#include <memory>
#include <optional>
#include <string>
Expand Down Expand Up @@ -265,9 +266,8 @@ class KvsBackedCache : public Parent {
read_state.stamp.time >= options.staleness_bound) {
ABSL_LOG_IF(INFO, TENSORSTORE_ASYNC_CACHE_DEBUG)
<< *this << "KvsWriteback: skipping because condition is satisfied";
kvstore::ReadResult read_result;
read_result.stamp = std::move(read_state.stamp);
return execution::set_value(receiver, std::move(read_result));
return execution::set_value(receiver, kvstore::ReadResult::Unspecified(
std::move(read_state.stamp)));
}
struct EncodeReceiverImpl {
TransactionNode* self_;
Expand All @@ -280,14 +280,10 @@ class KvsBackedCache : public Parent {
}
void set_cancel() { ABSL_UNREACHABLE(); } // COV_NF_LINE
void set_value(std::optional<absl::Cord> value) {
kvstore::ReadResult read_result;
read_result.stamp = std::move(update_.stamp);
if (value) {
read_result.state = kvstore::ReadResult::kValue;
read_result.value = std::move(*value);
} else {
read_result.state = kvstore::ReadResult::kMissing;
}
kvstore::ReadResult read_result =
value ? kvstore::ReadResult::Value(std::move(*value),
std::move(update_.stamp))
: kvstore::ReadResult::Missing(std::move(update_.stamp));

// FIXME: only save if committing, also could do this inside
// ApplyReceiverImpl
Expand Down
7 changes: 0 additions & 7 deletions tensorstore/internal/http/http_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,6 @@ std::optional<std::string> FormatCacheControlMaxAgeHeader(
std::optional<std::string> FormatStalenessBoundCacheControlHeader(
absl::Time staleness_bound);

/// `strptime`-compatible format string for the HTTP date header.
///
/// https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Date
///
/// Note that the time zone is always UTC and is specified as "GMT".
constexpr const char kHttpTimeFormat[] = "%a, %d %b %E4Y %H:%M:%S GMT";

/// Implements the builder pattern for HttpRequest.
class HttpRequestBuilder {
public:
Expand Down
7 changes: 7 additions & 0 deletions tensorstore/internal/http/http_response.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,13 @@ absl::Status HttpResponseCodeToStatus(
Result<std::tuple<size_t, size_t, size_t>> ParseContentRangeHeader(
const HttpResponse& response);

/// `strptime`-compatible format string for the HTTP date header.
///
/// https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Date
///
/// Note that the time zone is always UTC and is specified as "GMT".
constexpr const char kHttpTimeFormat[] = "%a, %d %b %E4Y %H:%M:%S GMT";

/// Attempts to parse a header using SimpleAtoi.
template <typename T>
std::optional<T> TryParseIntHeader(
Expand Down
2 changes: 2 additions & 0 deletions tensorstore/kvstore/file/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ tensorstore_cc_library(
"//tensorstore/internal:context_binding",
"//tensorstore/internal:file_io_concurrency_resource",
"//tensorstore/internal:flat_cord_builder",
"//tensorstore/internal:intrusive_ptr",
"//tensorstore/internal:os_error_code",
"//tensorstore/internal:uri_utils",
"//tensorstore/internal/cache_key",
Expand All @@ -43,6 +44,7 @@ tensorstore_cc_library(
"//tensorstore/util/execution",
"//tensorstore/util/execution:any_receiver",
"//tensorstore/util/execution:sender",
"//tensorstore/util/garbage_collection",
"@com_github_nlohmann_json//:nlohmann_json",
"@com_google_absl//absl/functional:function_ref",
"@com_google_absl//absl/status",
Expand Down
27 changes: 15 additions & 12 deletions tensorstore/kvstore/file/file_key_value_store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@
#include "tensorstore/internal/context_binding.h"
#include "tensorstore/internal/file_io_concurrency_resource.h"
#include "tensorstore/internal/flat_cord_builder.h"
#include "tensorstore/internal/intrusive_ptr.h"
#include "tensorstore/internal/json_binding/bindable.h"
#include "tensorstore/internal/json_binding/json_binding.h"
#include "tensorstore/internal/metrics/counter.h"
Expand All @@ -116,13 +117,18 @@
#include "tensorstore/kvstore/file/util.h"
#include "tensorstore/kvstore/generation.h"
#include "tensorstore/kvstore/key_range.h"
#include "tensorstore/kvstore/operations.h"
#include "tensorstore/kvstore/read_result.h"
#include "tensorstore/kvstore/registry.h"
#include "tensorstore/kvstore/spec.h"
#include "tensorstore/kvstore/supported_features.h"
#include "tensorstore/kvstore/url_registry.h"
#include "tensorstore/util/execution/any_receiver.h"
#include "tensorstore/util/execution/execution.h"
#include "tensorstore/util/execution/sender.h"
#include "tensorstore/util/executor.h"
#include "tensorstore/util/future.h"
#include "tensorstore/util/garbage_collection/fwd.h"
#include "tensorstore/util/quote_string.h"
#include "tensorstore/util/result.h"
#include "tensorstore/util/status.h"
Expand Down Expand Up @@ -614,24 +620,21 @@ struct ReadTask {
kvstore::ReadOptions options;

Result<ReadResult> operator()() const {
ReadResult read_result;
read_result.stamp.time = absl::Now();
TimestampedStorageGeneration stamp;
stamp.time = absl::Now();
int64_t size;
TENSORSTORE_ASSIGN_OR_RETURN(
auto fd,
OpenValueFile(full_path.c_str(), &read_result.stamp.generation, &size));
auto fd, OpenValueFile(full_path.c_str(), &stamp.generation, &size));
if (!fd.valid()) {
read_result.state = ReadResult::kMissing;
return read_result;
return kvstore::ReadResult::Missing(stamp.time);
}
if (read_result.stamp.generation == options.if_not_equal ||
if (stamp.generation == options.if_not_equal ||
(!StorageGeneration::IsUnknown(options.if_equal) &&
read_result.stamp.generation != options.if_equal)) {
return read_result;
stamp.generation != options.if_equal)) {
return kvstore::ReadResult::Unspecified(std::move(stamp));
}
TENSORSTORE_ASSIGN_OR_RETURN(auto byte_range,
options.byte_range.Validate(size));
read_result.state = ReadResult::kValue;
internal::FlatCordBuilder buffer(byte_range.size());
size_t offset = 0;
while (offset < buffer.size()) {
Expand All @@ -649,8 +652,8 @@ struct ReadTask {
}
return StatusFromErrno("Error reading file: ", full_path);
}
read_result.value = std::move(buffer).Build();
return read_result;
return kvstore::ReadResult::Value(std::move(buffer).Build(),
std::move(stamp));
}
};

Expand Down
1 change: 0 additions & 1 deletion tensorstore/kvstore/gcs_grpc/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ tensorstore_cc_library(
"//tensorstore/internal:intrusive_ptr",
"//tensorstore/internal:retry",
"//tensorstore/internal:schedule_at",
"//tensorstore/internal:type_traits",
"//tensorstore/internal:uri_utils",
"//tensorstore/internal/cache_key",
"//tensorstore/internal/grpc:utils",
Expand Down
49 changes: 23 additions & 26 deletions tensorstore/kvstore/gcs_grpc/gcs_grpc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
#include <optional>
#include <string>
#include <string_view>
#include <type_traits>
#include <utility>

#include "absl/base/thread_annotations.h"
Expand All @@ -54,7 +53,6 @@
#include "tensorstore/internal/metrics/histogram.h"
#include "tensorstore/internal/retry.h"
#include "tensorstore/internal/schedule_at.h"
#include "tensorstore/internal/type_traits.h"
#include "tensorstore/internal/uri_utils.h"
#include "tensorstore/kvstore/byte_range.h"
#include "tensorstore/kvstore/driver.h"
Expand Down Expand Up @@ -346,7 +344,8 @@ struct ReadTask : public internal::AtomicReferenceCount<ReadTask>,
ReadObjectRequest request_;
ReadObjectResponse response_;
std::optional<absl::crc32c_t> crc32c_;
kvstore::ReadResult read_result_;
TimestampedStorageGeneration storage_generation_;
absl::Cord value_;

int attempt_ = 0;
absl::Mutex mutex_;
Expand Down Expand Up @@ -395,9 +394,8 @@ struct ReadTask : public internal::AtomicReferenceCount<ReadTask>,
if (!promise_.result_needed()) {
return;
}
read_result_.stamp.time = absl::Now();
read_result_.stamp.generation = StorageGeneration::Unknown();
read_result_.value.Clear();
storage_generation_ =
TimestampedStorageGeneration{StorageGeneration::Unknown(), absl::Now()};

ABSL_LOG_IF(INFO, TENSORSTORE_GCS_GRPC_DEBUG)
<< "Read: " << this << " " << ConciseDebugString(request_);
Expand Down Expand Up @@ -426,7 +424,7 @@ struct ReadTask : public internal::AtomicReferenceCount<ReadTask>,
return;
}
if (response_.has_metadata()) {
read_result_.stamp.generation =
storage_generation_.generation =
StorageGeneration::FromUint64(response_.metadata().generation());
}
if (response_.has_object_checksums() &&
Expand Down Expand Up @@ -472,7 +470,7 @@ struct ReadTask : public internal::AtomicReferenceCount<ReadTask>,
}
}
if (response_.has_checksummed_data()) {
read_result_.value.Append(response_.checksummed_data().content());
value_.Append(response_.checksummed_data().content());
}

// Issue next request, if necessary.
Expand Down Expand Up @@ -514,47 +512,46 @@ struct ReadTask : public internal::AtomicReferenceCount<ReadTask>,
tensorstore::StrCat("All retry attempts failed: ", status));
}

auto latency = absl::Now() - read_result_.stamp.time;
auto latency = absl::Now() - storage_generation_.time;
gcs_grpc_read_latency_ms.Observe(absl::ToInt64Milliseconds(latency));

if (!status.ok()) {
if (absl::IsFailedPrecondition(status) || absl::IsAborted(status)) {
// Failed precondition is set when either the if_generation_match or
// the if_generation_not_match fails.
read_result_.value.Clear();
if (!StorageGeneration::IsUnknown(options_.if_equal)) {
read_result_.stamp.generation = StorageGeneration::Unknown();
storage_generation_.generation = StorageGeneration::Unknown();
} else {
read_result_.stamp.generation = options_.if_not_equal;
storage_generation_.generation = options_.if_not_equal;
}
promise_.SetResult(std::move(read_result_));
} else if (absl::IsNotFound(status)) {
read_result_.stamp.generation = StorageGeneration::NoValue();
read_result_.state = kvstore::ReadResult::kMissing;
read_result_.value.Clear();
promise_.SetResult(std::move(read_result_));
} else {
promise_.SetResult(std::move(status));
promise_.SetResult(
kvstore::ReadResult::Unspecified(std::move(storage_generation_)));
return;
}
if (absl::IsNotFound(status)) {
promise_.SetResult(
kvstore::ReadResult::Missing(storage_generation_.time));
return;
}
promise_.SetResult(std::move(status));
return;
}
if (StorageGeneration::IsUnknown(read_result_.stamp.generation)) {
if (StorageGeneration::IsUnknown(storage_generation_.generation)) {
// Bad metadata was returned by BlobService; this is unexpected, and
// usually indicates a bug in our testing.
promise_.SetResult(
absl::InternalError("Object missing a valid generation"));
return;
}
if (options_.byte_range.size() == 0) {
read_result_.value.Clear();
} else if (crc32c_.has_value() &&
ComputeCrc32c(read_result_.value) != *crc32c_) {
value_.Clear();
} else if (crc32c_.has_value() && ComputeCrc32c(value_) != *crc32c_) {
promise_.SetResult(
absl::DataLossError("Object crc32c does not match expected crc32c"));
return;
}
read_result_.state = kvstore::ReadResult::kValue;
promise_.SetResult(std::move(read_result_));
promise_.SetResult(kvstore::ReadResult::Value(
std::move(value_), std::move(storage_generation_)));
}
};

Expand Down
31 changes: 13 additions & 18 deletions tensorstore/kvstore/gcs_http/gcs_key_value_store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -588,30 +588,25 @@ struct ReadTask : public RateLimiterNode,

// Parse `Date` header from response to correctly handle cached responses.
// The GCS servers always send a `date` header.
kvstore::ReadResult read_result;
read_result.stamp.time = start_time_;

switch (httpresponse.status_code) {
case 204:
case 404:
// Object not found.
read_result.stamp.generation = StorageGeneration::NoValue();
read_result.state = kvstore::ReadResult::kMissing;
return read_result;
return kvstore::ReadResult::Missing(start_time_);
case 412:
// "Failed precondition": indicates the ifGenerationMatch condition
// did not hold.
// NOTE: This is returned even when the object does not exist.
read_result.stamp.generation = StorageGeneration::Unknown();
return read_result;
return kvstore::ReadResult::Unspecified(TimestampedStorageGeneration{
StorageGeneration::Unknown(), start_time_});
case 304:
// "Not modified": indicates that the ifGenerationNotMatch condition
// did not hold.
read_result.stamp.generation = options.if_not_equal;
return read_result;
return kvstore::ReadResult::Unspecified(
TimestampedStorageGeneration{options.if_not_equal, start_time_});
}

read_result.state = kvstore::ReadResult::kValue;
absl::Cord value;
ObjectMetadata metadata;
if (options.byte_range.size() != 0) {
if (httpresponse.status_code != 206) {
Expand All @@ -620,10 +615,9 @@ struct ReadTask : public RateLimiterNode,
auto byte_range,
options.byte_range.Validate(httpresponse.payload.size()));

read_result.value =
internal::GetSubCord(httpresponse.payload, byte_range);
value = internal::GetSubCord(httpresponse.payload, byte_range);
} else {
read_result.value = httpresponse.payload;
value = httpresponse.payload;

// Server should return a parseable content-range header.
TENSORSTORE_ASSIGN_OR_RETURN(auto content_range_tuple,
Expand All @@ -633,7 +627,7 @@ struct ReadTask : public RateLimiterNode,
(options.byte_range.inclusive_min != -1 &&
options.byte_range.inclusive_min !=
std::get<0>(content_range_tuple)) ||
(request_size >= 0 && request_size != read_result.value.size())) {
(request_size >= 0 && request_size != value.size())) {
// Return an error when the response does not start at the requested
// offset of when the response is smaller than the desired size.
return absl::OutOfRangeError(
Expand All @@ -653,9 +647,10 @@ struct ReadTask : public RateLimiterNode,
ParseObjectMetadata(cord.Flatten()));
}

read_result.stamp.generation =
StorageGeneration::FromUint64(metadata.generation);
return read_result;
auto generation = StorageGeneration::FromUint64(metadata.generation);
return kvstore::ReadResult::Value(
std::move(value),
TimestampedStorageGeneration{std::move(generation), start_time_});
}
};

Expand Down
3 changes: 1 addition & 2 deletions tensorstore/kvstore/http/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ tensorstore_cc_library(
"//tensorstore/internal/http:curl_transport",
"//tensorstore/internal/http:http_header",
"//tensorstore/internal/json_binding",
"//tensorstore/internal/json_binding:bindable",
"//tensorstore/internal/metrics",
"//tensorstore/kvstore",
"//tensorstore/kvstore:byte_range",
Expand All @@ -43,9 +42,9 @@ tensorstore_cc_library(
"//tensorstore/util:status",
"//tensorstore/util:str_cat",
"//tensorstore/util/garbage_collection",
"@com_github_nlohmann_json//:nlohmann_json",
"@com_google_absl//absl/status",
"@com_google_absl//absl/strings",
"@com_google_absl//absl/strings:cord",
"@com_google_absl//absl/time",
],
alwayslink = 1,
Expand Down
Loading

0 comments on commit eb69315

Please sign in to comment.