Skip to content

Commit

Permalink
Add more tests and emit metric on early buffer
Browse files Browse the repository at this point in the history
Signed-off-by: Akshita Agarwal <akshita.agarwal@airbnb.com>
  • Loading branch information
Akshita Agarwal committed Oct 14, 2024
1 parent 96129fc commit 8ab1441
Show file tree
Hide file tree
Showing 6 changed files with 191 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,10 @@ must happen before ``onNewConnection()`` is called on the ``TcpProxy`` filter to
Early reception and delayed upstream connection establishment
-------------------------------------------------------------

``TcpProxy`` filter normally disables reading on the downstream connection until the upstream connection has been established. In some situations earlier filters in the filter chain may need to read data from the downstream connection before allowing the upstream connection to be established. This can be done by setting the ``StreamInfo`` filter state object for the key ``envoy.tcp_proxy.receive_before_connect``. Setting this dynamic metadata must happen in ``initializeReadFilterCallbacks()`` callback of the network filter so that is done before ``TcpProxy`` filter is initialized.
``TcpProxy`` filter normally disables reading on the downstream connection until the upstream connection has been established. In some situations earlier filters in the filter chain may need to read data from the downstream connection before allowing the upstream connection to be established. This can be done by setting the ``StreamInfo`` filter state object for the key ``envoy.tcp_proxy.receive_before_connect``. Setting this filter state must happen in ``initializeReadFilterCallbacks()`` callback of the network filter so that is done before ``TcpProxy`` filter is initialized.

Network filters can also pass data upto the ``TcpProxy`` filter before the upstream connection has been established, as ``TcpProxy`` filter now buffers data it receives before the upstream connection has been established to be sent when the upstream connection is established. Filters can also delay the upstream connection setup by returning ``StopIteration`` from their ``onNewConnection`` and ``onData`` callbacks.

.. note::

``TcpProxy`` filter does not limit the size of the pre-connection data buffer. Filters using the
``envoy.tcp_proxy.receive_before_connect`` option must take care to not pass unlimited amount to
data to the TcpProxy before the upstream connection has been set up.
On receiving early data, TCP_PROXY will read disable the connection until the upstream connection is established. This is to prevent the connection from being closed by the peer if the upstream connection is not established in a timely manner.

.. _config_network_filters_tcp_proxy_tunneling_over_http:

Expand Down
10 changes: 5 additions & 5 deletions source/common/tcp_proxy/tcp_proxy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,6 @@ void Filter::initialize(Network::ReadFilterCallbacks& callbacks, bool set_connec

if (receive_before_connect && receive_before_connect->value()) {
ENVOY_CONN_LOG(debug, "receive_before_connect is enabled", read_callbacks_->connection());
std::cout<<"Filter::initialize: receive_before_connect enabled"<<std::endl;
receive_before_connect_ = true;
} else {
// Need to disable reads so that we don't write to an upstream that might fail
Expand Down Expand Up @@ -780,10 +779,11 @@ Network::FilterStatus Filter::onData(Buffer::Instance& data, bool end_stream) {
resetIdleTimer(); // TODO(ggreenway) PERF: do we need to reset timer on both send and receive?
} else if (receive_before_connect_) {
// Buffer data received before upstream connection exists
std::cout<<"data received before upstream connection exists"<<std::endl;
ENVOY_CONN_LOG(debug, "buffering {} bytes as upstream connection is not established yet", read_callbacks_->connection(), data.length());
ENVOY_CONN_LOG(debug, "buffering {} bytes as upstream connection is not established yet",
read_callbacks_->connection(), data.length());
early_data_buffer_.move(data);
read_callbacks_->connection().readDisable(true);
config_->stats().early_data_received_count_total_.inc();
if (!early_data_end_stream_) {
early_data_end_stream_ = end_stream;
}
Expand Down Expand Up @@ -924,8 +924,8 @@ void Filter::onUpstreamConnection() {
// If we have received any data before upstream connection is established, send it to
// the upstream connection.
else if (early_data_buffer_.length() > 0) {
std::cout<<"Filter::onUpstreamConnection: Flushing early data buffer"<<std::endl;
ENVOY_CONN_LOG(debug, "TCP:onUpstreamEvent() Flushing early data buffer to upstream", read_callbacks_->connection());
ENVOY_CONN_LOG(debug, "TCP:onUpstreamEvent() Flushing early data buffer to upstream",
read_callbacks_->connection());
getStreamInfo().getUpstreamBytesMeter()->addWireBytesSent(early_data_buffer_.length());
upstream_->encodeData(early_data_buffer_, early_data_end_stream_);
ASSERT(0 == early_data_buffer_.length());
Expand Down
9 changes: 5 additions & 4 deletions source/common/tcp_proxy/tcp_proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
#include "envoy/upstream/cluster_manager.h"
#include "envoy/upstream/upstream.h"

#include "source/common/common/assert.h"
#include "source/common/buffer/buffer_impl.h"
#include "source/common/common/assert.h"
#include "source/common/common/logger.h"
#include "source/common/formatter/substitution_format_string.h"
#include "source/common/http/header_map_impl.h"
Expand All @@ -41,8 +41,6 @@
namespace Envoy {
namespace TcpProxy {

constexpr absl::string_view PerConnectionIdleTimeoutMs =
"envoy.tcp_proxy.per_connection_idle_timeout_ms";
constexpr absl::string_view ReceiveBeforeConnectKey = "envoy.tcp_proxy.receive_before_connect";

constexpr absl::string_view PerConnectionIdleTimeoutMs =
Expand All @@ -62,6 +60,7 @@ constexpr absl::string_view PerConnectionIdleTimeoutMs =
COUNTER(downstream_cx_tx_bytes_total) \
COUNTER(downstream_flow_control_paused_reading_total) \
COUNTER(downstream_flow_control_resumed_reading_total) \
COUNTER(early_data_received_count_total) \
COUNTER(idle_timeout) \
COUNTER(max_downstream_connection_duration) \
COUNTER(upstream_flush_total) \
Expand Down Expand Up @@ -670,7 +669,9 @@ class Filter : public Network::ReadFilter,
uint32_t connect_attempts_{};
bool connecting_{};
bool downstream_closed_{};
HttpStreamDecoderFilterCallbacks upstream_decoder_filter_callbacks_;
bool receive_before_connect_{false};
bool early_data_end_stream_{false};
Buffer::OwnedImpl early_data_buffer_{};
};

// This class deals with an upstream connection that needs to finish flushing, when the downstream
Expand Down
2 changes: 1 addition & 1 deletion test/common/tcp_proxy/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ envoy_cc_test_library(
"//source/common/network:upstream_server_name_lib",
"//source/common/network:upstream_socket_options_filter_state_lib",
"//source/common/stats:stats_lib",
"//source/common/stream_info:bool_accessor_lib",
"//source/common/tcp_proxy",
"//source/common/upstream:upstream_includes",
"//source/common/upstream:upstream_lib",
"//source/extensions/access_loggers/file:config",
"//source/extensions/upstreams/http/generic:config",
"//source/common/stream_info:bool_accessor_lib",
"//test/common/upstream:utility_lib",
"//test/mocks/buffer:buffer_mocks",
"//test/mocks/network:network_mocks",
Expand Down
47 changes: 40 additions & 7 deletions test/common/tcp_proxy/tcp_proxy_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,9 @@ class TcpProxyTest : public TcpProxyTestBase {

if (receive_before_connect) {
filter_callbacks_.connection().streamInfo().filterState()->setData(
TcpProxy::ReceiveBeforeConnectKey, std::make_unique<StreamInfo::BoolAccessorImpl>(true),
StreamInfo::FilterState::StateType::ReadOnly,
StreamInfo::FilterState::LifeSpan::Connection);
TcpProxy::ReceiveBeforeConnectKey, std::make_unique<StreamInfo::BoolAccessorImpl>(true),
StreamInfo::FilterState::StateType::ReadOnly,
StreamInfo::FilterState::LifeSpan::Connection);
}

filter_ = std::make_unique<Filter>(config_,
Expand All @@ -156,8 +156,9 @@ class TcpProxyTest : public TcpProxyTestBase {
}

if (connections > 0) {
auto expected_status_on_new_connection = receive_before_connect ? Network::FilterStatus::Continue
: Network::FilterStatus::StopIteration;
auto expected_status_on_new_connection = receive_before_connect
? Network::FilterStatus::Continue
: Network::FilterStatus::StopIteration;
EXPECT_EQ(expected_status_on_new_connection, filter_->onNewConnection());
EXPECT_EQ(absl::optional<uint64_t>(), filter_->computeHashKey());
EXPECT_EQ(&filter_callbacks_.connection_, filter_->downstreamConnection());
Expand Down Expand Up @@ -457,6 +458,38 @@ TEST_P(TcpProxyTest, UpstreamDisconnectDownstreamFlowControl) {

raiseEventUpstreamConnected(0);

Buffer::OwnedImpl buffer("hello");
EXPECT_CALL(*upstream_connections_.at(0), write(BufferEqual(&buffer), _));
filter_->onData(buffer, false);

Buffer::OwnedImpl response("world");
EXPECT_CALL(filter_callbacks_.connection_, write(BufferEqual(&response), _));
upstream_callbacks_->onUpstreamData(response, false);

EXPECT_CALL(*upstream_connections_.at(0), readDisable(true));
filter_callbacks_.connection_.runHighWatermarkCallbacks();

EXPECT_CALL(filter_callbacks_.connection_, close(Network::ConnectionCloseType::FlushWrite));
upstream_callbacks_->onEvent(Network::ConnectionEvent::RemoteClose);

filter_callbacks_.connection_.runLowWatermarkCallbacks();
}

TEST_P(TcpProxyTest, ReceiveBeforeConnectBuffersOnEarlyData) {
setup(1, false, true);
std::string early_data("early data");
Buffer::OwnedImpl early_data_buffer(early_data);

// Check that the early data is buffered and flushed to upstream when connection is established.
// Also check that downstream connection is read disabled.
EXPECT_CALL(*upstream_connections_.at(0), write(_, _)).Times(0);
EXPECT_CALL(filter_callbacks_.connection_, readDisable(true));
filter_->onData(early_data_buffer, false);

// Now when upstream connection is established, early buffer will be sent.
EXPECT_CALL(*upstream_connections_.at(0), write(BufferStringEqual(early_data), false));
raiseEventUpstreamConnected(0);

// Any further communications between client and server can resume normally.
Buffer::OwnedImpl buffer("hello");
EXPECT_CALL(*upstream_connections_.at(0), write(BufferEqual(&buffer), _));
Expand All @@ -468,7 +501,7 @@ TEST_P(TcpProxyTest, UpstreamDisconnectDownstreamFlowControl) {
}

TEST_P(TcpProxyTest, ReceiveBeforeConnectNoEarlyData) {
setup(1, false, true);
setup(1, /*set_redirect_records*/ false, /*receive_before_connect*/ true);
raiseEventUpstreamConnected(0, false);

// Any data sent after upstream connection is established is flushed directly to upstream,
Expand Down Expand Up @@ -1386,7 +1419,7 @@ TEST_P(TcpProxyTest, UpstreamSocketOptionsReturnedEmpty) {
}

TEST_P(TcpProxyTest, TcpProxySetRedirectRecordsToUpstream) {
setup(1, true);
setup(1, /*set_redirect_records*/ true, /*receive_before_connect*/ false);
EXPECT_TRUE(filter_->upstreamSocketOptions());
auto iterator = std::find_if(
filter_->upstreamSocketOptions()->begin(), filter_->upstreamSocketOptions()->end(),
Expand Down
138 changes: 138 additions & 0 deletions test/integration/tcp_proxy_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1718,4 +1718,142 @@ INSTANTIATE_TEST_SUITE_P(TcpProxyIntegrationTestParams, MysqlIntegrationTest,
testing::ValuesIn(TestEnvironment::getIpVersionsForTest()),
TestUtility::ipTestParamsToString);

class PauseIterationFilter : public Network::ReadFilter {
public:
explicit PauseIterationFilter() {}

Network::FilterStatus onData(Buffer::Instance&, bool) override {
if (!should_continue_) {
should_continue_ = true;
// Stop Iteration when first time data is received.
return Network::FilterStatus::StopIteration;
}
return Network::FilterStatus::Continue;
}

Network::FilterStatus onNewConnection() override {
// Stop Iteration as more data is needed before filter chain can be continued.
return Network::FilterStatus::StopIteration;
}

void initializeReadFilterCallbacks(Network::ReadFilterCallbacks& callbacks) override {
read_callbacks_ = &callbacks;

// Pass ReceiveBeforeConnect state to TCP_PROXY so that it does not read disable
// the connection.
read_callbacks_->connection().streamInfo().filterState()->setData(
TcpProxy::ReceiveBeforeConnectKey, std::make_unique<StreamInfo::BoolAccessorImpl>(true),
StreamInfo::FilterState::StateType::ReadOnly,
StreamInfo::FilterState::LifeSpan::Connection);
}

bool should_continue_{false};
Network::ReadFilterCallbacks* read_callbacks_{};
};

class PauseIterationFilterFactory : public Server::Configuration::NamedNetworkFilterConfigFactory {
public:
Network::FilterFactoryCb
createFilterFactoryFromProto(const Protobuf::Message&,
Server::Configuration::FactoryContext&) override {
return [](Network::FilterManager& filter_manager) -> void {
filter_manager.addReadFilter(std::make_shared<PauseIterationFilter>());
};
}

ProtobufTypes::MessagePtr createEmptyConfigProto() override {
return ProtobufTypes::MessagePtr{new Envoy::ProtobufWkt::Struct()};
}

std::string name() const override { CONSTRUCT_ON_FIRST_USE(std::string, "test.pause_iteration"); }
};

class TcpProxyReceiveBeforeConnectIntegrationTest : public TcpProxyIntegrationTest {
public:
TcpProxyReceiveBeforeConnectIntegrationTest() : TcpProxyIntegrationTest() {
config_helper_.addNetworkFilter(R"EOF(
name: test.pause_iteration
typed_config:
"@type": type.googleapis.com/google.protobuf.Struct
)EOF");
}

PauseIterationFilterFactory factory_;
Registry::InjectFactory<Server::Configuration::NamedNetworkFilterConfigFactory> register_factory_{
factory_};
};

INSTANTIATE_TEST_SUITE_P(TcpProxyIntegrationTestParams, TcpProxyReceiveBeforeConnectIntegrationTest,
testing::ValuesIn(TestEnvironment::getIpVersionsForTest()),
TestUtility::ipTestParamsToString);

TEST_P(TcpProxyReceiveBeforeConnectIntegrationTest, ReceiveBeforeConnectEarlyData) {
initialize();
FakeRawConnectionPtr fake_upstream_connection;
IntegrationTcpClientPtr tcp_client = makeTcpConnection(lookupPort("tcp_proxy"));

// First time data is sent, the PauseFilter stops the iteration.
ASSERT_TRUE(tcp_client->write("hello"));
ASSERT_FALSE(fake_upstreams_[0]->waitForRawConnection(fake_upstream_connection));

ASSERT_TRUE(tcp_client->write("world"));
test_server_->waitForCounterEq("tcp.tcpproxy_stats.early_data_received_count_total", 1);
ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(fake_upstream_connection));
// Once the connection is established, the early data will be flushed to the upstream.
ASSERT_TRUE(fake_upstream_connection->waitForData(10));
ASSERT_TRUE(fake_upstream_connection->write("response"));
ASSERT_TRUE(fake_upstream_connection->close());
ASSERT_TRUE(fake_upstream_connection->waitForDisconnect());
tcp_client->waitForHalfClose();
tcp_client->close();
}

TEST_P(TcpProxyReceiveBeforeConnectIntegrationTest, UpstreamBufferHighWatermark) {
config_helper_.setBufferLimits(1024, 1024);
std::string data(1024 * 16, 'a');

initialize();
FakeRawConnectionPtr fake_upstream_connection;
IntegrationTcpClientPtr tcp_client = makeTcpConnection(lookupPort("tcp_proxy"));

// First time data is sent, the PauseFilter stops the iteration.
ASSERT_TRUE(tcp_client->write(data.substr(0, 1024)));
ASSERT_FALSE(fake_upstreams_[0]->waitForRawConnection(fake_upstream_connection));

ASSERT_TRUE(tcp_client->write(data.substr(1024)));
test_server_->waitForCounterEq("tcp.tcpproxy_stats.early_data_received_count_total", 1);
ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(fake_upstream_connection));
// Once the connection is established, the early data will be flushed to the upstream.
ASSERT_TRUE(fake_upstream_connection->waitForData(1024 * 16));
ASSERT_TRUE(fake_upstream_connection->write("response"));
ASSERT_TRUE(fake_upstream_connection->close());
ASSERT_TRUE(fake_upstream_connection->waitForDisconnect());
tcp_client->waitForHalfClose();
tcp_client->close();

uint32_t upstream_pauses =
test_server_->counter("cluster.cluster_0.upstream_flow_control_paused_reading_total")
->value();
uint32_t upstream_resumes =
test_server_->counter("cluster.cluster_0.upstream_flow_control_resumed_reading_total")
->value();

uint32_t downstream_pauses =
test_server_->counter("tcp.tcpproxy_stats.downstream_flow_control_paused_reading_total")
->value();
uint32_t downstream_resumes =
test_server_->counter("tcp.tcpproxy_stats.downstream_flow_control_resumed_reading_total")
->value();

EXPECT_EQ(upstream_pauses, upstream_resumes);
EXPECT_EQ(upstream_resumes, 0);

// Since we are receiving early data, downstream connection will already be read
// disabled so downstream pause metric is not emitted when upstream buffer hits high
// watermark. When the upstream buffer watermark goes down, downstream will be read
// enabled and downstream resume metric will be emitted.
EXPECT_EQ(downstream_pauses, 0);
EXPECT_EQ(downstream_resumes, 1);
}

} // namespace Envoy

0 comments on commit 8ab1441

Please sign in to comment.