Skip to content

Commit

Permalink
udp_proxy: support proxy access logging (envoyproxy#23242)
Browse files Browse the repository at this point in the history
Some stats like `no_route` and `idle_timeout` can't be printed by session access log,
so we need proxy-level access logging to log global stats.
Additional Description:
Risk Level: Low
Fixes envoyproxy#23241

Signed-off-by: giantcroc <changran.wang@intel.com>
  • Loading branch information
GiantCroc authored Nov 1, 2022
1 parent 3ea63d7 commit 33fce6b
Show file tree
Hide file tree
Showing 6 changed files with 206 additions and 68 deletions.
7 changes: 5 additions & 2 deletions api/envoy/extensions/filters/udp/udp_proxy/v3/udp_proxy.proto
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ option (udpa.annotations.file_status).package_version_status = ACTIVE;
// [#extension: envoy.filters.udp_listener.udp_proxy]

// Configuration for the UDP proxy filter.
// [#next-free-field: 10]
// [#next-free-field: 11]
message UdpProxyConfig {
option (udpa.annotations.versioning).previous_message_type =
"envoy.config.filter.udp.udp_proxy.v2alpha.UdpProxyConfig";
Expand Down Expand Up @@ -105,6 +105,9 @@ message UdpProxyConfig {
// to upstream host selected on first chunk receival for that "session" (identified by source IP/port and local IP/port).
bool use_per_packet_load_balancing = 7;

// Configuration for access logs emitted by the UDP proxy. Note that certain UDP specific data is emitted as :ref:`Dynamic Metadata <config_access_log_format_dynamic_metadata>`.
// Configuration for session access logs emitted by the UDP proxy. Note that certain UDP specific data is emitted as :ref:`Dynamic Metadata <config_access_log_format_dynamic_metadata>`.
repeated config.accesslog.v3.AccessLog access_log = 8;

// Configuration for proxy access logs emitted by the UDP proxy. Note that certain UDP specific data is emitted as :ref:`Dynamic Metadata <config_access_log_format_dynamic_metadata>`.
repeated config.accesslog.v3.AccessLog proxy_access_log = 10;
}
3 changes: 3 additions & 0 deletions changelogs/current.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -71,5 +71,8 @@ new_features:
- area: http
change: |
allowing the dynamic forward proxy cluster to :ref:`allow_coalesced_connections <envoy_v3_api_field_extensions.clusters.dynamic_forward_proxy.v3.ClusterConfig.allow_coalesced_connections>` for HTTP/2 and HTTP/3 connections.
- area: udp_proxy
change: |
added support for :ref:`proxy_access_log <envoy_v3_api_field_extensions.filters.udp.udp_proxy.v3.UdpProxyConfig.proxy_access_log>`.
deprecated:
44 changes: 33 additions & 11 deletions docs/root/configuration/observability/access_log/usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -650,28 +650,50 @@ The following command operators are supported:

UDP
For :ref:`UDP Proxy <config_udp_listener_filters_udp_proxy>`,
NAMESPACE should be always set to "udp.proxy", optional KEYs are as follows:
when NAMESPACE is set to "udp.proxy.session", optional KEYs are as follows:

* ``cluster_name``: Name of the cluster.
* ``bytes_sent``: Total number of downstream bytes sent to the upstream in the session.
* ``bytes_received``: Total number of downstream bytes received from the upstream in the session.
* ``errors_sent``: Number of errors that have occurred when sending datagrams to the upstream in the session.
* ``errors_received``: Number of errors that have occurred when receiving datagrams from the upstream in UDP proxy.
Since the receiving errors are counted in at the listener level (vs. the session), this counter is global to all sessions and may not be directly attributable to the session being logged.
* ``datagrams_sent``: Number of datagrams sent to the upstream successfully in the session.
* ``datagrams_received``: Number of datagrams received from the upstream successfully in the session.

Recommended access log format for UDP proxy:
Recommended session access log format for UDP proxy:

.. code-block:: none
[%START_TIME%] %DYNAMIC_METADATA(udp.proxy.session:cluster_name)%
%DYNAMIC_METADATA(udp.proxy.session:bytes_sent)%
%DYNAMIC_METADATA(udp.proxy.session:bytes_received)%
%DYNAMIC_METADATA(udp.proxy.session:errors_sent)%
%DYNAMIC_METADATA(udp.proxy.session:datagrams_sent)%
%DYNAMIC_METADATA(udp.proxy.session:datagrams_received)%\n
when NAMESPACE is set to "udp.proxy.proxy", optional KEYs are as follows:

* ``bytes_sent``: Total number of downstream bytes sent to the upstream in UDP proxy.
* ``bytes_received``: Total number of downstream bytes received from the upstream in UDP proxy.
* ``errors_sent``: Number of errors that have occurred when sending datagrams to the upstream in UDP proxy.
* ``errors_received``: Number of errors that have occurred when receiving datagrams from the upstream in UDP proxy.
* ``datagrams_sent``: Number of datagrams sent to the upstream successfully in UDP proxy.
* ``datagrams_received``: Number of datagrams received from the upstream successfully in UDP proxy.
* ``no_route``: Number of times that no upstream cluster found in UDP proxy.
* ``session_total``: Total number of sessions in UDP proxy.
* ``idle_timeout``: Number of times that sessions idle timeout occurred in UDP proxy.

Recommended proxy access log format for UDP proxy:

.. code-block:: none
[%START_TIME%] %DYNAMIC_METADATA(udp.proxy:cluster_name)%
%DYNAMIC_METADATA(udp.proxy:bytes_sent)%
%DYNAMIC_METADATA(udp.proxy:bytes_received)%
%DYNAMIC_METADATA(udp.proxy:errors_sent)%
%DYNAMIC_METADATA(udp.proxy:errors_received)%
%DYNAMIC_METADATA(udp.proxy:datagrams_sent)%
%DYNAMIC_METADATA(udp.proxy:datagrams_received)%\n
[%START_TIME%]
%DYNAMIC_METADATA(udp.proxy.proxy:bytes_sent)%
%DYNAMIC_METADATA(udp.proxy.proxy:bytes_received)%
%DYNAMIC_METADATA(udp.proxy.proxy:errors_sent)%
%DYNAMIC_METADATA(udp.proxy.proxy:errors_received)%
%DYNAMIC_METADATA(udp.proxy.proxy:datagrams_sent)%
%DYNAMIC_METADATA(udp.proxy.proxy:datagrams_received)%
%DYNAMIC_METADATA(udp.proxy.proxy:session_total)%\n
THRIFT
For :ref:`Thrift Proxy <config_network_filters_thrift_proxy>`,
Expand Down
55 changes: 45 additions & 10 deletions source/extensions/filters/udp/udp_proxy/udp_proxy_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,19 @@ UdpProxyFilter::UdpProxyFilter(Network::UdpReadFilterCallbacks& callbacks,
onClusterAddOrUpdate(*cluster);
}
}

if (!config_->proxyAccessLogs().empty()) {
udp_proxy_stats_.emplace(StreamInfo::StreamInfoImpl(config_->timeSource(), nullptr));
}
}

UdpProxyFilter::~UdpProxyFilter() {
if (!config_->proxyAccessLogs().empty()) {
fillProxyStreamInfo();
for (const auto& access_log : config_->proxyAccessLogs()) {
access_log->log(nullptr, nullptr, nullptr, udp_proxy_stats_.value());
}
}
}

void UdpProxyFilter::onClusterAddOrUpdate(Upstream::ThreadLocalCluster& cluster) {
Expand Down Expand Up @@ -234,8 +247,8 @@ UdpProxyFilter::ActiveSession::ActiveSession(ClusterInfo& cluster,
// NOTE: The socket call can only fail due to memory/fd exhaustion. No local ephemeral port
// is bound until the first packet is sent to the upstream host.
socket_(cluster.filter_.createSocket(host)) {
if (!cluster_.filter_.config_->accessLogs().empty()) {
udp_sess_stats_.emplace(
if (!cluster_.filter_.config_->sessionAccessLogs().empty()) {
udp_session_stats_.emplace(
StreamInfo::StreamInfoImpl(cluster_.filter_.config_->timeSource(), nullptr));
}

Expand Down Expand Up @@ -281,29 +294,51 @@ UdpProxyFilter::ActiveSession::~ActiveSession() {
.connections()
.dec();

if (!cluster_.filter_.config_->accessLogs().empty()) {
fillStreamInfo();
for (const auto& access_log : cluster_.filter_.config_->accessLogs()) {
access_log->log(nullptr, nullptr, nullptr, udp_sess_stats_.value());
if (!cluster_.filter_.config_->sessionAccessLogs().empty()) {
fillSessionStreamInfo();
for (const auto& access_log : cluster_.filter_.config_->sessionAccessLogs()) {
access_log->log(nullptr, nullptr, nullptr, udp_session_stats_.value());
}
}
}

void UdpProxyFilter::ActiveSession::fillStreamInfo() {
void UdpProxyFilter::ActiveSession::fillSessionStreamInfo() {
ProtobufWkt::Struct stats_obj;
auto& fields_map = *stats_obj.mutable_fields();
fields_map["cluster_name"] = ValueUtil::stringValue(cluster_.cluster_.info()->name());
fields_map["bytes_sent"] = ValueUtil::numberValue(session_stats_.downstream_sess_tx_bytes_);
fields_map["bytes_received"] = ValueUtil::numberValue(session_stats_.downstream_sess_rx_bytes_);
fields_map["errors_sent"] = ValueUtil::numberValue(session_stats_.downstream_sess_tx_errors_);
fields_map["errors_received"] =
ValueUtil::numberValue(cluster_.filter_.config_->stats().downstream_sess_rx_errors_.value());
fields_map["datagrams_sent"] =
ValueUtil::numberValue(session_stats_.downstream_sess_tx_datagrams_);
fields_map["datagrams_received"] =
ValueUtil::numberValue(session_stats_.downstream_sess_rx_datagrams_);

udp_sess_stats_.value().setDynamicMetadata("udp.proxy", stats_obj);
udp_session_stats_.value().setDynamicMetadata("udp.proxy.session", stats_obj);
}

void UdpProxyFilter::fillProxyStreamInfo() {
ProtobufWkt::Struct stats_obj;
auto& fields_map = *stats_obj.mutable_fields();
fields_map["bytes_sent"] =
ValueUtil::numberValue(config_->stats().downstream_sess_tx_bytes_.value());
fields_map["bytes_received"] =
ValueUtil::numberValue(config_->stats().downstream_sess_rx_bytes_.value());
fields_map["errors_sent"] =
ValueUtil::numberValue(config_->stats().downstream_sess_tx_errors_.value());
fields_map["errors_received"] =
ValueUtil::numberValue(config_->stats().downstream_sess_rx_errors_.value());
fields_map["datagrams_sent"] =
ValueUtil::numberValue(config_->stats().downstream_sess_tx_datagrams_.value());
fields_map["datagrams_received"] =
ValueUtil::numberValue(config_->stats().downstream_sess_rx_datagrams_.value());
fields_map["no_route"] =
ValueUtil::numberValue(config_->stats().downstream_sess_no_route_.value());
fields_map["session_total"] =
ValueUtil::numberValue(config_->stats().downstream_sess_total_.value());
fields_map["idle_timeout"] = ValueUtil::numberValue(config_->stats().idle_timeout_.value());

udp_proxy_stats_.value().setDynamicMetadata("udp.proxy.proxy", stats_obj);
}

void UdpProxyFilter::ActiveSession::onIdleTimer() {
Expand Down
31 changes: 23 additions & 8 deletions source/extensions/filters/udp/udp_proxy/udp_proxy_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
#include "absl/container/flat_hash_map.h"
#include "absl/container/flat_hash_set.h"

// TODO(mattklein123): UDP session access logging.

namespace Envoy {
namespace Extensions {
namespace UdpFilters {
Expand Down Expand Up @@ -90,9 +88,15 @@ class UdpProxyFilterConfig {
"is not running with the CAP_NET_ADMIN capability.");
}

access_logs_.reserve(config.access_log_size());
session_access_logs_.reserve(config.access_log_size());
for (const envoy::config::accesslog::v3::AccessLog& log_config : config.access_log()) {
access_logs_.emplace_back(AccessLog::AccessLogFactory::fromProto(log_config, context));
session_access_logs_.emplace_back(
AccessLog::AccessLogFactory::fromProto(log_config, context));
}

proxy_access_logs_.reserve(config.proxy_access_log_size());
for (const envoy::config::accesslog::v3::AccessLog& log_config : config.proxy_access_log()) {
proxy_access_logs_.emplace_back(AccessLog::AccessLogFactory::fromProto(log_config, context));
}

if (!config.hash_policies().empty()) {
Expand All @@ -116,7 +120,12 @@ class UdpProxyFilterConfig {
const Network::ResolvedUdpSocketConfig& upstreamSocketConfig() const {
return upstream_socket_config_;
}
const std::vector<AccessLog::InstanceSharedPtr>& accessLogs() const { return access_logs_; }
const std::vector<AccessLog::InstanceSharedPtr>& sessionAccessLogs() const {
return session_access_logs_;
}
const std::vector<AccessLog::InstanceSharedPtr>& proxyAccessLogs() const {
return proxy_access_logs_;
}

private:
static UdpProxyDownstreamStats generateStats(const std::string& stat_prefix,
Expand All @@ -135,7 +144,8 @@ class UdpProxyFilterConfig {
std::unique_ptr<const HashPolicyImpl> hash_policy_;
mutable UdpProxyDownstreamStats stats_;
const Network::ResolvedUdpSocketConfig upstream_socket_config_;
std::vector<AccessLog::InstanceSharedPtr> access_logs_;
std::vector<AccessLog::InstanceSharedPtr> session_access_logs_;
std::vector<AccessLog::InstanceSharedPtr> proxy_access_logs_;
Random::RandomGenerator& random_;
};

Expand Down Expand Up @@ -165,6 +175,7 @@ class UdpProxyFilter : public Network::UdpListenerReadFilter,
public:
UdpProxyFilter(Network::UdpReadFilterCallbacks& callbacks,
const UdpProxyFilterConfigSharedPtr& config);
~UdpProxyFilter() override;

// Network::UdpListenerReadFilter
Network::FilterStatus onData(Network::UdpRecvData& data) override;
Expand Down Expand Up @@ -193,7 +204,7 @@ class UdpProxyFilter : public Network::UdpListenerReadFilter,
private:
void onIdleTimer();
void onReadReady();
void fillStreamInfo();
void fillSessionStreamInfo();

// Network::UdpPacketProcessor
void processPacket(Network::Address::InstanceConstSharedPtr local_address,
Expand Down Expand Up @@ -242,7 +253,7 @@ class UdpProxyFilter : public Network::UdpListenerReadFilter,
bool skip_connect_{};

UdpProxySessionStats session_stats_{};
absl::optional<StreamInfo::StreamInfoImpl> udp_sess_stats_;
absl::optional<StreamInfo::StreamInfoImpl> udp_session_stats_;
};

using ActiveSessionPtr = std::unique_ptr<ActiveSession>;
Expand Down Expand Up @@ -380,6 +391,8 @@ class UdpProxyFilter : public Network::UdpListenerReadFilter,
nullptr, Network::SocketCreationOptions{});
}

void fillProxyStreamInfo();

// Upstream::ClusterUpdateCallbacks
void onClusterAddOrUpdate(Upstream::ThreadLocalCluster& cluster) final;
void onClusterRemoval(const std::string& cluster_name) override;
Expand All @@ -388,6 +401,8 @@ class UdpProxyFilter : public Network::UdpListenerReadFilter,
const Upstream::ClusterUpdateCallbacksHandlePtr cluster_update_callbacks_;
// Map for looking up cluster info with its name.
absl::flat_hash_map<std::string, ClusterInfoPtr> cluster_infos_;

absl::optional<StreamInfo::StreamInfoImpl> udp_proxy_stats_;
};

} // namespace UdpProxy
Expand Down
Loading

0 comments on commit 33fce6b

Please sign in to comment.