7070import com .google .logging .v2 .WriteLogEntriesResponse ;
7171import com .google .protobuf .Empty ;
7272import java .util .ArrayList ;
73- import java .util .Collections ;
74- import java .util .IdentityHashMap ;
7573import java .util .List ;
7674import java .util .Map ;
77- import java .util .Set ;
75+ import java .util .concurrent . ConcurrentHashMap ;
7876import java .util .concurrent .ExecutionException ;
7977import java .util .concurrent .TimeUnit ;
8078import java .util .concurrent .TimeoutException ;
@@ -83,9 +81,7 @@ class LoggingImpl extends BaseService<LoggingOptions> implements Logging {
8381
8482 private static final int FLUSH_WAIT_TIMEOUT_SECONDS = 6 ;
8583 private final LoggingRpc rpc ;
86- private final Object writeLock = new Object ();
87- private final Set <ApiFuture <Void >> pendingWrites =
88- Collections .newSetFromMap (new IdentityHashMap <ApiFuture <Void >, Boolean >());
84+ private final Map <Object , ApiFuture <Void >> pendingWrites = new ConcurrentHashMap <>();
8985
9086 private volatile Synchronicity writeSynchronicity = Synchronicity .ASYNC ;
9187 private volatile Severity flushSeverity = null ;
@@ -575,9 +571,7 @@ public void flush() {
575571 // BUG(1795): We should force batcher to issue RPC call for buffered messages,
576572 // so the code below doesn't wait uselessly.
577573 ArrayList <ApiFuture <Void >> writesToFlush = new ArrayList <>();
578- synchronized (writeLock ) {
579- writesToFlush .addAll (pendingWrites );
580- }
574+ writesToFlush .addAll (pendingWrites .values ());
581575
582576 try {
583577 ApiFutures .allAsList (writesToFlush ).get (FLUSH_WAIT_TIMEOUT_SECONDS , TimeUnit .SECONDS );
@@ -596,16 +590,13 @@ private void writeLogEntries(Iterable<LogEntry> logEntries, WriteOption... write
596590 case ASYNC :
597591 default :
598592 final ApiFuture <Void > writeFuture = writeAsync (logEntries , writeOptions );
599- synchronized (writeLock ) {
600- pendingWrites .add (writeFuture );
601- }
593+ final Object pendingKey = new Object ();
594+ pendingWrites .put (pendingKey , writeFuture );
602595 ApiFutures .addCallback (
603596 writeFuture ,
604597 new ApiFutureCallback <Void >() {
605598 private void removeFromPending () {
606- synchronized (writeLock ) {
607- pendingWrites .remove (writeFuture );
608- }
599+ pendingWrites .remove (pendingKey );
609600 }
610601
611602 @ Override
@@ -711,8 +702,6 @@ public void close() throws Exception {
711702
712703 @ VisibleForTesting
713704 int getNumPendingWrites () {
714- synchronized (writeLock ) {
715- return pendingWrites .size ();
716- }
705+ return pendingWrites .size ();
717706 }
718707}
0 commit comments