1+ package ca .spottedleaf .concurrentutil .completable ;
2+
3+ import ca .spottedleaf .concurrentutil .collection .MultiThreadedQueue ;
4+ import ca .spottedleaf .concurrentutil .executor .Cancellable ;
5+ import ca .spottedleaf .concurrentutil .util .ConcurrentUtil ;
6+ import org .slf4j .Logger ;
7+ import org .slf4j .LoggerFactory ;
8+ import java .util .function .BiConsumer ;
9+
10+ public final class CallbackCompletable <T > {
11+
12+ private static final Logger LOGGER = LoggerFactory .getLogger (CallbackCompletable .class );
13+
14+ private final MultiThreadedQueue <BiConsumer <T , Throwable >> waiters = new MultiThreadedQueue <>();
15+ private T result ;
16+ private Throwable throwable ;
17+ private volatile boolean completed ;
18+
19+ public boolean isCompleted () {
20+ return this .completed ;
21+ }
22+
23+ /**
24+ * Note: Can only use after calling {@link #addAsynchronousWaiter(BiConsumer)}, as this function performs zero
25+ * synchronisation
26+ */
27+ public T getResult () {
28+ return this .result ;
29+ }
30+
31+ /**
32+ * Note: Can only use after calling {@link #addAsynchronousWaiter(BiConsumer)}, as this function performs zero
33+ * synchronisation
34+ */
35+ public Throwable getThrowable () {
36+ return this .throwable ;
37+ }
38+
39+ /**
40+ * Adds a waiter that should only be completed asynchronously by the complete() calls. If complete()
41+ * has already been called, returns {@code null} and does not invoke the specified consumer.
42+ * @param consumer Consumer to be executed on completion
43+ * @throws NullPointerException If consumer is null
44+ * @return A cancellable which will control the execution of the specified consumer
45+ */
46+ public Cancellable addAsynchronousWaiter (final BiConsumer <T , Throwable > consumer ) {
47+ if (this .waiters .add (consumer )) {
48+ return new CancellableImpl (consumer );
49+ }
50+ return null ;
51+ }
52+
53+ private void completeAllWaiters (final T result , final Throwable throwable ) {
54+ this .completed = true ;
55+ BiConsumer <T , Throwable > waiter ;
56+ while ((waiter = this .waiters .pollOrBlockAdds ()) != null ) {
57+ this .completeWaiter (waiter , result , throwable );
58+ }
59+ }
60+
61+ private void completeWaiter (final BiConsumer <T , Throwable > consumer , final T result , final Throwable throwable ) {
62+ try {
63+ consumer .accept (result , throwable );
64+ } catch (final Throwable throwable2 ) {
65+ LOGGER .error ("Failed to complete callback " + ConcurrentUtil .genericToString (consumer ), throwable2 );
66+ }
67+ }
68+
69+ /**
70+ * Adds a waiter that will be completed asynchronously by the complete() calls. If complete()
71+ * has already been called, then invokes the consumer synchronously with the completed result.
72+ * @param consumer Consumer to be executed on completion
73+ * @throws NullPointerException If consumer is null
74+ * @return A cancellable which will control the execution of the specified consumer
75+ */
76+ public Cancellable addWaiter (final BiConsumer <T , Throwable > consumer ) {
77+ if (this .waiters .add (consumer )) {
78+ return new CancellableImpl (consumer );
79+ }
80+ this .completeWaiter (consumer , this .result , this .throwable );
81+ return new CancellableImpl (consumer );
82+ }
83+
84+ public void complete (final T result ) {
85+ this .result = result ;
86+ this .completeAllWaiters (result , null );
87+ }
88+
89+ public void completeWithThrowable (final Throwable throwable ) {
90+ if (throwable == null ) {
91+ throw new NullPointerException ("Throwable cannot be null" );
92+ }
93+ this .throwable = throwable ;
94+ this .completeAllWaiters (null , throwable );
95+ }
96+
97+ private final class CancellableImpl implements Cancellable {
98+
99+ private final BiConsumer <T , Throwable > waiter ;
100+
101+ private CancellableImpl (final BiConsumer <T , Throwable > waiter ) {
102+ this .waiter = waiter ;
103+ }
104+
105+ @ Override
106+ public boolean cancel () {
107+ return CallbackCompletable .this .waiters .remove (this .waiter );
108+ }
109+ }
110+ }
0 commit comments