diff --git a/tensorstore/internal/benchmark/multi_genspec.cc b/tensorstore/internal/benchmark/multi_genspec.cc index 6797ea78..f8c436e1 100644 --- a/tensorstore/internal/benchmark/multi_genspec.cc +++ b/tensorstore/internal/benchmark/multi_genspec.cc @@ -23,7 +23,7 @@ bazel run -c opt \ "driver": "zarr3", "kvstore": { "driver": "ocdbt", - "base": "gs://bucket/path/ocdbt.root/" + "base": "file:///tmp/benchmark" }}' --paths=opt_state.0.mu.params.decoder.decoder_norm.scale/,opt_state.0.mu.params.decoder.layers.mlp.wi_0.kernel/ > config.json @@ -61,6 +61,8 @@ bazel run -c opt \ #include "tensorstore/internal/json_binding/box.h" // IWYU pragma: keep #include "tensorstore/internal/json_binding/json_binding.h" #include "tensorstore/internal/json_binding/std_array.h" // IWYU pragma: keep +#include "tensorstore/kvstore/kvstore.h" +#include "tensorstore/kvstore/operations.h" #include "tensorstore/kvstore/spec.h" #include "tensorstore/open.h" #include "tensorstore/open_mode.h" @@ -111,23 +113,30 @@ struct HostSpec { }; }; -ABSL_FLAG(tensorstore::JsonAbslFlag, context_spec, - {}, - "Context spec for writing data. This can be used to control the " - "number of concurrent write operations of the underlying key-value " - "store. See examples at the start of the source file."); +ABSL_FLAG( + tensorstore::JsonAbslFlag, context_spec, + []() { + return tensorstore::Context::Spec::FromJson( + { + {"file_io_concurrency", {{"limit", 128}}}, + }) + .value(); + }(), + "Context spec for reading data."); ABSL_FLAG( tensorstore::JsonAbslFlag, base_spec, []() { - return tensorstore::Spec::FromJson({ - {"driver", "zarr"}, - {"kvstore", "memory://"}, - }) + return tensorstore::Spec::FromJson( + { + {"driver", "zarr"}, + {"kvstore", + {{"driver", "ocdbt"}, {"base", "memory:///prefix/"}}}, + }) .value(); }(), "Base TensorStore Spec to use for reading; the kvstore path will be " - "augmented with each specified --path."); + "augmented with each name in --config or --paths."); ABSL_FLAG(std::string, config, {}, "Paths to all the tensorstore specs to benchmark. The actual spec " @@ -142,10 +151,13 @@ ABSL_FLAG( "HostShard json object to use for reading. This can be used to control " "the number of partitions of each variable to read."); +ABSL_FLAG(bool, list_kvstore, false, + "List the contents of the kvstore in --base_spec."); + namespace tensorstore { namespace { -std::vector> MultiOpen( +std::vector>> TryMultiOpen( Context context, std::vector specs) { ABSL_LOG(INFO) << "Opening " << specs.size() << " tensorstores"; std::vector>> pending_open; @@ -154,14 +166,18 @@ std::vector> MultiOpen( pending_open.push_back( tensorstore::Open(spec, context, tensorstore::ReadWriteMode::read)); } - auto wait_all = tensorstore::WaitAllFuture(tensorstore::span(pending_open)); - std::vector> stores; + for (auto& future : pending_open) { + future.Wait(); + } + std::vector>> stores; stores.reserve(specs.size()); for (size_t i = 0; i < pending_open.size(); ++i) { if (pending_open[i].status().ok()) { stores.push_back(std::move(pending_open[i]).value()); } else { - ABSL_LOG(ERROR) << "Failed to open: " << specs[i]; + ABSL_LOG(ERROR) << "Failed to open: " << specs[i] << "\n\n" + << pending_open[i].status(); + stores.push_back(std::nullopt); } } return stores; @@ -219,9 +235,7 @@ void SetHostSpecArrayBoxes(ShardVariable& var, const HostSpec& host) { } } -bool FillUsingHostSpec(std::vector& config, - const std::vector>& stores, - HostSpec host) { +bool FillUsingHostSpec(std::vector& config, HostSpec host) { if (host.total_workers == 0) return false; ABSL_LOG(INFO) << "Attempting to generate config from --host_spec."; @@ -238,7 +252,7 @@ bool FillUsingHostSpec(std::vector& config, if (host.total_partitions == 0) { int64_t total_partitions = 0; - for (size_t i = 0; i < stores.size(); ++i) { + for (size_t i = 0; i < config.size(); ++i) { total_partitions = std::max(total_partitions, total_chunks(config[i])); } ABSL_LOG(INFO) << "total_partitions=" << total_partitions; @@ -247,7 +261,7 @@ bool FillUsingHostSpec(std::vector& config, if (host.partitions_per_host == 0) { host.partitions_per_host = 1; } - for (size_t i = 0; i < stores.size(); ++i) { + for (size_t i = 0; i < config.size(); ++i) { SetHostSpecArrayBoxes(config[i], host); if (config[i].array_boxes.empty() && !config[i].shape.empty()) { config[i].array_boxes.push_back(Box<>(config[i].shape)); @@ -303,11 +317,15 @@ void SetExistingChunkArrayBoxes(ShardVariable& var, bool FillUsingExistingData( std::vector& config, - const std::vector>& stores) { + const std::vector>>& stores) { ABSL_LOG(INFO) << "Attempting to generate config from existing data in --base_spec."; for (size_t i = 0; i < stores.size(); ++i) { - SetExistingChunkArrayBoxes(config[i], stores[i]); + if (!stores[i].has_value() && config[i].array_boxes.empty()) { + config[i].array_boxes.push_back(Box<>(config[i].shape)); + continue; + } + SetExistingChunkArrayBoxes(config[i], *stores[i]); } return true; } @@ -415,21 +433,6 @@ void Dump(std::vector& config) { } void Run() { - // --read_config specifies a file or a json object. - std::vector config = [&]() { - if (absl::GetFlag(FLAGS_config).empty()) { - std::vector config; - for (const auto& name : absl::GetFlag(FLAGS_paths).elements) { - config.push_back(ShardVariable{name}); - } - return config; - } - return ReadFromFileOrFlag(absl::GetFlag(FLAGS_config)); - }(); - - ABSL_QCHECK(!config.empty()) - << "Empty config; supply non-empty --read_config or --paths."; - auto ensure_trailing_slash = [](auto& spec) { if (!spec.path.empty() && spec.path.back() != '/') { spec.AppendSuffix("/"); @@ -445,6 +448,36 @@ void Run() { return tensorstore::Spec::FromJson(ts_json).value(); }(); + Context context(absl::GetFlag(FLAGS_context_spec).value); + + if (absl::GetFlag(FLAGS_list_kvstore)) { + TENSORSTORE_CHECK_OK_AND_ASSIGN( + auto kvstore, kvstore::Open(kvstore_spec, context).result()); + auto list_future = kvstore::ListFuture(kvstore, {}); + if (!list_future.status().ok()) { + ABSL_LOG(ERROR) << "Failed to list kvstore: " << list_future.status(); + } else { + for (const auto& entry : list_future.result().value()) { + ABSL_LOG(INFO) << entry.key; + } + } + } + + // --read_config specifies a file or a json object. + std::vector config = [&]() { + if (absl::GetFlag(FLAGS_config).empty()) { + std::vector config; + for (const auto& name : absl::GetFlag(FLAGS_paths).elements) { + config.push_back(ShardVariable{name}); + } + return config; + } + return ReadFromFileOrFlag(absl::GetFlag(FLAGS_config)); + }(); + + ABSL_QCHECK(!config.empty()) + << "Empty config; supply non-empty --read_config or --paths."; + // Construct the specs to benchmark by combining the base spec with each // specified path. Orbax, for example, constructs multiple tensorstores // within the same ocdbt kvstore, so this can be used to benchmark the read @@ -474,18 +507,19 @@ void Run() { } specs.push_back(std::move(spec)); } - Context context(absl::GetFlag(FLAGS_context_spec).value); - std::vector> stores = MultiOpen(context, specs); + std::vector>> stores = + TryMultiOpen(context, specs); ABSL_QCHECK(stores.size() == config.size()) << "Number of stores and config size mismatch"; for (size_t i = 0; i < stores.size(); ++i) { - FillShardVariableFields(stores[i], config[i]); + if (stores[i].has_value()) { + FillShardVariableFields(*stores[i], config[i]); + } } - if (!FillUsingHostSpec(config, stores, - absl::GetFlag(FLAGS_host_spec).value)) { + if (!FillUsingHostSpec(config, absl::GetFlag(FLAGS_host_spec).value)) { if (!FillUsingExistingData(config, stores)) { ABSL_LOG(FATAL) << "Failed to fill config."; } diff --git a/tensorstore/internal/benchmark/multi_read_benchmark.cc b/tensorstore/internal/benchmark/multi_read_benchmark.cc index 576e185c..10b6a9df 100644 --- a/tensorstore/internal/benchmark/multi_read_benchmark.cc +++ b/tensorstore/internal/benchmark/multi_read_benchmark.cc @@ -29,7 +29,7 @@ bazel run -c opt \ "driver": "zarr3", "kvstore": { "driver": "ocdbt", - "base": "gs://bucket/path/ocdbt.root/" + "base": "file:///tmp/benchmark" }}' --read_config=config.json */ @@ -80,23 +80,31 @@ bazel run -c opt \ using ::tensorstore::internal_benchmark::ReadFromFileOrFlag; using ::tensorstore::internal_benchmark::ShardVariable; -ABSL_FLAG(tensorstore::JsonAbslFlag, context_spec, - {}, - "Context spec for writing data. This can be used to control the " - "number of concurrent write operations of the underlying key-value " - "store. See examples at the start of the source file."); +ABSL_FLAG( + tensorstore::JsonAbslFlag, context_spec, + []() { + return tensorstore::Context::Spec::FromJson( + { + {"file_io_concurrency", {{"limit", 128}}}, + }) + .value(); + }(), + "Context spec for reading data. This can be used to control the " + "number of concurrent read operations, for example."); ABSL_FLAG( tensorstore::JsonAbslFlag, base_spec, []() { - return tensorstore::Spec::FromJson({ - {"driver", "zarr"}, - {"kvstore", "memory://"}, - }) + return tensorstore::Spec::FromJson( + { + {"driver", "zarr"}, + {"kvstore", + {{"driver", "ocdbt"}, {"base", "memory:///prefix/"}}}, + }) .value(); }(), "Base TensorStore Spec to use for reading; the kvstore path will be " - "augmented with each specified --path."); + "augmented with each name in --read_config or --paths."); ABSL_FLAG(std::string, read_config, {}, "Paths to all the tensorstore specs to benchmark. The actual spec " diff --git a/tensorstore/internal/benchmark/multi_spec.h b/tensorstore/internal/benchmark/multi_spec.h index b440ea0d..27832602 100644 --- a/tensorstore/internal/benchmark/multi_spec.h +++ b/tensorstore/internal/benchmark/multi_spec.h @@ -17,7 +17,6 @@ #include -#include #include #include @@ -38,19 +37,20 @@ struct ShardVariable { std::string dtype; std::vector> array_boxes; - constexpr static auto default_json_binder = - [](auto is_loading, const auto& options, auto* obj, auto* j) { - namespace jb = tensorstore::internal_json_binding; - using Self = ShardVariable; - return jb::Object( - jb::Member("name", jb::Projection<&Self::name>()), - jb::Member("shape", jb::Projection<&Self::shape>()), - jb::Member("chunks", jb::Projection<&Self::chunks>()), - jb::Member("dtype", jb::Projection<&Self::dtype>()), - jb::Member("array_boxes", jb::Projection<&Self::array_boxes>()), - jb::DiscardExtraMembers /**/ - )(is_loading, options, obj, j); - }; + constexpr static auto default_json_binder = [](auto is_loading, + const auto& options, auto* obj, + auto* j) { + namespace jb = tensorstore::internal_json_binding; + using Self = ShardVariable; + return jb::Object( + jb::Member("name", jb::Projection<&Self::name>()), + jb::Member("shape", jb::Projection<&Self::shape>()), + jb::Member("chunks", jb::Projection<&Self::chunks>()), + jb::Member("dtype", jb::Projection<&Self::dtype>()), + jb::OptionalMember("array_boxes", jb::Projection<&Self::array_boxes>()), + jb::DiscardExtraMembers /**/ + )(is_loading, options, obj, j); + }; }; /// Read a list of ShardVariable from a file or a json object. diff --git a/tensorstore/internal/benchmark/multi_write_benchmark.cc b/tensorstore/internal/benchmark/multi_write_benchmark.cc index c7cc4f79..ae19c213 100644 --- a/tensorstore/internal/benchmark/multi_write_benchmark.cc +++ b/tensorstore/internal/benchmark/multi_write_benchmark.cc @@ -29,7 +29,7 @@ bazel run -c opt \ "driver": "zarr3", "kvstore": { "driver": "ocdbt", - "base": "gs://bucket/path/ocdbt.root/" + "base": "file:///tmp/benchmark" }}' --write_config=config.json */ @@ -91,22 +91,30 @@ bazel run -c opt \ using ::tensorstore::internal_benchmark::ReadFromFileOrFlag; using ::tensorstore::internal_benchmark::ShardVariable; -ABSL_FLAG(tensorstore::JsonAbslFlag, context_spec, - {}, - "Context spec for writing data. This can be used to control the " - "number of concurrent write operations of the underlying key-value " - "store. See examples at the start of the source file."); +ABSL_FLAG( + tensorstore::JsonAbslFlag, context_spec, + []() { + return tensorstore::Context::Spec::FromJson( + { + {"file_io_concurrency", {{"limit", 128}}}, + }) + .value(); + }(), + "Context spec for writing data. This can be used to control the " + "number of concurrent write operations, for example."); ABSL_FLAG( tensorstore::JsonAbslFlag, base_spec, []() { - return tensorstore::Spec::FromJson({ - {"driver", "zarr"}, - {"kvstore", "memory://"}, - }) + return tensorstore::Spec::FromJson( + { + {"driver", "zarr"}, + {"kvstore", + {{"driver", "ocdbt"}, {"base", "memory:///prefix/"}}}, + }) .value(); }(), - "Base TensorStore Spec to use for reading; the kvstore path will be " + "Base TensorStore Spec to use for writing; the kvstore path will be " "augmented with each name in --write_config."); ABSL_FLAG(std::string, write_config, {}, @@ -195,12 +203,14 @@ Stats DoSinglePass(tensorstore::Context context, [&, var_i = i, copy_promise = copy_promise, commit_promise = commit_promise](Promise open_promise, ReadyFuture> future) { + const auto& var = config[var_i]; if (!future.status().ok()) { - open_promise.SetResult(future.status()); + // If one tensorstore fails to open, keep going with the rest. + ABSL_LOG(ERROR) << "Failed to open: " << var.name; + ABSL_LOG(ERROR) << future.status(); return; } - const auto& var = config[var_i]; KeyType key(std::string_view(var.dtype), tensorstore::span(var.chunks)); auto it = chunk_arrays.find(key); @@ -263,17 +273,13 @@ Stats DoSinglePass(tensorstore::Context context, // Wait until all opens complete. open_future.Wait(); - if (!open_future.result().ok()) { - // write test failed, return to write_before - ABSL_LOG(FATAL) << "Failed to open:" << open_future.status(); - } stats.open_time = absl::Now(); // Wait until all copies complete. copy_future.Wait(); if (!copy_future.result().ok()) { // write test failed, return to write_before - ABSL_LOG(FATAL) << "Failed to copy:" << copy_future.status(); + ABSL_LOG(WARNING) << "Failed to copy:" << copy_future.status(); } stats.copy_time = absl::Now();