Skip to content

Commit fca0cfe

Browse files
committed
simplify BlockingProcessStreamReader; fix tests
Update #1429. This commit simplifies the main run() method. Previous implementation checks whether the underlying Reader is ready to read then either reads a line or sleeps, catching InterruptedException to watch for any thread interruption. There are subtle difficulties with this approach: - Even if the underlying Reader is ready to read, it might not have enough bytes to form a line. It might still block. - It's not necessary to sleep. If the thread is interrupted while reading. It should throw InterruptedIOException. The method now reads in a loop, waiting for either exceptions or EOF. The test class implements a mock Logger that logs to a data structure. It then verifies that the data structure holds appropriate logs. As implemented, this can cause a race, as two threads, the writer and the verifier, run concurrently. This commit fixes this by waiting for the writing thread to terminate before verifying.
1 parent 0838c41 commit fca0cfe

File tree

2 files changed

+12
-39
lines changed

2 files changed

+12
-39
lines changed

google-cloud-core/src/main/java/com/google/cloud/testing/BlockingProcessStreamReader.java

Lines changed: 10 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -35,15 +35,13 @@
3535
*/
3636
class BlockingProcessStreamReader extends Thread {
3737

38-
private static final int STREAM_READER_SLEEP_INTERVAL_IN_MS = 200;
3938
private static final int LOG_LENGTH_LIMIT = 50000;
4039

4140
private final BufferedReader errorReader;
4241
private final Logger logger;
4342
private StringBuilder currentLog;
4443
private Level currentLogLevel;
4544
private boolean collectionMode;
46-
private volatile boolean terminated;
4745
private final String emulatorTag;
4846
private final Pattern logLinePattern;
4947

@@ -64,34 +62,25 @@ private BlockingProcessStreamReader(String emulator, InputStream stream, String
6462
}
6563

6664
void terminate() throws IOException {
67-
terminated = true;
68-
errorReader.close();
6965
interrupt();
7066
}
7167

7268
@Override
7369
public void run() {
7470
String previousLine = "";
7571
String nextLine = "";
76-
while (!terminated) {
77-
try {
78-
if (errorReader.ready()) {
79-
previousLine = nextLine;
80-
nextLine = errorReader.readLine();
81-
if (nextLine == null) {
82-
terminated = true;
83-
} else {
84-
processLogLine(previousLine, nextLine);
85-
}
86-
} else {
87-
sleep(STREAM_READER_SLEEP_INTERVAL_IN_MS);
72+
try {
73+
for (;;) {
74+
previousLine = nextLine;
75+
nextLine = errorReader.readLine();
76+
if (nextLine == null) {
77+
break;
8878
}
89-
} catch (IOException e) {
79+
processLogLine(previousLine, nextLine);
80+
}
81+
} catch (IOException e) {
82+
if (!isInterrupted()) {
9083
e.printStackTrace(System.err);
91-
} catch (InterruptedException e) {
92-
previousLine = nextLine;
93-
nextLine = null;
94-
break;
9584
}
9685
}
9786
processLogLine(previousLine, firstNonNull(nextLine, ""));

google-cloud-core/src/test/java/com/google/cloud/testing/BlockingProcessStreamReaderTest.java

Lines changed: 2 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -87,17 +87,9 @@ public void testBlockUntil() throws IOException {
8787
public void testForwardLogEntry() throws IOException, InterruptedException {
8888
TestLogger logger = new TestLogger();
8989
InputStream stream = new ByteArrayInputStream(OUTPUT_WITH_LOGS.getBytes(Charsets.UTF_8));
90-
BlockingProcessStreamReader thread =
91-
BlockingProcessStreamReader.start("emulator", stream, BLOCK_UNTIL, logger);
92-
while (logger.getLogs().get(Level.INFO).isEmpty()) {
93-
Thread.sleep(200);
94-
}
90+
BlockingProcessStreamReader.start("emulator", stream, BLOCK_UNTIL, logger).join();
9591
assertEquals("[emulator] log line 1" + System.lineSeparator() + "[emulator] log line 2",
9692
logger.getLogs().get(Level.INFO).iterator().next());
97-
thread.terminate();
98-
while (logger.getLogs().get(Level.FINE).isEmpty()) {
99-
Thread.sleep(200);
100-
}
10193
assertEquals("[emulator] log line 3", logger.getLogs().get(Level.FINE).iterator().next());
10294
stream.close();
10395
}
@@ -106,17 +98,9 @@ public void testForwardLogEntry() throws IOException, InterruptedException {
10698
public void testForwardAlreadyTaggedLogs() throws IOException, InterruptedException {
10799
TestLogger logger = new TestLogger();
108100
InputStream stream = new ByteArrayInputStream(TAGGED_OUTPUT_WITH_LOGS.getBytes(Charsets.UTF_8));
109-
BlockingProcessStreamReader thread =
110-
BlockingProcessStreamReader.start("emulator", stream, BLOCK_UNTIL, logger);
111-
while (logger.getLogs().get(Level.INFO).isEmpty()) {
112-
Thread.sleep(200);
113-
}
101+
BlockingProcessStreamReader.start("emulator", stream, BLOCK_UNTIL, logger).join();
114102
assertEquals("[emulator] log line 1" + System.lineSeparator() + "[emulator] log line 2",
115103
logger.getLogs().get(Level.INFO).iterator().next());
116-
thread.terminate();
117-
while (logger.getLogs().get(Level.FINE).isEmpty()) {
118-
Thread.sleep(200);
119-
}
120104
assertEquals("[emulator] log line 3", logger.getLogs().get(Level.FINE).iterator().next());
121105
stream.close();
122106
}

0 commit comments

Comments
 (0)