@@ -41,8 +41,11 @@ final class DeferredGroup<T> {
41
41
*/
42
42
private final Deferred <ArrayList <T >> parent = new Deferred <ArrayList <T >>();
43
43
44
- /** How many results do we expect? */
45
- private final int nresults ;
44
+ /**
45
+ * How many results do we expect?.
46
+ * Need to acquires this' monitor before changing.
47
+ */
48
+ private int nresults ;
46
49
47
50
/**
48
51
* All the results for each Deferred we're grouping.
@@ -54,8 +57,17 @@ final class DeferredGroup<T> {
54
57
/**
55
58
* Constructor.
56
59
* @param deferreds All the {@link Deferred}s we want to group.
60
+ * @param ordered If true, the results will be presented in the same order
61
+ * as the {@link Deferred}s are in the {@code deferreds} argument.
62
+ * If false, results will be presented in the order in which they arrive.
63
+ * In other words, assuming that {@code deferreds} is a list of three
64
+ * {@link Deferred} objects {@code [A, B, C]}, then if {@code ordered} is
65
+ * true, {@code results} will be {@code [result A, result B, result C]}
66
+ * whereas if {@code ordered} is false then the order in {@code results}
67
+ * is determined by the order in which callbacks fire on A, B, and C.
57
68
*/
58
- public DeferredGroup (final Collection <Deferred <T >> deferreds ) {
69
+ public DeferredGroup (final Collection <Deferred <T >> deferreds ,
70
+ final boolean ordered ) {
59
71
nresults = deferreds .size ();
60
72
results = new ArrayList <Object >(nresults );
61
73
@@ -64,6 +76,7 @@ public DeferredGroup(final Collection<Deferred<T>> deferreds) {
64
76
return ;
65
77
}
66
78
79
+ // Callback used to collect results in the order in which they appear.
67
80
final class Notify <T > implements Callback <T , T > {
68
81
public T call (final T arg ) {
69
82
recordCompletion (arg );
@@ -74,10 +87,37 @@ public String toString() {
74
87
}
75
88
};
76
89
77
- final Notify <T > notify = new Notify <T >();
90
+ // Callback that preserves the original orders of the Deferreds.
91
+ final class NotifyOrdered <T > implements Callback <T , T > {
92
+ private final int index ;
93
+ NotifyOrdered (int index ) {
94
+ this .index = index ;
95
+ }
96
+ public T call (final T arg ) {
97
+ recordCompletion (arg , index );
98
+ return arg ;
99
+ }
100
+ public String toString () {
101
+ return "notify #" + index + " DeferredGroup@"
102
+ + DeferredGroup .super .hashCode ();
103
+ }
104
+ };
78
105
79
- for (final Deferred <T > d : deferreds ) {
80
- d .addBoth (notify );
106
+ if (ordered ) {
107
+ int i = 0 ;
108
+ for (final Deferred <T > d : deferreds ) {
109
+ results .add (null ); // ensures results.set(i, result) is valid.
110
+ // Note: it's important to add the callback after the line above,
111
+ // as the callback can fire at any time once it's been added, and
112
+ // if it fires before results.set(i, result) is valid, we'll get
113
+ // an IndexOutOfBoundsException.
114
+ d .addBoth (new NotifyOrdered <T >(i ++));
115
+ }
116
+ } else {
117
+ final Notify <T > notify = new Notify <T >();
118
+ for (final Deferred <T > d : deferreds ) {
119
+ d .addBoth (notify );
120
+ }
81
121
}
82
122
}
83
123
@@ -93,30 +133,50 @@ public Deferred<ArrayList<T>> getDeferred() {
93
133
* @param result The result of the deferred.
94
134
*/
95
135
private void recordCompletion (final Object result ) {
96
- int size ;
136
+ int left ;
97
137
synchronized (this ) {
98
138
results .add (result );
99
- size = results .size ();
139
+ left = --nresults ;
140
+ }
141
+ if (left == 0 ) {
142
+ done ();
100
143
}
101
- if (size == nresults ) {
102
- // From this point on, we no longer need to synchronize in order to
103
- // access `results' since we know we're done, so no other thread is
104
- // going to call this method on this instance again.
105
- for (final Object r : results ) {
106
- if (r instanceof Exception ) {
107
- parent .callback (new DeferredGroupException (results , (Exception ) r ));
108
- return ;
109
- }
144
+ }
145
+
146
+ /**
147
+ * Called back when one of the {@link Deferred} in the group completes.
148
+ * @param result The result of the deferred.
149
+ * @param index The index of the result.
150
+ */
151
+ private void recordCompletion (final Object result , final int index ) {
152
+ int left ;
153
+ synchronized (this ) {
154
+ results .set (index , result );
155
+ left = --nresults ;
156
+ }
157
+ if (left == 0 ) {
158
+ done ();
159
+ }
160
+ }
161
+
162
+ /** Called once we have obtained all the results of this group. */
163
+ private void done () {
164
+ // From this point on, we no longer need to synchronize in order to
165
+ // access `results' since we know we're done, so no other thread is
166
+ // going to call recordCompletion() again.
167
+ for (final Object r : results ) {
168
+ if (r instanceof Exception ) {
169
+ parent .callback (new DeferredGroupException (results , (Exception ) r ));
170
+ return ;
110
171
}
111
- parent .callback (results );
112
172
}
173
+ parent .callback (results );
113
174
}
114
175
115
176
public String toString () {
116
177
return "DeferredGroup"
117
178
+ "(parent=" + parent
118
- + ", # results=" + results .size () + " / " + nresults
119
- + ')' ;
179
+ + ", # results=" + results .size () + " / " + nresults + " left)" ;
120
180
}
121
181
122
182
}
0 commit comments