Skip to content

Get rid of all synchronized blocks and methods in production code #1178

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Aug 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions bson/src/main/org/bson/codecs/pojo/LazyPropertyModelCodec.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import static java.lang.String.format;
import static org.bson.codecs.pojo.PojoSpecializationHelper.specializeTypeData;
Expand All @@ -35,7 +37,7 @@ class LazyPropertyModelCodec<T> implements Codec<T> {
private final PropertyModel<T> propertyModel;
private final CodecRegistry registry;
private final PropertyCodecRegistry propertyCodecRegistry;

private final Lock codecLock = new ReentrantLock();
private volatile Codec<T> codec;

LazyPropertyModelCodec(final PropertyModel<T> propertyModel, final CodecRegistry registry,
Expand All @@ -61,11 +63,17 @@ public Class<T> getEncoderClass() {
}

private Codec<T> getPropertyModelCodec() {
Codec<T> codec = this.codec;
if (codec == null) {
synchronized (this) {
codecLock.lock();
try {
codec = this.codec;
if (codec == null) {
codec = createCodec();
this.codec = codec;
}
} finally {
codecLock.unlock();
}
}
return codec;
Expand Down
46 changes: 31 additions & 15 deletions driver-core/src/main/com/mongodb/connection/netty/NettyStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import static com.mongodb.assertions.Assertions.assertNotNull;
import static com.mongodb.assertions.Assertions.isTrueArgument;
Expand Down Expand Up @@ -120,8 +122,8 @@ final class NettyStream implements Stream {
private volatile Channel channel;

private final LinkedList<io.netty.buffer.ByteBuf> pendingInboundBuffers = new LinkedList<>();
/* The fields pendingReader, pendingException are always written/read inside synchronized blocks
* that use the same NettyStream object, so they can be plain.*/
private final Lock lock = new ReentrantLock();
// access to the fields `pendingReader`, `pendingException` is guarded by `lock`
private PendingReader pendingReader;
private Throwable pendingException;
/* The fields readTimeoutTask, readTimeoutMillis are each written only in the ChannelInitializer.initChannel method
Expand Down Expand Up @@ -282,7 +284,8 @@ public void readAsync(final int numBytes, final AsyncCompletionHandler<ByteBuf>
private void readAsync(final int numBytes, final AsyncCompletionHandler<ByteBuf> handler, final long readTimeoutMillis) {
ByteBuf buffer = null;
Throwable exceptionResult = null;
synchronized (this) {
lock.lock();
try {
exceptionResult = pendingException;
if (exceptionResult == null) {
if (!hasBytesAvailable(numBytes)) {
Expand Down Expand Up @@ -316,6 +319,8 @@ private void readAsync(final int numBytes, final AsyncCompletionHandler<ByteBuf>
cancel(pendingReader.timeout);
this.pendingReader = null;
}
} finally {
lock.unlock();
}
if (exceptionResult != null) {
handler.failed(exceptionResult);
Expand All @@ -338,13 +343,16 @@ private boolean hasBytesAvailable(final int numBytes) {

private void handleReadResponse(@Nullable final io.netty.buffer.ByteBuf buffer, @Nullable final Throwable t) {
PendingReader localPendingReader = null;
synchronized (this) {
lock.lock();
try {
if (buffer != null) {
pendingInboundBuffers.add(buffer.retain());
} else {
pendingException = t;
}
localPendingReader = pendingReader;
} finally {
lock.unlock();
}

if (localPendingReader != null) {
Expand All @@ -359,16 +367,21 @@ public ServerAddress getAddress() {
}

@Override
public synchronized void close() {
isClosed = true;
if (channel != null) {
channel.close();
channel = null;
}
for (Iterator<io.netty.buffer.ByteBuf> iterator = pendingInboundBuffers.iterator(); iterator.hasNext();) {
io.netty.buffer.ByteBuf nextByteBuf = iterator.next();
iterator.remove();
nextByteBuf.release();
public void close() {
lock.lock();
try {
isClosed = true;
if (channel != null) {
channel.close();
channel = null;
}
for (Iterator<io.netty.buffer.ByteBuf> iterator = pendingInboundBuffers.iterator(); iterator.hasNext();) {
io.netty.buffer.ByteBuf nextByteBuf = iterator.next();
iterator.remove();
nextByteBuf.release();
}
} finally {
lock.unlock();
}
}

Expand Down Expand Up @@ -504,7 +517,8 @@ private class OpenChannelFutureListener implements ChannelFutureListener {

@Override
public void operationComplete(final ChannelFuture future) {
synchronized (NettyStream.this) {
lock.lock();
try {
if (future.isSuccess()) {
if (isClosed) {
channelFuture.channel().close();
Expand All @@ -522,6 +536,8 @@ public void operationComplete(final ChannelFuture future) {
initializeChannel(handler, socketAddressQueue);
}
}
} finally {
lock.unlock();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import static com.mongodb.internal.authentication.HttpHelper.getHttpContents;

Expand All @@ -37,42 +39,52 @@
public final class AzureCredentialHelper {
private static final String ACCESS_TOKEN_FIELD = "access_token";
private static final String EXPIRES_IN_FIELD = "expires_in";
private static final Lock CACHED_ACCESS_TOKEN_LOCK = new ReentrantLock();
private static volatile ExpirableValue<String> cachedAccessToken = ExpirableValue.expired();

private static ExpirableValue<String> cachedAccessToken = ExpirableValue.expired();

public static synchronized BsonDocument obtainFromEnvironment() {
public static BsonDocument obtainFromEnvironment() {
String accessToken;
Optional<String> cachedValue = cachedAccessToken.getValue();
if (cachedValue.isPresent()) {
accessToken = cachedValue.get();
} else {
String endpoint = "http://" + "169.254.169.254:80"
+ "/metadata/identity/oauth2/token?api-version=2018-02-01&resource=https://vault.azure.net";
CACHED_ACCESS_TOKEN_LOCK.lock();
try {
cachedValue = cachedAccessToken.getValue();
if (cachedValue.isPresent()) {
accessToken = cachedValue.get();
} else {
String endpoint = "http://" + "169.254.169.254:80"
+ "/metadata/identity/oauth2/token?api-version=2018-02-01&resource=https://vault.azure.net";

Map<String, String> headers = new HashMap<>();
headers.put("Metadata", "true");
headers.put("Accept", "application/json");
Map<String, String> headers = new HashMap<>();
headers.put("Metadata", "true");
headers.put("Accept", "application/json");

long startNanoTime = System.nanoTime();
BsonDocument responseDocument;
try {
responseDocument = BsonDocument.parse(getHttpContents("GET", endpoint, headers));
} catch (JsonParseException e) {
throw new MongoClientException("Exception parsing JSON from Azure IMDS metadata response.", e);
}
long startNanoTime = System.nanoTime();
BsonDocument responseDocument;
try {
responseDocument = BsonDocument.parse(getHttpContents("GET", endpoint, headers));
} catch (JsonParseException e) {
throw new MongoClientException("Exception parsing JSON from Azure IMDS metadata response.", e);
}

if (!responseDocument.isString(ACCESS_TOKEN_FIELD)) {
throw new MongoClientException(String.format(
"The %s field from Azure IMDS metadata response is missing or is not a string", ACCESS_TOKEN_FIELD));
}
if (!responseDocument.isString(EXPIRES_IN_FIELD)) {
throw new MongoClientException(String.format(
"The %s field from Azure IMDS metadata response is missing or is not a string", EXPIRES_IN_FIELD));
if (!responseDocument.isString(ACCESS_TOKEN_FIELD)) {
throw new MongoClientException(String.format(
"The %s field from Azure IMDS metadata response is missing or is not a string", ACCESS_TOKEN_FIELD));
}
if (!responseDocument.isString(EXPIRES_IN_FIELD)) {
throw new MongoClientException(String.format(
"The %s field from Azure IMDS metadata response is missing or is not a string", EXPIRES_IN_FIELD));
}
accessToken = responseDocument.getString(ACCESS_TOKEN_FIELD).getValue();
int expiresInSeconds = Integer.parseInt(responseDocument.getString(EXPIRES_IN_FIELD).getValue());
cachedAccessToken = ExpirableValue.expirable(accessToken, Duration.ofSeconds(expiresInSeconds).minus(Duration.ofMinutes(1)),
startNanoTime);
}
} finally {
CACHED_ACCESS_TOKEN_LOCK.unlock();
}
accessToken = responseDocument.getString(ACCESS_TOKEN_FIELD).getValue();
int expiresInSeconds = Integer.parseInt(responseDocument.getString(EXPIRES_IN_FIELD).getValue());
cachedAccessToken = ExpirableValue.expirable(accessToken, Duration.ofSeconds(expiresInSeconds).minus(Duration.ofMinutes(1)),
startNanoTime);
}
return new BsonDocument("accessToken", new BsonString(accessToken));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;

import static com.mongodb.assertions.Assertions.isTrue;
import static com.mongodb.assertions.Assertions.assertTrue;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

/**
Expand All @@ -47,14 +47,16 @@ public abstract class AsynchronousChannelStream implements Stream {
private final ServerAddress serverAddress;
private final SocketSettings settings;
private final PowerOfTwoBufferPool bufferProvider;
private volatile ExtendedAsynchronousByteChannel channel;
// we use `AtomicReference` to guarantee that we do not call `ExtendedAsynchronousByteChannel.close` concurrently with itself
private final AtomicReference<ExtendedAsynchronousByteChannel> channel;
private volatile boolean isClosed;

public AsynchronousChannelStream(final ServerAddress serverAddress, final SocketSettings settings,
final PowerOfTwoBufferPool bufferProvider) {
this.serverAddress = serverAddress;
this.settings = settings;
this.bufferProvider = bufferProvider;
channel = new AtomicReference<>();
}

public ServerAddress getServerAddress() {
Expand All @@ -69,16 +71,18 @@ public PowerOfTwoBufferPool getBufferProvider() {
return bufferProvider;
}

public synchronized ExtendedAsynchronousByteChannel getChannel() {
return channel;
public ExtendedAsynchronousByteChannel getChannel() {
return channel.get();
}

protected synchronized void setChannel(final ExtendedAsynchronousByteChannel channel) {
isTrue("current channel is null", this.channel == null);
protected void setChannel(final ExtendedAsynchronousByteChannel channel) {
if (isClosed) {
closeChannel(channel);
} else {
this.channel = channel;
assertTrue(this.channel.compareAndSet(null, channel));
if (isClosed) {
closeChannel(this.channel.getAndSet(null));
}
}
}

Expand Down Expand Up @@ -116,7 +120,7 @@ private void readAsync(final int numBytes, final int additionalTimeout, final As
timeout += additionalTimeout;
}

channel.read(buffer.asNIO(), timeout, MILLISECONDS, null, new BasicCompletionHandler(buffer, handler));
getChannel().read(buffer.asNIO(), timeout, MILLISECONDS, null, new BasicCompletionHandler(buffer, handler));
}

@Override
Expand Down Expand Up @@ -158,16 +162,12 @@ public ServerAddress getAddress() {
}

@Override
public synchronized void close() {
public void close() {
isClosed = true;
try {
closeChannel(channel);
} finally {
channel = null;
}
closeChannel(this.channel.getAndSet(null));
}

private void closeChannel(final ExtendedAsynchronousByteChannel channel) {
private void closeChannel(@Nullable final ExtendedAsynchronousByteChannel channel) {
try {
if (channel != null) {
channel.close();
Expand Down Expand Up @@ -208,7 +208,7 @@ public void failed(final Throwable t) {

private class AsyncWritableByteChannelAdapter {
void write(final ByteBuffer src, final AsyncCompletionHandler<Void> handler) {
channel.write(src, null, new AsyncWritableByteChannelAdapter.WriteCompletionHandler(handler));
getChannel().write(src, null, new AsyncWritableByteChannelAdapter.WriteCompletionHandler(handler));
}

private class WriteCompletionHandler extends BaseCompletionHandler<Void, Integer, Object> {
Expand Down Expand Up @@ -250,7 +250,7 @@ public void completed(final Integer result, final Void attachment) {
localByteBuf.flip();
localHandler.completed(localByteBuf);
} else {
channel.read(localByteBuf.asNIO(), settings.getReadTimeout(MILLISECONDS), MILLISECONDS, null,
getChannel().read(localByteBuf.asNIO(), settings.getReadTimeout(MILLISECONDS), MILLISECONDS, null,
new BasicCompletionHandler(localByteBuf, localHandler));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,9 +245,12 @@ private ServerDescription lookupServerDescription(final ServerDescription curren
} catch (Throwable t) {
averageRoundTripTime.reset();
InternalConnection localConnection;
synchronized (this) {
lock.lock();
try {
localConnection = connection;
connection = null;
} finally {
lock.unlock();
}
if (localConnection != null) {
localConnection.close();
Expand Down Expand Up @@ -311,11 +314,14 @@ private long waitForSignalOrTimeout() throws InterruptedException {

public void cancelCurrentCheck() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again this looks like it was purposely defensive - any ideas why?

Could cancelCurrentCheck and other usages of connection be interweaved - hence requiring the synchronization?

Copy link
Member Author

@stIncMale stIncMale Aug 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again this looks like it was purposely defensive - any ideas why?

I can't reconstruct the ideas of why synchronized was added because, as far as I can see, it achieves nothing.

Could cancelCurrentCheck and other usages of connection be interweaved - hence requiring the synchronization?

The only thing that matters for this PR is whether synchronized, as it is in the code, gives us guarantees that we don't have without it. As far as I can see, it does not. If there is any code that must not be run concurrently with the code inside the synchronized block, and that code is not itself guarded by the synchronized block using the same this, then it is irrelevant whether this synchronized block exists or not.

Thus, we only need to consider whether it is fine for the code in the block to run concurrently with itself and with the code in the other synchronized block. I convinced myself that it is, and asked reviewers to do the analysis on their own, and bring up specific issues if they see them.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I agree 👍

Copy link
Collaborator

@jyemin jyemin Aug 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if the the intention is to prevent concurrent calls to InternalConnection#close on the same instance. I think the existing code does that. Does the new code?

But if that's the case it still seems unnecessary, as that method should (and does at least for InternalStreamConnection) protect itself from concurrent calls.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that method should (and does at least for InternalStreamConnection) protect itself from concurrent calls.

I don't think it should (even if it does). I'll look into this.

Copy link
Member Author

@stIncMale stIncMale Aug 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, ServerMonitor.cancelCurrentCheck is not called concurrently with itself, i.e., regardless of whether it's synchronized or not, it does not run concurrently with itself. This, in turn, means that ServerMonitor.cancelCurrentCheck alone can't result in ServerMonitorRunnable.connection.close being called concurrently with itself.

However, if ServerMonitor.cancelCurrentCheck is called concurrently with ServerMonitorRunnable.run, which is totally possible, then not having the two synchronized blocks that were removed, allows concurrently running ServerMonitorRunnable.run and ServerMonitor.cancelCurrentCheck to call ServerMonitorRunnable.connection.close concurrently.

But, there are other reasons in this class why ServerMonitorRunnable.connection.close may be called concurrently with itself: ServerMonitor.close may run concurrently with ServerMonitorRunnable.run, and it also may result in ServerMonitorRunnable.connection.close being called concurrently (the same reasoning applies to RoundTripTimeRunnable.connection). However, nothing was done in DefaultServerMonitor to prevent this, which may suggest that there was not intent to prevent ServerMonitorRunnable.connection.close from being called concurrently.

Overall, it is extremely difficult at times to reason about concurrency in the driver, because in many cases the thread-safety and related requirements/guarantees are not documented, especially for internal code. One of the outstanding culprits is close method on any interface/class. It is virtually never possible to say correctly or with confidence whether concurrent calls are allowed or not. The code we are discussing right now is a good example: one would expect that InternalConnection must never be used concurrently, yet, as I pointed out above, it seems like it may be, regardless of whether synchronized blocks are there or not.

TL;RD Thank you for pointing out the problem, the change may indeed introduce issues on top of those that DefaultServerMonitor seems to have anyway. I will replace synchronized critical sections with critical sections implemented using DefaultServerMonitor.lock.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall, it is extremely difficult at times to reason about concurrency in the driver, because in many cases the thread-safety and related requirements/guarantees are not documented, especially for internal code.

Agreed. We can and should take opportunities to improve this aspect of the driver.

One of the outstanding culprits is close method on any interface/class.

Perhaps interestingly: this seems a general problem with implementations of AutoCloseable/Closeable. Neither say anything about concurrency in the Javadoc, yet implementations like Socket#close protect against concurrent execution but it's not mentioned in the Javadoc.

InternalConnection localConnection = null;
synchronized (this) {
lock.lock();
try {
if (connection != null && !currentCheckCancelled) {
localConnection = connection;
currentCheckCancelled = true;
}
} finally {
lock.unlock();
}
if (localConnection != null) {
localConnection.close();
Expand Down
Loading