18
18
import java .io .ByteArrayOutputStream ;
19
19
import java .io .IOException ;
20
20
import java .io .OutputStream ;
21
+ import java .util .ArrayList ;
21
22
import java .util .Arrays ;
23
+ import java .util .Collections ;
22
24
import java .util .HashSet ;
25
+ import java .util .List ;
23
26
import java .util .Set ;
24
27
25
28
/**
26
29
* OutputStream Multiplexer
27
30
* <p>
28
31
* Multiplexes data on an output stream to subscribing other output streams.
32
+ * This implementation synchronizes on {@code flush} allowing threads expecting
33
+ * data on subscribing output streams to wait.
29
34
*
30
35
* @author technosf
31
36
* @see Inspired by Brogdan Matasaru,
@@ -45,7 +50,8 @@ public class MultiplexOutputStream
45
50
/**
46
51
* The set of subscribing streams
47
52
*/
48
- private Set <OutputStream > subscribers = new HashSet <>();
53
+ private Set <OutputStream > subscribers =
54
+ Collections .synchronizedSet (new HashSet <OutputStream >());
49
55
50
56
/**
51
57
* Automatically flush output streams on writes ending with EOL char 10
@@ -97,7 +103,7 @@ public MultiplexOutputStream(OutputStream... os)
97
103
98
104
/**
99
105
* Sets autoFlush property, causing output streams to be flushed when writes
100
- * end with character 10.
106
+ * end with character 10 (LF) .
101
107
*
102
108
* @param autoFlush
103
109
* true to autoflush
@@ -177,7 +183,13 @@ public boolean hasOutputStream(OutputStream os)
177
183
*/
178
184
public boolean addOutputStreams (OutputStream ... os )
179
185
{
180
- return subscribers .addAll (Arrays .asList (os ));
186
+ synchronized (publisher )
187
+ /*
188
+ * Lock on publisher, so that IO operations are consistent
189
+ */
190
+ {
191
+ return subscribers .addAll (Arrays .asList (os ));
192
+ }
181
193
}
182
194
183
195
@@ -190,7 +202,13 @@ public boolean addOutputStreams(OutputStream... os)
190
202
*/
191
203
public boolean removeOutputStreams (OutputStream ... os )
192
204
{
193
- return subscribers .removeAll (Arrays .asList (os ));
205
+ synchronized (publisher )
206
+ /*
207
+ * Lock on publisher, so that IO operations are consistent
208
+ */
209
+ {
210
+ return subscribers .removeAll (Arrays .asList (os ));
211
+ }
194
212
}
195
213
196
214
@@ -244,9 +262,11 @@ public void close() throws IOException
244
262
245
263
246
264
/**
247
- * {@inheritDoc}
265
+ * {code flush} takes accumulated writes and pushes them onto subscribing
266
+ * {@code OutputStream}s.
248
267
* <p>
249
- * Notify's after <em>flush</em>
268
+ * <em>flush</em> is synchronous and calls {@code notifyAll} once all
269
+ * {@code OutputStream}s have been flushed.
250
270
*
251
271
* @see java.io.OutputStream#flush()
252
272
*/
@@ -258,6 +278,8 @@ public synchronized void flush() throws IOException
258
278
* Lock on publisher, so that IO operations are consistent
259
279
*/
260
280
{
281
+ List <OutputStream > deadStreams = new ArrayList <>(); // Container streams found dead
282
+
261
283
/*
262
284
* Copy the publisher contents to be flushed and clear the publisher
263
285
*/
@@ -286,16 +308,24 @@ public synchronized void flush() throws IOException
286
308
}
287
309
catch (IOException e )
288
310
/*
289
- * Assume os is closed, remove
311
+ * Assume os is closed, add to dead list
290
312
*/
291
313
{
292
- subscribers . remove (os );
314
+ deadStreams . add (os );
293
315
}
294
316
}
317
+
318
+ if (!deadStreams .isEmpty ())
319
+ /*
320
+ * Remove dead streams
321
+ */
322
+ {
323
+ subscribers .remove (deadStreams );
324
+ }
325
+
295
326
writeFlag = false ;
296
327
}
297
-
298
- notifyAll ();
328
+ notifyAll (); // Notify waiting threads that a flush has occured
299
329
}
300
330
301
331
0 commit comments