Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions driver-core/src/main/com/mongodb/MongoException.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,22 @@ public class MongoException extends RuntimeException {
*/
public static final String UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL = "UnknownTransactionCommitResult";

/**
* An error label indicating that the server is overloaded.
*
* @see #hasErrorLabel(String)
* @since 5.7
*/
public static final String SYSTEM_OVERLOADED_ERROR_LABEL = "SystemOverloadedError";

/**
* An error label indicating that the operation is safely retryable.
*
* @see #hasErrorLabel(String)
* @since 5.7
*/
public static final String RETRYABLE_ERROR_LABEL = "RetryableError";

private static final long serialVersionUID = -4415279469780082174L;

private final int code;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.mongodb.internal.connection;

import com.mongodb.MongoException;
import com.mongodb.annotations.ThreadSafe;
import com.mongodb.connection.ClusterConnectionMode;
import com.mongodb.connection.ServerDescription;
Expand Down Expand Up @@ -137,9 +138,27 @@ private void handleException(final SdamIssue sdamIssue, final boolean beforeHand
serverMonitor.connect();
} else if (sdamIssue.relatedToNetworkNotTimeout()
|| (beforeHandshake && (sdamIssue.relatedToNetworkTimeout() || sdamIssue.relatedToAuth()))) {
updateDescription(sdamIssue.serverDescription());
connectionPool.invalidate(sdamIssue.exception().orElse(null));
serverMonitor.cancelCurrentCheck();
// Backpressure spec: Don't clear pool or mark server unknown for connection establishment failures
// (network errors or timeouts during handshake). Authentication errors after handshake should still
// clear the pool as they're not related to overload.
// TLS configuration errors (certificate validation, protocol mismatches) should also clear the pool
// as they indicate configuration issues, not server overload.
if (beforeHandshake && !sdamIssue.relatedToAuth() && !sdamIssue.relatedToTlsConfigurationError()) {
// Don't update server description to Unknown
// Don't invalidate the connection pool
// Apply error labels for backpressure
sdamIssue.exception().ifPresent(exception -> {
if (exception instanceof MongoException) {
MongoException mongoException = (MongoException) exception;
mongoException.addLabel(MongoException.SYSTEM_OVERLOADED_ERROR_LABEL);
mongoException.addLabel(MongoException.RETRYABLE_ERROR_LABEL);
}
});
} else {
updateDescription(sdamIssue.serverDescription());
connectionPool.invalidate(sdamIssue.exception().orElse(null));
serverMonitor.cancelCurrentCheck();
}
} else if (sdamIssue.relatedToWriteConcern() || sdamIssue.relatedToStalePrimary()) {
updateDescription(sdamIssue.serverDescription());
serverMonitor.connect();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@
import com.mongodb.connection.TopologyVersion;
import com.mongodb.lang.Nullable;

import javax.net.ssl.SSLHandshakeException;
import javax.net.ssl.SSLPeerUnverifiedException;
import javax.net.ssl.SSLProtocolException;
import java.security.cert.CertPathBuilderException;
import java.security.cert.CertPathValidatorException;
import java.security.cert.CertificateException;
import java.util.Optional;

import static com.mongodb.assertions.Assertions.assertNotNull;
Expand Down Expand Up @@ -162,6 +168,49 @@ boolean relatedToWriteConcern() {
return exception instanceof MongoWriteConcernWithResponseException;
}

/**
* Checks if the exception is related to TLS configuration errors that are NOT due to server overload.
* These include certificate validation failures, protocol mismatches, etc.
*
* @return true if this is a TLS configuration error (not network-related)
*/
boolean relatedToTlsConfigurationError() {
if (!(exception instanceof MongoSocketException)) {
return false;
}
Throwable cause = exception.getCause();
while (cause != null) {
// Check for various certificate validation and TLS configuration errors
if (cause instanceof CertificateException
|| cause instanceof CertPathBuilderException
|| cause instanceof CertPathValidatorException
|| cause instanceof SSLPeerUnverifiedException
|| cause instanceof SSLProtocolException) {
return true;
}

// SSLHandshakeException can be either network or config, so we check the message
if (cause instanceof SSLHandshakeException) {
String message = cause.getMessage();
if (message != null) {
String lowerMessage = message.toLowerCase();
// These indicate configuration issues, not network issues
if (lowerMessage.contains("certificate")
|| lowerMessage.contains("verify")
|| lowerMessage.contains("trust")
|| lowerMessage.contains("hostname")
|| lowerMessage.contains("protocol")
|| lowerMessage.contains("cipher")
|| lowerMessage.contains("handshake_failure")) {
return true;
}
}
}
cause = cause.getCause();
}
return false;
}

private static boolean stale(@Nullable final Throwable t, final ServerDescription currentServerDescription) {
return TopologyVersionHelper.topologyVersion(t)
.map(candidateTopologyVersion -> TopologyVersionHelper.newerOrEqual(
Expand Down
2 changes: 1 addition & 1 deletion driver-core/src/test/resources/specifications
Submodule specifications updated 81 files
+10 −5 .pre-commit-config.yaml
+7 −2 source/connection-monitoring-and-pooling/connection-monitoring-and-pooling.md
+2 −0 source/index-management/index-management.md
+6 −6 source/index-management/tests/README.md
+1 −0 source/mongodb-handshake/tests/unified/metadata-not-propagated.json
+1 −0 source/mongodb-handshake/tests/unified/metadata-not-propagated.yml
+7 −4 source/retryable-reads/retryable-reads.md
+8 −5 source/retryable-writes/retryable-writes.md
+13 −3 source/server-discovery-and-monitoring/server-discovery-and-monitoring-tests.md
+142 −0 source/server-discovery-and-monitoring/tests/unified/backpressure-network-error-fail-replicaset.json
+80 −0 source/server-discovery-and-monitoring/tests/unified/backpressure-network-error-fail-replicaset.yml
+8 −6 source/server-discovery-and-monitoring/tests/unified/backpressure-network-error-fail-single.json
+5 −5 source/server-discovery-and-monitoring/tests/unified/backpressure-network-error-fail-single.yml
+145 −0 source/server-discovery-and-monitoring/tests/unified/backpressure-network-timeout-fail-replicaset.json
+83 −0 source/server-discovery-and-monitoring/tests/unified/backpressure-network-timeout-fail-replicaset.yml
+7 −5 source/server-discovery-and-monitoring/tests/unified/backpressure-network-timeout-fail-single.json
+4 −4 source/server-discovery-and-monitoring/tests/unified/backpressure-network-timeout-fail-single.yml
+21 −21 ...d-monitoring/tests/unified/backpressure-server-description-unchanged-on-min-pool-size-population-error.json
+14 −14 ...nd-monitoring/tests/unified/backpressure-server-description-unchanged-on-min-pool-size-population-error.yml
+3 −2 source/server-discovery-and-monitoring/tests/unified/minPoolSize-error.json
+3 −2 source/server-discovery-and-monitoring/tests/unified/minPoolSize-error.yml
+45 −69 source/server-discovery-and-monitoring/tests/unified/pool-clear-min-pool-size-error.json
+36 −39 source/server-discovery-and-monitoring/tests/unified/pool-clear-min-pool-size-error.yml
+8 −4 source/server-selection/server-selection-tests.md
+25 −8 source/server-selection/server-selection.md
+62 −0 source/server-selection/tests/server_selection/ReplicaSetNoPrimary/read/DeprioritizedNearest.json
+26 −0 source/server-selection/tests/server_selection/ReplicaSetNoPrimary/read/DeprioritizedNearest.yml
+39 −0 source/server-selection/tests/server_selection/ReplicaSetNoPrimary/read/DeprioritizedPrimary.json
+22 −0 source/server-selection/tests/server_selection/ReplicaSetNoPrimary/read/DeprioritizedPrimary.yml
+62 −0 source/server-selection/tests/server_selection/ReplicaSetNoPrimary/read/DeprioritizedPrimaryPreferred.json
+26 −0 source/server-selection/tests/server_selection/ReplicaSetNoPrimary/read/DeprioritizedPrimaryPreferred.yml
+62 −0 source/server-selection/tests/server_selection/ReplicaSetNoPrimary/read/DeprioritizedSecondary.json
+26 −0 source/server-selection/tests/server_selection/ReplicaSetNoPrimary/read/DeprioritizedSecondary.yml
+62 −0 source/server-selection/tests/server_selection/ReplicaSetNoPrimary/read/DeprioritizedSecondaryPreferred.json
+26 −0 source/server-selection/tests/server_selection/ReplicaSetNoPrimary/read/DeprioritizedSecondaryPreferred.yml
+84 −0 ...ce/server-selection/tests/server_selection/ReplicaSetWithPrimary/read/DeprioritizedAllPrimaryPreferred.json
+34 −0 source/server-selection/tests/server_selection/ReplicaSetWithPrimary/read/DeprioritizedAllPrimaryPreferred.yml
+100 −0 .../server-selection/tests/server_selection/ReplicaSetWithPrimary/read/DeprioritizedAllSecondaryPreferred.json
+36 −0 ...e/server-selection/tests/server_selection/ReplicaSetWithPrimary/read/DeprioritizedAllSecondaryPreferred.yml
+78 −0 source/server-selection/tests/server_selection/ReplicaSetWithPrimary/read/DeprioritizedNearest.json
+33 −0 source/server-selection/tests/server_selection/ReplicaSetWithPrimary/read/DeprioritizedNearest.yml
+65 −0 source/server-selection/tests/server_selection/ReplicaSetWithPrimary/read/DeprioritizedPrimary.json
+30 −0 source/server-selection/tests/server_selection/ReplicaSetWithPrimary/read/DeprioritizedPrimary.yml
+84 −0 source/server-selection/tests/server_selection/ReplicaSetWithPrimary/read/DeprioritizedPrimaryPreferred.json
+34 −0 source/server-selection/tests/server_selection/ReplicaSetWithPrimary/read/DeprioritizedPrimaryPreferred.yml
+86 −0 source/server-selection/tests/server_selection/ReplicaSetWithPrimary/read/DeprioritizedSecondary.json
+34 −0 source/server-selection/tests/server_selection/ReplicaSetWithPrimary/read/DeprioritizedSecondary.yml
+78 −0 source/server-selection/tests/server_selection/ReplicaSetWithPrimary/read/DeprioritizedSecondaryPreferred.json
+33 −0 source/server-selection/tests/server_selection/ReplicaSetWithPrimary/read/DeprioritizedSecondaryPreferred.yml
+70 −0 ...ce/server-selection/tests/server_selection/ReplicaSetWithPrimary/write/DeprioritizedSecondaryPreferred.json
+30 −0 source/server-selection/tests/server_selection/ReplicaSetWithPrimary/write/DeprioritizedSecondaryPreferred.yml
+47 −0 source/server-selection/tests/server_selection/Sharded/read/DeprioritizedNearest.json
+22 −0 source/server-selection/tests/server_selection/Sharded/read/DeprioritizedNearest.yml
+42 −0 source/server-selection/tests/server_selection/Sharded/read/DeprioritizedPrimary.json
+20 −0 source/server-selection/tests/server_selection/Sharded/read/DeprioritizedPrimary.yml
+47 −0 source/server-selection/tests/server_selection/Sharded/read/DeprioritizedPrimaryPreferred.json
+22 −0 source/server-selection/tests/server_selection/Sharded/read/DeprioritizedPrimaryPreferred.yml
+47 −0 source/server-selection/tests/server_selection/Sharded/read/DeprioritizedSecondary.json
+22 −0 source/server-selection/tests/server_selection/Sharded/read/DeprioritizedSecondary.yml
+47 −0 source/server-selection/tests/server_selection/Sharded/read/DeprioritizedSecondaryPreferred.json
+22 −0 source/server-selection/tests/server_selection/Sharded/read/DeprioritizedSecondaryPreferred.yml
+47 −0 source/server-selection/tests/server_selection/Sharded/write/DeprioritizedNearest.json
+22 −0 source/server-selection/tests/server_selection/Sharded/write/DeprioritizedNearest.yml
+42 −0 source/server-selection/tests/server_selection/Sharded/write/DeprioritizedPrimary.json
+20 −0 source/server-selection/tests/server_selection/Sharded/write/DeprioritizedPrimary.yml
+47 −0 source/server-selection/tests/server_selection/Sharded/write/DeprioritizedPrimaryPreferred.json
+22 −0 source/server-selection/tests/server_selection/Sharded/write/DeprioritizedPrimaryPreferred.yml
+47 −0 source/server-selection/tests/server_selection/Sharded/write/DeprioritizedSecondary.json
+22 −0 source/server-selection/tests/server_selection/Sharded/write/DeprioritizedSecondary.yml
+47 −0 source/server-selection/tests/server_selection/Sharded/write/DeprioritizedSecondaryPreferred.json
+22 −0 source/server-selection/tests/server_selection/Sharded/write/DeprioritizedSecondaryPreferred.yml
+54 −0 source/server-selection/tests/server_selection/Single/read/Deprioritized.json
+20 −0 source/server-selection/tests/server_selection/Single/read/Deprioritized.yml
+54 −0 source/server-selection/tests/server_selection/Single/write/Deprioritized.json
+20 −0 source/server-selection/tests/server_selection/Single/write/Deprioritized.yml
+9 −1 source/sessions/snapshot-sessions.md
+15 −0 source/sessions/tests/README.md
+2 −1 source/socks5-support/socks5.md
+5 −4 source/transactions-convenient-api/tests/README.md
+51 −32 source/transactions-convenient-api/transactions-convenient-api.md
+2 −7 source/unified-test-format/unified-test-format.md
Original file line number Diff line number Diff line change
Expand Up @@ -234,10 +234,12 @@ class DefaultServerSpecification extends Specification {
]
}

def 'failed open should invalidate the server'() {
def 'network error should not invalidate the pool'() {
given:
def connectionPool = Mock(ConnectionPool)
connectionPool.get(_) >> { throw exceptionToThrow }
connectionPool.get(_) >> {
throw exceptionToThrow
}
def serverMonitor = Mock(ServerMonitor)
def server = defaultServer(connectionPool, serverMonitor)

Expand All @@ -247,8 +249,8 @@ class DefaultServerSpecification extends Specification {
then:
def e = thrown(MongoException)
e.is(exceptionToThrow)
1 * connectionPool.invalidate(exceptionToThrow)
1 * serverMonitor.cancelCurrentCheck()
0 * connectionPool.invalidate(_)
0 * serverMonitor.cancelCurrentCheck()

where:
exceptionToThrow << [
Expand Down Expand Up @@ -281,7 +283,7 @@ class DefaultServerSpecification extends Specification {
]
}

def 'failed open should invalidate the server asynchronously'() {
def 'failed open should not invalidate the pool asynchronously'() {
given:
def connectionPool = Mock(ConnectionPool)
connectionPool.getAsync(_, _) >> { it.last().onResult(null, exceptionToThrow) }
Expand All @@ -301,8 +303,8 @@ class DefaultServerSpecification extends Specification {
then:
!receivedConnection
receivedThrowable.is(exceptionToThrow)
1 * connectionPool.invalidate(exceptionToThrow)
1 * serverMonitor.cancelCurrentCheck()
0 * connectionPool.invalidate(exceptionToThrow)
0 * serverMonitor.cancelCurrentCheck()


where:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.mongodb.ClusterFixture;
import com.mongodb.MongoClientSettings;
import com.mongodb.event.ConnectionCheckOutFailedEvent;
import com.mongodb.event.ConnectionPoolClearedEvent;
import com.mongodb.event.ConnectionPoolListener;
import com.mongodb.event.ConnectionPoolReadyEvent;
Expand Down Expand Up @@ -47,7 +48,10 @@
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

import static com.mongodb.ClusterFixture.configureFailPoint;
import static com.mongodb.ClusterFixture.disableFailPoint;
Expand Down Expand Up @@ -268,6 +272,79 @@ public void shouldEmitHeartbeatStartedBeforeSocketIsConnected() {
// As it requires mocking and package access to `com.mongodb.internal.connection`
}

/**
* See
* <a href="https://github.com/mongodb/specifications/blob/master/source/server-discovery-and-monitoring/server-discovery-and-monitoring-tests.md#connection-pool-backpressure">Connection Pool Backpressure</a>.
*/
@Test
public void testConnectionPoolBackpressure() throws InterruptedException {
assumeTrue(serverVersionAtLeast(7, 0));

AtomicInteger connectionCheckOutFailedEventCount = new AtomicInteger(0);
AtomicInteger poolClearedEventCount = new AtomicInteger(0);

ConnectionPoolListener connectionPoolListener = new ConnectionPoolListener() {
@Override
public void connectionCheckOutFailed(final ConnectionCheckOutFailedEvent event) {
connectionCheckOutFailedEventCount.incrementAndGet();
}

@Override
public void connectionPoolCleared(final ConnectionPoolClearedEvent event) {
poolClearedEventCount.incrementAndGet();
}
};

MongoClientSettings clientSettings = getMongoClientSettingsBuilder()
.applyToConnectionPoolSettings(builder -> builder
.maxConnecting(100)
.addConnectionPoolListener(connectionPoolListener))
.build();

try (MongoClient adminClient = MongoClients.create(getMongoClientSettingsBuilder().build());
MongoClient client = MongoClients.create(clientSettings)) {

MongoDatabase adminDatabase = adminClient.getDatabase("admin");
MongoDatabase database = client.getDatabase(getDefaultDatabaseName());
MongoCollection<Document> collection = database.getCollection("testCollection");

// Configure rate limiter using admin commands
adminDatabase.runCommand(new Document("setParameter", 1)
.append("ingressConnectionEstablishmentRateLimiterEnabled", true));
adminDatabase.runCommand(new Document("setParameter", 1)
.append("ingressConnectionEstablishmentRatePerSec", 20));
adminDatabase.runCommand(new Document("setParameter", 1)
.append("ingressConnectionEstablishmentBurstCapacitySecs", 1));
adminDatabase.runCommand(new Document("setParameter", 1)
.append("ingressConnectionEstablishmentMaxQueueDepth", 1));

collection.insertOne(Document.parse("{}"));

// Run 100 parallel find operations with 2-seconds sleep
ExecutorService executor = Executors.newFixedThreadPool(100);
for (int i = 0; i < 100; i++) {
executor.submit(() -> collection.find(new Document("$where", "function() { sleep(2000); return true; }")).first());
}

// Wait for all operations to complete
executor.shutdown();
boolean terminated = executor.awaitTermination(20, SECONDS);
assertTrue("Executor did not terminate within timeout", terminated);

// Assert at least 10 ConnectionCheckOutFailedEvents occurred
assertTrue("Expected at least 10 ConnectionCheckOutFailedEvents, but got " + connectionCheckOutFailedEventCount.get(),
connectionCheckOutFailedEventCount.get() >= 10);

// Assert 0 PoolClearedEvents occurred
assertEquals("Expected 0 PoolClearedEvents", 0, poolClearedEventCount.get());

// Teardown: sleep 1 second and reset rate limiter
Thread.sleep(1000);
adminDatabase.runCommand(new Document("setParameter", 1)
.append("ingressConnectionEstablishmentRateLimiterEnabled", false));
}
}

private static void assertPoll(final BlockingQueue<?> queue, @Nullable final Class<?> allowed, final Set<Class<?>> required)
throws InterruptedException {
assertPoll(queue, allowed, required, Timeout.expiresIn(TEST_WAIT_TIMEOUT_MILLIS, MILLISECONDS, ZERO_DURATION_MEANS_EXPIRED));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,9 +436,16 @@ private static boolean serverDescriptionChangedEventMatches(final BsonDocument e
switch (newType) {
case "Unknown":
return event.getNewDescription().getType() == ServerType.UNKNOWN;
case "LoadBalancer": {
case "LoadBalancer":
return event.getNewDescription().getType() == ServerType.LOAD_BALANCER;
}
case "Mongos":
return event.getNewDescription().getType() == ServerType.SHARD_ROUTER;
case "Standalone":
return event.getNewDescription().getType() == ServerType.STANDALONE;
case "RSPrimary":
return event.getNewDescription().getType() == ServerType.REPLICA_SET_PRIMARY;
case "RSSecondary":
return event.getNewDescription().getType() == ServerType.REPLICA_SET_SECONDARY;
default:
throw new UnsupportedOperationException();
}
Expand Down