Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions doc/host_config_schema/cchost_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,11 @@
"type": "string",
"default": "1000ms",
"description": "Interval (time string) between retries to fetch a recent snapshot from the target node"
},
"fetch_snapshot_max_size": {
"type": "string",
"default": "10GB",
"description": "Maximum size of snapshot this node is willing to fetch"
}
},
"required": ["target_rpc_address"],
Expand Down
5 changes: 5 additions & 0 deletions include/ccf/ds/unit_strings.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,11 @@ namespace ccf::ds
{
return value;
}

size_t count_bytes() const
{
return value;
}
};

inline void from_json(const nlohmann::json& j, SizeString& s)
Expand Down
1 change: 1 addition & 0 deletions include/ccf/http_consts.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ namespace ccf
static constexpr auto AUTHORIZATION = "authorization";
static constexpr auto CACHE_CONTROL = "cache-control";
static constexpr auto CONTENT_LENGTH = "content-length";
static constexpr auto CONTENT_RANGE = "content-range";
static constexpr auto CONTENT_TYPE = "content-type";
static constexpr auto DATE = "date";
static constexpr auto DIGEST = "digest";
Expand Down
4 changes: 3 additions & 1 deletion src/host/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ namespace host
bool fetch_recent_snapshot = true;
size_t fetch_snapshot_max_attempts = 3;
ccf::ds::TimeString fetch_snapshot_retry_interval = {"1000ms"};
ccf::ds::SizeString fetch_snapshot_max_size = {"10GB"};

bool operator==(const Join&) const = default;
};
Expand Down Expand Up @@ -164,7 +165,8 @@ namespace host
follow_redirect,
fetch_recent_snapshot,
fetch_snapshot_max_attempts,
fetch_snapshot_retry_interval);
fetch_snapshot_retry_interval,
fetch_snapshot_max_size);

DECLARE_JSON_TYPE_WITH_OPTIONAL_FIELDS(CCHostConfig::Command::Recover);
DECLARE_JSON_REQUIRED_FIELDS(CCHostConfig::Command::Recover);
Expand Down
3 changes: 2 additions & 1 deletion src/host/run.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,8 @@ namespace ccf
config.command.service_certificate_file,
latest_local_idx,
config.command.join.fetch_snapshot_max_attempts,
config.command.join.fetch_snapshot_retry_interval.count_ms());
config.command.join.fetch_snapshot_retry_interval.count_ms(),
config.command.join.fetch_snapshot_max_size.count_bytes());

if (latest_peer_snapshot.has_value())
{
Expand Down
6 changes: 3 additions & 3 deletions src/http/curl.h
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ namespace ccf::curl
CurlRequest(
UniqueCURL&& curl_handle_,
RESTVerb method_,
std::string&& url_,
const std::string& url_,
UniqueSlist&& headers_,
std::unique_ptr<RequestBody>&& request_body_,
std::unique_ptr<ccf::curl::ResponseBody>&& response_,
Expand Down Expand Up @@ -538,9 +538,9 @@ namespace ccf::curl
return response;
}

[[nodiscard]] ResponseHeaders& get_response_headers()
[[nodiscard]] const ResponseHeaders::HeaderMap& get_response_headers() const
{
return response_headers;
return response_headers.data;
}

[[nodiscard]] std::optional<uint16_t> get_response_thread() const
Expand Down
6 changes: 3 additions & 3 deletions src/node/quote_endorsements_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ namespace ccf
const auto& server = servers.front();
const auto& endpoint = server.front();
auto* response_body = request->get_response_body();
auto& response_headers = request->get_response_headers();
const auto& response_headers = request->get_response_headers();

if (curl_response == CURLE_OK && status_code == HTTP_STATUS_OK)
{
Expand Down Expand Up @@ -259,8 +259,8 @@ namespace ccf
curl_response == CURLE_OK &&
status_code == HTTP_STATUS_TOO_MANY_REQUESTS)
{
auto h = response_headers.data.find(http::headers::RETRY_AFTER);
if (h != response_headers.data.end())
auto h = response_headers.find(http::headers::RETRY_AFTER);
if (h != response_headers.end())
{
const auto& retry_after_value = h->second;
// If value is invalid, retry_after_s is unchanged
Expand Down
185 changes: 123 additions & 62 deletions src/node/rpc/node_frontend.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,59 @@ namespace ccf
NetworkState& network;
ccf::AbstractNodeOperation& node_operation;

std::optional<std::string> get_redirect_address_for_node(
const ccf::endpoints::ReadOnlyEndpointContext& ctx,
const ccf::NodeId& target_node)
{
auto nodes = ctx.tx.ro(network.nodes);

auto node_info = nodes->get(target_node);
if (!node_info.has_value())
{
LOG_FAIL_FMT("Node redirection error: Unknown node {}", target_node);
ctx.rpc_ctx->set_error(
HTTP_STATUS_INTERNAL_SERVER_ERROR,
ccf::errors::InternalError,
fmt::format(
"Cannot find node info to produce redirect response for node {}",
target_node));
return std::nullopt;
}

const auto interface_id =
ctx.rpc_ctx->get_session_context()->interface_id;
if (!interface_id.has_value())
{
LOG_FAIL_FMT("Node redirection error: Non-RPC request");
ctx.rpc_ctx->set_error(
HTTP_STATUS_INTERNAL_SERVER_ERROR,
ccf::errors::InternalError,
"Cannot redirect non-RPC request");
return std::nullopt;
}

const auto& interfaces = node_info->rpc_interfaces;
const auto interface_it = interfaces.find(interface_id.value());
if (interface_it == interfaces.end())
{
LOG_FAIL_FMT(
"Node redirection error: Target missing interface {}",
interface_id.value());
ctx.rpc_ctx->set_error(
HTTP_STATUS_INTERNAL_SERVER_ERROR,
ccf::errors::InternalError,
fmt::format(
"Cannot redirect request. Received on RPC interface {}, which is "
"not present on target node {}",
interface_id.value(),
target_node));
return std::nullopt;
}

const auto& interface = interface_it->second;
return interface.published_address;
}

static std::pair<http_status, std::string> quote_verification_error(
QuoteVerificationResult result)
{
Expand Down Expand Up @@ -1856,77 +1909,87 @@ namespace ccf

static constexpr auto snapshot_since_param_key = "since";
// Redirects to endpoint for a single specific snapshot
auto find_snapshot = [this](ccf::endpoints::CommandEndpointContext& ctx) {
auto node_configuration_subsystem =
this->context.get_subsystem<NodeConfigurationSubsystem>();
if (!node_configuration_subsystem)
{
ctx.rpc_ctx->set_error(
HTTP_STATUS_INTERNAL_SERVER_ERROR,
ccf::errors::InternalError,
"NodeConfigurationSubsystem is not available");
return;
}
auto find_snapshot =
[this](ccf::endpoints::ReadOnlyEndpointContext& ctx) {
auto node_configuration_subsystem =
this->context.get_subsystem<NodeConfigurationSubsystem>();
if (!node_configuration_subsystem)
{
ctx.rpc_ctx->set_error(
HTTP_STATUS_INTERNAL_SERVER_ERROR,
ccf::errors::InternalError,
"NodeConfigurationSubsystem is not available");
return;
}

const auto& snapshots_config =
node_configuration_subsystem->get().node_config.snapshots;
const auto& snapshots_config =
node_configuration_subsystem->get().node_config.snapshots;

size_t latest_idx = 0;
{
// Get latest_idx from query param, if present
const auto parsed_query =
http::parse_query(ctx.rpc_ctx->get_request_query());
size_t latest_idx = 0;
{
// Get latest_idx from query param, if present
const auto parsed_query =
http::parse_query(ctx.rpc_ctx->get_request_query());

std::string error_reason;
auto snapshot_since = http::get_query_value_opt<ccf::SeqNo>(
parsed_query, snapshot_since_param_key, error_reason);
std::string error_reason;
auto snapshot_since = http::get_query_value_opt<ccf::SeqNo>(
parsed_query, snapshot_since_param_key, error_reason);

if (snapshot_since.has_value())
{
if (error_reason != "")
if (snapshot_since.has_value())
{
ctx.rpc_ctx->set_error(
HTTP_STATUS_BAD_REQUEST,
ccf::errors::InvalidQueryParameterValue,
std::move(error_reason));
return;
if (error_reason != "")
{
ctx.rpc_ctx->set_error(
HTTP_STATUS_BAD_REQUEST,
ccf::errors::InvalidQueryParameterValue,
std::move(error_reason));
return;
}
latest_idx = snapshot_since.value();
}
latest_idx = snapshot_since.value();
}
}

const auto orig_latest = latest_idx;
auto latest_committed_snapshot =
snapshots::find_latest_committed_snapshot_in_directory(
snapshots_config.directory, latest_idx);
const auto orig_latest = latest_idx;
auto latest_committed_snapshot =
snapshots::find_latest_committed_snapshot_in_directory(
snapshots_config.directory, latest_idx);

if (!latest_committed_snapshot.has_value())
{
ctx.rpc_ctx->set_error(
HTTP_STATUS_NOT_FOUND,
ccf::errors::ResourceNotFound,
fmt::format(
"This node has no committed snapshots since {}", orig_latest));
return;
}
if (!latest_committed_snapshot.has_value())
{
ctx.rpc_ctx->set_error(
HTTP_STATUS_NOT_FOUND,
ccf::errors::ResourceNotFound,
fmt::format(
"This node has no committed snapshots since {}", orig_latest));
return;
}

const auto& snapshot_path = latest_committed_snapshot.value();
const auto& snapshot_path = latest_committed_snapshot.value();

LOG_DEBUG_FMT("Redirecting to snapshot: {}", snapshot_path);
const auto address =
get_redirect_address_for_node(ctx, this->context.get_node_id());
if (!address.has_value())
{
// Helper function should have populated error response, so return
// now
return;
}

auto redirect_url = fmt::format("/node/snapshot/{}", snapshot_path);
ctx.rpc_ctx->set_response_header(
ccf::http::headers::LOCATION, redirect_url);
ctx.rpc_ctx->set_response_status(HTTP_STATUS_PERMANENT_REDIRECT);
};
make_command_endpoint(
auto redirect_url = fmt::format(
"https://{}/node/snapshot/{}", address.value(), snapshot_path);
LOG_DEBUG_FMT("Redirecting to snapshot: {}", redirect_url);
ctx.rpc_ctx->set_response_header(
ccf::http::headers::LOCATION, redirect_url);
ctx.rpc_ctx->set_response_status(HTTP_STATUS_PERMANENT_REDIRECT);
};
make_read_only_endpoint(
"/snapshot", HTTP_HEAD, find_snapshot, no_auth_required)
.set_forwarding_required(endpoints::ForwardingRequired::Never)
.add_query_parameter<ccf::SeqNo>(
snapshot_since_param_key, ccf::endpoints::OptionalParameter)
.set_openapi_hidden(true)
.install();
make_command_endpoint(
make_read_only_endpoint(
"/snapshot", HTTP_GET, find_snapshot, no_auth_required)
.set_forwarding_required(endpoints::ForwardingRequired::Never)
.add_query_parameter<ccf::SeqNo>(
Expand Down Expand Up @@ -2087,14 +2150,12 @@ namespace ccf

if (range_end > total_size)
{
ctx.rpc_ctx->set_error(
HTTP_STATUS_BAD_REQUEST,
ccf::errors::InvalidHeaderValue,
fmt::format(
"End of range {} is larger than total file size {}",
range_end,
total_size));
return;
LOG_DEBUG_FMT(
"Requested snapshot range ending at {}, but file size is "
"only {} - shrinking range end",
range_end,
total_size);
range_end = total_size;
}

if (range_end < range_start)
Expand Down Expand Up @@ -2174,7 +2235,7 @@ namespace ccf
// Partial Content responses describe the current response in
// Content-Range
ctx.rpc_ctx->set_response_header(
"content-range",
ccf::http::headers::CONTENT_RANGE,
fmt::format("bytes {}-{}/{}", range_start, range_end, total_size));
};
make_command_endpoint(
Expand Down
Loading