Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

aws: add metadata fetcher utility to use http async client #29880

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions source/extensions/common/aws/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,18 @@ envoy_cc_library(
external_deps = ["abseil_optional"],
)

envoy_cc_library(
name = "metadata_fetcher_lib",
srcs = ["metadata_fetcher.cc"],
hdrs = ["metadata_fetcher.h"],
deps = [
":utility_lib",
"//envoy/upstream:cluster_manager_interface",
"//source/common/http:utility_lib",
"@envoy_api//envoy/config/core/v3:pkg_cc_proto",
],
)

envoy_cc_library(
name = "credentials_provider_impl_lib",
srcs = ["credentials_provider_impl.cc"],
Expand All @@ -63,10 +75,14 @@ envoy_cc_library(
external_deps = ["curl"],
deps = [
"//envoy/http:message_interface",
"//envoy/upstream:cluster_manager_interface",
"//source/common/common:empty_string",
"//source/common/common:matchers_lib",
"//source/common/common:utility_lib",
"//source/common/http:headers_lib",
"//source/common/http:utility_lib",
"@envoy_api//envoy/config/cluster/v3:pkg_cc_proto",
"@envoy_api//envoy/extensions/upstreams/http/v3:pkg_cc_proto",
],
)

Expand Down
179 changes: 179 additions & 0 deletions source/extensions/common/aws/metadata_fetcher.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
#include "source/extensions/common/aws/metadata_fetcher.h"

#include "envoy/config/core/v3/base.pb.h"
#include "envoy/config/core/v3/http_uri.pb.h"

#include "source/common/common/enum_to_int.h"
#include "source/common/http/headers.h"
#include "source/common/http/utility.h"
#include "source/common/protobuf/utility.h"

namespace Envoy {
namespace Extensions {
namespace Common {
namespace Aws {

namespace {

class MetadataFetcherImpl : public MetadataFetcher,
public Logger::Loggable<Logger::Id::aws>,
public Http::AsyncClient::Callbacks {

public:
MetadataFetcherImpl(Upstream::ClusterManager& cm, absl::string_view cluster_name)
: cm_(cm), cluster_name_(std::string(cluster_name)) {}

~MetadataFetcherImpl() override { cancel(); }

void cancel() override {
if (request_ && !complete_) {
request_->cancel();
ENVOY_LOG(debug, "fetch AWS Metadata [cluster = {}]: cancelled", cluster_name_);
}
reset();
}

absl::string_view failureToString(MetadataFetcher::MetadataReceiver::Failure reason) override {
switch (reason) {
case MetadataFetcher::MetadataReceiver::Failure::Network:
return "Network";
case MetadataFetcher::MetadataReceiver::Failure::InvalidMetadata:
return "InvalidMetadata";
case MetadataFetcher::MetadataReceiver::Failure::MissingConfig:
return "MissingConfig";
default:
return "";
}
}

void fetch(Http::RequestMessage& message, Tracing::Span& parent_span,
ravenblackx marked this conversation as resolved.
Show resolved Hide resolved
MetadataFetcher::MetadataReceiver& receiver) override {
ASSERT(!request_);
ASSERT(!receiver_);
complete_ = false;
receiver_ = makeOptRef(receiver);
const auto thread_local_cluster = cm_.getThreadLocalCluster(cluster_name_);
if (thread_local_cluster == nullptr) {
ENVOY_LOG(error, "{} AWS Metadata failed: [cluster = {}] not found", __func__, cluster_name_);
complete_ = true;
receiver_->onMetadataError(MetadataFetcher::MetadataReceiver::Failure::MissingConfig);
reset();
return;
}

constexpr uint64_t MAX_RETRIES = 3;
constexpr uint64_t RETRY_DELAY = 1000;
constexpr uint64_t TIMEOUT = 5 * 1000;

const auto host_attributes = Http::Utility::parseAuthority(message.headers().getHostValue());
const auto host = host_attributes.host_;
const auto path = message.headers().getPathValue();
const auto scheme = message.headers().getSchemeValue();
const auto method = message.headers().getMethodValue();
ENVOY_LOG(debug, "fetch AWS Metadata at [uri = {}]: start from cluster {}",
fmt::format("{}://{}{}", scheme, host, path), cluster_name_);

Http::RequestHeaderMapPtr headersPtr =
Envoy::Http::createHeaderMap<Envoy::Http::RequestHeaderMapImpl>(
{{Envoy::Http::Headers::get().Method, std::string(method)},
{Envoy::Http::Headers::get().Host, std::string(host)},
{Envoy::Http::Headers::get().Scheme, std::string(scheme)},
{Envoy::Http::Headers::get().Path, std::string(path)}});

// Copy the remaining headers.
message.headers().iterate(
[&headersPtr](const Http::HeaderEntry& entry) -> Http::HeaderMap::Iterate {
// Skip pseudo-headers
if (!entry.key().getStringView().empty() && entry.key().getStringView()[0] == ':') {
return Http::HeaderMap::Iterate::Continue;
}
headersPtr->addCopy(Http::LowerCaseString(entry.key().getStringView()),
entry.value().getStringView());
return Http::HeaderMap::Iterate::Continue;
});

auto messagePtr = std::make_unique<Envoy::Http::RequestMessageImpl>(std::move(headersPtr));

auto options = Http::AsyncClient::RequestOptions()
.setTimeout(std::chrono::milliseconds(TIMEOUT))
.setParentSpan(parent_span)
.setSendXff(false)
.setChildSpanName("AWS Metadata Fetch");

envoy::config::route::v3::RetryPolicy route_retry_policy;
route_retry_policy.mutable_num_retries()->set_value(MAX_RETRIES);
route_retry_policy.mutable_per_try_timeout()->CopyFrom(
Protobuf::util::TimeUtil::MillisecondsToDuration(TIMEOUT));
route_retry_policy.mutable_per_try_idle_timeout()->CopyFrom(
Protobuf::util::TimeUtil::MillisecondsToDuration(RETRY_DELAY));
route_retry_policy.set_retry_on("5xx,gateway-error,connect-failure,reset,refused-stream");

options.setRetryPolicy(route_retry_policy);
options.setBufferBodyForRetry(true);
request_ = makeOptRefFromPtr(
thread_local_cluster->httpAsyncClient().send(std::move(messagePtr), *this, options));
}

// HTTP async receive method on success.
void onSuccess(const Http::AsyncClient::Request&, Http::ResponseMessagePtr&& response) override {
complete_ = true;
const uint64_t status_code = Http::Utility::getResponseStatus(response->headers());
if (status_code == enumToInt(Http::Code::OK)) {
ENVOY_LOG(debug, "{}: fetch AWS Metadata [cluster = {}]: success", __func__, cluster_name_);
if (response->body().length() != 0) {
const auto body = response->bodyAsString();
receiver_->onMetadataSuccess(std::move(body));
} else {
ENVOY_LOG(debug, "{}: fetch AWS Metadata [cluster = {}]: body is empty", __func__,
cluster_name_);
receiver_->onMetadataError(MetadataFetcher::MetadataReceiver::Failure::InvalidMetadata);
}
} else {
if (response->body().length() != 0) {
ENVOY_LOG(debug, "{}: fetch AWS Metadata [cluster = {}]: response status code {}, body: {}",
__func__, cluster_name_, status_code, response->bodyAsString());
} else {
ENVOY_LOG(debug,
"{}: fetch AWS Metadata [cluster = {}]: response status code {}, body is empty",
__func__, cluster_name_, status_code);
}
receiver_->onMetadataError(MetadataFetcher::MetadataReceiver::Failure::Network);
}
reset();
}

// HTTP async receive method on failure.
void onFailure(const Http::AsyncClient::Request&,
Http::AsyncClient::FailureReason reason) override {
ENVOY_LOG(debug, "{}: fetch AWS Metadata [cluster = {}]: network error {}", __func__,
cluster_name_, enumToInt(reason));
complete_ = true;
receiver_->onMetadataError(MetadataFetcher::MetadataReceiver::Failure::Network);
reset();
}

// TODO(suniltheta): Add metadata fetch status into the span like it is done on ext_authz filter.
void onBeforeFinalizeUpstreamSpan(Tracing::Span&, const Http::ResponseHeaderMap*) override {}

private:
bool complete_{};
suniltheta marked this conversation as resolved.
Show resolved Hide resolved
Upstream::ClusterManager& cm_;
const std::string cluster_name_;
OptRef<MetadataFetcher::MetadataReceiver> receiver_;
OptRef<Http::AsyncClient::Request> request_;

void reset() {
request_.reset();
receiver_.reset();
}
};
} // namespace

MetadataFetcherPtr MetadataFetcher::create(Upstream::ClusterManager& cm,
absl::string_view cluster_name) {
return std::make_unique<MetadataFetcherImpl>(cm, cluster_name);
}
} // namespace Aws
} // namespace Common
} // namespace Extensions
} // namespace Envoy
97 changes: 97 additions & 0 deletions source/extensions/common/aws/metadata_fetcher.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
#pragma once

#include <memory>
#include <string>

#include "envoy/common/pure.h"
#include "envoy/http/message.h"
#include "envoy/upstream/cluster_manager.h"

#include "source/common/http/message_impl.h"
#include "source/extensions/common/aws/utility.h"

#include "absl/strings/string_view.h"
#include "absl/types/optional.h"

namespace Envoy {
namespace Extensions {
namespace Common {
namespace Aws {

class MetadataFetcher;
using MetadataFetcherPtr = std::unique_ptr<MetadataFetcher>;

/**
* MetadataFetcher interface can be used to retrieve AWS Metadata from various providers.
* An instance of this interface is designed to retrieve one AWS Metadata at a time.
* The implementation of AWS Metadata Fetcher is similar to JwksFetcher.
*/

class MetadataFetcher {
public:
class MetadataReceiver {
public:
enum class Failure {
/* A network error occurred causing AWS Metadata retrieval failure. */
Network,
/* A failure occurred when trying to parse the retrieved AWS Metadata data. */
InvalidMetadata,
/* A missing config causing AWS Metadata retrieval failure. */
MissingConfig,
};

virtual ~MetadataReceiver() = default;

/**
* @brief Successful retrieval callback of returned AWS Metadata.
* @param body Fetched AWS Metadata.
*/
virtual void onMetadataSuccess(const std::string&& body) PURE;

/**
* @brief Retrieval error callback.
* @param reason the failure reason.
*/
virtual void onMetadataError(Failure reason) PURE;
};

virtual ~MetadataFetcher() = default;

/**
* @brief Cancel any in-flight request.
*/
virtual void cancel() PURE;

/**
* @brief Retrieve a AWS Metadata from a remote HTTP host.
* At most one outstanding request may be in-flight.
* i.e. from the invocation of `fetch()` until either
* a callback or `cancel()` is invoked, no additional
* `fetch()` may be issued. The URI to fetch is to pre
* determined based on the credentials provider source.
*
* @param receiver the receiver of the fetched AWS Metadata or error
*/
virtual void fetch(Http::RequestMessage& message, Tracing::Span& parent_span,
MetadataReceiver& receiver) PURE;

/**
* @brief Return MetadataReceiver Failure enum as a string.
*
* @return absl::string_view
*/
virtual absl::string_view failureToString(MetadataReceiver::Failure) PURE;

/**
* @brief Factory method for creating a Metadata Fetcher.
*
* @param cm the cluster manager to use during AWS Metadata retrieval
* @param provider the AWS Metadata provider
* @return a MetadataFetcher instance
*/
static MetadataFetcherPtr create(Upstream::ClusterManager& cm, absl::string_view cluster_name);
};
} // namespace Aws
} // namespace Common
} // namespace Extensions
} // namespace Envoy
56 changes: 56 additions & 0 deletions source/extensions/common/aws/utility.cc
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
#include "source/extensions/common/aws/utility.h"

#include "envoy/upstream/cluster_manager.h"

#include "source/common/common/empty_string.h"
#include "source/common/common/fmt.h"
#include "source/common/common/utility.h"
#include "source/common/protobuf/message_validator_impl.h"
#include "source/common/protobuf/utility.h"

#include "absl/strings/match.h"
#include "absl/strings/str_join.h"
#include "absl/strings/str_split.h"
#include "curl/curl.h"
#include "fmt/printf.h"

namespace Envoy {
namespace Extensions {
Expand Down Expand Up @@ -294,6 +299,57 @@ absl::optional<std::string> Utility::fetchMetadata(Http::RequestMessage& message
return buffer.empty() ? absl::nullopt : absl::optional<std::string>(buffer);
}

bool Utility::addInternalClusterStatic(
Upstream::ClusterManager& cm, absl::string_view cluster_name,
const envoy::config::cluster::v3::Cluster::DiscoveryType cluster_type, absl::string_view uri) {
// Check if local cluster exists with that name.
if (cm.getThreadLocalCluster(cluster_name) == nullptr) {
// Make sure we run this on main thread.
TRY_ASSERT_MAIN_THREAD {
envoy::config::cluster::v3::Cluster cluster;
absl::string_view host_port;
absl::string_view path;
Http::Utility::extractHostPathFromUri(uri, host_port, path);
const auto host_attributes = Http::Utility::parseAuthority(host_port);
const auto host = host_attributes.host_;
const auto port = host_attributes.port_ ? host_attributes.port_.value() : 80;

cluster.set_name(cluster_name);
cluster.set_type(cluster_type);
cluster.mutable_connect_timeout()->set_seconds(5);
cluster.mutable_load_assignment()->set_cluster_name(cluster_name);
auto* endpoint = cluster.mutable_load_assignment()
->add_endpoints()
->add_lb_endpoints()
->mutable_endpoint();
auto* addr = endpoint->mutable_address();
addr->mutable_socket_address()->set_address(host);
addr->mutable_socket_address()->set_port_value(port);
cluster.set_lb_policy(envoy::config::cluster::v3::Cluster::ROUND_ROBIN);
envoy::extensions::upstreams::http::v3::HttpProtocolOptions protocol_options;
auto* http_protocol_options =
protocol_options.mutable_explicit_http_config()->mutable_http_protocol_options();
http_protocol_options->set_accept_http_10(true);
(*cluster.mutable_typed_extension_protocol_options())
["envoy.extensions.upstreams.http.v3.HttpProtocolOptions"]
.PackFrom(protocol_options);

// TODO(suniltheta): use random number generator here for cluster version.
cm.addOrUpdateCluster(cluster, "12345");
ENVOY_LOG_MISC(info,
"Added a {} internal cluster [name: {}, address:{}:{}] to fetch aws "
"credentials",
cluster_type, cluster_name, host, port);
}
END_TRY
CATCH(const EnvoyException& e, {
ENVOY_LOG_MISC(error, "Failed to add internal cluster {}: {}", cluster_name, e.what());
return false;
});
}
return true;
}

} // namespace Aws
} // namespace Common
} // namespace Extensions
Expand Down
Loading
Loading