Skip to content

Commit

Permalink
Merge pull request redpanda-data#21576 from abhijat/feat/inv-scrub/wi…
Browse files Browse the repository at this point in the history
…re-inv-service

cst/inv: wire in inventory download service
  • Loading branch information
abhijat authored Aug 16, 2024
2 parents ee6bcb2 + 284ddf7 commit 22f5c1d
Show file tree
Hide file tree
Showing 17 changed files with 410 additions and 54 deletions.
3 changes: 2 additions & 1 deletion src/v/cloud_storage/inventory/aws_ops.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ ptree destination_node(const aws_report_configuration& cfg) {
ptree destination;
destination.add("Format", cfg.format);
destination.add("Prefix", cfg.prefix);
destination.add("Bucket", fmt::format("arn::aws::s3:::{}", cfg.bucket()));
destination.add("Bucket", fmt::format("arn:aws:s3:::{}", cfg.bucket()));
return destination;
}

Expand All @@ -58,6 +58,7 @@ iobuf to_xml(const aws_report_configuration& cfg) {

inv_cfg.add("IsEnabled", "true");
inv_cfg.add("Id", cfg.inventory_id());
inv_cfg.add("IncludedObjectVersions", "Current");

inv_cfg.add(cfg.frequency_path, cfg.frequency);

Expand Down
6 changes: 5 additions & 1 deletion src/v/cloud_storage/inventory/inv_consumer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,11 @@ ss::future<> inventory_consumer::flush_ntp_hashes(
}

ntp_hash_path /= fmt::format("{}", file_name);
vlog(
cst_log.trace,
"Writing {} hashe(s) to file {}",
hashes.size(),
ntp_hash_path);
co_return co_await ss::with_file_close_on_failure(
ss::open_file_dma(
ntp_hash_path.string(), ss::open_flags::create | ss::open_flags::wo),
Expand All @@ -223,7 +228,6 @@ ss::future<> inventory_consumer::flush_ntp_hashes(

ss::future<> inventory_consumer::write_hashes_to_file(
ss::file& f, fragmented_vector<uint64_t> hashes) {
vlog(cst_log.trace, "Writing {} hashe(s) to disk", hashes.size());
std::exception_ptr ep;
auto stream = co_await ss::make_file_output_stream(f);
auto res = co_await ss::coroutine::as_future(
Expand Down
7 changes: 7 additions & 0 deletions src/v/cloud_storage/inventory/inv_ops.cc
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,13 @@ ss::future<inv_ops> make_inv_ops(
cloud_storage_clients::bucket_name bucket,
inventory_config_id inv_cfg_id,
ss::sstring inv_reports_prefix) {
if (inv_cfg_id().empty() || inv_reports_prefix.empty()) {
throw std::runtime_error{fmt::format(
"empty inventory id or report destination prefix: id: {}, prefix: "
"{}",
inv_cfg_id(),
inv_reports_prefix)};
}
co_return inv_ops{aws_ops{bucket, inv_cfg_id, inv_reports_prefix}};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ constexpr auto frequency = csi::report_generation_frequency::daily;
const auto expected_key = fmt::format("?inventory&id={}", id);
const auto expected_xml_payload = fmt::format(
R"({header}
<InventoryConfiguration {ns}><Destination><S3BucketDestination><Format>{format}</Format><Prefix>{prefix}</Prefix><Bucket>arn::aws::s3:::{bucket}</Bucket></S3BucketDestination></Destination><IsEnabled>true</IsEnabled><Id>{id}</Id><Schedule><Frequency>{schedule}</Frequency></Schedule></InventoryConfiguration>)",
<InventoryConfiguration {ns}><Destination><S3BucketDestination><Format>{format}</Format><Prefix>{prefix}</Prefix><Bucket>arn:aws:s3:::{bucket}</Bucket></S3BucketDestination></Destination><IsEnabled>true</IsEnabled><Id>{id}</Id><IncludedObjectVersions>Current</IncludedObjectVersions><Schedule><Frequency>{schedule}</Frequency></Schedule></InventoryConfiguration>)",
fmt::arg("header", R"(<?xml version="1.0" encoding="utf-8"?>)"),
fmt::arg("ns", R"(xmlns="http://s3.amazonaws.com/doc/2006-03-01/")"),
fmt::arg("format", format),
Expand Down
16 changes: 15 additions & 1 deletion src/v/cloud_storage/inventory/types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,14 @@

#include "cloud_storage/inventory/types.h"

#include "cloud_storage/configuration.h"
#include "config/node_config.h"
#include "model/metadata.h"

namespace {
constexpr auto supported_backends = {model::cloud_storage_backend::aws};
}

namespace cloud_storage::inventory {
std::ostream& operator<<(std::ostream& os, report_generation_frequency rgf) {
switch (rgf) {
Expand All @@ -27,12 +35,18 @@ std::ostream& operator<<(std::ostream& os, report_format rf) {

std::ostream& operator<<(std::ostream& os, inventory_creation_result icr) {
switch (icr) {
using enum cloud_storage::inventory::inventory_creation_result;
using enum inventory_creation_result;
case success:
return os << "success";
case already_exists:
return os << "already-exists";
}
}

bool validate_backend_supported_for_inventory_scrub(
model::cloud_storage_backend backend) {
return std::ranges::find(supported_backends, backend)
!= supported_backends.end();
}

} // namespace cloud_storage::inventory
6 changes: 4 additions & 2 deletions src/v/cloud_storage/inventory/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,10 @@
#include "base/outcome.h"
#include "cloud_storage_clients/types.h"
#include "model/fundamental.h"
#include "utils/named_type.h"
#include "model/metadata.h"

#include <seastar/core/sharded.hh>

#include <type_traits>
#include <variant>

class retry_chain_node;
Expand Down Expand Up @@ -153,6 +152,9 @@ concept vendor_ops_provider = std::is_base_of_v<base_ops, T>;
template<vendor_ops_provider... Ts>
using inv_ops_variant = std::variant<Ts...>;

bool
validate_backend_supported_for_inventory_scrub(model::cloud_storage_backend);

} // namespace cloud_storage::inventory

namespace std {
Expand Down
123 changes: 84 additions & 39 deletions src/v/cluster/inventory_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@

#include "cluster/inventory_service.h"

#include "base/units.h"
#include "cloud_storage/inventory/inv_consumer.h"
#include "cloud_storage/inventory/report_parser.h"
#include "cloud_storage/logger.h"
#include "cloud_storage/remote.h"
#include "config/node_config.h"

#include <seastar/core/file-types.hh>
#include <seastar/core/file.hh>
Expand All @@ -33,7 +33,6 @@ using cloud_storage::cst_log;
namespace csi = cloud_storage::inventory;

namespace {
constexpr auto max_hash_size_in_memory = 128_KiB;
constexpr auto partition_leaders_retry = 5;
constexpr auto sleep_between_get_leaders = 1s;

Expand Down Expand Up @@ -79,6 +78,13 @@ ss::future<> remove_entry(std::filesystem::path parent, ss::directory_entry e) {
co_return co_await ss::recursive_remove_directory(path);
}

retry_chain_node make_rtc(ss::abort_source& as) {
return retry_chain_node{
as,
config::shard_local_cfg().cloud_storage_hydration_timeout_ms(),
config::shard_local_cfg().cloud_storage_initial_backoff_ms()};
}

} // namespace

namespace cluster {
Expand All @@ -100,12 +106,21 @@ default_leaders_provider::default_leaders_provider(
ss::future<absl::node_hash_set<model::ntp>>
default_leaders_provider::ntps(ss::abort_source& as) {
absl::node_hash_set<model::ntp> ntps;
const auto self_node_id = config::node().node_id();
if (!self_node_id.has_value()) {
vlog(cst_log.warn, "node has no id, cannot find leadership");
co_return ntps;
}

for (auto retry : std::ranges::iota_view{0, partition_leaders_retry}) {
std::exception_ptr ep;
try {
co_await _leaders_table.local().for_each_leader(
[&ntps](auto tp_ns, auto pid, auto, auto) mutable {
ntps.insert(model::ntp{tp_ns.ns, tp_ns.tp, pid});
[&ntps,
self_node_id](auto tp_ns, auto pid, auto node_id, auto) mutable {
if (node_id == self_node_id) {
ntps.insert(model::ntp{tp_ns.ns, tp_ns.tp, pid});
}
});
break;
} catch (...) {
Expand All @@ -126,38 +141,31 @@ inventory_service::inventory_service(
std::shared_ptr<leaders_provider> leaders,
std::shared_ptr<remote_provider> remote,
csi::inv_ops inv_ops,
ss::lowres_clock::duration inventory_report_check_interval)
ss::lowres_clock::duration inventory_report_check_interval,
bool should_create_report_config)
: _hash_store_path{std::move(hash_store_path)}
, _leaders{std::move(leaders)}
, _remote{std::move(remote)}
, _ops{std::move(inv_ops)}
, _rtc{_as}
, _inventory_report_check_interval{inventory_report_check_interval} {}
, _inventory_report_check_interval{inventory_report_check_interval}
, _should_create_report_config{should_create_report_config} {}

ss::future<> inventory_service::start() {
if (ss::this_shard_id() != 0) {
if (ss::this_shard_id() != shard_id) {
co_return;
}

auto rtc = retry_chain_node{_as, 60s, 1s};

vlog(cst_log.info, "Attempting to create inventory configuration");
if (const auto res = co_await _ops.maybe_create_inventory_configuration(
_remote->ref(), rtc);
res.has_error()) {
vlog(
cst_log.warn,
"Inventory configuration creation failed, will retry later",
res.error());
// If we failed creating inventory, try again later (on next expiry
// of _inventory_report_check_interval). If another node succeeded in
// creating the inventory, this is not counted as a failure.
_try_creating_inv_config = true;
} else {
vlog(
cst_log.info,
"Inventory configuration creation result: {}",
res.value());
vlog(
cst_log.info,
"starting inventory download service, should create config: {}, check "
"interval: {}",
_should_create_report_config,
_inventory_report_check_interval);

const auto config_created = co_await maybe_create_inventory_config();
if (!config_created && _should_create_report_config) {
_retry_creating_inv_config = true;
}

_report_check_timer.set_callback([this] {
Expand All @@ -175,27 +183,57 @@ ss::future<> inventory_service::start() {
_report_check_timer.arm_periodic(_inventory_report_check_interval);
}

ss::future<bool> inventory_service::maybe_create_inventory_config() {
bool config_created{false};
if (_should_create_report_config) {
vlog(cst_log.info, "Attempting to create inventory configuration");
auto rtc = make_rtc(_as);
if (const auto res = co_await _ops.maybe_create_inventory_configuration(
_remote->ref(), rtc);
res.has_error()) {
vlog(
cst_log.warn,
"Inventory configuration creation failed, will retry later",
res.error());
} else {
// If another node succeeded in creating the inventory, this is not
// counted as a failure. The end goal is that the
// configuration/schedule should exist by the time this call is
// finished.
vlog(
cst_log.info,
"Inventory configuration creation result: {}",
res.value());
config_created = true;
}
}
co_return config_created;
}

ss::future<> inventory_service::check_for_current_inventory() {
auto h = _gate.hold();
if (_try_creating_inv_config) {
auto rtc = retry_chain_node{_as, 60s, 1s};
auto res = co_await _ops.maybe_create_inventory_configuration(
_remote->ref(), rtc);

if (res.has_value()) {
_try_creating_inv_config = false;
if (_retry_creating_inv_config) {
const auto config_created = co_await maybe_create_inventory_config();
if (config_created) {
_retry_creating_inv_config = false;
}

// We either created the inventory just now, or failed again. In either
// case the report will not exist for several hours.
co_return;
}

// TODO use config values from cloud_storage
auto rtc = retry_chain_node{_as, 60s, 1s};
auto rtc = make_rtc(_as);
auto res = co_await _ops.fetch_latest_report_metadata(_remote->ref(), rtc);
if (res.has_error()) {
vlog(cst_log.info, "failed to fetch report metadata: ", res.error());
const auto& error = res.error();
if (
error == cloud_storage::inventory::error_outcome::no_reports_found) {
vlog(cst_log.info, "finished inventory check: {}", res.error());
} else {
vlog(
cst_log.info, "failed to fetch report metadata: {}", res.error());
}
co_return;
}

Expand Down Expand Up @@ -231,9 +269,17 @@ inventory_service::download_and_process_reports(csi::report_paths paths) {
co_return false;
}

for (const auto& ntp : ntps) {
vlog(cst_log.trace, "filtering report for NTP {}", ntp);
}

// Report consumer is expected to process multiple reports while keeping
// state across calls. This is only created once per set of files.
csi::inventory_consumer c{_hash_store_path, ntps, max_hash_size_in_memory};
csi::inventory_consumer c{
_hash_store_path,
ntps,
config::shard_local_cfg()
.cloud_storage_inventory_max_hash_size_during_parse};

for (const auto& path : paths) {
const auto is_path_compressed
Expand All @@ -246,8 +292,7 @@ inventory_service::download_and_process_reports(csi::report_paths paths) {
path(),
is_path_compressed);

// TODO use config values from cloud_storage
auto rtc = retry_chain_node{_as, 60s, 1s};
auto rtc = make_rtc(_as);
if (auto res = co_await _remote->ref().download_stream(
_ops.bucket(),
cloud_storage::remote_segment_path{path},
Expand Down
24 changes: 22 additions & 2 deletions src/v/cluster/inventory_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,15 @@ struct default_leaders_provider final : leaders_provider {
/// configuration exists, and if not creates it.
class inventory_service : public ss::sharded<inventory_service> {
public:
static constexpr unsigned shard_id{0};
inventory_service(
std::filesystem::path hash_store_path,
std::shared_ptr<leaders_provider> leaders,
std::shared_ptr<remote_provider> remote,
cloud_storage::inventory::inv_ops inv_ops,
ss::lowres_clock::duration inventory_report_check_interval
= std::chrono::hours{6});
= std::chrono::hours{6},
bool should_create_report_config = true);

ss::future<> start();
ss::future<> stop();
Expand All @@ -83,6 +85,13 @@ class inventory_service : public ss::sharded<inventory_service> {
bool can_use_inventory_data() const { return _can_use_inventory_data; }

private:
/// Attempts to create inventory configuration (which schedules the report
/// to be generated). Returns true if the config was created. Note that this
/// also returns true if the config was created by another node, the return
/// value lets us know if the inventory schedule creation should be retried
/// later.
ss::future<bool> maybe_create_inventory_config();

/// Checks for the existence of an inventory report on a set frequency. If a
/// report is found, it's date is checked to compare with the last date
/// processed by this node. If the date is newer (or we have not processed
Expand Down Expand Up @@ -119,14 +128,25 @@ class inventory_service : public ss::sharded<inventory_service> {
ss::gate _gate;
ss::abort_source _as;
retry_chain_node _rtc;
bool _try_creating_inv_config{false};
bool _retry_creating_inv_config{false};

ss::timer<ss::lowres_clock> _report_check_timer;
std::optional<cloud_storage::inventory::report_datetime> _last_fetched;

ss::lowres_clock::duration _inventory_report_check_interval;

bool _can_use_inventory_data{false};

// Controls if redpanda will create the report schedule on which the report
// is generated by cloud storage provider. If this value is false and the
// service is running, the schedule will not be created on service startup,
// but the report will still be checked for by the service in the expected
// path periodically. The expectation then is that the report will be
// manually placed in the bucket by the user. For use in testing and
// deployments where the cloud provider does not support inventory reports
// but reports can be generated externally, and inventory based scrub is
// still desired.
bool _should_create_report_config{true};
};

} // namespace cluster
Loading

0 comments on commit 22f5c1d

Please sign in to comment.