Skip to content

Commit 48992f4

Browse files
committed
Get rid of all synchronized blocks and methods in production code
JAVA-5105
1 parent 40e2d1f commit 48992f4

File tree

9 files changed

+201
-119
lines changed

9 files changed

+201
-119
lines changed

bson/src/main/org/bson/codecs/pojo/LazyPropertyModelCodec.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727

2828
import java.util.ArrayList;
2929
import java.util.List;
30+
import java.util.concurrent.locks.Lock;
31+
import java.util.concurrent.locks.ReentrantLock;
3032

3133
import static java.lang.String.format;
3234
import static org.bson.codecs.pojo.PojoSpecializationHelper.specializeTypeData;
@@ -35,7 +37,7 @@ class LazyPropertyModelCodec<T> implements Codec<T> {
3537
private final PropertyModel<T> propertyModel;
3638
private final CodecRegistry registry;
3739
private final PropertyCodecRegistry propertyCodecRegistry;
38-
40+
private final Lock codecLock = new ReentrantLock();
3941
private volatile Codec<T> codec;
4042

4143
LazyPropertyModelCodec(final PropertyModel<T> propertyModel, final CodecRegistry registry,
@@ -61,11 +63,17 @@ public Class<T> getEncoderClass() {
6163
}
6264

6365
private Codec<T> getPropertyModelCodec() {
66+
Codec<T> codec = this.codec;
6467
if (codec == null) {
65-
synchronized (this) {
68+
codecLock.lock();
69+
try {
70+
codec = this.codec;
6671
if (codec == null) {
6772
codec = createCodec();
73+
this.codec = codec;
6874
}
75+
} finally {
76+
codecLock.unlock();
6977
}
7078
}
7179
return codec;

driver-core/src/main/com/mongodb/connection/netty/NettyStream.java

Lines changed: 31 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@
6464
import java.util.concurrent.CountDownLatch;
6565
import java.util.concurrent.Future;
6666
import java.util.concurrent.ScheduledFuture;
67+
import java.util.concurrent.locks.Lock;
68+
import java.util.concurrent.locks.ReentrantLock;
6769

6870
import static com.mongodb.assertions.Assertions.assertNotNull;
6971
import static com.mongodb.assertions.Assertions.isTrueArgument;
@@ -120,8 +122,8 @@ final class NettyStream implements Stream {
120122
private volatile Channel channel;
121123

122124
private final LinkedList<io.netty.buffer.ByteBuf> pendingInboundBuffers = new LinkedList<>();
123-
/* The fields pendingReader, pendingException are always written/read inside synchronized blocks
124-
* that use the same NettyStream object, so they can be plain.*/
125+
private final Lock lock = new ReentrantLock();
126+
// access to the fields `pendingReader`, `pendingException` is guarded by `lock`
125127
private PendingReader pendingReader;
126128
private Throwable pendingException;
127129
/* The fields readTimeoutTask, readTimeoutMillis are each written only in the ChannelInitializer.initChannel method
@@ -282,7 +284,8 @@ public void readAsync(final int numBytes, final AsyncCompletionHandler<ByteBuf>
282284
private void readAsync(final int numBytes, final AsyncCompletionHandler<ByteBuf> handler, final long readTimeoutMillis) {
283285
ByteBuf buffer = null;
284286
Throwable exceptionResult = null;
285-
synchronized (this) {
287+
lock.lock();
288+
try {
286289
exceptionResult = pendingException;
287290
if (exceptionResult == null) {
288291
if (!hasBytesAvailable(numBytes)) {
@@ -316,6 +319,8 @@ private void readAsync(final int numBytes, final AsyncCompletionHandler<ByteBuf>
316319
cancel(pendingReader.timeout);
317320
this.pendingReader = null;
318321
}
322+
} finally {
323+
lock.unlock();
319324
}
320325
if (exceptionResult != null) {
321326
handler.failed(exceptionResult);
@@ -338,13 +343,16 @@ private boolean hasBytesAvailable(final int numBytes) {
338343

339344
private void handleReadResponse(@Nullable final io.netty.buffer.ByteBuf buffer, @Nullable final Throwable t) {
340345
PendingReader localPendingReader = null;
341-
synchronized (this) {
346+
lock.lock();
347+
try {
342348
if (buffer != null) {
343349
pendingInboundBuffers.add(buffer.retain());
344350
} else {
345351
pendingException = t;
346352
}
347353
localPendingReader = pendingReader;
354+
} finally {
355+
lock.unlock();
348356
}
349357

350358
if (localPendingReader != null) {
@@ -359,16 +367,21 @@ public ServerAddress getAddress() {
359367
}
360368

361369
@Override
362-
public synchronized void close() {
363-
isClosed = true;
364-
if (channel != null) {
365-
channel.close();
366-
channel = null;
367-
}
368-
for (Iterator<io.netty.buffer.ByteBuf> iterator = pendingInboundBuffers.iterator(); iterator.hasNext();) {
369-
io.netty.buffer.ByteBuf nextByteBuf = iterator.next();
370-
iterator.remove();
371-
nextByteBuf.release();
370+
public void close() {
371+
lock.lock();
372+
try {
373+
isClosed = true;
374+
if (channel != null) {
375+
channel.close();
376+
channel = null;
377+
}
378+
for (Iterator<io.netty.buffer.ByteBuf> iterator = pendingInboundBuffers.iterator(); iterator.hasNext();) {
379+
io.netty.buffer.ByteBuf nextByteBuf = iterator.next();
380+
iterator.remove();
381+
nextByteBuf.release();
382+
}
383+
} finally {
384+
lock.unlock();
372385
}
373386
}
374387

@@ -504,7 +517,8 @@ private class OpenChannelFutureListener implements ChannelFutureListener {
504517

505518
@Override
506519
public void operationComplete(final ChannelFuture future) {
507-
synchronized (NettyStream.this) {
520+
lock.lock();
521+
try {
508522
if (future.isSuccess()) {
509523
if (isClosed) {
510524
channelFuture.channel().close();
@@ -522,6 +536,8 @@ public void operationComplete(final ChannelFuture future) {
522536
initializeChannel(handler, socketAddressQueue);
523537
}
524538
}
539+
} finally {
540+
lock.unlock();
525541
}
526542
}
527543
}

driver-core/src/main/com/mongodb/internal/authentication/AzureCredentialHelper.java

Lines changed: 38 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import java.util.HashMap;
2727
import java.util.Map;
2828
import java.util.Optional;
29+
import java.util.concurrent.locks.Lock;
30+
import java.util.concurrent.locks.ReentrantLock;
2931

3032
import static com.mongodb.internal.authentication.HttpHelper.getHttpContents;
3133

@@ -37,42 +39,52 @@
3739
public final class AzureCredentialHelper {
3840
private static final String ACCESS_TOKEN_FIELD = "access_token";
3941
private static final String EXPIRES_IN_FIELD = "expires_in";
42+
private static final Lock CACHED_ACCESS_TOKEN_LOCK = new ReentrantLock();
43+
private static volatile ExpirableValue<String> cachedAccessToken = ExpirableValue.expired();
4044

41-
private static ExpirableValue<String> cachedAccessToken = ExpirableValue.expired();
42-
43-
public static synchronized BsonDocument obtainFromEnvironment() {
45+
public static BsonDocument obtainFromEnvironment() {
4446
String accessToken;
4547
Optional<String> cachedValue = cachedAccessToken.getValue();
4648
if (cachedValue.isPresent()) {
4749
accessToken = cachedValue.get();
4850
} else {
49-
String endpoint = "http://" + "169.254.169.254:80"
50-
+ "/metadata/identity/oauth2/token?api-version=2018-02-01&resource=https://vault.azure.net";
51+
CACHED_ACCESS_TOKEN_LOCK.lock();
52+
try {
53+
cachedValue = cachedAccessToken.getValue();
54+
if (cachedValue.isPresent()) {
55+
accessToken = cachedValue.get();
56+
} else {
57+
String endpoint = "http://" + "169.254.169.254:80"
58+
+ "/metadata/identity/oauth2/token?api-version=2018-02-01&resource=https://vault.azure.net";
5159

52-
Map<String, String> headers = new HashMap<>();
53-
headers.put("Metadata", "true");
54-
headers.put("Accept", "application/json");
60+
Map<String, String> headers = new HashMap<>();
61+
headers.put("Metadata", "true");
62+
headers.put("Accept", "application/json");
5563

56-
long startNanoTime = System.nanoTime();
57-
BsonDocument responseDocument;
58-
try {
59-
responseDocument = BsonDocument.parse(getHttpContents("GET", endpoint, headers));
60-
} catch (JsonParseException e) {
61-
throw new MongoClientException("Exception parsing JSON from Azure IMDS metadata response.", e);
62-
}
64+
long startNanoTime = System.nanoTime();
65+
BsonDocument responseDocument;
66+
try {
67+
responseDocument = BsonDocument.parse(getHttpContents("GET", endpoint, headers));
68+
} catch (JsonParseException e) {
69+
throw new MongoClientException("Exception parsing JSON from Azure IMDS metadata response.", e);
70+
}
6371

64-
if (!responseDocument.isString(ACCESS_TOKEN_FIELD)) {
65-
throw new MongoClientException(String.format(
66-
"The %s field from Azure IMDS metadata response is missing or is not a string", ACCESS_TOKEN_FIELD));
67-
}
68-
if (!responseDocument.isString(EXPIRES_IN_FIELD)) {
69-
throw new MongoClientException(String.format(
70-
"The %s field from Azure IMDS metadata response is missing or is not a string", EXPIRES_IN_FIELD));
72+
if (!responseDocument.isString(ACCESS_TOKEN_FIELD)) {
73+
throw new MongoClientException(String.format(
74+
"The %s field from Azure IMDS metadata response is missing or is not a string", ACCESS_TOKEN_FIELD));
75+
}
76+
if (!responseDocument.isString(EXPIRES_IN_FIELD)) {
77+
throw new MongoClientException(String.format(
78+
"The %s field from Azure IMDS metadata response is missing or is not a string", EXPIRES_IN_FIELD));
79+
}
80+
accessToken = responseDocument.getString(ACCESS_TOKEN_FIELD).getValue();
81+
int expiresInSeconds = Integer.parseInt(responseDocument.getString(EXPIRES_IN_FIELD).getValue());
82+
cachedAccessToken = ExpirableValue.expirable(accessToken, Duration.ofSeconds(expiresInSeconds).minus(Duration.ofMinutes(1)),
83+
startNanoTime);
84+
}
85+
} finally {
86+
CACHED_ACCESS_TOKEN_LOCK.unlock();
7187
}
72-
accessToken = responseDocument.getString(ACCESS_TOKEN_FIELD).getValue();
73-
int expiresInSeconds = Integer.parseInt(responseDocument.getString(EXPIRES_IN_FIELD).getValue());
74-
cachedAccessToken = ExpirableValue.expirable(accessToken, Duration.ofSeconds(expiresInSeconds).minus(Duration.ofMinutes(1)),
75-
startNanoTime);
7688
}
7789
return new BsonDocument("accessToken", new BsonString(accessToken));
7890
}

driver-core/src/main/com/mongodb/internal/connection/AsynchronousChannelStream.java

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -69,16 +69,20 @@ public PowerOfTwoBufferPool getBufferProvider() {
6969
return bufferProvider;
7070
}
7171

72-
public synchronized ExtendedAsynchronousByteChannel getChannel() {
72+
public ExtendedAsynchronousByteChannel getChannel() {
7373
return channel;
7474
}
7575

76-
protected synchronized void setChannel(final ExtendedAsynchronousByteChannel channel) {
76+
protected void setChannel(final ExtendedAsynchronousByteChannel channel) {
7777
isTrue("current channel is null", this.channel == null);
7878
if (isClosed) {
7979
closeChannel(channel);
8080
} else {
8181
this.channel = channel;
82+
if (isClosed) {
83+
this.channel = null;
84+
closeChannel(channel);
85+
}
8286
}
8387
}
8488

@@ -158,16 +162,14 @@ public ServerAddress getAddress() {
158162
}
159163

160164
@Override
161-
public synchronized void close() {
165+
public void close() {
162166
isClosed = true;
163-
try {
164-
closeChannel(channel);
165-
} finally {
166-
channel = null;
167-
}
167+
ExtendedAsynchronousByteChannel channel = this.channel;
168+
this.channel = null;
169+
closeChannel(channel);
168170
}
169171

170-
private void closeChannel(final ExtendedAsynchronousByteChannel channel) {
172+
private void closeChannel(@Nullable final ExtendedAsynchronousByteChannel channel) {
171173
try {
172174
if (channel != null) {
173175
channel.close();

driver-core/src/main/com/mongodb/internal/connection/DefaultServerMonitor.java

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -244,11 +244,8 @@ private ServerDescription lookupServerDescription(final ServerDescription curren
244244
}
245245
} catch (Throwable t) {
246246
averageRoundTripTime.reset();
247-
InternalConnection localConnection;
248-
synchronized (this) {
249-
localConnection = connection;
250-
connection = null;
251-
}
247+
InternalConnection localConnection = connection;
248+
connection = null;
252249
if (localConnection != null) {
253250
localConnection.close();
254251
}
@@ -310,14 +307,9 @@ private long waitForSignalOrTimeout() throws InterruptedException {
310307
}
311308

312309
public void cancelCurrentCheck() {
313-
InternalConnection localConnection = null;
314-
synchronized (this) {
315-
if (connection != null && !currentCheckCancelled) {
316-
localConnection = connection;
317-
currentCheckCancelled = true;
318-
}
319-
}
320-
if (localConnection != null) {
310+
InternalConnection localConnection = connection;
311+
if (localConnection != null && !currentCheckCancelled) {
312+
currentCheckCancelled = true;
321313
localConnection.close();
322314
}
323315
}

driver-core/src/main/com/mongodb/internal/connection/ExponentiallyWeightedMovingAverage.java

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,32 +16,37 @@
1616

1717
package com.mongodb.internal.connection;
1818

19+
import java.util.concurrent.atomic.AtomicLong;
20+
1921
import static com.mongodb.assertions.Assertions.isTrueArgument;
2022

2123
class ExponentiallyWeightedMovingAverage {
24+
private static final long EMPTY = -1;
25+
2226
private final double alpha;
23-
private long average = -1;
27+
private final AtomicLong average;
2428

2529
ExponentiallyWeightedMovingAverage(final double alpha) {
2630
isTrueArgument("alpha >= 0.0 and <= 1.0", alpha >= 0.0 && alpha <= 1.0);
2731
this.alpha = alpha;
32+
average = new AtomicLong(EMPTY);
2833
}
2934

30-
synchronized void reset() {
31-
average = -1;
35+
void reset() {
36+
average.set(EMPTY);
3237
}
3338

34-
synchronized long addSample(final long sample) {
35-
if (average == -1) {
36-
average = sample;
37-
} else {
38-
average = (long) (alpha * sample + (1 - alpha) * average);
39-
}
40-
41-
return average;
39+
long addSample(final long sample) {
40+
return average.accumulateAndGet(sample, (average, givenSample) -> {
41+
if (average == EMPTY) {
42+
return givenSample;
43+
}
44+
return (long) (alpha * givenSample + (1 - alpha) * average);
45+
});
4246
}
4347

44-
synchronized long getAverage() {
45-
return average == -1 ? 0 : average;
48+
long getAverage() {
49+
long average = this.average.get();
50+
return average == EMPTY ? 0 : average;
4651
}
4752
}

0 commit comments

Comments
 (0)