4141import com .google .common .collect .ImmutableList ;
4242import com .google .common .collect .ImmutableMap ;
4343import com .google .common .collect .Iterables ;
44+ import com .google .common .util .concurrent .AbstractFuture ;
4445import com .google .common .util .concurrent .FutureCallback ;
4546import com .google .common .util .concurrent .Futures ;
4647import com .google .common .util .concurrent .ListenableFuture ;
6061import java .util .concurrent .Callable ;
6162import java .util .concurrent .ExecutionException ;
6263import java .util .concurrent .Executor ;
63- import java .util .concurrent .Future ;
6464import javax .annotation .Nullable ;
6565
6666/** Base type for Credentials using OAuth2. */
@@ -77,7 +77,7 @@ public class OAuth2Credentials extends Credentials {
7777 // byte[] is serializable, so the lock variable can be final
7878 @ VisibleForTesting final Object lock = new byte [0 ];
7979 private volatile OAuthValue value = null ;
80- @ VisibleForTesting transient ListenableFutureTask < OAuthValue > refreshTask ;
80+ @ VisibleForTesting transient RefreshTask refreshTask ;
8181
8282 // Change listeners are not serialized
8383 private transient List <CredentialsChangedListener > changeListeners ;
@@ -258,16 +258,7 @@ public OAuthValue call() throws Exception {
258258 }
259259 });
260260
261- task .addListener (
262- new Runnable () {
263- @ Override
264- public void run () {
265- finishRefreshAsync (task );
266- }
267- },
268- MoreExecutors .directExecutor ());
269-
270- refreshTask = task ;
261+ refreshTask = new RefreshTask (task , new RefreshTaskListener (task ));
271262
272263 return new AsyncRefreshResult (refreshTask , true );
273264 }
@@ -290,7 +281,7 @@ private void finishRefreshAsync(ListenableFuture<OAuthValue> finishedTask) {
290281 } catch (Exception e ) {
291282 // noop
292283 } finally {
293- if (this .refreshTask == finishedTask ) {
284+ if (this .refreshTask != null && this . refreshTask . getTask () == finishedTask ) {
294285 this .refreshTask = null ;
295286 }
296287 }
@@ -307,7 +298,7 @@ private void finishRefreshAsync(ListenableFuture<OAuthValue> finishedTask) {
307298 * thread of whatever executor the async call used. This doesn't affect correctness and is
308299 * extremely unlikely.
309300 */
310- private static <T > T unwrapDirectFuture (Future <T > future ) throws IOException {
301+ private static <T > T unwrapDirectFuture (ListenableFuture <T > future ) throws IOException {
311302 try {
312303 return future .get ();
313304 } catch (InterruptedException e ) {
@@ -567,10 +558,10 @@ public void onFailure(Throwable throwable) {
567558 * task is newly created, it is the caller's responsibility to execute it.
568559 */
569560 static class AsyncRefreshResult {
570- private final ListenableFutureTask < OAuthValue > task ;
561+ private final RefreshTask task ;
571562 private final boolean isNew ;
572563
573- AsyncRefreshResult (ListenableFutureTask < OAuthValue > task , boolean isNew ) {
564+ AsyncRefreshResult (RefreshTask task , boolean isNew ) {
574565 this .task = task ;
575566 this .isNew = isNew ;
576567 }
@@ -582,6 +573,57 @@ void executeIfNew(Executor executor) {
582573 }
583574 }
584575
576+ @ VisibleForTesting
577+ class RefreshTaskListener implements Runnable {
578+ private ListenableFutureTask <OAuthValue > task ;
579+
580+ RefreshTaskListener (ListenableFutureTask <OAuthValue > task ) {
581+ this .task = task ;
582+ }
583+
584+ @ Override
585+ public void run () {
586+ finishRefreshAsync (task );
587+ }
588+ }
589+
590+ class RefreshTask extends AbstractFuture <OAuthValue > implements Runnable {
591+ private final ListenableFutureTask <OAuthValue > task ;
592+ private final RefreshTaskListener listener ;
593+
594+ RefreshTask (ListenableFutureTask <OAuthValue > task , RefreshTaskListener listener ) {
595+ this .task = task ;
596+ this .listener = listener ;
597+
598+ // Update Credential state first
599+ task .addListener (listener , MoreExecutors .directExecutor ());
600+
601+ // Then notify the world
602+ Futures .addCallback (
603+ task ,
604+ new FutureCallback <OAuthValue >() {
605+ @ Override
606+ public void onSuccess (OAuthValue result ) {
607+ RefreshTask .this .set (result );
608+ }
609+
610+ @ Override
611+ public void onFailure (Throwable t ) {
612+ RefreshTask .this .setException (t );
613+ }
614+ },
615+ MoreExecutors .directExecutor ());
616+ }
617+
618+ public ListenableFutureTask <OAuthValue > getTask () {
619+ return this .task ;
620+ }
621+
622+ public void run () {
623+ task .run ();
624+ }
625+ }
626+
585627 public static class Builder {
586628
587629 private AccessToken accessToken ;
0 commit comments