Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

udp_proxy: support proxy access logging #23242

Merged
merged 16 commits into from
Nov 1, 2022
9 changes: 6 additions & 3 deletions api/envoy/extensions/filters/udp/udp_proxy/v3/udp_proxy.proto
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ option (udpa.annotations.file_status).package_version_status = ACTIVE;
// [#extension: envoy.filters.udp_listener.udp_proxy]

// Configuration for the UDP proxy filter.
// [#next-free-field: 10]
// [#next-free-field: 11]
message UdpProxyConfig {
option (udpa.annotations.versioning).previous_message_type =
"envoy.config.filter.udp.udp_proxy.v2alpha.UdpProxyConfig";
Expand Down Expand Up @@ -105,6 +105,9 @@ message UdpProxyConfig {
// to upstream host selected on first chunk receival for that "session" (identified by source IP/port and local IP/port).
bool use_per_packet_load_balancing = 7;

// Configuration for access logs emitted by the UDP proxy. Note that certain UDP specific data is emitted as :ref:`Dynamic Metadata <config_access_log_format_dynamic_metadata>`.
repeated config.accesslog.v3.AccessLog access_log = 8;
// Configuration for session access logs emitted by the UDP proxy. Note that certain UDP specific data is emitted as :ref:`Dynamic Metadata <config_access_log_format_dynamic_metadata>`.
repeated config.accesslog.v3.AccessLog session_access_log = 8;
giantcroc marked this conversation as resolved.
Show resolved Hide resolved

// Configuration for proxy access logs emitted by the UDP proxy. Note that certain UDP specific data is emitted as :ref:`Dynamic Metadata <config_access_log_format_dynamic_metadata>`.
repeated config.accesslog.v3.AccessLog proxy_access_log = 10;
giantcroc marked this conversation as resolved.
Show resolved Hide resolved
}
3 changes: 3 additions & 0 deletions changelogs/current.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,9 @@ new_features:
- area: grpc_json_transcoder
change: |
added support for parsing enum value case insensitively enabled by the config :ref:`case_insensitive_enum_parsing <envoy_v3_api_field_extensions.filters.http.grpc_json_transcoder.v3.GrpcJsonTranscoder.case_insensitive_enum_parsing>`.
- area: udp_proxy
change: |
added support for :ref:`proxy_access_log <envoy_v3_api_field_extensions.filters.udp.udp_proxy.v3.UdpProxyConfig.proxy_access_log>`.
- area: grpc_json_transcoder
change: |
added support for newline-delimited streams in :ref:`stream_newline_delimited <envoy_v3_api_field_extensions.filters.http.grpc_json_transcoder.v3.GrpcJsonTranscoder.PrintOptions.stream_newline_delimited>`.
Expand Down
44 changes: 33 additions & 11 deletions docs/root/configuration/observability/access_log/usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -650,28 +650,50 @@ The following command operators are supported:

UDP
For :ref:`UDP Proxy <config_udp_listener_filters_udp_proxy>`,
NAMESPACE should be always set to "udp.proxy", optional KEYs are as follows:
when NAMESPACE is set to "udp.proxy.session", optional KEYs are as follows:

* ``cluster_name``: Name of the cluster.
* ``bytes_sent``: Total number of downstream bytes sent to the upstream in the session.
* ``bytes_received``: Total number of downstream bytes received from the upstream in the session.
* ``errors_sent``: Number of errors that have occurred when sending datagrams to the upstream in the session.
* ``errors_received``: Number of errors that have occurred when receiving datagrams from the upstream in UDP proxy.
Since the receiving errors are counted in at the listener level (vs. the session), this counter is global to all sessions and may not be directly attributable to the session being logged.
* ``datagrams_sent``: Number of datagrams sent to the upstream successfully in the session.
* ``datagrams_received``: Number of datagrams received from the upstream successfully in the session.

Recommended access log format for UDP proxy:
Recommended session access log format for UDP proxy:

.. code-block:: none

[%START_TIME%] %DYNAMIC_METADATA(udp.proxy.session:cluster_name)%
%DYNAMIC_METADATA(udp.proxy.session:bytes_sent)%
%DYNAMIC_METADATA(udp.proxy.session:bytes_received)%
%DYNAMIC_METADATA(udp.proxy.session:errors_sent)%
%DYNAMIC_METADATA(udp.proxy.session:datagrams_sent)%
%DYNAMIC_METADATA(udp.proxy.session:datagrams_received)%\n

when NAMESPACE is set to "udp.proxy.proxy", optional KEYs are as follows:

* ``bytes_sent``: Total number of downstream bytes sent to the upstream in UDP proxy.
* ``bytes_received``: Total number of downstream bytes received from the upstream in UDP proxy.
* ``errors_sent``: Number of errors that have occurred when sending datagrams to the upstream in UDP proxy.
* ``errors_received``: Number of errors that have occurred when receiving datagrams from the upstream in UDP proxy.
* ``datagrams_sent``: Number of datagrams sent to the upstream successfully in UDP proxy.
* ``datagrams_received``: Number of datagrams received from the upstream successfully in UDP proxy.
* ``no_route``: Number of times that no upstream cluster found in UDP proxy.
* ``session_total``: Total number of sessions in UDP proxy.
* ``idle_timeout``: Number of times that sessions idle timeout occurred in UDP proxy.

Recommended proxy access log format for UDP proxy:

.. code-block:: none

[%START_TIME%] %DYNAMIC_METADATA(udp.proxy:cluster_name)%
%DYNAMIC_METADATA(udp.proxy:bytes_sent)%
%DYNAMIC_METADATA(udp.proxy:bytes_received)%
%DYNAMIC_METADATA(udp.proxy:errors_sent)%
%DYNAMIC_METADATA(udp.proxy:errors_received)%
%DYNAMIC_METADATA(udp.proxy:datagrams_sent)%
%DYNAMIC_METADATA(udp.proxy:datagrams_received)%\n
[%START_TIME%]
%DYNAMIC_METADATA(udp.proxy.proxy:bytes_sent)%
%DYNAMIC_METADATA(udp.proxy.proxy:bytes_received)%
%DYNAMIC_METADATA(udp.proxy.proxy:errors_sent)%
%DYNAMIC_METADATA(udp.proxy.proxy:errors_received)%
%DYNAMIC_METADATA(udp.proxy.proxy:datagrams_sent)%
%DYNAMIC_METADATA(udp.proxy.proxy:datagrams_received)%
%DYNAMIC_METADATA(udp.proxy.proxy:session_total)%\n

THRIFT
For :ref:`Thrift Proxy <config_network_filters_thrift_proxy>`,
Expand Down
55 changes: 45 additions & 10 deletions source/extensions/filters/udp/udp_proxy/udp_proxy_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,19 @@ UdpProxyFilter::UdpProxyFilter(Network::UdpReadFilterCallbacks& callbacks,
onClusterAddOrUpdate(*cluster);
}
}

if (!config_->proxyAccessLogs().empty()) {
udp_proxy_stats_.emplace(StreamInfo::StreamInfoImpl(config_->timeSource(), nullptr));
}
}

UdpProxyFilter::~UdpProxyFilter() {
if (!config_->proxyAccessLogs().empty()) {
fillProxyStreamInfo();
for (const auto& access_log : config_->proxyAccessLogs()) {
access_log->log(nullptr, nullptr, nullptr, udp_proxy_stats_.value());
}
}
}

void UdpProxyFilter::onClusterAddOrUpdate(Upstream::ThreadLocalCluster& cluster) {
Expand Down Expand Up @@ -234,8 +247,8 @@ UdpProxyFilter::ActiveSession::ActiveSession(ClusterInfo& cluster,
// NOTE: The socket call can only fail due to memory/fd exhaustion. No local ephemeral port
// is bound until the first packet is sent to the upstream host.
socket_(cluster.filter_.createSocket(host)) {
if (!cluster_.filter_.config_->accessLogs().empty()) {
udp_sess_stats_.emplace(
if (!cluster_.filter_.config_->sessionAccessLogs().empty()) {
udp_session_stats_.emplace(
StreamInfo::StreamInfoImpl(cluster_.filter_.config_->timeSource(), nullptr));
}

Expand Down Expand Up @@ -281,29 +294,51 @@ UdpProxyFilter::ActiveSession::~ActiveSession() {
.connections()
.dec();

if (!cluster_.filter_.config_->accessLogs().empty()) {
fillStreamInfo();
for (const auto& access_log : cluster_.filter_.config_->accessLogs()) {
access_log->log(nullptr, nullptr, nullptr, udp_sess_stats_.value());
if (!cluster_.filter_.config_->sessionAccessLogs().empty()) {
fillSessionStreamInfo();
for (const auto& access_log : cluster_.filter_.config_->sessionAccessLogs()) {
access_log->log(nullptr, nullptr, nullptr, udp_session_stats_.value());
}
}
}

void UdpProxyFilter::ActiveSession::fillStreamInfo() {
void UdpProxyFilter::ActiveSession::fillSessionStreamInfo() {
ProtobufWkt::Struct stats_obj;
auto& fields_map = *stats_obj.mutable_fields();
fields_map["cluster_name"] = ValueUtil::stringValue(cluster_.cluster_.info()->name());
fields_map["bytes_sent"] = ValueUtil::numberValue(session_stats_.downstream_sess_tx_bytes_);
fields_map["bytes_received"] = ValueUtil::numberValue(session_stats_.downstream_sess_rx_bytes_);
fields_map["errors_sent"] = ValueUtil::numberValue(session_stats_.downstream_sess_tx_errors_);
fields_map["errors_received"] =
ValueUtil::numberValue(cluster_.filter_.config_->stats().downstream_sess_rx_errors_.value());
fields_map["datagrams_sent"] =
ValueUtil::numberValue(session_stats_.downstream_sess_tx_datagrams_);
fields_map["datagrams_received"] =
ValueUtil::numberValue(session_stats_.downstream_sess_rx_datagrams_);

udp_sess_stats_.value().setDynamicMetadata("udp.proxy", stats_obj);
udp_session_stats_.value().setDynamicMetadata("udp.proxy.session", stats_obj);
}

void UdpProxyFilter::fillProxyStreamInfo() {
ProtobufWkt::Struct stats_obj;
auto& fields_map = *stats_obj.mutable_fields();
fields_map["bytes_sent"] =
ValueUtil::numberValue(config_->stats().downstream_sess_tx_bytes_.value());
fields_map["bytes_received"] =
ValueUtil::numberValue(config_->stats().downstream_sess_rx_bytes_.value());
fields_map["errors_sent"] =
ValueUtil::numberValue(config_->stats().downstream_sess_tx_errors_.value());
fields_map["errors_received"] =
ValueUtil::numberValue(config_->stats().downstream_sess_rx_errors_.value());
fields_map["datagrams_sent"] =
ValueUtil::numberValue(config_->stats().downstream_sess_tx_datagrams_.value());
fields_map["datagrams_received"] =
ValueUtil::numberValue(config_->stats().downstream_sess_rx_datagrams_.value());
fields_map["no_route"] =
ValueUtil::numberValue(config_->stats().downstream_sess_no_route_.value());
fields_map["session_total"] =
ValueUtil::numberValue(config_->stats().downstream_sess_total_.value());
giantcroc marked this conversation as resolved.
Show resolved Hide resolved
fields_map["idle_timeout"] = ValueUtil::numberValue(config_->stats().idle_timeout_.value());

udp_proxy_stats_.value().setDynamicMetadata("udp.proxy.proxy", stats_obj);
}

void UdpProxyFilter::ActiveSession::onIdleTimer() {
Expand Down
31 changes: 24 additions & 7 deletions source/extensions/filters/udp/udp_proxy/udp_proxy_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,15 @@ class UdpProxyFilterConfig {
"is not running with the CAP_NET_ADMIN capability.");
}

access_logs_.reserve(config.access_log_size());
for (const envoy::config::accesslog::v3::AccessLog& log_config : config.access_log()) {
access_logs_.emplace_back(AccessLog::AccessLogFactory::fromProto(log_config, context));
session_access_logs_.reserve(config.session_access_log_size());
for (const envoy::config::accesslog::v3::AccessLog& log_config : config.session_access_log()) {
session_access_logs_.emplace_back(
AccessLog::AccessLogFactory::fromProto(log_config, context));
}

proxy_access_logs_.reserve(config.proxy_access_log_size());
for (const envoy::config::accesslog::v3::AccessLog& log_config : config.proxy_access_log()) {
proxy_access_logs_.emplace_back(AccessLog::AccessLogFactory::fromProto(log_config, context));
}

if (!config.hash_policies().empty()) {
Expand All @@ -116,7 +122,12 @@ class UdpProxyFilterConfig {
const Network::ResolvedUdpSocketConfig& upstreamSocketConfig() const {
return upstream_socket_config_;
}
const std::vector<AccessLog::InstanceSharedPtr>& accessLogs() const { return access_logs_; }
const std::vector<AccessLog::InstanceSharedPtr>& sessionAccessLogs() const {
return session_access_logs_;
}
const std::vector<AccessLog::InstanceSharedPtr>& proxyAccessLogs() const {
return proxy_access_logs_;
}

private:
static UdpProxyDownstreamStats generateStats(const std::string& stat_prefix,
Expand All @@ -135,7 +146,8 @@ class UdpProxyFilterConfig {
std::unique_ptr<const HashPolicyImpl> hash_policy_;
mutable UdpProxyDownstreamStats stats_;
const Network::ResolvedUdpSocketConfig upstream_socket_config_;
std::vector<AccessLog::InstanceSharedPtr> access_logs_;
std::vector<AccessLog::InstanceSharedPtr> session_access_logs_;
std::vector<AccessLog::InstanceSharedPtr> proxy_access_logs_;
Random::RandomGenerator& random_;
};

Expand Down Expand Up @@ -165,6 +177,7 @@ class UdpProxyFilter : public Network::UdpListenerReadFilter,
public:
UdpProxyFilter(Network::UdpReadFilterCallbacks& callbacks,
const UdpProxyFilterConfigSharedPtr& config);
~UdpProxyFilter() override;

// Network::UdpListenerReadFilter
Network::FilterStatus onData(Network::UdpRecvData& data) override;
Expand Down Expand Up @@ -193,7 +206,7 @@ class UdpProxyFilter : public Network::UdpListenerReadFilter,
private:
void onIdleTimer();
void onReadReady();
void fillStreamInfo();
void fillSessionStreamInfo();

// Network::UdpPacketProcessor
void processPacket(Network::Address::InstanceConstSharedPtr local_address,
Expand Down Expand Up @@ -242,7 +255,7 @@ class UdpProxyFilter : public Network::UdpListenerReadFilter,
bool skip_connect_{};

UdpProxySessionStats session_stats_{};
absl::optional<StreamInfo::StreamInfoImpl> udp_sess_stats_;
absl::optional<StreamInfo::StreamInfoImpl> udp_session_stats_;
};

using ActiveSessionPtr = std::unique_ptr<ActiveSession>;
Expand Down Expand Up @@ -380,6 +393,8 @@ class UdpProxyFilter : public Network::UdpListenerReadFilter,
nullptr, Network::SocketCreationOptions{});
}

void fillProxyStreamInfo();

// Upstream::ClusterUpdateCallbacks
void onClusterAddOrUpdate(Upstream::ThreadLocalCluster& cluster) final;
void onClusterRemoval(const std::string& cluster_name) override;
Expand All @@ -388,6 +403,8 @@ class UdpProxyFilter : public Network::UdpListenerReadFilter,
const Upstream::ClusterUpdateCallbacksHandlePtr cluster_update_callbacks_;
// Map for looking up cluster info with its name.
absl::flat_hash_map<std::string, ClusterInfoPtr> cluster_infos_;

absl::optional<StreamInfo::StreamInfoImpl> udp_proxy_stats_;
};

} // namespace UdpProxy
Expand Down
Loading