Skip to content

Commit

Permalink
Allow the multi_write_benchmark and multi_genspec to handle failure m…
Browse files Browse the repository at this point in the history
…ore gracefully.

PiperOrigin-RevId: 683252904
Change-Id: I36c6ecf3ada1a5bf0c3520b4ee57641961f2fe14
  • Loading branch information
laramiel authored and copybara-github committed Oct 7, 2024
1 parent eacec20 commit d37f4e5
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 85 deletions.
118 changes: 76 additions & 42 deletions tensorstore/internal/benchmark/multi_genspec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ bazel run -c opt \
"driver": "zarr3",
"kvstore": {
"driver": "ocdbt",
"base": "gs://bucket/path/ocdbt.root/"
"base": "file:///tmp/benchmark"
}}'
--paths=opt_state.0.mu.params.decoder.decoder_norm.scale/,opt_state.0.mu.params.decoder.layers.mlp.wi_0.kernel/
> config.json
Expand Down Expand Up @@ -61,6 +61,8 @@ bazel run -c opt \
#include "tensorstore/internal/json_binding/box.h" // IWYU pragma: keep
#include "tensorstore/internal/json_binding/json_binding.h"
#include "tensorstore/internal/json_binding/std_array.h" // IWYU pragma: keep
#include "tensorstore/kvstore/kvstore.h"
#include "tensorstore/kvstore/operations.h"
#include "tensorstore/kvstore/spec.h"
#include "tensorstore/open.h"
#include "tensorstore/open_mode.h"
Expand Down Expand Up @@ -111,23 +113,30 @@ struct HostSpec {
};
};

ABSL_FLAG(tensorstore::JsonAbslFlag<tensorstore::Context::Spec>, context_spec,
{},
"Context spec for writing data. This can be used to control the "
"number of concurrent write operations of the underlying key-value "
"store. See examples at the start of the source file.");
ABSL_FLAG(
tensorstore::JsonAbslFlag<tensorstore::Context::Spec>, context_spec,
[]() {
return tensorstore::Context::Spec::FromJson(
{
{"file_io_concurrency", {{"limit", 128}}},
})
.value();
}(),
"Context spec for reading data.");

ABSL_FLAG(
tensorstore::JsonAbslFlag<tensorstore::Spec>, base_spec,
[]() {
return tensorstore::Spec::FromJson({
{"driver", "zarr"},
{"kvstore", "memory://"},
})
return tensorstore::Spec::FromJson(
{
{"driver", "zarr"},
{"kvstore",
{{"driver", "ocdbt"}, {"base", "memory:///prefix/"}}},
})
.value();
}(),
"Base TensorStore Spec to use for reading; the kvstore path will be "
"augmented with each specified --path.");
"augmented with each name in --config or --paths.");

ABSL_FLAG(std::string, config, {},
"Paths to all the tensorstore specs to benchmark. The actual spec "
Expand All @@ -142,10 +151,13 @@ ABSL_FLAG(
"HostShard json object to use for reading. This can be used to control "
"the number of partitions of each variable to read.");

ABSL_FLAG(bool, list_kvstore, false,
"List the contents of the kvstore in --base_spec.");

namespace tensorstore {
namespace {

std::vector<tensorstore::TensorStore<>> MultiOpen(
std::vector<std::optional<tensorstore::TensorStore<>>> TryMultiOpen(
Context context, std::vector<tensorstore::Spec> specs) {
ABSL_LOG(INFO) << "Opening " << specs.size() << " tensorstores";
std::vector<Future<tensorstore::TensorStore<>>> pending_open;
Expand All @@ -154,14 +166,18 @@ std::vector<tensorstore::TensorStore<>> MultiOpen(
pending_open.push_back(
tensorstore::Open(spec, context, tensorstore::ReadWriteMode::read));
}
auto wait_all = tensorstore::WaitAllFuture(tensorstore::span(pending_open));
std::vector<tensorstore::TensorStore<>> stores;
for (auto& future : pending_open) {
future.Wait();
}
std::vector<std::optional<tensorstore::TensorStore<>>> stores;
stores.reserve(specs.size());
for (size_t i = 0; i < pending_open.size(); ++i) {
if (pending_open[i].status().ok()) {
stores.push_back(std::move(pending_open[i]).value());
} else {
ABSL_LOG(ERROR) << "Failed to open: " << specs[i];
ABSL_LOG(ERROR) << "Failed to open: " << specs[i] << "\n\n"
<< pending_open[i].status();
stores.push_back(std::nullopt);
}
}
return stores;
Expand Down Expand Up @@ -219,9 +235,7 @@ void SetHostSpecArrayBoxes(ShardVariable& var, const HostSpec& host) {
}
}

bool FillUsingHostSpec(std::vector<ShardVariable>& config,
const std::vector<tensorstore::TensorStore<>>& stores,
HostSpec host) {
bool FillUsingHostSpec(std::vector<ShardVariable>& config, HostSpec host) {
if (host.total_workers == 0) return false;
ABSL_LOG(INFO) << "Attempting to generate config from --host_spec.";

Expand All @@ -238,7 +252,7 @@ bool FillUsingHostSpec(std::vector<ShardVariable>& config,

if (host.total_partitions == 0) {
int64_t total_partitions = 0;
for (size_t i = 0; i < stores.size(); ++i) {
for (size_t i = 0; i < config.size(); ++i) {
total_partitions = std::max(total_partitions, total_chunks(config[i]));
}
ABSL_LOG(INFO) << "total_partitions=" << total_partitions;
Expand All @@ -247,7 +261,7 @@ bool FillUsingHostSpec(std::vector<ShardVariable>& config,
if (host.partitions_per_host == 0) {
host.partitions_per_host = 1;
}
for (size_t i = 0; i < stores.size(); ++i) {
for (size_t i = 0; i < config.size(); ++i) {
SetHostSpecArrayBoxes(config[i], host);
if (config[i].array_boxes.empty() && !config[i].shape.empty()) {
config[i].array_boxes.push_back(Box<>(config[i].shape));
Expand Down Expand Up @@ -303,11 +317,15 @@ void SetExistingChunkArrayBoxes(ShardVariable& var,

bool FillUsingExistingData(
std::vector<ShardVariable>& config,
const std::vector<tensorstore::TensorStore<>>& stores) {
const std::vector<std::optional<tensorstore::TensorStore<>>>& stores) {
ABSL_LOG(INFO)
<< "Attempting to generate config from existing data in --base_spec.";
for (size_t i = 0; i < stores.size(); ++i) {
SetExistingChunkArrayBoxes(config[i], stores[i]);
if (!stores[i].has_value() && config[i].array_boxes.empty()) {
config[i].array_boxes.push_back(Box<>(config[i].shape));
continue;
}
SetExistingChunkArrayBoxes(config[i], *stores[i]);
}
return true;
}
Expand Down Expand Up @@ -415,21 +433,6 @@ void Dump(std::vector<ShardVariable>& config) {
}

void Run() {
// --read_config specifies a file or a json object.
std::vector<ShardVariable> config = [&]() {
if (absl::GetFlag(FLAGS_config).empty()) {
std::vector<ShardVariable> config;
for (const auto& name : absl::GetFlag(FLAGS_paths).elements) {
config.push_back(ShardVariable{name});
}
return config;
}
return ReadFromFileOrFlag(absl::GetFlag(FLAGS_config));
}();

ABSL_QCHECK(!config.empty())
<< "Empty config; supply non-empty --read_config or --paths.";

auto ensure_trailing_slash = [](auto& spec) {
if (!spec.path.empty() && spec.path.back() != '/') {
spec.AppendSuffix("/");
Expand All @@ -445,6 +448,36 @@ void Run() {
return tensorstore::Spec::FromJson(ts_json).value();
}();

Context context(absl::GetFlag(FLAGS_context_spec).value);

if (absl::GetFlag(FLAGS_list_kvstore)) {
TENSORSTORE_CHECK_OK_AND_ASSIGN(
auto kvstore, kvstore::Open(kvstore_spec, context).result());
auto list_future = kvstore::ListFuture(kvstore, {});
if (!list_future.status().ok()) {
ABSL_LOG(ERROR) << "Failed to list kvstore: " << list_future.status();
} else {
for (const auto& entry : list_future.result().value()) {
ABSL_LOG(INFO) << entry.key;
}
}
}

// --read_config specifies a file or a json object.
std::vector<ShardVariable> config = [&]() {
if (absl::GetFlag(FLAGS_config).empty()) {
std::vector<ShardVariable> config;
for (const auto& name : absl::GetFlag(FLAGS_paths).elements) {
config.push_back(ShardVariable{name});
}
return config;
}
return ReadFromFileOrFlag(absl::GetFlag(FLAGS_config));
}();

ABSL_QCHECK(!config.empty())
<< "Empty config; supply non-empty --read_config or --paths.";

// Construct the specs to benchmark by combining the base spec with each
// specified path. Orbax, for example, constructs multiple tensorstores
// within the same ocdbt kvstore, so this can be used to benchmark the read
Expand Down Expand Up @@ -474,18 +507,19 @@ void Run() {
}
specs.push_back(std::move(spec));
}
Context context(absl::GetFlag(FLAGS_context_spec).value);

std::vector<tensorstore::TensorStore<>> stores = MultiOpen(context, specs);
std::vector<std::optional<tensorstore::TensorStore<>>> stores =
TryMultiOpen(context, specs);
ABSL_QCHECK(stores.size() == config.size())
<< "Number of stores and config size mismatch";

for (size_t i = 0; i < stores.size(); ++i) {
FillShardVariableFields(stores[i], config[i]);
if (stores[i].has_value()) {
FillShardVariableFields(*stores[i], config[i]);
}
}

if (!FillUsingHostSpec(config, stores,
absl::GetFlag(FLAGS_host_spec).value)) {
if (!FillUsingHostSpec(config, absl::GetFlag(FLAGS_host_spec).value)) {
if (!FillUsingExistingData(config, stores)) {
ABSL_LOG(FATAL) << "Failed to fill config.";
}
Expand Down
30 changes: 19 additions & 11 deletions tensorstore/internal/benchmark/multi_read_benchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ bazel run -c opt \
"driver": "zarr3",
"kvstore": {
"driver": "ocdbt",
"base": "gs://bucket/path/ocdbt.root/"
"base": "file:///tmp/benchmark"
}}'
--read_config=config.json
*/
Expand Down Expand Up @@ -80,23 +80,31 @@ bazel run -c opt \
using ::tensorstore::internal_benchmark::ReadFromFileOrFlag;
using ::tensorstore::internal_benchmark::ShardVariable;

ABSL_FLAG(tensorstore::JsonAbslFlag<tensorstore::Context::Spec>, context_spec,
{},
"Context spec for writing data. This can be used to control the "
"number of concurrent write operations of the underlying key-value "
"store. See examples at the start of the source file.");
ABSL_FLAG(
tensorstore::JsonAbslFlag<tensorstore::Context::Spec>, context_spec,
[]() {
return tensorstore::Context::Spec::FromJson(
{
{"file_io_concurrency", {{"limit", 128}}},
})
.value();
}(),
"Context spec for reading data. This can be used to control the "
"number of concurrent read operations, for example.");

ABSL_FLAG(
tensorstore::JsonAbslFlag<tensorstore::Spec>, base_spec,
[]() {
return tensorstore::Spec::FromJson({
{"driver", "zarr"},
{"kvstore", "memory://"},
})
return tensorstore::Spec::FromJson(
{
{"driver", "zarr"},
{"kvstore",
{{"driver", "ocdbt"}, {"base", "memory:///prefix/"}}},
})
.value();
}(),
"Base TensorStore Spec to use for reading; the kvstore path will be "
"augmented with each specified --path.");
"augmented with each name in --read_config or --paths.");

ABSL_FLAG(std::string, read_config, {},
"Paths to all the tensorstore specs to benchmark. The actual spec "
Expand Down
28 changes: 14 additions & 14 deletions tensorstore/internal/benchmark/multi_spec.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

#include <stddef.h>

#include <cstdint>
#include <string>
#include <vector>

Expand All @@ -38,19 +37,20 @@ struct ShardVariable {
std::string dtype;
std::vector<Box<>> array_boxes;

constexpr static auto default_json_binder =
[](auto is_loading, const auto& options, auto* obj, auto* j) {
namespace jb = tensorstore::internal_json_binding;
using Self = ShardVariable;
return jb::Object(
jb::Member("name", jb::Projection<&Self::name>()),
jb::Member("shape", jb::Projection<&Self::shape>()),
jb::Member("chunks", jb::Projection<&Self::chunks>()),
jb::Member("dtype", jb::Projection<&Self::dtype>()),
jb::Member("array_boxes", jb::Projection<&Self::array_boxes>()),
jb::DiscardExtraMembers /**/
)(is_loading, options, obj, j);
};
constexpr static auto default_json_binder = [](auto is_loading,
const auto& options, auto* obj,
auto* j) {
namespace jb = tensorstore::internal_json_binding;
using Self = ShardVariable;
return jb::Object(
jb::Member("name", jb::Projection<&Self::name>()),
jb::Member("shape", jb::Projection<&Self::shape>()),
jb::Member("chunks", jb::Projection<&Self::chunks>()),
jb::Member("dtype", jb::Projection<&Self::dtype>()),
jb::OptionalMember("array_boxes", jb::Projection<&Self::array_boxes>()),
jb::DiscardExtraMembers /**/
)(is_loading, options, obj, j);
};
};

/// Read a list of ShardVariable from a file or a json object.
Expand Down
Loading

0 comments on commit d37f4e5

Please sign in to comment.