diff --git a/tensorstore/internal/cache/kvs_backed_cache.h b/tensorstore/internal/cache/kvs_backed_cache.h index 642ead5f6..332b375f7 100644 --- a/tensorstore/internal/cache/kvs_backed_cache.h +++ b/tensorstore/internal/cache/kvs_backed_cache.h @@ -19,7 +19,8 @@ /// /// Integrates `AsyncCache` with `kvstore::Driver`. -#include +#include + #include #include #include @@ -265,9 +266,8 @@ class KvsBackedCache : public Parent { read_state.stamp.time >= options.staleness_bound) { ABSL_LOG_IF(INFO, TENSORSTORE_ASYNC_CACHE_DEBUG) << *this << "KvsWriteback: skipping because condition is satisfied"; - kvstore::ReadResult read_result; - read_result.stamp = std::move(read_state.stamp); - return execution::set_value(receiver, std::move(read_result)); + return execution::set_value(receiver, kvstore::ReadResult::Unspecified( + std::move(read_state.stamp))); } struct EncodeReceiverImpl { TransactionNode* self_; @@ -280,14 +280,10 @@ class KvsBackedCache : public Parent { } void set_cancel() { ABSL_UNREACHABLE(); } // COV_NF_LINE void set_value(std::optional value) { - kvstore::ReadResult read_result; - read_result.stamp = std::move(update_.stamp); - if (value) { - read_result.state = kvstore::ReadResult::kValue; - read_result.value = std::move(*value); - } else { - read_result.state = kvstore::ReadResult::kMissing; - } + kvstore::ReadResult read_result = + value ? kvstore::ReadResult::Value(std::move(*value), + std::move(update_.stamp)) + : kvstore::ReadResult::Missing(std::move(update_.stamp)); // FIXME: only save if committing, also could do this inside // ApplyReceiverImpl diff --git a/tensorstore/internal/http/http_request.h b/tensorstore/internal/http/http_request.h index 9fa57c5f0..4ac804aa7 100644 --- a/tensorstore/internal/http/http_request.h +++ b/tensorstore/internal/http/http_request.h @@ -64,13 +64,6 @@ std::optional FormatCacheControlMaxAgeHeader( std::optional FormatStalenessBoundCacheControlHeader( absl::Time staleness_bound); -/// `strptime`-compatible format string for the HTTP date header. -/// -/// https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Date -/// -/// Note that the time zone is always UTC and is specified as "GMT". -constexpr const char kHttpTimeFormat[] = "%a, %d %b %E4Y %H:%M:%S GMT"; - /// Implements the builder pattern for HttpRequest. class HttpRequestBuilder { public: diff --git a/tensorstore/internal/http/http_response.h b/tensorstore/internal/http/http_response.h index fd5fafb96..85e896676 100644 --- a/tensorstore/internal/http/http_response.h +++ b/tensorstore/internal/http/http_response.h @@ -73,6 +73,13 @@ absl::Status HttpResponseCodeToStatus( Result> ParseContentRangeHeader( const HttpResponse& response); +/// `strptime`-compatible format string for the HTTP date header. +/// +/// https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Date +/// +/// Note that the time zone is always UTC and is specified as "GMT". +constexpr const char kHttpTimeFormat[] = "%a, %d %b %E4Y %H:%M:%S GMT"; + /// Attempts to parse a header using SimpleAtoi. template std::optional TryParseIntHeader( diff --git a/tensorstore/kvstore/file/BUILD b/tensorstore/kvstore/file/BUILD index c542eac7c..2ffea5ead 100644 --- a/tensorstore/kvstore/file/BUILD +++ b/tensorstore/kvstore/file/BUILD @@ -24,6 +24,7 @@ tensorstore_cc_library( "//tensorstore/internal:context_binding", "//tensorstore/internal:file_io_concurrency_resource", "//tensorstore/internal:flat_cord_builder", + "//tensorstore/internal:intrusive_ptr", "//tensorstore/internal:os_error_code", "//tensorstore/internal:uri_utils", "//tensorstore/internal/cache_key", @@ -43,6 +44,7 @@ tensorstore_cc_library( "//tensorstore/util/execution", "//tensorstore/util/execution:any_receiver", "//tensorstore/util/execution:sender", + "//tensorstore/util/garbage_collection", "@com_github_nlohmann_json//:nlohmann_json", "@com_google_absl//absl/functional:function_ref", "@com_google_absl//absl/status", diff --git a/tensorstore/kvstore/file/file_key_value_store.cc b/tensorstore/kvstore/file/file_key_value_store.cc index 6a9fb3fd7..0af04b728 100644 --- a/tensorstore/kvstore/file/file_key_value_store.cc +++ b/tensorstore/kvstore/file/file_key_value_store.cc @@ -106,6 +106,7 @@ #include "tensorstore/internal/context_binding.h" #include "tensorstore/internal/file_io_concurrency_resource.h" #include "tensorstore/internal/flat_cord_builder.h" +#include "tensorstore/internal/intrusive_ptr.h" #include "tensorstore/internal/json_binding/bindable.h" #include "tensorstore/internal/json_binding/json_binding.h" #include "tensorstore/internal/metrics/counter.h" @@ -116,13 +117,18 @@ #include "tensorstore/kvstore/file/util.h" #include "tensorstore/kvstore/generation.h" #include "tensorstore/kvstore/key_range.h" +#include "tensorstore/kvstore/operations.h" +#include "tensorstore/kvstore/read_result.h" #include "tensorstore/kvstore/registry.h" +#include "tensorstore/kvstore/spec.h" +#include "tensorstore/kvstore/supported_features.h" #include "tensorstore/kvstore/url_registry.h" #include "tensorstore/util/execution/any_receiver.h" #include "tensorstore/util/execution/execution.h" #include "tensorstore/util/execution/sender.h" #include "tensorstore/util/executor.h" #include "tensorstore/util/future.h" +#include "tensorstore/util/garbage_collection/fwd.h" #include "tensorstore/util/quote_string.h" #include "tensorstore/util/result.h" #include "tensorstore/util/status.h" @@ -614,24 +620,21 @@ struct ReadTask { kvstore::ReadOptions options; Result operator()() const { - ReadResult read_result; - read_result.stamp.time = absl::Now(); + TimestampedStorageGeneration stamp; + stamp.time = absl::Now(); int64_t size; TENSORSTORE_ASSIGN_OR_RETURN( - auto fd, - OpenValueFile(full_path.c_str(), &read_result.stamp.generation, &size)); + auto fd, OpenValueFile(full_path.c_str(), &stamp.generation, &size)); if (!fd.valid()) { - read_result.state = ReadResult::kMissing; - return read_result; + return kvstore::ReadResult::Missing(stamp.time); } - if (read_result.stamp.generation == options.if_not_equal || + if (stamp.generation == options.if_not_equal || (!StorageGeneration::IsUnknown(options.if_equal) && - read_result.stamp.generation != options.if_equal)) { - return read_result; + stamp.generation != options.if_equal)) { + return kvstore::ReadResult::Unspecified(std::move(stamp)); } TENSORSTORE_ASSIGN_OR_RETURN(auto byte_range, options.byte_range.Validate(size)); - read_result.state = ReadResult::kValue; internal::FlatCordBuilder buffer(byte_range.size()); size_t offset = 0; while (offset < buffer.size()) { @@ -649,8 +652,8 @@ struct ReadTask { } return StatusFromErrno("Error reading file: ", full_path); } - read_result.value = std::move(buffer).Build(); - return read_result; + return kvstore::ReadResult::Value(std::move(buffer).Build(), + std::move(stamp)); } }; diff --git a/tensorstore/kvstore/gcs_grpc/BUILD b/tensorstore/kvstore/gcs_grpc/BUILD index 212b6630a..c54763e23 100644 --- a/tensorstore/kvstore/gcs_grpc/BUILD +++ b/tensorstore/kvstore/gcs_grpc/BUILD @@ -38,7 +38,6 @@ tensorstore_cc_library( "//tensorstore/internal:intrusive_ptr", "//tensorstore/internal:retry", "//tensorstore/internal:schedule_at", - "//tensorstore/internal:type_traits", "//tensorstore/internal:uri_utils", "//tensorstore/internal/cache_key", "//tensorstore/internal/grpc:utils", diff --git a/tensorstore/kvstore/gcs_grpc/gcs_grpc.cc b/tensorstore/kvstore/gcs_grpc/gcs_grpc.cc index 13de0b334..b5487f109 100644 --- a/tensorstore/kvstore/gcs_grpc/gcs_grpc.cc +++ b/tensorstore/kvstore/gcs_grpc/gcs_grpc.cc @@ -28,7 +28,6 @@ #include #include #include -#include #include #include "absl/base/thread_annotations.h" @@ -54,7 +53,6 @@ #include "tensorstore/internal/metrics/histogram.h" #include "tensorstore/internal/retry.h" #include "tensorstore/internal/schedule_at.h" -#include "tensorstore/internal/type_traits.h" #include "tensorstore/internal/uri_utils.h" #include "tensorstore/kvstore/byte_range.h" #include "tensorstore/kvstore/driver.h" @@ -346,7 +344,8 @@ struct ReadTask : public internal::AtomicReferenceCount, ReadObjectRequest request_; ReadObjectResponse response_; std::optional crc32c_; - kvstore::ReadResult read_result_; + TimestampedStorageGeneration storage_generation_; + absl::Cord value_; int attempt_ = 0; absl::Mutex mutex_; @@ -395,9 +394,8 @@ struct ReadTask : public internal::AtomicReferenceCount, if (!promise_.result_needed()) { return; } - read_result_.stamp.time = absl::Now(); - read_result_.stamp.generation = StorageGeneration::Unknown(); - read_result_.value.Clear(); + storage_generation_ = + TimestampedStorageGeneration{StorageGeneration::Unknown(), absl::Now()}; ABSL_LOG_IF(INFO, TENSORSTORE_GCS_GRPC_DEBUG) << "Read: " << this << " " << ConciseDebugString(request_); @@ -426,7 +424,7 @@ struct ReadTask : public internal::AtomicReferenceCount, return; } if (response_.has_metadata()) { - read_result_.stamp.generation = + storage_generation_.generation = StorageGeneration::FromUint64(response_.metadata().generation()); } if (response_.has_object_checksums() && @@ -472,7 +470,7 @@ struct ReadTask : public internal::AtomicReferenceCount, } } if (response_.has_checksummed_data()) { - read_result_.value.Append(response_.checksummed_data().content()); + value_.Append(response_.checksummed_data().content()); } // Issue next request, if necessary. @@ -514,31 +512,31 @@ struct ReadTask : public internal::AtomicReferenceCount, tensorstore::StrCat("All retry attempts failed: ", status)); } - auto latency = absl::Now() - read_result_.stamp.time; + auto latency = absl::Now() - storage_generation_.time; gcs_grpc_read_latency_ms.Observe(absl::ToInt64Milliseconds(latency)); if (!status.ok()) { if (absl::IsFailedPrecondition(status) || absl::IsAborted(status)) { // Failed precondition is set when either the if_generation_match or // the if_generation_not_match fails. - read_result_.value.Clear(); if (!StorageGeneration::IsUnknown(options_.if_equal)) { - read_result_.stamp.generation = StorageGeneration::Unknown(); + storage_generation_.generation = StorageGeneration::Unknown(); } else { - read_result_.stamp.generation = options_.if_not_equal; + storage_generation_.generation = options_.if_not_equal; } - promise_.SetResult(std::move(read_result_)); - } else if (absl::IsNotFound(status)) { - read_result_.stamp.generation = StorageGeneration::NoValue(); - read_result_.state = kvstore::ReadResult::kMissing; - read_result_.value.Clear(); - promise_.SetResult(std::move(read_result_)); - } else { - promise_.SetResult(std::move(status)); + promise_.SetResult( + kvstore::ReadResult::Unspecified(std::move(storage_generation_))); + return; } + if (absl::IsNotFound(status)) { + promise_.SetResult( + kvstore::ReadResult::Missing(storage_generation_.time)); + return; + } + promise_.SetResult(std::move(status)); return; } - if (StorageGeneration::IsUnknown(read_result_.stamp.generation)) { + if (StorageGeneration::IsUnknown(storage_generation_.generation)) { // Bad metadata was returned by BlobService; this is unexpected, and // usually indicates a bug in our testing. promise_.SetResult( @@ -546,15 +544,14 @@ struct ReadTask : public internal::AtomicReferenceCount, return; } if (options_.byte_range.size() == 0) { - read_result_.value.Clear(); - } else if (crc32c_.has_value() && - ComputeCrc32c(read_result_.value) != *crc32c_) { + value_.Clear(); + } else if (crc32c_.has_value() && ComputeCrc32c(value_) != *crc32c_) { promise_.SetResult( absl::DataLossError("Object crc32c does not match expected crc32c")); return; } - read_result_.state = kvstore::ReadResult::kValue; - promise_.SetResult(std::move(read_result_)); + promise_.SetResult(kvstore::ReadResult::Value( + std::move(value_), std::move(storage_generation_))); } }; diff --git a/tensorstore/kvstore/gcs_http/gcs_key_value_store.cc b/tensorstore/kvstore/gcs_http/gcs_key_value_store.cc index 7858ff2b8..752968772 100644 --- a/tensorstore/kvstore/gcs_http/gcs_key_value_store.cc +++ b/tensorstore/kvstore/gcs_http/gcs_key_value_store.cc @@ -588,30 +588,25 @@ struct ReadTask : public RateLimiterNode, // Parse `Date` header from response to correctly handle cached responses. // The GCS servers always send a `date` header. - kvstore::ReadResult read_result; - read_result.stamp.time = start_time_; - switch (httpresponse.status_code) { case 204: case 404: // Object not found. - read_result.stamp.generation = StorageGeneration::NoValue(); - read_result.state = kvstore::ReadResult::kMissing; - return read_result; + return kvstore::ReadResult::Missing(start_time_); case 412: // "Failed precondition": indicates the ifGenerationMatch condition // did not hold. // NOTE: This is returned even when the object does not exist. - read_result.stamp.generation = StorageGeneration::Unknown(); - return read_result; + return kvstore::ReadResult::Unspecified(TimestampedStorageGeneration{ + StorageGeneration::Unknown(), start_time_}); case 304: // "Not modified": indicates that the ifGenerationNotMatch condition // did not hold. - read_result.stamp.generation = options.if_not_equal; - return read_result; + return kvstore::ReadResult::Unspecified( + TimestampedStorageGeneration{options.if_not_equal, start_time_}); } - read_result.state = kvstore::ReadResult::kValue; + absl::Cord value; ObjectMetadata metadata; if (options.byte_range.size() != 0) { if (httpresponse.status_code != 206) { @@ -620,10 +615,9 @@ struct ReadTask : public RateLimiterNode, auto byte_range, options.byte_range.Validate(httpresponse.payload.size())); - read_result.value = - internal::GetSubCord(httpresponse.payload, byte_range); + value = internal::GetSubCord(httpresponse.payload, byte_range); } else { - read_result.value = httpresponse.payload; + value = httpresponse.payload; // Server should return a parseable content-range header. TENSORSTORE_ASSIGN_OR_RETURN(auto content_range_tuple, @@ -633,7 +627,7 @@ struct ReadTask : public RateLimiterNode, (options.byte_range.inclusive_min != -1 && options.byte_range.inclusive_min != std::get<0>(content_range_tuple)) || - (request_size >= 0 && request_size != read_result.value.size())) { + (request_size >= 0 && request_size != value.size())) { // Return an error when the response does not start at the requested // offset of when the response is smaller than the desired size. return absl::OutOfRangeError( @@ -653,9 +647,10 @@ struct ReadTask : public RateLimiterNode, ParseObjectMetadata(cord.Flatten())); } - read_result.stamp.generation = - StorageGeneration::FromUint64(metadata.generation); - return read_result; + auto generation = StorageGeneration::FromUint64(metadata.generation); + return kvstore::ReadResult::Value( + std::move(value), + TimestampedStorageGeneration{std::move(generation), start_time_}); } }; diff --git a/tensorstore/kvstore/http/BUILD b/tensorstore/kvstore/http/BUILD index e7de4dc7c..e0f91a197 100644 --- a/tensorstore/kvstore/http/BUILD +++ b/tensorstore/kvstore/http/BUILD @@ -30,7 +30,6 @@ tensorstore_cc_library( "//tensorstore/internal/http:curl_transport", "//tensorstore/internal/http:http_header", "//tensorstore/internal/json_binding", - "//tensorstore/internal/json_binding:bindable", "//tensorstore/internal/metrics", "//tensorstore/kvstore", "//tensorstore/kvstore:byte_range", @@ -43,9 +42,9 @@ tensorstore_cc_library( "//tensorstore/util:status", "//tensorstore/util:str_cat", "//tensorstore/util/garbage_collection", - "@com_github_nlohmann_json//:nlohmann_json", "@com_google_absl//absl/status", "@com_google_absl//absl/strings", + "@com_google_absl//absl/strings:cord", "@com_google_absl//absl/time", ], alwayslink = 1, diff --git a/tensorstore/kvstore/http/driver.cc b/tensorstore/kvstore/http/driver.cc index f7172fa7f..cd97a3aeb 100644 --- a/tensorstore/kvstore/http/driver.cc +++ b/tensorstore/kvstore/http/driver.cc @@ -23,10 +23,10 @@ #include #include "absl/status/status.h" +#include "absl/strings/cord.h" #include "absl/strings/match.h" #include "absl/time/clock.h" #include "absl/time/time.h" -#include #include "tensorstore/context.h" #include "tensorstore/context_resource_provider.h" #include "tensorstore/internal/concurrency_resource.h" @@ -37,7 +37,6 @@ #include "tensorstore/internal/http/http_response.h" #include "tensorstore/internal/http/http_transport.h" #include "tensorstore/internal/intrusive_ptr.h" -#include "tensorstore/internal/json_binding/bindable.h" #include "tensorstore/internal/json_binding/json_binding.h" #include "tensorstore/internal/metrics/counter.h" #include "tensorstore/internal/path.h" @@ -247,8 +246,7 @@ struct ReadTask { kvstore::ReadOptions options; Result operator()() { - kvstore::ReadResult read_result; - + absl::Time start_time; HttpResponse httpresponse; auto retry_status = owner->RetryRequestWithBackoff([&] { HttpRequestBuilder request_builder( @@ -274,7 +272,7 @@ struct ReadTask { "if-none-match: \"", StorageGeneration::DecodeString(options.if_not_equal), "\"")); } - read_result.stamp.time = absl::Now(); + start_time = absl::Now(); auto response = owner->transport_->IssueRequest(request_builder.BuildRequest(), {}) .result(); @@ -307,16 +305,16 @@ struct ReadTask { tensorstore::StrCat("Invalid \"date\" response header: ", tensorstore::QuoteString(date_it->second))); } - if (response_date < read_result.stamp.time) { - if (options.staleness_bound < read_result.stamp.time && + if (response_date < start_time) { + if (options.staleness_bound < start_time && response_date < options.staleness_bound) { // `response_date` does not satisfy the `staleness_bound` // requirement, possibly due to time skew. Due to the way we // compute `max-age` in the request header, in the case of time skew // it is correct to just use `staleness_bound` instead. - read_result.stamp.time = options.staleness_bound; + start_time = options.staleness_bound; } else { - read_result.stamp.time = response_date; + start_time = response_date; } } } @@ -326,33 +324,29 @@ struct ReadTask { case 204: case 404: // Object not found. - read_result.stamp.generation = StorageGeneration::NoValue(); - read_result.state = kvstore::ReadResult::kMissing; - return read_result; + return kvstore::ReadResult::Missing(start_time); case 412: // "Failed precondition": indicates the If-Match condition did // not hold. - read_result.stamp.generation = StorageGeneration::Unknown(); - return read_result; + return kvstore::ReadResult::Unspecified(TimestampedStorageGeneration{ + StorageGeneration::Unknown(), start_time}); case 304: // "Not modified": indicates that the If-None-Match condition did // not hold. - read_result.stamp.generation = options.if_not_equal; - return read_result; + return kvstore::ReadResult::Unspecified( + TimestampedStorageGeneration{options.if_not_equal, start_time}); } - read_result.state = kvstore::ReadResult::kValue; + absl::Cord value; if (options.byte_range.size() != 0) { if (httpresponse.status_code != 206) { // This may or may not have been a range request; attempt to validate. TENSORSTORE_ASSIGN_OR_RETURN( auto byte_range, options.byte_range.Validate(httpresponse.payload.size())); - read_result.value = - internal::GetSubCord(httpresponse.payload, byte_range); + value = internal::GetSubCord(httpresponse.payload, byte_range); } else { - read_result.value = httpresponse.payload; - + value = httpresponse.payload; // Server should return a parseable content-range header. TENSORSTORE_ASSIGN_OR_RETURN(auto content_range_tuple, ParseContentRangeHeader(httpresponse)); @@ -361,7 +355,7 @@ struct ReadTask { (options.byte_range.inclusive_min != -1 && options.byte_range.inclusive_min != std::get<0>(content_range_tuple)) || - (request_size >= 0 && request_size != read_result.value.size())) { + (request_size >= 0 && request_size != value.size())) { // Return an error when the response does not start at the requested // offset of when the response is smaller than the desired size. return absl::OutOfRangeError(tensorstore::StrCat( @@ -373,6 +367,7 @@ struct ReadTask { } // Parse `ETag` header from response. + StorageGeneration generation = StorageGeneration::Invalid(); { auto it = httpresponse.headers.find("etag"); if (it != httpresponse.headers.end() && it->second.size() > 2 && @@ -381,14 +376,13 @@ struct ReadTask { std::string_view etag(it->second); etag.remove_prefix(1); etag.remove_suffix(1); - read_result.stamp.generation = StorageGeneration::FromString(etag); - } else { - // No ETag available. - read_result.stamp.generation = StorageGeneration::Invalid(); + generation = StorageGeneration::FromString(etag); } } - return read_result; + return kvstore::ReadResult::Value( + std::move(value), + TimestampedStorageGeneration{std::move(generation), start_time}); } }; diff --git a/tensorstore/kvstore/memory/memory_key_value_store.cc b/tensorstore/kvstore/memory/memory_key_value_store.cc index 2726bb345..a6c69d8eb 100644 --- a/tensorstore/kvstore/memory/memory_key_value_store.cc +++ b/tensorstore/kvstore/memory/memory_key_value_store.cc @@ -378,7 +378,6 @@ class MemoryDriver::TransactionNode Future MemoryDriver::Read(Key key, ReadOptions options) { auto& data = this->data(); absl::ReaderMutexLock lock(&data.mutex); - ReadResult read_result; auto& values = data.values; auto it = values.find(key); if (it == values.end()) { diff --git a/tensorstore/kvstore/neuroglancer_uint64_sharded/neuroglancer_uint64_sharded.cc b/tensorstore/kvstore/neuroglancer_uint64_sharded/neuroglancer_uint64_sharded.cc index b25da0ebb..257b354ed 100644 --- a/tensorstore/kvstore/neuroglancer_uint64_sharded/neuroglancer_uint64_sharded.cc +++ b/tensorstore/kvstore/neuroglancer_uint64_sharded/neuroglancer_uint64_sharded.cc @@ -64,7 +64,6 @@ #include "tensorstore/transaction.h" #include "tensorstore/util/execution/any_receiver.h" #include "tensorstore/util/execution/execution.h" -#include "tensorstore/util/execution/result_sender.h" #include "tensorstore/util/executor.h" #include "tensorstore/util/future.h" #include "tensorstore/util/garbage_collection/fwd.h" @@ -77,6 +76,7 @@ // specializations #include "tensorstore/internal/estimate_heap_usage/std_vector.h" // IWYU pragma: keep +#include "tensorstore/util/execution/result_sender.h" // IWYU pragma: keep namespace tensorstore { namespace neuroglancer_uint64_sharded { @@ -536,43 +536,41 @@ class ShardedKeyValueStoreWriteCache internal_kvstore::ReadModifyWriteEntry& entry, const StorageGeneration& if_not_equal) { auto& self = static_cast(entry.multi_phase()); - kvstore::ReadResult read_result; + TimestampedStorageGeneration stamp; std::shared_ptr encoded_chunks; { AsyncCache::ReadLock lock{self}; - read_result.stamp = lock.stamp(); + stamp = lock.stamp(); encoded_chunks = lock.shared_data(); } - if (!StorageGeneration::IsUnknown(read_result.stamp.generation) && - read_result.stamp.generation == if_not_equal) { - read_result.state = kvstore::ReadResult::kUnspecified; + if (!StorageGeneration::IsUnknown(stamp.generation) && + stamp.generation == if_not_equal) { + return kvstore::ReadResult::Unspecified(std::move(stamp)); + } + if (StorageGeneration::IsDirty(stamp.generation)) { + // Add layer to generation in order to make it possible to + // distinguish: + // + // 1. the shard being modified by a predecessor + // `ReadModifyWrite` operation on the underlying + // KeyValueStore. + // + // 2. the chunk being modified by a `ReadModifyWrite` + // operation attached to this transaction node. + stamp.generation = + StorageGeneration::AddLayer(std::move(stamp.generation)); + } + auto* chunk = + FindChunk(*encoded_chunks, GetMinishardAndChunkId(entry.key_)); + if (!chunk) { + return kvstore::ReadResult::Missing(std::move(stamp)); } else { - auto* chunk = - FindChunk(*encoded_chunks, GetMinishardAndChunkId(entry.key_)); - if (!chunk) { - read_result.state = kvstore::ReadResult::kMissing; - } else { - read_result.state = kvstore::ReadResult::kValue; - TENSORSTORE_ASSIGN_OR_RETURN( - read_result.value, - DecodeData(chunk->encoded_data, - GetOwningCache(self).sharding_spec().data_encoding)); - } - if (StorageGeneration::IsDirty(read_result.stamp.generation)) { - // Add layer to generation in order to make it possible to - // distinguish: - // - // 1. the shard being modified by a predecessor - // `ReadModifyWrite` operation on the underlying - // KeyValueStore. - // - // 2. the chunk being modified by a `ReadModifyWrite` - // operation attached to this transaction node. - read_result.stamp.generation = StorageGeneration::AddLayer( - std::move(read_result.stamp.generation)); - } + TENSORSTORE_ASSIGN_OR_RETURN( + absl::Cord value, + DecodeData(chunk->encoded_data, + GetOwningCache(self).sharding_spec().data_encoding)); + return kvstore::ReadResult::Value(std::move(value), std::move(stamp)); } - return read_result; } void Writeback(internal_kvstore::ReadModifyWriteEntry& entry, diff --git a/tensorstore/kvstore/ocdbt/non_distributed/read.cc b/tensorstore/kvstore/ocdbt/non_distributed/read.cc index 25cfd5b48..09c49d593 100644 --- a/tensorstore/kvstore/ocdbt/non_distributed/read.cc +++ b/tensorstore/kvstore/ocdbt/non_distributed/read.cc @@ -285,12 +285,10 @@ struct ReadOperation : public internal::AtomicReferenceCount { LinkValue( [op = std::move(op)](Promise promise, ReadyFuture read_future) { - kvstore::ReadResult read_result; - read_result.state = kvstore::ReadResult::kValue; - read_result.value = std::move(read_future.result()->value); - read_result.stamp.time = op->time; - read_result.stamp.generation = std::move(op->generation); - promise.SetResult(std::move(read_result)); + promise.SetResult(kvstore::ReadResult::Value( + std::move(read_future.result()->value), + TimestampedStorageGeneration{std::move(op->generation), + op->time})); }, std::move(promise), std::move(read_future)); } diff --git a/tensorstore/kvstore/s3/s3_key_value_store.cc b/tensorstore/kvstore/s3/s3_key_value_store.cc index a5596360c..bace53d34 100644 --- a/tensorstore/kvstore/s3/s3_key_value_store.cc +++ b/tensorstore/kvstore/s3/s3_key_value_store.cc @@ -561,51 +561,42 @@ struct ReadTask : public RateLimiterNode, auto latency = absl::Now() - start_time_; s3_read_latency_ms.Observe(absl::ToInt64Milliseconds(latency)); - // Parse `Date` header from response to correctly handle cached responses. - // The GCS servers always send a `date` header. - kvstore::ReadResult read_result; - read_result.stamp.time = start_time_; - switch (httpresponse.status_code) { case 204: case 404: // Object not found. - read_result.stamp.generation = StorageGeneration::NoValue(); - read_result.state = kvstore::ReadResult::kMissing; - return read_result; + return kvstore::ReadResult::Missing(start_time_); case 412: // "Failed precondition": indicates the ifGenerationMatch condition // did not hold. // NOTE: This is returned even when the object does not exist. - read_result.stamp.generation = StorageGeneration::Unknown(); - return read_result; + return kvstore::ReadResult::Unspecified(TimestampedStorageGeneration{ + StorageGeneration::Unknown(), start_time_}); case 304: // "Not modified": indicates that the ifGenerationNotMatch condition // did not hold. - read_result.stamp.generation = options.if_not_equal; - return read_result; + return kvstore::ReadResult::Unspecified( + TimestampedStorageGeneration{options.if_not_equal, start_time_}); } - read_result.state = kvstore::ReadResult::kValue; + absl::Cord value; if (options.byte_range.size() != 0) { if (httpresponse.status_code != 206) { // This may or may not have been a range request; attempt to validate. TENSORSTORE_ASSIGN_OR_RETURN( auto byte_range, options.byte_range.Validate(httpresponse.payload.size())); - read_result.value = - internal::GetSubCord(httpresponse.payload, byte_range); + value = internal::GetSubCord(httpresponse.payload, byte_range); } else { - read_result.value = httpresponse.payload; + value = httpresponse.payload; // Server should return a parseable content-range header. TENSORSTORE_ASSIGN_OR_RETURN(auto content_range_tuple, ParseContentRangeHeader(httpresponse)); - if (auto request_size = options.byte_range.size(); (options.byte_range.inclusive_min != -1 && options.byte_range.inclusive_min != std::get<0>(content_range_tuple)) || - (request_size >= 0 && request_size != read_result.value.size())) { + (request_size >= 0 && request_size != value.size())) { // Return an error when the response does not start at the requested // offset of when the response is smaller than the desired size. return absl::OutOfRangeError( @@ -617,9 +608,11 @@ struct ReadTask : public RateLimiterNode, } TENSORSTORE_ASSIGN_OR_RETURN( - read_result.stamp.generation, - StorageGenerationFromHeaders(httpresponse.headers)); - return read_result; + auto generation, StorageGenerationFromHeaders(httpresponse.headers)); + + return kvstore::ReadResult::Value( + std::move(value), + TimestampedStorageGeneration{std::move(generation), start_time_}); } }; diff --git a/tensorstore/kvstore/transaction.cc b/tensorstore/kvstore/transaction.cc index c323eea85..4f47ca361 100644 --- a/tensorstore/kvstore/transaction.cc +++ b/tensorstore/kvstore/transaction.cc @@ -1569,29 +1569,27 @@ class WriteViaExistingTransactionNode : public internal::TransactionState::Node, Future WriteViaExistingTransaction( Driver* driver, internal::OpenTransactionPtr& transaction, size_t& phase, Key key, std::optional value, WriteOptions options) { - ReadResult read_result; - if (value) { - read_result.state = ReadResult::kValue; - read_result.value = std::move(*value); - } else { - read_result.state = ReadResult::kMissing; - } + TimestampedStorageGeneration stamp; if (StorageGeneration::IsUnknown(options.if_equal)) { - read_result.stamp.time = absl::InfiniteFuture(); + stamp.time = absl::InfiniteFuture(); } else { assert(StorageGeneration::IsClean(options.if_equal)); - read_result.stamp.time = absl::Time(); + stamp.time = absl::Time(); } bool if_equal_no_value = StorageGeneration::IsNoValue(options.if_equal); - read_result.stamp.generation = std::move(options.if_equal); - read_result.stamp.generation.MarkDirty(); + stamp.generation = std::move(options.if_equal); + stamp.generation.MarkDirty(); + auto [promise, future] = PromiseFuturePair::Make(); using Node = WriteViaExistingTransactionNode; internal::WeakTransactionNodePtr node; node.reset(new Node); node->promise_ = promise; - node->read_result_ = std::move(read_result); + node->read_result_ = + value ? ReadResult::Value(std::move(*value), std::move(stamp)) + : ReadResult::Missing(std::move(stamp)); + node->if_equal_no_value_ = if_equal_no_value; TENSORSTORE_RETURN_IF_ERROR( driver->ReadModifyWrite(transaction, phase, std::move(key), *node)); diff --git a/tensorstore/kvstore/zarr3_sharding_indexed/zarr3_sharding_indexed.cc b/tensorstore/kvstore/zarr3_sharding_indexed/zarr3_sharding_indexed.cc index 6117aa0db..3a8aa612d 100644 --- a/tensorstore/kvstore/zarr3_sharding_indexed/zarr3_sharding_indexed.cc +++ b/tensorstore/kvstore/zarr3_sharding_indexed/zarr3_sharding_indexed.cc @@ -14,10 +14,10 @@ #include "tensorstore/kvstore/zarr3_sharding_indexed/zarr3_sharding_indexed.h" +#include #include #include -#include #include #include #include @@ -41,11 +41,8 @@ #include "tensorstore/internal/cache/cache_pool_resource.h" #include "tensorstore/internal/cache/kvs_backed_cache.h" #include "tensorstore/internal/cache_key/cache_key.h" -#include "tensorstore/internal/cache_key/std_vector.h" // IWYU pragma: keep #include "tensorstore/internal/data_copy_concurrency_resource.h" #include "tensorstore/internal/estimate_heap_usage/estimate_heap_usage.h" -#include "tensorstore/internal/estimate_heap_usage/std_optional.h" // IWYU pragma: keep -#include "tensorstore/internal/estimate_heap_usage/std_vector.h" // IWYU pragma: keep #include "tensorstore/internal/intrusive_ptr.h" #include "tensorstore/internal/json_binding/bindable.h" #include "tensorstore/internal/json_binding/dimension_indexed.h" @@ -66,21 +63,26 @@ #include "tensorstore/kvstore/transaction.h" #include "tensorstore/kvstore/zarr3_sharding_indexed/key.h" #include "tensorstore/kvstore/zarr3_sharding_indexed/shard_format.h" -#include "tensorstore/serialization/std_vector.h" // IWYU pragma: keep #include "tensorstore/transaction.h" #include "tensorstore/util/execution/any_receiver.h" #include "tensorstore/util/execution/execution.h" -#include "tensorstore/util/execution/result_sender.h" // IWYU pragma: keep #include "tensorstore/util/executor.h" #include "tensorstore/util/future.h" #include "tensorstore/util/garbage_collection/fwd.h" #include "tensorstore/util/garbage_collection/garbage_collection.h" -#include "tensorstore/util/garbage_collection/std_vector.h" // IWYU pragma: keep #include "tensorstore/util/result.h" #include "tensorstore/util/span.h" #include "tensorstore/util/status.h" #include "tensorstore/util/str_cat.h" +// specializations +#include "tensorstore/internal/cache_key/std_vector.h" // IWYU pragma: keep +#include "tensorstore/internal/estimate_heap_usage/std_optional.h" // IWYU pragma: keep +#include "tensorstore/internal/estimate_heap_usage/std_vector.h" // IWYU pragma: keep +#include "tensorstore/serialization/std_vector.h" // IWYU pragma: keep +#include "tensorstore/util/execution/result_sender.h" // IWYU pragma: keep +#include "tensorstore/util/garbage_collection/std_vector.h" // IWYU pragma: keep + namespace tensorstore { namespace zarr3_sharding_indexed { @@ -414,40 +416,38 @@ class ShardedKeyValueStoreWriteCache internal_kvstore::ReadModifyWriteEntry& entry, const StorageGeneration& if_not_equal) { auto& self = static_cast(entry.multi_phase()); - kvstore::ReadResult read_result; + TimestampedStorageGeneration stamp; std::shared_ptr entries; { AsyncCache::ReadLock lock{self}; - read_result.stamp = lock.stamp(); + stamp = lock.stamp(); entries = lock.shared_data(); } - if (!StorageGeneration::IsUnknown(read_result.stamp.generation) && - read_result.stamp.generation == if_not_equal) { - read_result.state = kvstore::ReadResult::kUnspecified; + if (!StorageGeneration::IsUnknown(stamp.generation) && + stamp.generation == if_not_equal) { + return kvstore::ReadResult::Unspecified(std::move(stamp)); + } + if (StorageGeneration::IsDirty(stamp.generation)) { + // Add layer to generation in order to make it possible to + // distinguish: + // + // 1. the shard being modified by a predecessor + // `ReadModifyWrite` operation on the underlying + // KeyValueStore. + // + // 2. the chunk being modified by a `ReadModifyWrite` + // operation attached to this transaction node. + stamp.generation = + StorageGeneration::AddLayer(std::move(stamp.generation)); + } + + auto entry_id = InternalKeyToEntryId(entry.key_); + const auto& shard_entry = entries->entries[entry_id]; + if (!shard_entry) { + return kvstore::ReadResult::Missing(std::move(stamp)); } else { - auto entry_id = InternalKeyToEntryId(entry.key_); - const auto& entry = entries->entries[entry_id]; - if (!entry) { - read_result.state = kvstore::ReadResult::kMissing; - } else { - read_result.state = kvstore::ReadResult::kValue; - read_result.value = *entry; - } - if (StorageGeneration::IsDirty(read_result.stamp.generation)) { - // Add layer to generation in order to make it possible to - // distinguish: - // - // 1. the shard being modified by a predecessor - // `ReadModifyWrite` operation on the underlying - // KeyValueStore. - // - // 2. the chunk being modified by a `ReadModifyWrite` - // operation attached to this transaction node. - read_result.stamp.generation = StorageGeneration::AddLayer( - std::move(read_result.stamp.generation)); - } + return kvstore::ReadResult::Value(*shard_entry, std::move(stamp)); } - return read_result; } // Staleness bound for the current pending call to `DoApply`. diff --git a/tensorstore/kvstore/zip/zip_key_value_store.cc b/tensorstore/kvstore/zip/zip_key_value_store.cc index ad4929822..5271fb44d 100644 --- a/tensorstore/kvstore/zip/zip_key_value_store.cc +++ b/tensorstore/kvstore/zip/zip_key_value_store.cc @@ -76,7 +76,6 @@ namespace jb = tensorstore::internal_json_binding; using ::tensorstore::internal_zip_kvstore::Directory; using ::tensorstore::internal_zip_kvstore::ZipDirectoryCache; -using ::tensorstore::kvstore::ReadResult; // ----------------------------------------------------------------------------- @@ -190,8 +189,8 @@ struct ReadState : public internal::AtomicReferenceCount { kvstore::ReadOptions options_; // The cache read has completed, so the zip directory entries are available. - void OnDirectoryReady(Promise promise) { - ReadResult result; + void OnDirectoryReady(Promise promise) { + TimestampedStorageGeneration stamp; // Set options for the entry request. kvstore::ReadOptions options; @@ -202,7 +201,7 @@ struct ReadState : public internal::AtomicReferenceCount { { ZipDirectoryCache::ReadLock lock( *(owner_->cache_entry_)); - result.stamp = lock.stamp(); + stamp = lock.stamp(); // Find key in the directory. assert(lock.data()); @@ -215,19 +214,17 @@ struct ReadState : public internal::AtomicReferenceCount { if (it == dir.entries.end() || it->filename != key_) { // Missing value. - result.state = kvstore::ReadResult::kMissing; - promise.SetResult(std::move(result)); + promise.SetResult(kvstore::ReadResult::Missing(std::move(stamp))); return; } // Check if_equal and if_not_equal conditions. // This happens after searching the directory in order to correctly handle // IsNoValue matches, above. - if (options_.if_not_equal == result.stamp.generation || + if (options_.if_not_equal == stamp.generation || (!StorageGeneration::IsUnknown(options_.if_equal) && - options_.if_equal != result.stamp.generation)) { - result.state = kvstore::ReadResult::kUnspecified; - promise.SetResult(std::move(result)); + options_.if_equal != stamp.generation)) { + promise.SetResult(kvstore::ReadResult::Unspecified(std::move(stamp))); return; } @@ -242,7 +239,7 @@ struct ReadState : public internal::AtomicReferenceCount { } } - options.if_equal = result.stamp.generation; + options.if_equal = stamp.generation; Link(WithExecutor(owner_->executor(), [self = internal::IntrusivePtr(this), seek_pos](Promise promise, @@ -254,8 +251,8 @@ struct ReadState : public internal::AtomicReferenceCount { kvstore::Read(owner_->base_, {}, std::move(options))); } - void OnValueRead(Promise promise, ReadyFuture ready, - size_t seek_pos) { + void OnValueRead(Promise promise, + ReadyFuture ready, size_t seek_pos) { if (!promise.result_needed()) return; if (!ready.status().ok()) { promise.SetResult(ready.status()); @@ -263,7 +260,7 @@ struct ReadState : public internal::AtomicReferenceCount { } internal_zip::ZipEntry local_header{}; - auto result = [&]() -> Result { + auto result = [&]() -> Result { kvstore::ReadResult read_result = std::move(ready.value()); if (!read_result.has_value()) { return read_result; @@ -309,13 +306,13 @@ struct ReadState : public internal::AtomicReferenceCount { } }; -Future ZipKvStore::Read(Key key, ReadOptions options) { +Future ZipKvStore::Read(Key key, ReadOptions options) { auto state = internal::MakeIntrusivePtr(); state->owner_ = internal::IntrusivePtr(this); state->key_ = std::move(key); state->options_ = options; - return PromiseFuturePair::LinkValue( + return PromiseFuturePair::LinkValue( WithExecutor( executor(), [state = std::move(state)](Promise promise,