Skip to content

Commit

Permalink
Merge "Fold validation compaction into scrub" from Botond
Browse files Browse the repository at this point in the history
"
Validation compaction -- although I still maintain that it is a good
descriptive name -- was an unfortunate choice for the underlying
functionality because Origin has burned the name already as it uses it
for a compaction type used during repair. This opens the door for
confusion for users coming from Cassandra who will associate Validation
compaction with the purpose it is used for in Origin.
Additionally, since Origin's validation compaction was not user
initiated, it didn't have a corresponding `nodetool` command to start
it. Adding such a command would create an operational difference between
us and Origin.

To avoid all this we fold validation compaction into scrub compaction,
under a new "validation" mode. I decided against using the also
suggested `--dry-mode` flag as I feel that a new mode is a more natural
choice, we don't have to define how it interacts with all the other
modes, unlike with a `--dry-mode` flag.

Fixes: scylladb#7736

Tests: unit(dev), manual(REST API)
"

* 'scrub-validation-mode/v2' of https://github.com/denesb/scylla:
  compaction/compaction_descriptor: add comment to Validation compaction type
  compaction/compaction_descriptor: compaction_options: remove validate
  api: storage_service: validate_keyspace -> scrub_keyspace (validate mode)
  compaction/compaction_manager: hide perform_sstable_validation()
  compaction: validation compaction -> scrub compaction (validate mode)
  compaction/compaction_descriptor: compaction_options: add options() accessor
  compaction/compaction_descriptor: compaction_options::scrub::mode: add validate
  • Loading branch information
avikivity committed Aug 10, 2021
2 parents 4ae6eae + 76f2790 commit a7ef826
Show file tree
Hide file tree
Showing 8 changed files with 52 additions and 94 deletions.
37 changes: 3 additions & 34 deletions api/api-doc/storage_service.json
Original file line number Diff line number Diff line change
Expand Up @@ -765,44 +765,12 @@
}
]
},
{
"path":"/storage_service/keyspace_validate/{keyspace}",
"operations":[
{
"method":"POST",
"summary":"Trigger a validation of sstables on a single keyspace",
"type": "long",
"nickname":"validate",
"produces":[
"application/json"
],
"parameters":[
{
"name":"keyspace",
"description":"The keyspace to validate",
"required":true,
"allowMultiple":false,
"type":"string",
"paramType":"path"
},
{
"name":"cf",
"description":"Comma seperated column family names",
"required":false,
"allowMultiple":false,
"type":"string",
"paramType":"query"
}
]
}
]
},
{
"path":"/storage_service/keyspace_scrub/{keyspace}",
"operations":[
{
"method":"GET",
"summary":"Scrub (deserialize + reserialize at the latest version, resolving corruptions if any) the given keyspace. If columnFamilies array is empty, all CFs are scrubbed. Scrubbed CFs will be snapshotted first, if disableSnapshot is false. Scrub has the following modes: Abort (default) - abort scrub if corruption is detected; Skip (same as `skip_corrupted=true`) skip over corrupt data, omitting them from the output; Segregate - segregate data into multiple sstables if needed, such that each sstable contains data with valid order.",
"summary":"Scrub (deserialize + reserialize at the latest version, resolving corruptions if any) the given keyspace. If columnFamilies array is empty, all CFs are scrubbed. Scrubbed CFs will be snapshotted first, if disableSnapshot is false. Scrub has the following modes: Abort (default) - abort scrub if corruption is detected; Skip (same as `skip_corrupted=true`) skip over corrupt data, omitting them from the output; Segregate - segregate data into multiple sstables if needed, such that each sstable contains data with valid order; Validate - read (no rewrite) and validate data, logging any problems found.",
"type": "long",
"nickname":"scrub",
"produces":[
Expand Down Expand Up @@ -834,7 +802,8 @@
"enum":[
"ABORT",
"SKIP",
"SEGREGATE"
"SEGREGATE",
"VALIDATE"
],
"paramType":"query"
},
Expand Down
15 changes: 2 additions & 13 deletions api/storage_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -541,19 +541,6 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
});
});

ss::validate.set(r, wrap_ks_cf(ctx, [] (http_context& ctx, std::unique_ptr<request> req, sstring keyspace, std::vector<sstring> column_families ) {
return ctx.db.invoke_on_all([=] (database& db) {
return do_for_each(column_families, [=, &db](sstring cfname) {
auto& cm = db.get_compaction_manager();
auto& cf = db.find_column_family(keyspace, cfname);
return cm.perform_sstable_validation(&cf);
});
}).then([]{
return make_ready_future<json::json_return_type>(0);
});

}));

ss::upgrade_sstables.set(r, wrap_ks_cf(ctx, [] (http_context& ctx, std::unique_ptr<request> req, sstring keyspace, std::vector<sstring> column_families) {
bool exclude_current_version = req_param<bool>(*req, "exclude_current_version", false);

Expand Down Expand Up @@ -1276,6 +1263,8 @@ void set_snapshot(http_context& ctx, routes& r, sharded<db::snapshot_ctl>& snap_
scrub_mode = sstables::compaction_options::scrub::mode::skip;
} else if (scrub_mode_str == "SEGREGATE") {
scrub_mode = sstables::compaction_options::scrub::mode::segregate;
} else if (scrub_mode_str == "VALIDATE") {
scrub_mode = sstables::compaction_options::scrub::mode::validate;
} else {
throw std::invalid_argument(fmt::format("Unknown argument for 'scrub_mode' parameter: {}", scrub_mode_str));
}
Expand Down
33 changes: 16 additions & 17 deletions compaction/compaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ std::string_view to_string(compaction_options::scrub::mode scrub_mode) {
return "skip";
case compaction_options::scrub::mode::segregate:
return "segregate";
case compaction_options::scrub::mode::validate:
return "validate";
}
on_internal_error_noexcept(clogger, format("Invalid scrub mode {}", int(scrub_mode)));
return "(invalid)";
Expand Down Expand Up @@ -1602,7 +1604,6 @@ compaction_type compaction_options::type() const {
static const compaction_type index_to_type[] = {
compaction_type::Compaction,
compaction_type::Cleanup,
compaction_type::Validation,
compaction_type::Upgrade,
compaction_type::Scrub,
compaction_type::Reshard,
Expand All @@ -1629,9 +1630,6 @@ static std::unique_ptr<compaction> make_compaction(column_family& cf, sstables::
std::unique_ptr<compaction> operator()(compaction_options::cleanup options) {
return std::make_unique<cleanup_compaction>(cf, std::move(descriptor), std::move(options));
}
std::unique_ptr<compaction> operator()(compaction_options::validation) {
return nullptr; // this compaction doesn't go through the regular path
}
std::unique_ptr<compaction> operator()(compaction_options::upgrade options) {
return std::make_unique<cleanup_compaction>(cf, std::move(descriptor), std::move(options));
}
Expand All @@ -1643,7 +1641,7 @@ static std::unique_ptr<compaction> make_compaction(column_family& cf, sstables::
return descriptor.options.visit(visitor_factory);
}

future<bool> validate_compaction_validate_reader(flat_mutation_reader reader, const compaction_info& info) {
future<bool> scrub_validate_mode_validate_reader(flat_mutation_reader reader, const compaction_info& info) {
auto schema = reader.schema();

bool valid = true;
Expand All @@ -1663,25 +1661,25 @@ future<bool> validate_compaction_validate_reader(flat_mutation_reader reader, co
if (mf.is_partition_start()) {
const auto& ps = mf.as_partition_start();
if (!validator(mf)) {
scrub_compaction::report_invalid_partition_start(compaction_type::Validation, validator, ps.key());
scrub_compaction::report_invalid_partition_start(compaction_type::Scrub, validator, ps.key());
validator.reset(mf);
valid = false;
}
if (!validator(ps.key())) {
scrub_compaction::report_invalid_partition(compaction_type::Validation, validator, ps.key());
scrub_compaction::report_invalid_partition(compaction_type::Scrub, validator, ps.key());
validator.reset(ps.key());
valid = false;
}
} else {
if (!validator(mf)) {
scrub_compaction::report_invalid_mutation_fragment(compaction_type::Validation, validator, mf);
scrub_compaction::report_invalid_mutation_fragment(compaction_type::Scrub, validator, mf);
validator.reset(mf);
valid = false;
}
}
}
if (!validator.on_end_of_stream()) {
scrub_compaction::report_invalid_end_of_stream(compaction_type::Validation, validator);
scrub_compaction::report_invalid_end_of_stream(compaction_type::Scrub, validator);
valid = false;
}
} catch (...) {
Expand All @@ -1697,7 +1695,7 @@ future<bool> validate_compaction_validate_reader(flat_mutation_reader reader, co
co_return valid;
}

static future<compaction_info> validate_sstables(sstables::compaction_descriptor descriptor, column_family& cf) {
static future<compaction_info> scrub_sstables_validate_mode(sstables::compaction_descriptor descriptor, column_family& cf) {
auto schema = cf.schema();

formatted_sstables_list sstables_list_msg;
Expand All @@ -1714,15 +1712,15 @@ static future<compaction_info> validate_sstables(sstables::compaction_descriptor
cf.get_compaction_manager().deregister_compaction(info);
});

clogger.info("Validating {}", sstables_list_msg);
clogger.info("Scrubbing in validate mode {}", sstables_list_msg);

auto permit = cf.compaction_concurrency_semaphore().make_tracking_only_permit(schema.get(), "Validation");
auto permit = cf.compaction_concurrency_semaphore().make_tracking_only_permit(schema.get(), "scrub:validate");
auto reader = sstables->make_local_shard_sstable_reader(schema, permit, query::full_partition_range, schema->full_slice(), descriptor.io_priority,
tracing::trace_state_ptr(), ::streamed_mutation::forwarding::no, ::mutation_reader::forwarding::no, default_read_monitor_generator());

const auto valid = co_await validate_compaction_validate_reader(std::move(reader), *info);
const auto valid = co_await scrub_validate_mode_validate_reader(std::move(reader), *info);

clogger.info("Validated {} - sstable(s) are {}", sstables_list_msg, valid ? "valid" : "invalid");
clogger.info("Finished scrubbing in validate mode {} - sstable(s) are {}", sstables_list_msg, valid ? "valid" : "invalid");

co_return *info;
}
Expand All @@ -1733,9 +1731,10 @@ compact_sstables(sstables::compaction_descriptor descriptor, column_family& cf)
return make_exception_future<compaction_info>(std::runtime_error(format("Called {} compaction with empty set on behalf of {}.{}",
compaction_name(descriptor.options.type()), cf.schema()->ks_name(), cf.schema()->cf_name())));
}
if (descriptor.options.type() == compaction_type::Validation) {
// Bypass the usual compaction machinery for validation compaction
return validate_sstables(std::move(descriptor), cf);
if (descriptor.options.type() == compaction_type::Scrub
&& std::get<compaction_options::scrub>(descriptor.options.options()).operation_mode == compaction_options::scrub::mode::validate) {
// Bypass the usual compaction machinery for dry-mode scrub
return scrub_sstables_validate_mode(std::move(descriptor), cf);
}
auto c = make_compaction(cf, std::move(descriptor));
if (c->enable_garbage_collected_sstable_writer()) {
Expand Down
2 changes: 1 addition & 1 deletion compaction/compaction.hh
Original file line number Diff line number Diff line change
Expand Up @@ -114,5 +114,5 @@ namespace sstables {
flat_mutation_reader make_scrubbing_reader(flat_mutation_reader rd, compaction_options::scrub::mode scrub_mode);

// For tests, can drop after we virtualize sstables.
future<bool> validate_compaction_validate_reader(flat_mutation_reader rd, const compaction_info& info);
future<bool> scrub_validate_mode_validate_reader(flat_mutation_reader rd, const compaction_info& info);
}
13 changes: 5 additions & 8 deletions compaction/compaction_descriptor.hh
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ namespace sstables {
enum class compaction_type {
Compaction = 0,
Cleanup = 1,
Validation = 2,
Validation = 2, // Origin uses this for a compaction that is used exclusively for repair
Scrub = 3,
Index_build = 4,
Reshard = 5,
Expand Down Expand Up @@ -69,8 +69,6 @@ public:
struct cleanup {
std::reference_wrapper<database> db;
};
struct validation {
};
struct upgrade {
std::reference_wrapper<database> db;
};
Expand All @@ -79,6 +77,7 @@ public:
abort, // abort scrub on the first sign of corruption
skip, // skip corrupt data, including range of rows and/or partitions that are out-of-order
segregate, // segregate out-of-order data into streams that all contain data with correct order
validate, // validate data, printing all errors found (sstables are only read, not rewritten)
};
mode operation_mode = mode::abort;
};
Expand All @@ -87,7 +86,7 @@ public:
struct reshape {
};
private:
using options_variant = std::variant<regular, cleanup, validation, upgrade, scrub, reshard, reshape>;
using options_variant = std::variant<regular, cleanup, upgrade, scrub, reshard, reshape>;

private:
options_variant _options;
Expand All @@ -113,10 +112,6 @@ public:
return compaction_options(cleanup{db});
}

static compaction_options make_validation() {
return compaction_options(validation{});
}

static compaction_options make_upgrade(database& db) {
return compaction_options(upgrade{db});
}
Expand All @@ -130,6 +125,8 @@ public:
return std::visit(std::forward<Visitor>(visitor)..., _options);
}

const options_variant& options() const { return _options; }

compaction_type type() const;
};

Expand Down
19 changes: 14 additions & 5 deletions compaction/compaction_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -754,8 +754,8 @@ future<> compaction_manager::rewrite_sstables(column_family* cf, sstables::compa
return task->compaction_done.get_future().then([task] {});
}

future<> compaction_manager::perform_sstable_validation(column_family* cf) {
return run_custom_job(cf, sstables::compaction_type::Validation, [this, &cf = *cf, sstables = get_candidates(*cf)] () mutable -> future<> {
future<> compaction_manager::perform_sstable_scrub_validate_mode(column_family* cf) {
return run_custom_job(cf, sstables::compaction_type::Scrub, [this, &cf = *cf, sstables = get_candidates(*cf)] () mutable -> future<> {
class pending_tasks {
compaction_manager::stats& _stats;
size_t _n;
Expand All @@ -775,8 +775,14 @@ future<> compaction_manager::perform_sstable_validation(column_family* cf) {

try {
co_await with_scheduling_group(_maintenance_sg.cpu, [&] () {
auto desc = sstables::compaction_descriptor({ sst }, {}, _maintenance_sg.io, sst->get_sstable_level(),
sstables::compaction_descriptor::default_max_sstable_bytes, sst->run_identifier(), sstables::compaction_options::make_validation());
auto desc = sstables::compaction_descriptor(
{ sst },
{},
_maintenance_sg.io,
sst->get_sstable_level(),
sstables::compaction_descriptor::default_max_sstable_bytes,
sst->run_identifier(),
sstables::compaction_options::make_scrub(sstables::compaction_options::scrub::mode::validate));
return compact_sstables(std::move(desc), cf);
});
} catch (sstables::compaction_stop_exception&) {
Expand All @@ -788,7 +794,7 @@ future<> compaction_manager::perform_sstable_validation(column_family* cf) {
// expected, just continue with the other sstables when seeing
// one.
_stats.errors++;
cmlog.error("Validating {} failed due to {}, continuing.", sst->get_filename(), std::current_exception());
cmlog.error("Scrubbing in validate mode {} failed due to {}, continuing.", sst->get_filename(), std::current_exception());
}

pending--;
Expand Down Expand Up @@ -878,6 +884,9 @@ future<> compaction_manager::perform_sstable_upgrade(database& db, column_family

// Submit a column family to be scrubbed and wait for its termination.
future<> compaction_manager::perform_sstable_scrub(column_family* cf, sstables::compaction_options::scrub::mode scrub_mode) {
if (scrub_mode == sstables::compaction_options::scrub::mode::validate) {
return perform_sstable_scrub_validate_mode(cf);
}
return rewrite_sstables(cf, sstables::compaction_options::make_scrub(scrub_mode), [this] (const table& cf) {
return get_candidates(cf);
});
Expand Down
9 changes: 2 additions & 7 deletions compaction/compaction_manager.hh
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ private:
// similar-sized compaction.
void postpone_compaction_for_column_family(column_family* cf);

future<> perform_sstable_scrub_validate_mode(column_family* cf);

compaction_controller _compaction_controller;
compaction_backlog_manager _backlog_manager;
maintenance_scheduling_group _maintenance_sg;
Expand Down Expand Up @@ -220,13 +222,6 @@ public:
// Submit a column family to be scrubbed and wait for its termination.
future<> perform_sstable_scrub(column_family* cf, sstables::compaction_options::scrub::mode scrub_mode);

// Submit a column family to be validated and wait for its termination.
//
// Validation compaction reads each sstable individually, passing the
// fragment stream through mutation fragment stream validator, logging any
// errors found.
future<> perform_sstable_validation(column_family* cf);

// Submit a column family for major compaction.
future<> submit_major_compaction(column_family* cf);

Expand Down
Loading

0 comments on commit a7ef826

Please sign in to comment.