Skip to content

Commit

Permalink
http: tracking active session under L7 timers (#7782)
Browse files Browse the repository at this point in the history
Signed-off-by: Alyssa Wilk <alyssar@chromium.org>
  • Loading branch information
alyssawilk authored and mattklein123 committed Aug 23, 2019
1 parent e67923f commit e958cf9
Show file tree
Hide file tree
Showing 9 changed files with 25 additions and 10 deletions.
2 changes: 1 addition & 1 deletion source/common/http/conn_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ ConnectionManagerImpl::ActiveStream::ActiveStream(ConnectionManagerImpl& connect
std::chrono::milliseconds request_timeout_ms_ = connection_manager_.config_.requestTimeout();
request_timer_ = connection_manager.read_callbacks_->connection().dispatcher().createTimer(
[this]() -> void { onRequestTimeout(); });
request_timer_->enableTimer(request_timeout_ms_);
request_timer_->enableTimer(request_timeout_ms_, this);
}

stream_info_.setRequestedServerName(
Expand Down
2 changes: 2 additions & 0 deletions source/common/router/router.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1539,6 +1539,8 @@ void Filter::UpstreamRequest::onPoolFailure(Http::ConnectionPool::PoolFailureRea

void Filter::UpstreamRequest::onPoolReady(Http::StreamEncoder& request_encoder,
Upstream::HostDescriptionConstSharedPtr host) {
// This may be called under an existing ScopeTrackerScopeState but it will unwind correctly.
ScopeTrackerScopeState scope(&parent_.callbacks_->scope(), parent_.callbacks_->dispatcher());
ENVOY_STREAM_LOG(debug, "pool ready", *parent_.callbacks_);

host->outlierDetector().putResult(Upstream::Outlier::Result::LOCAL_ORIGIN_CONNECT_SUCCESS);
Expand Down
12 changes: 6 additions & 6 deletions source/extensions/filters/http/fault/fault_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ Http::FilterHeadersStatus FaultFilter::decodeHeaders(Http::HeaderMap& headers, b
delay_timer_ =
decoder_callbacks_->dispatcher().createTimer([this]() -> void { postDelayInjection(); });
ENVOY_LOG(debug, "fault: delaying request {}ms", duration.value().count());
delay_timer_->enableTimer(duration.value());
delay_timer_->enableTimer(duration.value(), &decoder_callbacks_->scope());
recordDelaysInjectedStats();
decoder_callbacks_->streamInfo().setResponseFlag(StreamInfo::ResponseFlag::DelayInjected);
return Http::FilterHeadersStatus::StopIteration;
Expand Down Expand Up @@ -192,7 +192,7 @@ void FaultFilter::maybeSetupResponseRateLimit(const Http::HeaderMap& request_hea
encoder_callbacks_->injectEncodedDataToFilterChain(data, end_stream);
},
[this] { encoder_callbacks_->continueEncoding(); }, config_->timeSource(),
decoder_callbacks_->dispatcher());
decoder_callbacks_->dispatcher(), decoder_callbacks_->scope());
}

bool FaultFilter::faultOverflow() {
Expand Down Expand Up @@ -423,10 +423,10 @@ StreamRateLimiter::StreamRateLimiter(uint64_t max_kbps, uint64_t max_buffered_da
std::function<void()> resume_data_cb,
std::function<void(Buffer::Instance&, bool)> write_data_cb,
std::function<void()> continue_cb, TimeSource& time_source,
Event::Dispatcher& dispatcher)
Event::Dispatcher& dispatcher, const ScopeTrackedObject& scope)
: // bytes_per_time_slice is KiB converted to bytes divided by the number of ticks per second.
bytes_per_time_slice_((max_kbps * 1024) / SecondDivisor), write_data_cb_(write_data_cb),
continue_cb_(continue_cb),
continue_cb_(continue_cb), scope_(scope),
// The token bucket is configured with a max token count of the number of ticks per second,
// and refills at the same rate, so that we have a per second limit which refills gradually in
// ~63ms intervals.
Expand Down Expand Up @@ -472,7 +472,7 @@ void StreamRateLimiter::onTokenTimer() {
const std::chrono::milliseconds ms = token_bucket_.nextTokenAvailable();
if (ms.count() > 0) {
ENVOY_LOG(trace, "limiter: scheduling wakeup for {}ms", ms.count());
token_timer_->enableTimer(ms);
token_timer_->enableTimer(ms, &scope_);
}
}

Expand All @@ -498,7 +498,7 @@ void StreamRateLimiter::writeData(Buffer::Instance& incoming_buffer, bool end_st
// The filter API does not currently support that and it will not be a trivial change to add.
// Instead we cheat here by scheduling the token timer to run immediately after the stack is
// unwound, at which point we can directly called encode/decodeData.
token_timer_->enableTimer(std::chrono::milliseconds(0));
token_timer_->enableTimer(std::chrono::milliseconds(0), &scope_);
}
}

Expand Down
4 changes: 3 additions & 1 deletion source/extensions/filters/http/fault/fault_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,12 +144,13 @@ class StreamRateLimiter : Logger::Loggable<Logger::Id::filter> {
* trailers that have been paused during body flush.
* @param time_source the time source to run the token bucket with.
* @param dispatcher the stream's dispatcher to use for creating timers.
* @param scope the stream's scope
*/
StreamRateLimiter(uint64_t max_kbps, uint64_t max_buffered_data,
std::function<void()> pause_data_cb, std::function<void()> resume_data_cb,
std::function<void(Buffer::Instance&, bool)> write_data_cb,
std::function<void()> continue_cb, TimeSource& time_source,
Event::Dispatcher& dispatcher);
Event::Dispatcher& dispatcher, const ScopeTrackedObject& scope);

/**
* Called by the stream to write data. All data writes happen asynchronously, the stream should
Expand Down Expand Up @@ -180,6 +181,7 @@ class StreamRateLimiter : Logger::Loggable<Logger::Id::filter> {
const uint64_t bytes_per_time_slice_;
const std::function<void(Buffer::Instance&, bool)> write_data_cb_;
const std::function<void()> continue_cb_;
const ScopeTrackedObject& scope_;
TokenBucketImpl token_bucket_;
Event::TimerPtr token_timer_;
bool saw_data_{};
Expand Down
6 changes: 4 additions & 2 deletions source/extensions/filters/http/squash/squash_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,8 @@ Http::FilterHeadersStatus SquashFilter::decodeHeaders(Http::HeaderMap& headers,

attachment_timeout_timer_ =
decoder_callbacks_->dispatcher().createTimer([this]() -> void { doneSquashing(); });
attachment_timeout_timer_->enableTimer(config_->attachmentTimeout());
attachment_timeout_timer_->enableTimer(config_->attachmentTimeout(),
&decoder_callbacks_->scope());
// Check if the timer expired inline.
if (!is_squashing_) {
return Http::FilterHeadersStatus::Continue;
Expand Down Expand Up @@ -261,7 +262,8 @@ void SquashFilter::scheduleRetry() {
attachment_poll_period_timer_ =
decoder_callbacks_->dispatcher().createTimer([this]() -> void { pollForAttachment(); });
}
attachment_poll_period_timer_->enableTimer(config_->attachmentPollPeriod());
attachment_poll_period_timer_->enableTimer(config_->attachmentPollPeriod(),
&decoder_callbacks_->scope());
}

void SquashFilter::pollForAttachment() {
Expand Down
1 change: 1 addition & 0 deletions test/common/http/conn_manager_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1865,6 +1865,7 @@ TEST_F(HttpConnectionManagerImplTest, RequestTimeoutCallbackDisarmsAndReturns408
EXPECT_CALL(response_encoder_, encodeData(_, true)).WillOnce(AddBufferToString(&response_body));

conn_manager_->newStream(response_encoder_);
EXPECT_CALL(filter_callbacks_.connection_.dispatcher_, setTrackedObject(_)).Times(2);
request_timer->invokeCallback();
}));

Expand Down
1 change: 1 addition & 0 deletions test/common/router/router_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ class RouterTestBase : public testing::Test {
[&](Http::StreamDecoder& decoder,
Http::ConnectionPool::Callbacks& callbacks) -> Http::ConnectionPool::Cancellable* {
response_decoder_ = &decoder;
EXPECT_CALL(callbacks_.dispatcher_, setTrackedObject(_)).Times(testing::AtLeast(2));
callbacks.onPoolReady(original_encoder_, cm_.conn_pool_.host_);
return nullptr;
}));
Expand Down
3 changes: 3 additions & 0 deletions test/extensions/filters/http/fault/fault_filter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "gtest/gtest.h"

using testing::_;
using testing::AnyNumber;
using testing::DoAll;
using testing::Invoke;
using testing::Matcher;
Expand Down Expand Up @@ -136,6 +137,7 @@ class FaultFilterTest : public testing::Test {
filter_ = std::make_unique<FaultFilter>(config_);
filter_->setDecoderFilterCallbacks(decoder_filter_callbacks_);
filter_->setEncoderFilterCallbacks(encoder_filter_callbacks_);
EXPECT_CALL(decoder_filter_callbacks_.dispatcher_, setTrackedObject(_)).Times(AnyNumber());
}

void SetUpTest(const std::string json) { SetUpTest(convertJsonStrToProtoConfig(json)); }
Expand Down Expand Up @@ -465,6 +467,7 @@ TEST_F(FaultFilterTest, DelayForDownstreamCluster) {
EXPECT_CALL(decoder_filter_callbacks_, continueDecoding());
EXPECT_EQ(Http::FilterDataStatus::StopIterationAndWatermark, filter_->decodeData(data_, false));

EXPECT_CALL(decoder_filter_callbacks_.dispatcher_, setTrackedObject(_)).Times(2);
timer_->invokeCallback();

EXPECT_EQ(Http::FilterTrailersStatus::Continue, filter_->decodeTrailers(request_headers_));
Expand Down
4 changes: 4 additions & 0 deletions test/extensions/filters/http/squash/squash_filter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,7 @@ TEST_F(SquashFilterTest, Timeout) {
EXPECT_CALL(request_, cancel());
EXPECT_CALL(filter_callbacks_, continueDecoding());

EXPECT_CALL(filter_callbacks_.dispatcher_, setTrackedObject(_)).Times(2);
attachmentTimeout_timer_->invokeCallback();

EXPECT_EQ(Envoy::Http::FilterDataStatus::Continue, filter_->decodeData(buffer, false));
Expand Down Expand Up @@ -359,6 +360,7 @@ TEST_F(SquashFilterTest, CheckRetryPollingAttachment) {

// Expect the second get attachment request
expectAsyncClientSend();
EXPECT_CALL(filter_callbacks_.dispatcher_, setTrackedObject(_)).Times(2);
retry_timer->invokeCallback();
EXPECT_CALL(filter_callbacks_, continueDecoding());
completeGetStatusRequest("attached");
Expand All @@ -378,6 +380,7 @@ TEST_F(SquashFilterTest, CheckRetryPollingAttachmentOnFailure) {
// Expect the second get attachment request
expectAsyncClientSend();

EXPECT_CALL(filter_callbacks_.dispatcher_, setTrackedObject(_)).Times(2);
retry_timer->invokeCallback();

EXPECT_CALL(filter_callbacks_, continueDecoding());
Expand Down Expand Up @@ -435,6 +438,7 @@ TEST_F(SquashFilterTest, TimerExpiresInline) {
attachmentTimeout_timer_->scope_ = scope;
attachmentTimeout_timer_->enabled_ = true;
// timer expires inline
EXPECT_CALL(filter_callbacks_.dispatcher_, setTrackedObject(_)).Times(2);
attachmentTimeout_timer_->invokeCallback();
}));

Expand Down

0 comments on commit e958cf9

Please sign in to comment.