Skip to content

Commit d726f41

Browse files
pongadgarrettjonesgoogle
authored andcommitted
logging: make flush wait for writes (#1815)
This PR still isn't completely correct, since it does not force any RPC to immediately be issued. However, flush should now correctly wait for RPCs representing prior calls to publish to complete and any failures to be reported to ErrorManager before returning.
1 parent 9012428 commit d726f41

File tree

2 files changed

+97
-17
lines changed

2 files changed

+97
-17
lines changed

google-cloud-logging/src/main/java/com/google/cloud/logging/LoggingHandler.java

Lines changed: 59 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,19 @@
1818

1919
import static com.google.common.base.MoreObjects.firstNonNull;
2020

21+
import com.google.api.gax.core.ApiFuture;
22+
import com.google.api.gax.core.ApiFutureCallback;
23+
import com.google.api.gax.core.ApiFutures;
2124
import com.google.cloud.MonitoredResource;
2225
import com.google.cloud.logging.Logging.WriteOption;
23-
import com.google.api.gax.core.ApiFutures;
24-
import com.google.api.gax.core.ApiFutureCallback;
2526
import com.google.common.collect.ImmutableList;
2627
import com.google.common.collect.ImmutableMap;
28+
import com.google.common.util.concurrent.Uninterruptibles;
2729
import java.util.ArrayList;
2830
import java.util.Collections;
29-
import java.util.LinkedList;
31+
import java.util.IdentityHashMap;
3032
import java.util.List;
33+
import java.util.Set;
3134
import java.util.logging.ErrorManager;
3235
import java.util.logging.Filter;
3336
import java.util.logging.Formatter;
@@ -120,6 +123,10 @@ public class LoggingHandler extends Handler {
120123
// https://github.com/GoogleCloudPlatform/google-cloud-java/issues/1740 .
121124
private final Level baseLevel;
122125

126+
private final Object writeLock = new Object();
127+
private final Set<ApiFuture<Void>> pendingWrites =
128+
Collections.newSetFromMap(new IdentityHashMap<ApiFuture<Void>, Boolean>());
129+
123130
/**
124131
* Creates an handler that publishes messages to Stackdriver Logging.
125132
*/
@@ -376,6 +383,9 @@ public void publish(LogRecord record) {
376383
if (entry != null) {
377384
write(entry, writeOptions);
378385
}
386+
if (record.getLevel().intValue() >= flushLevel.intValue()) {
387+
flush();
388+
}
379389
} finally {
380390
inPublishCall.remove();
381391
}
@@ -457,28 +467,60 @@ void write(LogEntry entry, WriteOption... options) {
457467
reportError(null, ex, ErrorManager.FLUSH_FAILURE);
458468
}
459469
break;
470+
460471
case ASYNC:
461472
default:
462-
ApiFutures.addCallback(getLogging().writeAsync(entryList, options), new ApiFutureCallback<Void>() {
463-
@Override
464-
public void onSuccess(Void v) {}
465-
466-
@Override
467-
public void onFailure(Throwable t) {
468-
if (t instanceof Exception) {
469-
reportError(null, (Exception) t, ErrorManager.FLUSH_FAILURE);
470-
} else {
471-
reportError(null, new Exception(t), ErrorManager.FLUSH_FAILURE);
472-
}
473-
}
474-
});
473+
final ApiFuture<Void> writeFuture = getLogging().writeAsync(entryList, options);
474+
synchronized(writeLock) {
475+
pendingWrites.add(writeFuture);
476+
}
477+
ApiFutures.addCallback(
478+
writeFuture,
479+
new ApiFutureCallback<Void>() {
480+
private void removeFromPending() {
481+
synchronized(writeLock) {
482+
pendingWrites.remove(writeFuture);
483+
}
484+
}
485+
486+
@Override
487+
public void onSuccess(Void v) {
488+
removeFromPending();
489+
}
490+
491+
@Override
492+
public void onFailure(Throwable t) {
493+
try {
494+
if (t instanceof Exception) {
495+
reportError(null, (Exception) t, ErrorManager.FLUSH_FAILURE);
496+
} else {
497+
reportError(null, new Exception(t), ErrorManager.FLUSH_FAILURE);
498+
}
499+
} finally {
500+
removeFromPending();
501+
}
502+
}
503+
});
475504
break;
476505
}
477506
}
478507

479508
@Override
480509
public void flush() {
481-
// BUG(1795): flush is broken, need support from batching implementation.
510+
// BUG(1795): We should force batcher to issue RPC call for buffered messages,
511+
// so the code below doesn't wait uselessly.
512+
513+
ArrayList<ApiFuture<Void>> writesToFlush = new ArrayList<>();
514+
synchronized(writeLock) {
515+
writesToFlush.addAll(pendingWrites);
516+
}
517+
for (ApiFuture<Void> write : writesToFlush) {
518+
try {
519+
Uninterruptibles.getUninterruptibly(write);
520+
} catch (Exception e) {
521+
// Ignore exceptions, they are propagated to the error manager.
522+
}
523+
}
482524
}
483525

484526
/**

google-cloud-logging/src/test/java/com/google/cloud/logging/LoggingHandlerTest.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,11 @@
1818

1919
import static org.junit.Assert.assertEquals;
2020
import static org.junit.Assert.assertNotNull;
21+
import static org.junit.Assert.assertTrue;
22+
import static org.junit.Assert.assertFalse;
2123

2224
import com.google.api.gax.core.ApiFutures;
25+
import com.google.api.gax.core.SettableApiFuture;
2326
import com.google.cloud.MonitoredResource;
2427
import com.google.cloud.logging.LogEntry.Builder;
2528
import com.google.cloud.logging.Logging.WriteOption;
@@ -380,6 +383,41 @@ public void testFlushLevel() {
380383
handler.publish(newLogRecord(Level.WARNING, MESSAGE));
381384
}
382385

386+
@Test
387+
public void testFlush() throws InterruptedException {
388+
final SettableApiFuture<Void> mockRpc = SettableApiFuture.create();
389+
390+
EasyMock.expect(options.getProjectId()).andReturn(PROJECT).anyTimes();
391+
EasyMock.expect(options.getService()).andReturn(logging);
392+
logging.writeAsync(ImmutableList.of(INFO_ENTRY), DEFAULT_OPTIONS);
393+
EasyMock.expectLastCall().andReturn(mockRpc);
394+
EasyMock.replay(options, logging);
395+
final LoggingHandler handler = new LoggingHandler(LOG_NAME, options);
396+
handler.setFormatter(new TestFormatter());
397+
398+
// no messages, nothing to flush.
399+
handler.flush();
400+
401+
// send a message
402+
handler.publish(newLogRecord(Level.INFO, MESSAGE));
403+
Thread flushWaiter = new Thread(new Runnable() {
404+
@Override
405+
public void run() {
406+
handler.flush();
407+
}
408+
});
409+
flushWaiter.start();
410+
411+
// flushWaiter should be waiting for mockRpc to complete.
412+
flushWaiter.join(1000);
413+
assertTrue(flushWaiter.isAlive());
414+
415+
// With the RPC completed, flush should return, and the thread should terminate.
416+
mockRpc.set(null);
417+
flushWaiter.join(1000);
418+
assertFalse(flushWaiter.isAlive());
419+
}
420+
383421
@Test
384422
public void testSyncWrite() {
385423
EasyMock.expect(options.getProjectId()).andReturn(PROJECT).anyTimes();

0 commit comments

Comments
 (0)