Skip to content

Commit 82bc1b8

Browse files
authored
Get rid of all synchronized blocks and methods in production code (#1178)
JAVA-5105
1 parent 2fa9f49 commit 82bc1b8

File tree

9 files changed

+210
-116
lines changed

9 files changed

+210
-116
lines changed

bson/src/main/org/bson/codecs/pojo/LazyPropertyModelCodec.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727

2828
import java.util.ArrayList;
2929
import java.util.List;
30+
import java.util.concurrent.locks.Lock;
31+
import java.util.concurrent.locks.ReentrantLock;
3032

3133
import static java.lang.String.format;
3234
import static org.bson.codecs.pojo.PojoSpecializationHelper.specializeTypeData;
@@ -35,7 +37,7 @@ class LazyPropertyModelCodec<T> implements Codec<T> {
3537
private final PropertyModel<T> propertyModel;
3638
private final CodecRegistry registry;
3739
private final PropertyCodecRegistry propertyCodecRegistry;
38-
40+
private final Lock codecLock = new ReentrantLock();
3941
private volatile Codec<T> codec;
4042

4143
LazyPropertyModelCodec(final PropertyModel<T> propertyModel, final CodecRegistry registry,
@@ -61,11 +63,17 @@ public Class<T> getEncoderClass() {
6163
}
6264

6365
private Codec<T> getPropertyModelCodec() {
66+
Codec<T> codec = this.codec;
6467
if (codec == null) {
65-
synchronized (this) {
68+
codecLock.lock();
69+
try {
70+
codec = this.codec;
6671
if (codec == null) {
6772
codec = createCodec();
73+
this.codec = codec;
6874
}
75+
} finally {
76+
codecLock.unlock();
6977
}
7078
}
7179
return codec;

driver-core/src/main/com/mongodb/connection/netty/NettyStream.java

Lines changed: 31 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@
6464
import java.util.concurrent.CountDownLatch;
6565
import java.util.concurrent.Future;
6666
import java.util.concurrent.ScheduledFuture;
67+
import java.util.concurrent.locks.Lock;
68+
import java.util.concurrent.locks.ReentrantLock;
6769

6870
import static com.mongodb.assertions.Assertions.assertNotNull;
6971
import static com.mongodb.assertions.Assertions.isTrueArgument;
@@ -120,8 +122,8 @@ final class NettyStream implements Stream {
120122
private volatile Channel channel;
121123

122124
private final LinkedList<io.netty.buffer.ByteBuf> pendingInboundBuffers = new LinkedList<>();
123-
/* The fields pendingReader, pendingException are always written/read inside synchronized blocks
124-
* that use the same NettyStream object, so they can be plain.*/
125+
private final Lock lock = new ReentrantLock();
126+
// access to the fields `pendingReader`, `pendingException` is guarded by `lock`
125127
private PendingReader pendingReader;
126128
private Throwable pendingException;
127129
/* The fields readTimeoutTask, readTimeoutMillis are each written only in the ChannelInitializer.initChannel method
@@ -282,7 +284,8 @@ public void readAsync(final int numBytes, final AsyncCompletionHandler<ByteBuf>
282284
private void readAsync(final int numBytes, final AsyncCompletionHandler<ByteBuf> handler, final long readTimeoutMillis) {
283285
ByteBuf buffer = null;
284286
Throwable exceptionResult = null;
285-
synchronized (this) {
287+
lock.lock();
288+
try {
286289
exceptionResult = pendingException;
287290
if (exceptionResult == null) {
288291
if (!hasBytesAvailable(numBytes)) {
@@ -316,6 +319,8 @@ private void readAsync(final int numBytes, final AsyncCompletionHandler<ByteBuf>
316319
cancel(pendingReader.timeout);
317320
this.pendingReader = null;
318321
}
322+
} finally {
323+
lock.unlock();
319324
}
320325
if (exceptionResult != null) {
321326
handler.failed(exceptionResult);
@@ -338,13 +343,16 @@ private boolean hasBytesAvailable(final int numBytes) {
338343

339344
private void handleReadResponse(@Nullable final io.netty.buffer.ByteBuf buffer, @Nullable final Throwable t) {
340345
PendingReader localPendingReader = null;
341-
synchronized (this) {
346+
lock.lock();
347+
try {
342348
if (buffer != null) {
343349
pendingInboundBuffers.add(buffer.retain());
344350
} else {
345351
pendingException = t;
346352
}
347353
localPendingReader = pendingReader;
354+
} finally {
355+
lock.unlock();
348356
}
349357

350358
if (localPendingReader != null) {
@@ -359,16 +367,21 @@ public ServerAddress getAddress() {
359367
}
360368

361369
@Override
362-
public synchronized void close() {
363-
isClosed = true;
364-
if (channel != null) {
365-
channel.close();
366-
channel = null;
367-
}
368-
for (Iterator<io.netty.buffer.ByteBuf> iterator = pendingInboundBuffers.iterator(); iterator.hasNext();) {
369-
io.netty.buffer.ByteBuf nextByteBuf = iterator.next();
370-
iterator.remove();
371-
nextByteBuf.release();
370+
public void close() {
371+
lock.lock();
372+
try {
373+
isClosed = true;
374+
if (channel != null) {
375+
channel.close();
376+
channel = null;
377+
}
378+
for (Iterator<io.netty.buffer.ByteBuf> iterator = pendingInboundBuffers.iterator(); iterator.hasNext();) {
379+
io.netty.buffer.ByteBuf nextByteBuf = iterator.next();
380+
iterator.remove();
381+
nextByteBuf.release();
382+
}
383+
} finally {
384+
lock.unlock();
372385
}
373386
}
374387

@@ -504,7 +517,8 @@ private class OpenChannelFutureListener implements ChannelFutureListener {
504517

505518
@Override
506519
public void operationComplete(final ChannelFuture future) {
507-
synchronized (NettyStream.this) {
520+
lock.lock();
521+
try {
508522
if (future.isSuccess()) {
509523
if (isClosed) {
510524
channelFuture.channel().close();
@@ -522,6 +536,8 @@ public void operationComplete(final ChannelFuture future) {
522536
initializeChannel(handler, socketAddressQueue);
523537
}
524538
}
539+
} finally {
540+
lock.unlock();
525541
}
526542
}
527543
}

driver-core/src/main/com/mongodb/internal/authentication/AzureCredentialHelper.java

Lines changed: 38 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import java.util.HashMap;
2727
import java.util.Map;
2828
import java.util.Optional;
29+
import java.util.concurrent.locks.Lock;
30+
import java.util.concurrent.locks.ReentrantLock;
2931

3032
import static com.mongodb.internal.authentication.HttpHelper.getHttpContents;
3133

@@ -37,42 +39,52 @@
3739
public final class AzureCredentialHelper {
3840
private static final String ACCESS_TOKEN_FIELD = "access_token";
3941
private static final String EXPIRES_IN_FIELD = "expires_in";
42+
private static final Lock CACHED_ACCESS_TOKEN_LOCK = new ReentrantLock();
43+
private static volatile ExpirableValue<String> cachedAccessToken = ExpirableValue.expired();
4044

41-
private static ExpirableValue<String> cachedAccessToken = ExpirableValue.expired();
42-
43-
public static synchronized BsonDocument obtainFromEnvironment() {
45+
public static BsonDocument obtainFromEnvironment() {
4446
String accessToken;
4547
Optional<String> cachedValue = cachedAccessToken.getValue();
4648
if (cachedValue.isPresent()) {
4749
accessToken = cachedValue.get();
4850
} else {
49-
String endpoint = "http://" + "169.254.169.254:80"
50-
+ "/metadata/identity/oauth2/token?api-version=2018-02-01&resource=https://vault.azure.net";
51+
CACHED_ACCESS_TOKEN_LOCK.lock();
52+
try {
53+
cachedValue = cachedAccessToken.getValue();
54+
if (cachedValue.isPresent()) {
55+
accessToken = cachedValue.get();
56+
} else {
57+
String endpoint = "http://" + "169.254.169.254:80"
58+
+ "/metadata/identity/oauth2/token?api-version=2018-02-01&resource=https://vault.azure.net";
5159

52-
Map<String, String> headers = new HashMap<>();
53-
headers.put("Metadata", "true");
54-
headers.put("Accept", "application/json");
60+
Map<String, String> headers = new HashMap<>();
61+
headers.put("Metadata", "true");
62+
headers.put("Accept", "application/json");
5563

56-
long startNanoTime = System.nanoTime();
57-
BsonDocument responseDocument;
58-
try {
59-
responseDocument = BsonDocument.parse(getHttpContents("GET", endpoint, headers));
60-
} catch (JsonParseException e) {
61-
throw new MongoClientException("Exception parsing JSON from Azure IMDS metadata response.", e);
62-
}
64+
long startNanoTime = System.nanoTime();
65+
BsonDocument responseDocument;
66+
try {
67+
responseDocument = BsonDocument.parse(getHttpContents("GET", endpoint, headers));
68+
} catch (JsonParseException e) {
69+
throw new MongoClientException("Exception parsing JSON from Azure IMDS metadata response.", e);
70+
}
6371

64-
if (!responseDocument.isString(ACCESS_TOKEN_FIELD)) {
65-
throw new MongoClientException(String.format(
66-
"The %s field from Azure IMDS metadata response is missing or is not a string", ACCESS_TOKEN_FIELD));
67-
}
68-
if (!responseDocument.isString(EXPIRES_IN_FIELD)) {
69-
throw new MongoClientException(String.format(
70-
"The %s field from Azure IMDS metadata response is missing or is not a string", EXPIRES_IN_FIELD));
72+
if (!responseDocument.isString(ACCESS_TOKEN_FIELD)) {
73+
throw new MongoClientException(String.format(
74+
"The %s field from Azure IMDS metadata response is missing or is not a string", ACCESS_TOKEN_FIELD));
75+
}
76+
if (!responseDocument.isString(EXPIRES_IN_FIELD)) {
77+
throw new MongoClientException(String.format(
78+
"The %s field from Azure IMDS metadata response is missing or is not a string", EXPIRES_IN_FIELD));
79+
}
80+
accessToken = responseDocument.getString(ACCESS_TOKEN_FIELD).getValue();
81+
int expiresInSeconds = Integer.parseInt(responseDocument.getString(EXPIRES_IN_FIELD).getValue());
82+
cachedAccessToken = ExpirableValue.expirable(accessToken, Duration.ofSeconds(expiresInSeconds).minus(Duration.ofMinutes(1)),
83+
startNanoTime);
84+
}
85+
} finally {
86+
CACHED_ACCESS_TOKEN_LOCK.unlock();
7187
}
72-
accessToken = responseDocument.getString(ACCESS_TOKEN_FIELD).getValue();
73-
int expiresInSeconds = Integer.parseInt(responseDocument.getString(EXPIRES_IN_FIELD).getValue());
74-
cachedAccessToken = ExpirableValue.expirable(accessToken, Duration.ofSeconds(expiresInSeconds).minus(Duration.ofMinutes(1)),
75-
startNanoTime);
7688
}
7789
return new BsonDocument("accessToken", new BsonString(accessToken));
7890
}

driver-core/src/main/com/mongodb/internal/connection/AsynchronousChannelStream.java

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
import java.util.concurrent.CountDownLatch;
3838
import java.util.concurrent.atomic.AtomicReference;
3939

40-
import static com.mongodb.assertions.Assertions.isTrue;
40+
import static com.mongodb.assertions.Assertions.assertTrue;
4141
import static java.util.concurrent.TimeUnit.MILLISECONDS;
4242

4343
/**
@@ -47,14 +47,16 @@ public abstract class AsynchronousChannelStream implements Stream {
4747
private final ServerAddress serverAddress;
4848
private final SocketSettings settings;
4949
private final PowerOfTwoBufferPool bufferProvider;
50-
private volatile ExtendedAsynchronousByteChannel channel;
50+
// we use `AtomicReference` to guarantee that we do not call `ExtendedAsynchronousByteChannel.close` concurrently with itself
51+
private final AtomicReference<ExtendedAsynchronousByteChannel> channel;
5152
private volatile boolean isClosed;
5253

5354
public AsynchronousChannelStream(final ServerAddress serverAddress, final SocketSettings settings,
5455
final PowerOfTwoBufferPool bufferProvider) {
5556
this.serverAddress = serverAddress;
5657
this.settings = settings;
5758
this.bufferProvider = bufferProvider;
59+
channel = new AtomicReference<>();
5860
}
5961

6062
public ServerAddress getServerAddress() {
@@ -69,16 +71,18 @@ public PowerOfTwoBufferPool getBufferProvider() {
6971
return bufferProvider;
7072
}
7173

72-
public synchronized ExtendedAsynchronousByteChannel getChannel() {
73-
return channel;
74+
public ExtendedAsynchronousByteChannel getChannel() {
75+
return channel.get();
7476
}
7577

76-
protected synchronized void setChannel(final ExtendedAsynchronousByteChannel channel) {
77-
isTrue("current channel is null", this.channel == null);
78+
protected void setChannel(final ExtendedAsynchronousByteChannel channel) {
7879
if (isClosed) {
7980
closeChannel(channel);
8081
} else {
81-
this.channel = channel;
82+
assertTrue(this.channel.compareAndSet(null, channel));
83+
if (isClosed) {
84+
closeChannel(this.channel.getAndSet(null));
85+
}
8286
}
8387
}
8488

@@ -116,7 +120,7 @@ private void readAsync(final int numBytes, final int additionalTimeout, final As
116120
timeout += additionalTimeout;
117121
}
118122

119-
channel.read(buffer.asNIO(), timeout, MILLISECONDS, null, new BasicCompletionHandler(buffer, handler));
123+
getChannel().read(buffer.asNIO(), timeout, MILLISECONDS, null, new BasicCompletionHandler(buffer, handler));
120124
}
121125

122126
@Override
@@ -158,16 +162,12 @@ public ServerAddress getAddress() {
158162
}
159163

160164
@Override
161-
public synchronized void close() {
165+
public void close() {
162166
isClosed = true;
163-
try {
164-
closeChannel(channel);
165-
} finally {
166-
channel = null;
167-
}
167+
closeChannel(this.channel.getAndSet(null));
168168
}
169169

170-
private void closeChannel(final ExtendedAsynchronousByteChannel channel) {
170+
private void closeChannel(@Nullable final ExtendedAsynchronousByteChannel channel) {
171171
try {
172172
if (channel != null) {
173173
channel.close();
@@ -208,7 +208,7 @@ public void failed(final Throwable t) {
208208

209209
private class AsyncWritableByteChannelAdapter {
210210
void write(final ByteBuffer src, final AsyncCompletionHandler<Void> handler) {
211-
channel.write(src, null, new AsyncWritableByteChannelAdapter.WriteCompletionHandler(handler));
211+
getChannel().write(src, null, new AsyncWritableByteChannelAdapter.WriteCompletionHandler(handler));
212212
}
213213

214214
private class WriteCompletionHandler extends BaseCompletionHandler<Void, Integer, Object> {
@@ -250,7 +250,7 @@ public void completed(final Integer result, final Void attachment) {
250250
localByteBuf.flip();
251251
localHandler.completed(localByteBuf);
252252
} else {
253-
channel.read(localByteBuf.asNIO(), settings.getReadTimeout(MILLISECONDS), MILLISECONDS, null,
253+
getChannel().read(localByteBuf.asNIO(), settings.getReadTimeout(MILLISECONDS), MILLISECONDS, null,
254254
new BasicCompletionHandler(localByteBuf, localHandler));
255255
}
256256
}

driver-core/src/main/com/mongodb/internal/connection/DefaultServerMonitor.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -245,9 +245,12 @@ private ServerDescription lookupServerDescription(final ServerDescription curren
245245
} catch (Throwable t) {
246246
averageRoundTripTime.reset();
247247
InternalConnection localConnection;
248-
synchronized (this) {
248+
lock.lock();
249+
try {
249250
localConnection = connection;
250251
connection = null;
252+
} finally {
253+
lock.unlock();
251254
}
252255
if (localConnection != null) {
253256
localConnection.close();
@@ -311,11 +314,14 @@ private long waitForSignalOrTimeout() throws InterruptedException {
311314

312315
public void cancelCurrentCheck() {
313316
InternalConnection localConnection = null;
314-
synchronized (this) {
317+
lock.lock();
318+
try {
315319
if (connection != null && !currentCheckCancelled) {
316320
localConnection = connection;
317321
currentCheckCancelled = true;
318322
}
323+
} finally {
324+
lock.unlock();
319325
}
320326
if (localConnection != null) {
321327
localConnection.close();

0 commit comments

Comments
 (0)