Skip to content

Use Lock.lockInterruptibly only where it may actually be needed #1220

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Oct 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions driver-core/src/main/com/mongodb/KerberosSubjectProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import java.util.concurrent.locks.ReentrantLock;

import static com.mongodb.assertions.Assertions.notNull;
import static com.mongodb.internal.Locks.checkedWithLock;
import static com.mongodb.internal.Locks.checkedWithInterruptibleLock;
import static java.lang.String.format;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.MINUTES;
Expand Down Expand Up @@ -91,7 +91,7 @@ private KerberosSubjectProvider(final String loginContextName, @Nullable final S
*/
@NonNull
public Subject getSubject() throws LoginException {
return checkedWithLock(lock, () -> {
return checkedWithInterruptibleLock(lock, () -> {
if (subject == null || needNewSubject(subject)) {
subject = createNewSubject();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@

import static com.mongodb.assertions.Assertions.assertNotNull;
import static com.mongodb.assertions.Assertions.isTrueArgument;
import static com.mongodb.internal.Locks.lockInterruptibly;
import static com.mongodb.internal.Locks.withLock;
import static com.mongodb.internal.connection.SslHelper.enableHostNameVerification;
import static com.mongodb.internal.connection.SslHelper.enableSni;
Expand Down Expand Up @@ -288,7 +287,7 @@ public void readAsync(final int numBytes, final AsyncCompletionHandler<ByteBuf>
private void readAsync(final int numBytes, final AsyncCompletionHandler<ByteBuf> handler, final long readTimeoutMillis) {
ByteBuf buffer = null;
Throwable exceptionResult = null;
lockInterruptibly(lock);
lock.lock();
try {
exceptionResult = pendingException;
if (exceptionResult == null) {
Expand Down
66 changes: 59 additions & 7 deletions driver-core/src/main/com/mongodb/internal/Locks.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,27 @@ public static <V> V withLock(final Lock lock, final Supplier<V> supplier) {
}

public static <V, E extends Exception> V checkedWithLock(final Lock lock, final CheckedSupplier<V, E> supplier) throws E {
lock.lock();
try {
return supplier.get();
} finally {
lock.unlock();
}
}

public static void withInterruptibleLock(final Lock lock, final Runnable action) throws MongoInterruptedException {
withInterruptibleLock(lock, () -> {
action.run();
return null;
});
}

public static <V> V withInterruptibleLock(final Lock lock, final Supplier<V> supplier) throws MongoInterruptedException {
return checkedWithInterruptibleLock(lock, supplier::get);
}

public static <V, E extends Exception> V checkedWithInterruptibleLock(final Lock lock, final CheckedSupplier<V, E> supplier)
throws MongoInterruptedException, E {
lockInterruptibly(lock);
try {
return supplier.get();
Expand All @@ -56,20 +77,51 @@ public static void lockInterruptibly(final Lock lock) throws MongoInterruptedExc
}
}

/**
* See {@link #lockInterruptiblyUnfair(ReentrantLock)} before using this method.
*/
public static void withUnfairLock(final ReentrantLock lock, final Runnable action) {
withUnfairLock(lock, () -> {
action.run();
return null;
});
}

/**
* See {@link #lockInterruptiblyUnfair(ReentrantLock)} before using this method.
*/
public static <V> V withUnfairLock(final ReentrantLock lock, final Supplier<V> supplier) {
lockUnfair(lock);
try {
return supplier.get();
} finally {
lock.unlock();
}
}

private static void lockUnfair(
// The type must be `ReentrantLock`, not `Lock`,
// because only `ReentrantLock.tryLock` is documented to have the barging (unfair) behavior.
final ReentrantLock lock) {
if (!lock.tryLock()) {
lock.lock();
}
}

/**
* This method allows a thread to attempt acquiring the {@code lock} unfairly despite the {@code lock}
* being {@linkplain ReentrantLock#ReentrantLock(boolean) fair}. In most cases you should create an unfair lock,
* instead of using this method.
*/
public static void lockInterruptiblyUnfair(
// The type must be `ReentrantLock`, not `Lock`,
// because only `ReentrantLock.tryLock` is documented to have the barging behavior.
// because only `ReentrantLock.tryLock` is documented to have the barging (unfair) behavior.
final ReentrantLock lock) throws MongoInterruptedException {
if (Thread.currentThread().isInterrupted()) {
throw interruptAndCreateMongoInterruptedException(null, null);
}
// `ReentrantLock.tryLock` is unfair
if (!lock.tryLock()) {
try {
lock.lockInterruptibly();
} catch (InterruptedException e) {
throw interruptAndCreateMongoInterruptedException(null, e);
}
lockInterruptibly(lock);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import com.mongodb.event.ClusterDescriptionChangedEvent;
import com.mongodb.event.ClusterListener;
import com.mongodb.event.ClusterOpeningEvent;
import com.mongodb.internal.Locks;
import com.mongodb.internal.VisibleForTesting;
import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.internal.diagnostics.logging.Logger;
Expand All @@ -56,6 +55,7 @@
import static com.mongodb.connection.ServerDescription.MAX_DRIVER_WIRE_VERSION;
import static com.mongodb.connection.ServerDescription.MIN_DRIVER_SERVER_VERSION;
import static com.mongodb.connection.ServerDescription.MIN_DRIVER_WIRE_VERSION;
import static com.mongodb.internal.Locks.withInterruptibleLock;
import static com.mongodb.internal.VisibleForTesting.AccessModifier.PRIVATE;
import static com.mongodb.internal.connection.EventHelper.wouldDescriptionsGenerateEquivalentEvents;
import static com.mongodb.internal.event.EventListenerHelper.singleClusterListener;
Expand Down Expand Up @@ -223,7 +223,7 @@ public ClusterDescription getCurrentDescription() {

@Override
public void withLock(final Runnable action) {
Locks.withLock(lock, action);
withInterruptibleLock(lock, action);
}

private void updatePhase() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

import java.util.concurrent.locks.ReentrantLock;

import static com.mongodb.internal.Locks.withLock;
import static com.mongodb.internal.Locks.withInterruptibleLock;

/**
* <p>This class is not part of the public API and may be removed or changed at any time</p>
Expand All @@ -33,19 +33,19 @@ public class ClusterClock {
private BsonDocument clusterTime;

public BsonDocument getCurrent() {
return withLock(lock, () -> clusterTime);
return withInterruptibleLock(lock, () -> clusterTime);
}

public BsonTimestamp getClusterTime() {
return withLock(lock, () -> clusterTime != null ? clusterTime.getTimestamp(CLUSTER_TIME_KEY) : null);
return withInterruptibleLock(lock, () -> clusterTime != null ? clusterTime.getTimestamp(CLUSTER_TIME_KEY) : null);
}

public void advance(@Nullable final BsonDocument other) {
withLock(lock, () -> this.clusterTime = greaterOf(other));
withInterruptibleLock(lock, () -> this.clusterTime = greaterOf(other));
}

public BsonDocument greaterOf(@Nullable final BsonDocument other) {
return withLock(lock, () -> {
return withInterruptibleLock(lock, () -> {
if (other == null) {
return clusterTime;
} else if (clusterTime == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import static com.mongodb.assertions.Assertions.notNull;
import static com.mongodb.internal.Locks.lockInterruptibly;
import static com.mongodb.internal.Locks.lockInterruptiblyUnfair;
import static com.mongodb.internal.Locks.withUnfairLock;
import static com.mongodb.internal.VisibleForTesting.AccessModifier.PRIVATE;
import static com.mongodb.internal.thread.InterruptionUtil.interruptAndCreateMongoInterruptedException;

Expand Down Expand Up @@ -371,8 +372,7 @@ int permits() {
}

boolean acquirePermitImmediateUnfair() {
lockInterruptiblyUnfair(lock);
try {
return withUnfairLock(lock, () -> {
throwIfClosedOrPaused();
if (permits > 0) {
//noinspection NonAtomicOperationOnVolatileField
Expand All @@ -381,9 +381,7 @@ boolean acquirePermitImmediateUnfair() {
} else {
return false;
}
} finally {
lock.unlock();
}
});
}

/**
Expand Down Expand Up @@ -428,39 +426,30 @@ boolean acquirePermit(final long timeout, final TimeUnit unit) throws MongoInter
}

void releasePermit() {
lockInterruptiblyUnfair(lock);
try {
withUnfairLock(lock, () -> {
assertTrue(permits < maxPermits);
//noinspection NonAtomicOperationOnVolatileField
permits++;
permitAvailableOrClosedOrPausedCondition.signal();
} finally {
lock.unlock();
}
});
}

void pause(final Supplier<MongoException> causeSupplier) {
lockInterruptiblyUnfair(lock);
try {
withUnfairLock(lock, () -> {
if (!paused) {
this.paused = true;
permitAvailableOrClosedOrPausedCondition.signalAll();
}
this.causeSupplier = assertNotNull(causeSupplier);
} finally {
lock.unlock();
}
});
}

void ready() {
if (paused) {
lockInterruptiblyUnfair(lock);
try {
withUnfairLock(lock, () -> {
this.paused = false;
this.causeSupplier = null;
} finally {
lock.unlock();
}
});
}
}

Expand All @@ -469,16 +458,14 @@ void ready() {
*/
boolean close() {
if (!closed) {
lockInterruptiblyUnfair(lock);
try {
return withUnfairLock(lock, () -> {
if (!closed) {
closed = true;
permitAvailableOrClosedOrPausedCondition.signalAll();
return true;
}
} finally {
lock.unlock();
}
return false;
});
}
return false;
}
Expand Down Expand Up @@ -515,5 +502,4 @@ boolean closed() {
static String sizeToString(final int size) {
return size == INFINITE_SIZE ? "infinite" : Integer.toString(size);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,12 @@
import static com.mongodb.assertions.Assertions.isTrue;
import static com.mongodb.assertions.Assertions.notNull;
import static com.mongodb.event.ConnectionClosedEvent.Reason.ERROR;
import static com.mongodb.internal.Locks.lockInterruptibly;
import static com.mongodb.internal.Locks.withLock;
import static com.mongodb.internal.Locks.withUnfairLock;
import static com.mongodb.internal.VisibleForTesting.AccessModifier.PRIVATE;
import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback;
import static com.mongodb.internal.connection.ConcurrentPool.INFINITE_SIZE;
import static com.mongodb.internal.Locks.lockInterruptibly;
import static com.mongodb.internal.Locks.lockInterruptiblyUnfair;
import static com.mongodb.internal.connection.ConcurrentPool.sizeToString;
import static com.mongodb.internal.event.EventListenerHelper.getConnectionPoolListener;
import static com.mongodb.internal.logging.LogMessage.Component.CONNECTION;
Expand Down Expand Up @@ -1103,14 +1103,11 @@ private PooledConnection acquirePermitOrGetAvailableOpenedConnection(final boole
}

private void releasePermit() {
lockInterruptiblyUnfair(lock);
try {
withUnfairLock(lock, () -> {
assertTrue(permits < maxPermits);
permits++;
permitAvailableOrHandedOverOrClosedOrPausedCondition.signal();
} finally {
lock.unlock();
}
});
}

private void expressDesireToGetAvailableConnection() {
Expand Down Expand Up @@ -1141,29 +1138,24 @@ private void giveUpOnTryingToGetAvailableConnection() {
* from threads that are waiting for a permit to open a connection.
*/
void tryHandOverOrRelease(final UsageTrackingInternalConnection openConnection) {
lockInterruptiblyUnfair(lock);
try {
boolean handedOver = withUnfairLock(lock, () -> {
for (//iterate from first (head) to last (tail)
MutableReference<PooledConnection> desiredConnectionSlot : desiredConnectionSlots) {
if (desiredConnectionSlot.reference == null) {
desiredConnectionSlot.reference = new PooledConnection(openConnection);
permitAvailableOrHandedOverOrClosedOrPausedCondition.signal();
return;
return true;
}
}
} finally {
lock.unlock();
return false;
});
if (!handedOver) {
pool.release(openConnection);
}
pool.release(openConnection);
}

void signalClosedOrPaused() {
lockInterruptiblyUnfair(lock);
try {
permitAvailableOrHandedOverOrClosedOrPausedCondition.signalAll();
} finally {
lock.unlock();
}
withUnfairLock(lock, permitAvailableOrHandedOverOrClosedOrPausedCondition::signalAll);
}

/**
Expand Down Expand Up @@ -1327,16 +1319,16 @@ private static class AsyncWorkManager implements AutoCloseable {
}

void enqueue(final Task task) {
lockInterruptibly(lock);
try {
boolean closed = withLock(lock, () -> {
if (initUnlessClosed()) {
tasks.add(task);
return;
return false;
}
} finally {
lock.unlock();
return true;
});
if (closed) {
task.failAsClosed();
}
task.failAsClosed();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
import static com.mongodb.assertions.Assertions.assertNotNull;
import static com.mongodb.assertions.Assertions.notNull;
import static com.mongodb.connection.ServerType.UNKNOWN;
import static com.mongodb.internal.Locks.lockInterruptibly;
import static com.mongodb.internal.Locks.checkedWithLock;
import static com.mongodb.internal.Locks.withLock;
import static com.mongodb.internal.connection.CommandHelper.HELLO;
import static com.mongodb.internal.connection.CommandHelper.LEGACY_HELLO;
Expand Down Expand Up @@ -294,12 +294,7 @@ private void waitForNext() throws InterruptedException {
}

private long waitForSignalOrTimeout() throws InterruptedException {
lockInterruptibly(lock);
try {
return condition.awaitNanos(serverSettings.getHeartbeatFrequency(NANOSECONDS));
} finally {
lock.unlock();
}
return checkedWithLock(lock, () -> condition.awaitNanos(serverSettings.getHeartbeatFrequency(NANOSECONDS)));
}

public void cancelCurrentCheck() {
Expand Down
Loading