Skip to content

Commit

Permalink
access log: added support to flush tcp access log periodically (envoy…
Browse files Browse the repository at this point in the history
…proxy#23630)

* access log: added support to flush tcp access log periodically

Signed-off-by: wbpcode <wangbaiping@corp.netease.com>
  • Loading branch information
code authored Nov 19, 2022
1 parent 9bb570f commit 04dd76b
Show file tree
Hide file tree
Showing 15 changed files with 361 additions and 8 deletions.
15 changes: 14 additions & 1 deletion api/envoy/data/accesslog/v3/accesslog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ message ConnectionProperties {
}

// Defines fields that are shared by all Envoy access logs.
// [#next-free-field: 26]
// [#next-free-field: 28]
message AccessLogCommon {
option (udpa.annotations.versioning).previous_message_type =
"envoy.data.accesslog.v2.AccessLogCommon";
Expand Down Expand Up @@ -202,6 +202,19 @@ message AccessLogCommon {

// Connection termination details may provide additional information about why the connection was terminated by Envoy for L4 reasons.
string connection_termination_details = 25;

// Optional unique id of stream (TCP connection, long-live HTTP2 stream, HTTP request) for logging and tracing.
// This could be any format string that could be used to identify one stream.
string stream_id = 26;

// If this log entry is final log entry that flushed after the stream completed or
// intermediate log entry that flushed periodically during the stream.
// There may be multiple intermediate log entries and only one final log entry for each
// long-live stream (TCP connection, long-live HTTP2 stream).
// And if it is necessary, unique ID or identifier can be added to the log entry
// :ref:`stream_id <envoy_v3_api_field_data.accesslog.v3.AccessLogCommon.stream_id>` to
// correlate all these intermediate log entries and final log entry.
bool intermediate_log_entry = 27;
}

// Flags indicating occurrences during request/response processing.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ option (udpa.annotations.file_status).package_version_status = ACTIVE;
// TCP Proxy :ref:`configuration overview <config_network_filters_tcp_proxy>`.
// [#extension: envoy.filters.network.tcp_proxy]

// [#next-free-field: 15]
// [#next-free-field: 16]
message TcpProxy {
option (udpa.annotations.versioning).previous_message_type =
"envoy.config.filter.network.tcp_proxy.v2.TcpProxy";
Expand Down Expand Up @@ -199,4 +199,11 @@ message TcpProxy {
// is reached the connection will be closed. Duration must be at least 1ms.
google.protobuf.Duration max_downstream_connection_duration = 13
[(validate.rules).duration = {gte {nanos: 1000000}}];

// The interval to flush access log. The TCP proxy will flush only one access log when the connection
// is closed by default. If this field is set, the TCP proxy will flush access log periodically with
// the specified interval.
// The interval must be at least 1ms.
google.protobuf.Duration access_log_flush_interval = 15
[(validate.rules).duration = {gte {nanos: 1000000}}];
}
7 changes: 7 additions & 0 deletions changelogs/current.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,13 @@ new_features:
- area: upstream
change: |
added a new field :ref:`socket_options <envoy_v3_api_field_config.core.v3.ExtraSourceAddress.socket_options>` to the ExtraSourceAddress, allowing specifying discrete socket options for each source address.
- area: access_log
change: |
added a new field :ref:`intermediate_log_entry <envoy_v3_api_field_data.accesslog.v3.AccessLogCommon.intermediate_log_entry>` to detect if the gRPC log entry is an intermediate log entry or not and added
support to flush TCP log entries periodly according to the configured :ref:`inteval <envoy_v3_api_field_extensions.filters.network.tcp_proxy.v3.TcpProxy.access_log_flush_interval>`.
- area: access_log
change: |
added support for :ref:`%STREAM_ID% <config_access_log_format_stream_id>` for stream unique identifier.
- area: thrift
change: |
added payload to metadata filter which matches a given payload field's value would be extracted and attached to the request as dynamic metadata.
Expand Down
9 changes: 9 additions & 0 deletions docs/root/configuration/observability/access_log/usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,15 @@ The following command operators are supported:
is unique with high likelihood within an execution, but can duplicate across
multiple instances or between restarts.

.. _config_access_log_format_stream_id:

%STREAM_ID%
An identifier for the stream (HTTP request, long-live HTTP2 stream, TCP connection, etc.). It can be used to
cross-reference TCP access logs across multiple log sinks, or to cross-reference timer-based reports for the same connection.
Different with %CONNECTION_ID%, the identifier should be unique across multiple instances or between restarts.
And it's value should be same with %REQ(X-REQUEST-ID)% for HTTP request.
This should be used to replace %CONNECTION_ID% and %REQ(X-REQUEST-ID)% in most cases.

%GRPC_STATUS(X)%
`gRPC status code <https://github.com/googleapis/googleapis/blob/master/google/rpc/code.proto>`_ formatted according to the optional parameter ``X``, which can be ``CAMEL_STRING``, ``SNAKE_STRING`` and ``NUMBER``.
For example, if the grpc status is ``INVALID_ARGUMENT`` (represented by number 3), the formatter will return ``InvalidArgument`` for ``CAMEL_STRING``, ``INVALID_ARGUMENT`` for ``SNAKE_STRING`` and ``3`` for ``NUMBER``.
Expand Down
17 changes: 17 additions & 0 deletions source/common/formatter/substitution_formatter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1512,6 +1512,23 @@ const StreamInfoFormatter::FieldExtractorLookupTbl& StreamInfoFormatter::getKnow
}
return result;
});
}}},
{"STREAM_ID",
{CommandSyntaxChecker::COMMAND_ONLY,
[](const std::string&, const absl::optional<size_t>&) {
return std::make_unique<StreamInfoStringFieldExtractor>(
[](const StreamInfo::StreamInfo& stream_info)
-> absl::optional<std::string> {
auto provider = stream_info.getStreamIdProvider();
if (!provider.has_value()) {
return {};
}
auto id = provider->toStringView();
if (!id.has_value()) {
return {};
}
return absl::make_optional<std::string>(id.value());
});
}}}});
}

Expand Down
1 change: 1 addition & 0 deletions source/common/tcp_proxy/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ envoy_cc_library(
"//source/common/network:utility_lib",
"//source/common/protobuf:utility_lib",
"//source/common/router:metadatamatchcriteria_lib",
"//source/common/stream_info:stream_id_provider_lib",
"//source/common/stream_info:stream_info_lib",
"//source/common/upstream:load_balancer_lib",
"//source/extensions/upstreams/tcp/generic:config",
Expand Down
43 changes: 43 additions & 0 deletions source/common/tcp_proxy/tcp_proxy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "source/common/common/fmt.h"
#include "source/common/common/macros.h"
#include "source/common/common/utility.h"
#include "source/common/config/metadata.h"
#include "source/common/config/utility.h"
#include "source/common/config/well_known_names.h"
#include "source/common/network/application_protocol.h"
Expand All @@ -31,6 +32,7 @@
#include "source/common/network/upstream_server_name.h"
#include "source/common/network/upstream_socket_options_filter_state.h"
#include "source/common/router/metadatamatchcriteria_impl.h"
#include "source/common/stream_info/stream_id_provider_impl.h"

namespace Envoy {
namespace TcpProxy {
Expand Down Expand Up @@ -88,6 +90,12 @@ Config::SharedConfig::SharedConfig(
max_downstream_connection_duration_ = std::chrono::milliseconds(connection_duration);
}

if (config.has_access_log_flush_interval()) {
const uint64_t flush_interval =
DurationUtil::durationToMilliseconds(config.access_log_flush_interval());
access_log_flush_interval_ = std::chrono::milliseconds(flush_interval);
}

if (config.has_on_demand() && config.on_demand().has_odcds_config()) {
on_demand_config_ =
std::make_unique<OnDemandConfig>(config.on_demand(), context, *stats_scope_);
Expand Down Expand Up @@ -178,6 +186,10 @@ Filter::Filter(ConfigSharedPtr config, Upstream::ClusterManager& cluster_manager
}

Filter::~Filter() {
// Disable access log flush timer if it is enabled.
disableAccessLogFlushTimer();

// Flush the final end stream access log entry.
for (const auto& access_log : config_->accessLogs()) {
access_log->log(nullptr, nullptr, nullptr, getStreamInfo());
}
Expand Down Expand Up @@ -623,6 +635,16 @@ Network::FilterStatus Filter::onNewConnection() {
connection_duration_timer_->enableTimer(config_->maxDownstreamConnectionDuration().value());
}

if (config_->accessLogFlushInterval().has_value()) {
access_log_flush_timer_ = read_callbacks_->connection().dispatcher().createTimer(
[this]() -> void { onAccessLogFlushInterval(); });
resetAccessLogFlushTimer();
}

// Set UUID for the connection. This is used for logging and tracing.
getStreamInfo().setStreamIdProvider(
std::make_shared<StreamInfo::StreamIdProviderImpl>(config_->randomGenerator().uuid()));

ASSERT(upstream_ == nullptr);
route_ = pickRoute();
return establishUpstreamConnection();
Expand Down Expand Up @@ -754,6 +776,27 @@ void Filter::onMaxDownstreamConnectionDuration() {
read_callbacks_->connection().close(Network::ConnectionCloseType::NoFlush);
}

void Filter::onAccessLogFlushInterval() {
for (const auto& access_log : config_->accessLogs()) {
access_log->log(nullptr, nullptr, nullptr, getStreamInfo());
}
resetAccessLogFlushTimer();
}

void Filter::resetAccessLogFlushTimer() {
if (access_log_flush_timer_ != nullptr) {
ASSERT(config_->accessLogFlushInterval().has_value());
access_log_flush_timer_->enableTimer(config_->accessLogFlushInterval().value());
}
}

void Filter::disableAccessLogFlushTimer() {
if (access_log_flush_timer_ != nullptr) {
access_log_flush_timer_->disableTimer();
access_log_flush_timer_.reset();
}
}

void Filter::resetIdleTimer() {
if (idle_timer_ != nullptr) {
ASSERT(config_->idleTimeout());
Expand Down
21 changes: 17 additions & 4 deletions source/common/tcp_proxy/tcp_proxy.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include <chrono>
#include <cstdint>
#include <memory>
#include <string>
Expand Down Expand Up @@ -196,18 +197,21 @@ class Config {
const absl::optional<std::chrono::milliseconds>& maxDownstreamConnectionDuration() const {
return max_downstream_connection_duration_;
}
const absl::optional<std::chrono::milliseconds>& accessLogFlushInterval() const {
return access_log_flush_interval_;
}
TunnelingConfigHelperOptConstRef tunnelingConfigHelper() {
if (tunneling_config_helper_) {
return TunnelingConfigHelperOptConstRef(*tunneling_config_helper_);
return {*tunneling_config_helper_};
} else {
return TunnelingConfigHelperOptConstRef();
return {};
}
}
OnDemandConfigOptConstRef onDemandConfig() {
if (on_demand_config_) {
return OnDemandConfigOptConstRef(*on_demand_config_);
return {*on_demand_config_};
} else {
return OnDemandConfigOptConstRef();
return {};
}
}

Expand All @@ -221,6 +225,7 @@ class Config {
const TcpProxyStats stats_;
absl::optional<std::chrono::milliseconds> idle_timeout_;
absl::optional<std::chrono::milliseconds> max_downstream_connection_duration_;
absl::optional<std::chrono::milliseconds> access_log_flush_interval_;
std::unique_ptr<TunnelingConfigHelper> tunneling_config_helper_;
std::unique_ptr<OnDemandConfig> on_demand_config_;
};
Expand Down Expand Up @@ -250,6 +255,9 @@ class Config {
const absl::optional<std::chrono::milliseconds>& maxDownstreamConnectionDuration() const {
return shared_config_->maxDownstreamConnectionDuration();
}
const absl::optional<std::chrono::milliseconds>& accessLogFlushInterval() const {
return shared_config_->accessLogFlushInterval();
}
// Return nullptr if there is no tunneling config.
TunnelingConfigHelperOptConstRef tunnelingConfigHelper() {
return shared_config_->tunnelingConfigHelper();
Expand All @@ -271,6 +279,7 @@ class Config {
}
// This function must not be called if on demand is disabled.
const OnDemandStats& onDemandStats() const { return shared_config_->onDemandConfig()->stats(); }
Random::RandomGenerator& randomGenerator() { return random_generator_; }

private:
struct SimpleRouteImpl : public Route {
Expand Down Expand Up @@ -471,6 +480,9 @@ class Filter : public Network::ReadFilter,
void resetIdleTimer();
void disableIdleTimer();
void onMaxDownstreamConnectionDuration();
void onAccessLogFlushInterval();
void resetAccessLogFlushTimer();
void disableAccessLogFlushTimer();

const ConfigSharedPtr config_;
Upstream::ClusterManager& cluster_manager_;
Expand All @@ -479,6 +491,7 @@ class Filter : public Network::ReadFilter,
DownstreamCallbacks downstream_callbacks_;
Event::TimerPtr idle_timer_;
Event::TimerPtr connection_duration_timer_;
Event::TimerPtr access_log_flush_timer_;

// A pointer to the on demand cluster lookup when lookup is in flight.
Upstream::ClusterDiscoveryCallbackHandlePtr cluster_discovery_handle_;
Expand Down
10 changes: 10 additions & 0 deletions source/extensions/access_loggers/grpc/grpc_access_log_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,16 @@ void Utility::extractCommonAccessLogProperties(
const auto tag_applier = Tracing::CustomTagUtility::createCustomTag(custom_tag);
tag_applier->applyLog(common_access_log, ctx);
}

// If the stream is not complete, then this log entry is intermediate log entry.
if (!stream_info.requestComplete().has_value()) {
common_access_log.set_intermediate_log_entry(true);
}

// Set stream unique id from the stream info.
if (auto provider = stream_info.getStreamIdProvider(); provider.has_value()) {
common_access_log.set_stream_id(std::string(provider->toStringView().value_or("")));
}
}

} // namespace GrpcCommon
Expand Down
1 change: 1 addition & 0 deletions test/common/formatter/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ envoy_cc_test(
"//source/common/json:json_loader_lib",
"//source/common/network:address_lib",
"//source/common/router:string_accessor_lib",
"//source/common/stream_info:stream_id_provider_lib",
"//test/mocks/api:api_mocks",
"//test/mocks/http:http_mocks",
"//test/mocks/ssl:ssl_mocks",
Expand Down
16 changes: 16 additions & 0 deletions test/common/formatter/substitution_formatter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "source/common/network/address_impl.h"
#include "source/common/protobuf/utility.h"
#include "source/common/router/string_accessor_impl.h"
#include "source/common/stream_info/stream_id_provider_impl.h"

#include "test/common/formatter/command_extension.h"
#include "test/mocks/api/mocks.h"
Expand Down Expand Up @@ -824,6 +825,21 @@ TEST(SubstitutionFormatterTest, streamInfoFormatter) {
ProtoEq(ValueUtil::numberValue(id)));
}

{
StreamInfoFormatter upstream_format("STREAM_ID");

StreamInfo::StreamIdProviderImpl id_provider("ffffffff-0012-0110-00ff-0c00400600ff");
EXPECT_CALL(stream_info, getStreamIdProvider())
.WillRepeatedly(Return(makeOptRef<const StreamInfo::StreamIdProvider>(id_provider)));

EXPECT_EQ("ffffffff-0012-0110-00ff-0c00400600ff",
upstream_format.format(request_headers, response_headers, response_trailers,
stream_info, body));
EXPECT_THAT(upstream_format.formatValue(request_headers, response_headers, response_trailers,
stream_info, body),
ProtoEq(ValueUtil::stringValue("ffffffff-0012-0110-00ff-0c00400600ff")));
}

{
StreamInfoFormatter upstream_format("REQUESTED_SERVER_NAME");
std::string requested_server_name = "stub_server";
Expand Down
27 changes: 25 additions & 2 deletions test/common/tcp_proxy/tcp_proxy_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,7 @@ TEST_F(TcpProxyTest, StreamInfoDynamicMetadata) {
(*metadata.mutable_filter_metadata())[Envoy::Config::MetadataFilters::get().ENVOY_LB];
(*map.mutable_fields())["test"] = val;
EXPECT_CALL(filter_callbacks_.connection_.stream_info_, dynamicMetadata())
.WillOnce(ReturnRef(metadata));
.WillRepeatedly(ReturnRef(metadata));

filter_ = std::make_unique<Filter>(config_, factory_context_.cluster_manager_);
filter_->initializeReadFilterCallbacks(filter_callbacks_);
Expand Down Expand Up @@ -680,7 +680,7 @@ TEST_F(TcpProxyTest, StreamInfoDynamicMetadataAndConfigMerged) {
(*map.mutable_fields())["k1"] = v1;
(*map.mutable_fields())["k2"] = v2;
EXPECT_CALL(filter_callbacks_.connection_.stream_info_, dynamicMetadata())
.WillOnce(ReturnRef(metadata));
.WillRepeatedly(ReturnRef(metadata));

filter_ = std::make_unique<Filter>(config_, factory_context_.cluster_manager_);
filter_->initializeReadFilterCallbacks(filter_callbacks_);
Expand Down Expand Up @@ -956,6 +956,29 @@ TEST_F(TcpProxyTest, AccessLogDownstreamAddress) {
EXPECT_EQ(access_log_data_, "1.1.1.1 1.1.1.2:20000");
}

// Test that intermediate log entry by field %DURATION%.
TEST_F(TcpProxyTest, IntermediateLogEntry) {
auto config = accessLogConfig("%DURATION%");
config.mutable_access_log_flush_interval()->set_seconds(1);
config.mutable_idle_timeout()->set_seconds(0);

auto* flush_timer = new NiceMock<Event::MockTimer>(&filter_callbacks_.connection_.dispatcher_);
EXPECT_CALL(*flush_timer, enableTimer(std::chrono::milliseconds(1000), _));

setup(1, config);
raiseEventUpstreamConnected(0);

// The timer will be enabled cyclically.
EXPECT_CALL(*flush_timer, enableTimer(std::chrono::milliseconds(1000), _));
flush_timer->invokeCallback();

// No valid duration until the connection is closed.
EXPECT_EQ(access_log_data_.value(), fmt::format("-"));

filter_callbacks_.connection_.raiseEvent(Network::ConnectionEvent::RemoteClose);
filter_.reset();
}

TEST_F(TcpProxyTest, AccessLogUpstreamSSLConnection) {
setup(1);

Expand Down
Loading

0 comments on commit 04dd76b

Please sign in to comment.