25
25
import java .util .Map ;
26
26
import java .util .Set ;
27
27
import java .util .concurrent .ConcurrentHashMap ;
28
+ import java .util .concurrent .Executor ;
28
29
import java .util .concurrent .atomic .AtomicLong ;
29
30
import java .util .stream .Collectors ;
30
31
import org .slf4j .Logger ;
@@ -38,6 +39,7 @@ public class DiscoveryServer {
38
39
39
40
private final List <DiscoveryServerCallbacks > callbacks ;
40
41
private final ConfigWatcher configWatcher ;
42
+ private final ExecutorGroup executorGroup ;
41
43
private final AtomicLong streamCount = new AtomicLong ();
42
44
43
45
public DiscoveryServer (ConfigWatcher configWatcher ) {
@@ -54,11 +56,25 @@ public DiscoveryServer(DiscoveryServerCallbacks callbacks, ConfigWatcher configW
54
56
* @param configWatcher source of configuration updates
55
57
*/
56
58
public DiscoveryServer (List <DiscoveryServerCallbacks > callbacks , ConfigWatcher configWatcher ) {
59
+ this (callbacks , configWatcher , new DefaultExecutorGroup ());
60
+ }
61
+
62
+ /**
63
+ * Creates the server.
64
+ * @param callbacks server callbacks
65
+ * @param configWatcher source of configuration updates
66
+ * @param executorGroup executor group to use for responding stream requests
67
+ */
68
+ public DiscoveryServer (List <DiscoveryServerCallbacks > callbacks ,
69
+ ConfigWatcher configWatcher ,
70
+ ExecutorGroup executorGroup ) {
57
71
Preconditions .checkNotNull (callbacks , "callbacks cannot be null" );
58
72
Preconditions .checkNotNull (configWatcher , "configWatcher cannot be null" );
73
+ Preconditions .checkNotNull (executorGroup , "executorGroup cannot be null" );
59
74
60
75
this .callbacks = callbacks ;
61
76
this .configWatcher = configWatcher ;
77
+ this .executorGroup = executorGroup ;
62
78
}
63
79
64
80
/**
@@ -149,13 +165,14 @@ private StreamObserver<DiscoveryRequest> createRequestHandler(
149
165
String defaultTypeUrl ) {
150
166
151
167
long streamId = streamCount .getAndIncrement ();
168
+ Executor executor = executorGroup .next ();
152
169
153
170
LOGGER .info ("[{}] open stream from {}" , streamId , defaultTypeUrl );
154
171
155
172
callbacks .forEach (cb -> cb .onStreamOpen (streamId , defaultTypeUrl ));
156
173
157
174
final DiscoveryRequestStreamObserver requestStreamObserver =
158
- new DiscoveryRequestStreamObserver (defaultTypeUrl , responseObserver , streamId , ads );
175
+ new DiscoveryRequestStreamObserver (defaultTypeUrl , responseObserver , streamId , ads , executor );
159
176
160
177
if (responseObserver instanceof ServerCallStreamObserver ) {
161
178
((ServerCallStreamObserver ) responseObserver ).setOnCancelHandler (requestStreamObserver ::onCancelled );
@@ -173,11 +190,15 @@ private class DiscoveryRequestStreamObserver implements StreamObserver<Discovery
173
190
private final StreamObserver <DiscoveryResponse > responseObserver ;
174
191
private final long streamId ;
175
192
private final boolean ads ;
193
+ private final Executor executor ;
176
194
177
195
private AtomicLong streamNonce ;
178
196
179
- public DiscoveryRequestStreamObserver (String defaultTypeUrl , StreamObserver <DiscoveryResponse > responseObserver ,
180
- long streamId , boolean ads ) {
197
+ public DiscoveryRequestStreamObserver (String defaultTypeUrl ,
198
+ StreamObserver <DiscoveryResponse > responseObserver ,
199
+ long streamId ,
200
+ boolean ads ,
201
+ Executor executor ) {
181
202
this .defaultTypeUrl = defaultTypeUrl ;
182
203
this .responseObserver = responseObserver ;
183
204
this .streamId = streamId ;
@@ -186,6 +207,7 @@ public DiscoveryRequestStreamObserver(String defaultTypeUrl, StreamObserver<Disc
186
207
latestResponse = new ConcurrentHashMap <>(Resources .TYPE_URLS .size ());
187
208
ackedResources = new ConcurrentHashMap <>(Resources .TYPE_URLS .size ());
188
209
streamNonce = new AtomicLong ();
210
+ this .executor = executor ;
189
211
}
190
212
191
213
@ Override
@@ -238,7 +260,7 @@ public void onNext(DiscoveryRequest request) {
238
260
ads ,
239
261
request ,
240
262
ackedResources .getOrDefault (typeUrl , Collections .emptySet ()),
241
- r -> send (r , typeUrl ));
263
+ r -> executor . execute (() -> send (r , typeUrl ) ));
242
264
});
243
265
244
266
return ;
0 commit comments