Skip to content

Commit df039bd

Browse files
markdrothpaulosjca
authored andcommitted
[TokenFetcherCredentials] fix backoff behavior (grpc#38004)
As per discussion in grpc/proposal#438 (comment). Closes grpc#38004 COPYBARA_INTEGRATE_REVIEW=grpc#38004 from markdroth:token_fetcher_creds_backoff_fix 8de2fec PiperOrigin-RevId: 690678927
1 parent 384ad45 commit df039bd

File tree

3 files changed

+84
-77
lines changed

3 files changed

+84
-77
lines changed

src/core/lib/security/credentials/token_fetcher/token_fetcher_credentials.cc

+29-22
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,8 @@ void TokenFetcherCredentials::Token::AddTokenToClientInitialMetadata(
5858
//
5959

6060
TokenFetcherCredentials::FetchState::BackoffTimer::BackoffTimer(
61-
RefCountedPtr<FetchState> fetch_state)
62-
: fetch_state_(std::move(fetch_state)) {
61+
RefCountedPtr<FetchState> fetch_state, absl::Status status)
62+
: fetch_state_(std::move(fetch_state)), status_(status) {
6363
const Duration delay = fetch_state_->backoff_.NextAttemptDelay();
6464
GRPC_TRACE_LOG(token_fetcher_credentials, INFO)
6565
<< "[TokenFetcherCredentials " << fetch_state_->creds_.get()
@@ -100,24 +100,13 @@ void TokenFetcherCredentials::FetchState::BackoffTimer::OnTimer() {
100100
<< "[TokenFetcherCredentials " << fetch_state_->creds_.get()
101101
<< "]: fetch_state=" << fetch_state_.get() << " backoff_timer=" << this
102102
<< ": backoff timer fired";
103-
if (fetch_state_->queued_calls_.empty()) {
104-
// If there are no pending calls when the timer fires, then orphan
105-
// the FetchState object. Note that this drops the backoff state,
106-
// but that's probably okay, because if we didn't have any pending
107-
// calls during the backoff period, we probably won't see any
108-
// immediately now either.
109-
GRPC_TRACE_LOG(token_fetcher_credentials, INFO)
110-
<< "[TokenFetcherCredentials " << fetch_state_->creds_.get()
111-
<< "]: fetch_state=" << fetch_state_.get() << " backoff_timer=" << this
112-
<< ": no pending calls, clearing state";
113-
fetch_state_->creds_->fetch_state_.reset();
114-
} else {
115-
// If there are pending calls, then start a new fetch attempt.
116-
GRPC_TRACE_LOG(token_fetcher_credentials, INFO)
117-
<< "[TokenFetcherCredentials " << fetch_state_->creds_.get()
118-
<< "]: fetch_state=" << fetch_state_.get() << " backoff_timer=" << this
119-
<< ": starting new fetch attempt";
120-
fetch_state_->StartFetchAttempt();
103+
auto* self_ptr =
104+
absl::get_if<OrphanablePtr<BackoffTimer>>(&fetch_state_->state_);
105+
// This condition should always be true, but check to be defensive.
106+
if (self_ptr != nullptr && self_ptr->get() == this) {
107+
// Reset pointer in fetch_state_, so that subsequent RPCs know that
108+
// we're no longer in backoff and they can trigger a new fetch.
109+
self_ptr->reset();
121110
}
122111
}
123112

@@ -145,6 +134,14 @@ void TokenFetcherCredentials::FetchState::Orphan() {
145134
Unref();
146135
}
147136

137+
absl::Status TokenFetcherCredentials::FetchState::status() const {
138+
auto* backoff_ptr = absl::get_if<OrphanablePtr<BackoffTimer>>(&state_);
139+
if (backoff_ptr == nullptr || *backoff_ptr == nullptr) {
140+
return absl::OkStatus();
141+
}
142+
return (*backoff_ptr)->status();
143+
}
144+
148145
void TokenFetcherCredentials::FetchState::StartFetchAttempt() {
149146
GRPC_TRACE_LOG(token_fetcher_credentials, INFO)
150147
<< "[TokenFetcherCredentials " << creds_.get()
@@ -182,7 +179,8 @@ void TokenFetcherCredentials::FetchState::TokenFetchComplete(
182179
<< "]: fetch_state=" << this
183180
<< ": token fetch failed: " << token.status();
184181
// If failed, start backoff timer.
185-
state_ = OrphanablePtr<BackoffTimer>(new BackoffTimer(Ref()));
182+
state_ =
183+
OrphanablePtr<BackoffTimer>(new BackoffTimer(Ref(), token.status()));
186184
}
187185
ResumeQueuedCalls(std::move(token));
188186
}
@@ -204,14 +202,18 @@ void TokenFetcherCredentials::FetchState::ResumeQueuedCalls(
204202
RefCountedPtr<TokenFetcherCredentials::QueuedCall>
205203
TokenFetcherCredentials::FetchState::QueueCall(
206204
ClientMetadataHandle initial_metadata) {
207-
// Add call to pending list.
208205
auto queued_call = MakeRefCounted<QueuedCall>();
209206
queued_call->waker = GetContext<Activity>()->MakeNonOwningWaker();
210207
queued_call->pollent = GetContext<grpc_polling_entity>();
211208
grpc_polling_entity_add_to_pollset_set(
212209
queued_call->pollent, grpc_polling_entity_pollset_set(&creds_->pollent_));
213210
queued_call->md = std::move(initial_metadata);
214211
queued_calls_.insert(queued_call);
212+
// If backoff has expired since the last attempt, trigger a new one.
213+
auto* backoff_ptr = absl::get_if<OrphanablePtr<BackoffTimer>>(&state_);
214+
if (backoff_ptr != nullptr && backoff_ptr->get() == nullptr) {
215+
StartFetchAttempt();
216+
}
215217
return queued_call;
216218
}
217219

@@ -267,6 +269,11 @@ TokenFetcherCredentials::GetRequestMetadata(
267269
token_->AddTokenToClientInitialMetadata(*initial_metadata);
268270
return Immediate(std::move(initial_metadata));
269271
}
272+
// If we're in backoff, fail the call.
273+
if (fetch_state_ != nullptr) {
274+
absl::Status status = fetch_state_->status();
275+
if (!status.ok()) return Immediate(std::move(status));
276+
}
270277
// If we don't have a cached token, this call will need to be queued.
271278
GRPC_TRACE_LOG(token_fetcher_credentials, INFO)
272279
<< "[TokenFetcherCredentials " << this

src/core/lib/security/credentials/token_fetcher/token_fetcher_credentials.h

+7-1
Original file line numberDiff line numberDiff line change
@@ -111,24 +111,30 @@ class TokenFetcherCredentials : public grpc_call_credentials {
111111
// annotations.
112112
void Orphan() override ABSL_NO_THREAD_SAFETY_ANALYSIS;
113113

114+
// Returns non-OK when we're in backoff.
115+
absl::Status status() const;
116+
114117
RefCountedPtr<QueuedCall> QueueCall(ClientMetadataHandle initial_metadata)
115118
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&TokenFetcherCredentials::mu_);
116119

117120
private:
118121
class BackoffTimer : public InternallyRefCounted<BackoffTimer> {
119122
public:
120-
explicit BackoffTimer(RefCountedPtr<FetchState> fetch_state)
123+
BackoffTimer(RefCountedPtr<FetchState> fetch_state, absl::Status status)
121124
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&TokenFetcherCredentials::mu_);
122125

123126
// Disabling thread safety annotations, since Orphan() is called
124127
// by OrpahanablePtr<>, which does not have the right lock
125128
// annotations.
126129
void Orphan() override ABSL_NO_THREAD_SAFETY_ANALYSIS;
127130

131+
absl::Status status() const { return status_; }
132+
128133
private:
129134
void OnTimer();
130135

131136
RefCountedPtr<FetchState> fetch_state_;
137+
const absl::Status status_;
132138
absl::optional<grpc_event_engine::experimental::EventEngine::TaskHandle>
133139
timer_handle_ ABSL_GUARDED_BY(&TokenFetcherCredentials::mu_);
134140
};

test/core/security/credentials_test.cc

+48-54
Original file line numberDiff line numberDiff line change
@@ -2569,31 +2569,35 @@ TEST_F(TokenFetcherCredentialsTest, FetchFails) {
25692569
run_after_duration = duration;
25702570
});
25712571
ExecCtx exec_ctx;
2572-
creds_->AddResult(kExpectedError);
25732572
// First request will trigger a fetch, which will fail.
2573+
LOG(INFO) << "Sending first RPC.";
2574+
creds_->AddResult(kExpectedError);
25742575
auto state = RequestMetadataState::NewInstance(kExpectedError, "",
25752576
/*expect_delay=*/true);
25762577
state->RunRequestMetadataTest(creds_.get(), kTestUrlScheme, kTestAuthority,
25772578
kTestPath);
25782579
EXPECT_EQ(creds_->num_fetches(), 1);
25792580
while (!run_after_duration.has_value()) event_engine_->Tick();
25802581
// Make sure backoff was set for the right period.
2581-
// This is 1 second (initial backoff) minus 1ms for the tick needed above.
25822582
EXPECT_EQ(run_after_duration, std::chrono::seconds(1));
25832583
run_after_duration.reset();
2584-
// Start a new call now, which will be queued and then eventually
2585-
// resumed when the next fetch happens.
2584+
// Start a new call now, which will fail because we're in backoff.
2585+
LOG(INFO) << "Sending second RPC.";
25862586
state = RequestMetadataState::NewInstance(
2587-
absl::OkStatus(), "authorization: foo", /*expect_delay=*/true);
2587+
kExpectedError, "authorization: foo", /*expect_delay=*/false);
25882588
state->RunRequestMetadataTest(creds_.get(), kTestUrlScheme, kTestAuthority,
25892589
kTestPath);
2590-
// Tick until the next fetch starts.
2591-
creds_->AddResult(MakeToken("foo"));
2590+
EXPECT_EQ(creds_->num_fetches(), 1);
2591+
// Tick until backoff expires.
2592+
LOG(INFO) << "Waiting for backoff.";
25922593
event_engine_->TickUntilIdle();
2593-
EXPECT_EQ(creds_->num_fetches(), 2);
2594-
// A call started now should use the new cached data.
2594+
EXPECT_EQ(creds_->num_fetches(), 1);
2595+
// Starting another call should trigger a new fetch, which will
2596+
// succeed this time.
2597+
LOG(INFO) << "Sending third RPC.";
2598+
creds_->AddResult(MakeToken("foo"));
25952599
state = RequestMetadataState::NewInstance(
2596-
absl::OkStatus(), "authorization: foo", /*expect_delay=*/false);
2600+
absl::OkStatus(), "authorization: foo", /*expect_delay=*/true);
25972601
state->RunRequestMetadataTest(creds_.get(), kTestUrlScheme, kTestAuthority,
25982602
kTestPath);
25992603
EXPECT_EQ(creds_->num_fetches(), 2);
@@ -2607,8 +2611,9 @@ TEST_F(TokenFetcherCredentialsTest, Backoff) {
26072611
run_after_duration = duration;
26082612
});
26092613
ExecCtx exec_ctx;
2610-
creds_->AddResult(kExpectedError);
26112614
// First request will trigger a fetch, which will fail.
2615+
LOG(INFO) << "Sending first RPC.";
2616+
creds_->AddResult(kExpectedError);
26122617
auto state = RequestMetadataState::NewInstance(kExpectedError, "",
26132618
/*expect_delay=*/true);
26142619
state->RunRequestMetadataTest(creds_.get(), kTestUrlScheme, kTestAuthority,
@@ -2618,64 +2623,53 @@ TEST_F(TokenFetcherCredentialsTest, Backoff) {
26182623
// Make sure backoff was set for the right period.
26192624
EXPECT_EQ(run_after_duration, std::chrono::seconds(1));
26202625
run_after_duration.reset();
2621-
// Start a new call now, which will be queued and then eventually
2622-
// resumed when the next fetch happens.
2626+
// Start a new call now, which will fail because we're in backoff.
2627+
LOG(INFO) << "Sending second RPC.";
26232628
state = RequestMetadataState::NewInstance(kExpectedError, "",
2624-
/*expect_delay=*/true);
2629+
/*expect_delay=*/false);
26252630
state->RunRequestMetadataTest(creds_.get(), kTestUrlScheme, kTestAuthority,
26262631
kTestPath);
2627-
// Tick until the next fetch fails and the backoff timer starts again.
2632+
EXPECT_EQ(creds_->num_fetches(), 1);
2633+
// Tick until backoff expires.
2634+
LOG(INFO) << "Waiting for backoff.";
2635+
event_engine_->TickUntilIdle();
2636+
EXPECT_EQ(creds_->num_fetches(), 1);
2637+
// Starting another call should trigger a new fetch, which will again fail.
2638+
LOG(INFO) << "Sending third RPC.";
26282639
creds_->AddResult(kExpectedError);
2629-
while (!run_after_duration.has_value()) event_engine_->Tick();
2630-
EXPECT_EQ(creds_->num_fetches(), 2);
2631-
// The backoff time should be longer now. We account for jitter here.
2632-
EXPECT_EQ(run_after_duration, std::chrono::milliseconds(1600))
2633-
<< "actual: " << run_after_duration->count();
2634-
run_after_duration.reset();
2635-
// Start another new call to trigger another new fetch once the
2636-
// backoff expires.
26372640
state = RequestMetadataState::NewInstance(kExpectedError, "",
26382641
/*expect_delay=*/true);
26392642
state->RunRequestMetadataTest(creds_.get(), kTestUrlScheme, kTestAuthority,
26402643
kTestPath);
2641-
// Tick until the next fetch starts.
2642-
creds_->AddResult(kExpectedError);
2644+
EXPECT_EQ(creds_->num_fetches(), 2);
26432645
while (!run_after_duration.has_value()) event_engine_->Tick();
2644-
EXPECT_EQ(creds_->num_fetches(), 3);
2645-
// Check backoff time again.
2646-
EXPECT_EQ(run_after_duration, std::chrono::milliseconds(2560))
2646+
// The backoff time should be longer now.
2647+
EXPECT_EQ(run_after_duration, std::chrono::milliseconds(1600))
26472648
<< "actual: " << run_after_duration->count();
2648-
}
2649-
2650-
TEST_F(TokenFetcherCredentialsTest, FetchNotStartedAfterBackoffWithoutRpc) {
2651-
const absl::Status kExpectedError = absl::UnavailableError("bummer, dude");
2652-
absl::optional<FuzzingEventEngine::Duration> run_after_duration;
2653-
event_engine_->SetRunAfterDurationCallback(
2654-
[&](FuzzingEventEngine::Duration duration) {
2655-
run_after_duration = duration;
2656-
});
2657-
ExecCtx exec_ctx;
2658-
creds_->AddResult(kExpectedError);
2659-
// First request will trigger a fetch, which will fail.
2660-
auto state = RequestMetadataState::NewInstance(kExpectedError, "",
2661-
/*expect_delay=*/true);
2649+
run_after_duration.reset();
2650+
// Start a new call now, which will fail because we're in backoff.
2651+
LOG(INFO) << "Sending fourth RPC.";
2652+
state = RequestMetadataState::NewInstance(kExpectedError, "",
2653+
/*expect_delay=*/false);
26622654
state->RunRequestMetadataTest(creds_.get(), kTestUrlScheme, kTestAuthority,
26632655
kTestPath);
2664-
EXPECT_EQ(creds_->num_fetches(), 1);
2665-
while (!run_after_duration.has_value()) event_engine_->Tick();
2666-
// Make sure backoff was set for the right period.
2667-
EXPECT_EQ(run_after_duration, std::chrono::seconds(1));
2668-
run_after_duration.reset();
2669-
// Tick until the backoff expires. No new fetch should be started.
2656+
EXPECT_EQ(creds_->num_fetches(), 2);
2657+
// Tick until backoff expires.
2658+
LOG(INFO) << "Waiting for backoff.";
26702659
event_engine_->TickUntilIdle();
2671-
EXPECT_EQ(creds_->num_fetches(), 1);
2672-
// Now start a new request, which will trigger a new fetch.
2673-
creds_->AddResult(MakeToken("foo"));
2674-
state = RequestMetadataState::NewInstance(
2675-
absl::OkStatus(), "authorization: foo", /*expect_delay=*/true);
2660+
EXPECT_EQ(creds_->num_fetches(), 2);
2661+
// Starting another call should trigger a new fetch, which will again fail.
2662+
LOG(INFO) << "Sending fifth RPC.";
2663+
creds_->AddResult(kExpectedError);
2664+
state = RequestMetadataState::NewInstance(kExpectedError, "",
2665+
/*expect_delay=*/true);
26762666
state->RunRequestMetadataTest(creds_.get(), kTestUrlScheme, kTestAuthority,
26772667
kTestPath);
2678-
EXPECT_EQ(creds_->num_fetches(), 2);
2668+
EXPECT_EQ(creds_->num_fetches(), 3);
2669+
while (!run_after_duration.has_value()) event_engine_->Tick();
2670+
// The backoff time should be longer now.
2671+
EXPECT_EQ(run_after_duration, std::chrono::milliseconds(2560))
2672+
<< "actual: " << run_after_duration->count();
26792673
}
26802674

26812675
TEST_F(TokenFetcherCredentialsTest, ShutdownWhileBackoffTimerPending) {

0 commit comments

Comments
 (0)