Skip to content

Commit 1771565

Browse files
authored
fix(gcp_pubsub source): Fix handling of auth token (#12757)
* Regenerate the GCP auth token only *after* one interval * Break the receive loop immediately after a receive error * Fix handling of credentials generation in gcp_pubsub source The gcp_pubsub source would take the credentials on the first time through the streaming pull loop, in order to start the token regenerate task, leaving them empty if it needed to restart the loop. This drops the `take` and moves the token regenerator into a stream that can be polled in the main request loop, exiting the loop and restarting the stream when the token is regenerated.
1 parent 20a5fec commit 1771565

File tree

3 files changed

+63
-40
lines changed

3 files changed

+63
-40
lines changed

src/gcp.rs

Lines changed: 35 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::sync::{Arc, RwLock};
22
use std::time::Duration;
33

4-
use futures::StreamExt;
4+
use futures::{Stream, StreamExt};
55
pub use goauth::scopes::Scope;
66
use goauth::{
77
auth::{JwtClaims, Token, TokenErr},
@@ -13,6 +13,7 @@ use once_cell::sync::Lazy;
1313
use serde::{Deserialize, Serialize};
1414
use smpl_jwt::Jwt;
1515
use snafu::{ResultExt, Snafu};
16+
use tokio::time::Instant;
1617
use tokio_stream::wrappers::IntervalStream;
1718

1819
use crate::{config::ProxyConfig, http::HttpClient, http::HttpError};
@@ -70,10 +71,13 @@ impl GcpAuthConfig {
7071
}
7172

7273
#[derive(Clone, Debug)]
73-
pub struct GcpCredentials {
74+
pub struct GcpCredentials(Arc<Inner>);
75+
76+
#[derive(Debug)]
77+
struct Inner {
7478
creds: Option<Credentials>,
7579
scope: Scope,
76-
token: Arc<RwLock<Token>>,
80+
token: RwLock<Token>,
7781
}
7882

7983
impl GcpCredentials {
@@ -83,24 +87,24 @@ impl GcpCredentials {
8387
let token = goauth::get_token(&jwt, &creds)
8488
.await
8589
.context(GetTokenSnafu)?;
86-
Ok(Self {
90+
Ok(Self(Arc::new(Inner {
8791
creds: Some(creds),
8892
scope,
89-
token: Arc::new(RwLock::new(token)),
90-
})
93+
token: RwLock::new(token),
94+
})))
9195
}
9296

9397
async fn new_implicit(scope: Scope) -> crate::Result<Self> {
9498
let token = get_token_implicit().await?;
95-
Ok(Self {
99+
Ok(Self(Arc::new(Inner {
96100
creds: None,
97101
scope,
98-
token: Arc::new(RwLock::new(token)),
99-
})
102+
token: RwLock::new(token),
103+
})))
100104
}
101105

102106
pub fn make_token(&self) -> String {
103-
let token = self.token.read().unwrap();
107+
let token = self.0.token.read().unwrap();
104108
format!("{} {}", token.token_type(), token.access_token())
105109
}
106110

@@ -111,35 +115,39 @@ impl GcpCredentials {
111115
}
112116

113117
async fn regenerate_token(&self) -> crate::Result<()> {
114-
let token = match &self.creds {
118+
let token = match &self.0.creds {
115119
Some(creds) => {
116-
let jwt = make_jwt(creds, &self.scope).unwrap(); // Errors caught above
120+
let jwt = make_jwt(creds, &self.0.scope).unwrap(); // Errors caught above
117121
goauth::get_token(&jwt, creds).await?
118122
}
119123
None => get_token_implicit().await?,
120124
};
121-
*self.token.write().unwrap() = token;
125+
*self.0.token.write().unwrap() = token;
122126
Ok(())
123127
}
124128

125129
pub fn spawn_regenerate_token(&self) {
126130
let this = self.clone();
131+
tokio::spawn(async move { this.token_regenerator().for_each(|_| async {}).await });
132+
}
127133

128-
let period = this.token.read().unwrap().expires_in() as u64 / 2;
129-
let interval = IntervalStream::new(tokio::time::interval(Duration::from_secs(period)));
130-
let task = interval.for_each(move |_| {
131-
let this = this.clone();
132-
async move {
133-
debug!("Renewing GCP authentication token.");
134-
if let Err(error) = this.regenerate_token().await {
135-
error!(
136-
message = "Failed to update GCP authentication token.",
137-
%error
138-
);
134+
pub fn token_regenerator(&self) -> impl Stream<Item = ()> + 'static {
135+
let period = Duration::from_secs(self.0.token.read().unwrap().expires_in() as u64 / 2);
136+
let this = self.clone();
137+
IntervalStream::new(tokio::time::interval_at(Instant::now() + period, period)).then(
138+
move |_| {
139+
let this = this.clone();
140+
async move {
141+
debug!("Renewing GCP authentication token.");
142+
if let Err(error) = this.regenerate_token().await {
143+
error!(
144+
message = "Failed to update GCP authentication token.",
145+
%error
146+
);
147+
}
139148
}
140-
}
141-
});
142-
tokio::spawn(task);
149+
},
150+
)
143151
}
144152
}
145153

src/sources/gcp_pubsub.rs

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::{sync::Arc, time::Duration};
1+
use std::{pin::Pin, sync::Arc, time::Duration};
22

33
use chrono::{DateTime, NaiveDateTime, Utc};
44
use codecs::decoding::{DeserializerConfig, FramingConfig};
@@ -27,7 +27,8 @@ use crate::{
2727
},
2828
serde::{bool_or_struct, default_decoding, default_framing_message_based},
2929
shutdown::ShutdownSignal,
30-
sources::util::{self, finalizer::UnorderedFinalizer},
30+
sources::util,
31+
sources::util::finalizer::{EmptyStream, UnorderedFinalizer},
3132
tls::{TlsConfig, TlsSettings},
3233
SourceSender,
3334
};
@@ -215,15 +216,24 @@ impl PubsubSource {
215216
.context(EndpointTlsSnafu)?;
216217
}
217218

218-
while self.run_once(&endpoint).await {
219-
info!(timeout_secs = 1, "Retrying after timeout");
219+
let mut token_generator = match &self.credentials {
220+
Some(credentials) => credentials.clone().token_regenerator().boxed(),
221+
None => EmptyStream::default().boxed(),
222+
};
223+
224+
while self.run_once(&endpoint, &mut token_generator).await {
225+
info!(timeout_secs = 1, "Retrying after timeout.");
220226
tokio::time::sleep(self.retry_delay).await;
221227
}
222228

223229
Ok(())
224230
}
225231

226-
async fn run_once(&mut self, endpoint: &Endpoint) -> bool {
232+
async fn run_once(
233+
&mut self,
234+
endpoint: &Endpoint,
235+
token_generator: &mut Pin<Box<dyn Stream<Item = ()> + Send>>,
236+
) -> bool {
227237
let connection = match endpoint.connect().await {
228238
Ok(connection) => connection,
229239
Err(error) => {
@@ -252,6 +262,7 @@ impl PubsubSource {
252262
// Handle shutdown during startup, the streaming pull doesn't
253263
// start if there is no data in the subscription.
254264
let request_stream = self.request_stream();
265+
debug!("Starting streaming pull.");
255266
let stream = tokio::select! {
256267
_ = &mut self.shutdown => return false,
257268
result = client.streaming_pull(request_stream) => match result {
@@ -264,24 +275,27 @@ impl PubsubSource {
264275
};
265276
let mut stream = stream.into_inner();
266277

267-
if let Some(credentials) = self.credentials.take() {
268-
credentials.spawn_regenerate_token();
269-
}
270-
271278
let (finalizer, mut ack_stream) =
272279
Finalizer::maybe_new(self.acknowledgements, self.shutdown.clone());
273280

274281
loop {
275282
tokio::select! {
276283
_ = &mut self.shutdown => return false,
284+
_ = &mut token_generator.next() => {
285+
debug!("New authentication token generated, restarting stream.");
286+
return true;
287+
},
277288
receipts = ack_stream.next() => if let Some((status, receipts)) = receipts {
278289
if status == BatchStatus::Delivered {
279290
self.ack_ids.lock().await.extend(receipts);
280291
}
281292
},
282293
response = stream.next() => match response {
283294
Some(Ok(response)) => self.handle_response(response, &finalizer).await,
284-
Some(Err(error)) => emit!(GcpPubsubReceiveError { error }),
295+
Some(Err(error)) => {
296+
emit!(GcpPubsubReceiveError { error });
297+
break;
298+
}
285299
None => break,
286300
},
287301
}

src/sources/util/finalizer.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ where
8282
let (finalizer, stream) = Self::new(shutdown);
8383
(Some(finalizer), stream.boxed())
8484
} else {
85-
(None, EmptyStream(Default::default()).boxed())
85+
(None, EmptyStream::default().boxed())
8686
}
8787
}
8888

@@ -194,8 +194,9 @@ impl<T> Future for FinalizerFuture<T> {
194194
}
195195
}
196196

197-
#[derive(Clone, Copy)]
198-
struct EmptyStream<T>(PhantomData<T>);
197+
#[derive(Clone, Copy, Derivative)]
198+
#[derivative(Default(bound = ""))]
199+
pub struct EmptyStream<T>(PhantomData<T>);
199200

200201
impl<T> Stream for EmptyStream<T> {
201202
type Item = T;

0 commit comments

Comments
 (0)