Skip to content

Commit 92c90c6

Browse files
authored
Refactor transaction server pinning (#1211)
* Determine whether TransactionContext is required based on ConnectionSource's ServerDescription instead of the ClusterDescription * Remove Cluster#getDescription and update tests that relied on it JAVA-5186
1 parent 6a20fb6 commit 92c90c6

File tree

15 files changed

+144
-323
lines changed

15 files changed

+144
-323
lines changed

driver-core/src/main/com/mongodb/internal/connection/BaseCluster.java

Lines changed: 0 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -165,56 +165,6 @@ public void selectServerAsync(final ServerSelector serverSelector, final Operati
165165
}
166166
}
167167

168-
@Override
169-
public ClusterDescription getDescription() {
170-
isTrue("open", !isClosed());
171-
172-
try {
173-
CountDownLatch currentPhase = phase.get();
174-
ClusterDescription curDescription = description;
175-
176-
boolean selectionFailureLogged = false;
177-
178-
long startTimeNanos = System.nanoTime();
179-
long curTimeNanos = startTimeNanos;
180-
long maxWaitTimeNanos = getMaxWaitTimeNanos();
181-
182-
while (curDescription.getType() == ClusterType.UNKNOWN) {
183-
184-
if (curTimeNanos - startTimeNanos > maxWaitTimeNanos) {
185-
throw new MongoTimeoutException(format("Timed out after %d ms while waiting to connect. Client view of cluster state "
186-
+ "is %s",
187-
settings.getServerSelectionTimeout(MILLISECONDS),
188-
curDescription.getShortDescription()));
189-
}
190-
191-
if (!selectionFailureLogged) {
192-
if (LOGGER.isInfoEnabled()) {
193-
if (settings.getServerSelectionTimeout(MILLISECONDS) < 0) {
194-
LOGGER.info("Cluster description not yet available. Waiting indefinitely.");
195-
} else {
196-
LOGGER.info(format("Cluster description not yet available. Waiting for %d ms before timing out",
197-
settings.getServerSelectionTimeout(MILLISECONDS)));
198-
}
199-
}
200-
selectionFailureLogged = true;
201-
}
202-
203-
connect();
204-
205-
currentPhase.await(Math.min(maxWaitTimeNanos - (curTimeNanos - startTimeNanos), getMinWaitTimeNanos()), NANOSECONDS);
206-
207-
curTimeNanos = System.nanoTime();
208-
209-
currentPhase = phase.get();
210-
curDescription = description;
211-
}
212-
return curDescription;
213-
} catch (InterruptedException e) {
214-
throw interruptAndCreateMongoInterruptedException("Interrupted while waiting to connect", e);
215-
}
216-
}
217-
218168
public ClusterId getClusterId() {
219169
return clusterId;
220170
}

driver-core/src/main/com/mongodb/internal/connection/Cluster.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -40,15 +40,6 @@ public interface Cluster extends Closeable {
4040

4141
ClusterSettings getSettings();
4242

43-
/**
44-
* Get the description of this cluster. This method will not return normally until the cluster type is known.
45-
*
46-
* @return a ClusterDescription representing the current state of the cluster
47-
* @throws com.mongodb.MongoTimeoutException if the timeout has been reached before the cluster type is known
48-
* @throws com.mongodb.MongoInterruptedException if interrupted when getting the cluster description
49-
*/
50-
ClusterDescription getDescription();
51-
5243

5344
ClusterId getClusterId();
5445

driver-core/src/main/com/mongodb/internal/connection/LoadBalancedCluster.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -174,13 +174,6 @@ public ClusterSettings getSettings() {
174174
return settings;
175175
}
176176

177-
@Override
178-
public ClusterDescription getDescription() {
179-
isTrue("open", !isClosed());
180-
waitForSrv();
181-
return description;
182-
}
183-
184177
@Override
185178
public ClusterId getClusterId() {
186179
return clusterId;

driver-core/src/test/functional/com/mongodb/ClusterFixture.java

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import com.mongodb.async.FutureResultCallback;
2020
import com.mongodb.connection.AsynchronousSocketChannelStreamFactory;
2121
import com.mongodb.connection.ClusterConnectionMode;
22+
import com.mongodb.connection.ClusterDescription;
2223
import com.mongodb.connection.ClusterSettings;
2324
import com.mongodb.connection.ClusterType;
2425
import com.mongodb.connection.ConnectionPoolSettings;
@@ -85,6 +86,7 @@
8586
import static com.mongodb.connection.ClusterType.REPLICA_SET;
8687
import static com.mongodb.connection.ClusterType.SHARDED;
8788
import static com.mongodb.connection.ClusterType.STANDALONE;
89+
import static com.mongodb.connection.ClusterType.UNKNOWN;
8890
import static com.mongodb.internal.connection.ClusterDescriptionHelper.getPrimaries;
8991
import static com.mongodb.internal.connection.ClusterDescriptionHelper.getSecondaries;
9092
import static com.mongodb.internal.thread.InterruptionUtil.interruptAndCreateMongoInterruptedException;
@@ -140,7 +142,20 @@ public static String getDefaultDatabaseName() {
140142
}
141143

142144
public static boolean clusterIsType(final ClusterType clusterType) {
143-
return getCluster().getDescription().getType() == clusterType;
145+
return getClusterDescription(getCluster()).getType() == clusterType;
146+
}
147+
148+
public static ClusterDescription getClusterDescription(final Cluster cluster) {
149+
try {
150+
ClusterDescription clusterDescription = cluster.getCurrentDescription();
151+
while (clusterDescription.getType() == UNKNOWN) {
152+
Thread.sleep(10);
153+
clusterDescription = cluster.getCurrentDescription();
154+
}
155+
return clusterDescription;
156+
} catch (InterruptedException e) {
157+
throw interruptAndCreateMongoInterruptedException("Interrupted", e);
158+
}
144159
}
145160

146161
public static ServerVersion getServerVersion() {
@@ -449,27 +464,27 @@ public static SslSettings getSslSettings(final ConnectionString connectionString
449464
}
450465

451466
public static ServerAddress getPrimary() {
452-
List<ServerDescription> serverDescriptions = getPrimaries(getCluster().getDescription());
467+
List<ServerDescription> serverDescriptions = getPrimaries(getClusterDescription(getCluster()));
453468
while (serverDescriptions.isEmpty()) {
454469
try {
455470
sleep(100);
456471
} catch (InterruptedException e) {
457472
throw new RuntimeException(e);
458473
}
459-
serverDescriptions = getPrimaries(getCluster().getDescription());
474+
serverDescriptions = getPrimaries(getClusterDescription(getCluster()));
460475
}
461476
return serverDescriptions.get(0).getAddress();
462477
}
463478

464479
public static ServerAddress getSecondary() {
465-
List<ServerDescription> serverDescriptions = getSecondaries(getCluster().getDescription());
480+
List<ServerDescription> serverDescriptions = getSecondaries(getClusterDescription(getCluster()));
466481
while (serverDescriptions.isEmpty()) {
467482
try {
468483
sleep(100);
469484
} catch (InterruptedException e) {
470485
throw new RuntimeException(e);
471486
}
472-
serverDescriptions = getSecondaries(getCluster().getDescription());
487+
serverDescriptions = getSecondaries(getClusterDescription(getCluster()));
473488
}
474489
return serverDescriptions.get(0).getAddress();
475490
}
@@ -499,20 +514,19 @@ public static BsonDocument getServerParameters() {
499514
}
500515

501516
public static boolean isDiscoverableReplicaSet() {
502-
return getCluster().getDescription().getType() == REPLICA_SET
503-
&& getCluster().getDescription().getConnectionMode() == MULTIPLE;
517+
return clusterIsType(REPLICA_SET) && getClusterConnectionMode() == MULTIPLE;
504518
}
505519

506520
public static boolean isSharded() {
507-
return getCluster().getDescription().getType() == SHARDED;
521+
return clusterIsType(SHARDED);
508522
}
509523

510524
public static boolean isStandalone() {
511-
return getCluster().getDescription().getType() == STANDALONE;
525+
return clusterIsType(STANDALONE);
512526
}
513527

514528
public static boolean isLoadBalanced() {
515-
return getCluster().getSettings().getMode() == LOAD_BALANCED;
529+
return getClusterConnectionMode() == LOAD_BALANCED;
516530
}
517531

518532
public static boolean isAuthenticated() {

driver-core/src/test/functional/com/mongodb/internal/connection/SingleServerClusterTest.java

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -79,23 +79,14 @@ public void tearDown() {
7979
cluster.close();
8080
}
8181

82-
@Test
83-
public void shouldGetDescription() {
84-
// given
85-
setUpCluster(getPrimary());
86-
87-
// expect
88-
assertNotNull(cluster.getDescription());
89-
}
90-
9182
@Test
9283
public void descriptionShouldIncludeSettings() {
9384
// given
9485
setUpCluster(getPrimary());
9586

9687
// expect
97-
assertNotNull(cluster.getDescription().getClusterSettings());
98-
assertNotNull(cluster.getDescription().getServerSettings());
88+
assertNotNull(cluster.getCurrentDescription().getClusterSettings());
89+
assertNotNull(cluster.getCurrentDescription().getServerSettings());
9990
}
10091

10192
@Test

driver-core/src/test/unit/com/mongodb/internal/connection/BaseClusterSpecification.groovy

Lines changed: 1 addition & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -80,12 +80,6 @@ class BaseClusterSpecification extends Specification {
8080
cluster.getCurrentDescription() == new ClusterDescription(clusterSettings.getMode(), ClusterType.UNKNOWN, [], clusterSettings,
8181
factory.getSettings())
8282

83-
when: 'the description is accessed before initialization'
84-
cluster.getDescription()
85-
86-
then: 'a MongoTimeoutException is thrown'
87-
thrown(MongoTimeoutException)
88-
8983
when: 'a server is selected before initialization'
9084
cluster.selectServer({ def clusterDescription -> [] }, new OperationContext())
9185

@@ -193,21 +187,10 @@ class BaseClusterSpecification extends Specification {
193187
.exception(new MongoInternalException('oops'))
194188
.build())
195189

196-
cluster.getDescription()
197-
198-
then:
199-
def e = thrown(MongoTimeoutException)
200-
e.getMessage().startsWith("Timed out after ${serverSelectionTimeoutMS} ms while waiting to connect. " +
201-
'Client view of cluster state is {type=UNKNOWN')
202-
e.getMessage().contains('{address=localhost:27017, type=UNKNOWN, state=CONNECTING, ' +
203-
'exception={com.mongodb.MongoInternalException: oops}}')
204-
e.getMessage().contains('{address=localhost:27018, type=UNKNOWN, state=CONNECTING}')
205-
206-
when:
207190
cluster.selectServer(new WritableServerSelector(), new OperationContext())
208191

209192
then:
210-
e = thrown(MongoTimeoutException)
193+
def e = thrown(MongoTimeoutException)
211194
e.getMessage().startsWith("Timed out after ${serverSelectionTimeoutMS} ms while waiting for a server " +
212195
'that matches WritableServerSelector. Client view of cluster state is {type=UNKNOWN')
213196
e.getMessage().contains('{address=localhost:27017, type=UNKNOWN, state=CONNECTING, ' +
@@ -272,37 +255,6 @@ class BaseClusterSpecification extends Specification {
272255
cluster?.close()
273256
}
274257

275-
@Slow
276-
def 'should wait indefinitely for a cluster description until interrupted'() {
277-
given:
278-
def cluster = new MultiServerCluster(new ClusterId(),
279-
builder().mode(MULTIPLE)
280-
.hosts([firstServer, secondServer, thirdServer])
281-
.serverSelectionTimeout(-1, SECONDS)
282-
.build(),
283-
factory)
284-
285-
when:
286-
def latch = new CountDownLatch(1)
287-
def thread = new Thread({
288-
try {
289-
cluster.getDescription()
290-
} catch (MongoInterruptedException e) {
291-
latch.countDown()
292-
}
293-
})
294-
thread.start()
295-
sleep(1000)
296-
thread.interrupt()
297-
def interrupted = latch.await(ClusterFixture.TIMEOUT, SECONDS)
298-
299-
then:
300-
interrupted
301-
302-
cleanup:
303-
cluster?.close()
304-
}
305-
306258
def 'should select server asynchronously when server is already available'() {
307259
given:
308260
def cluster = new MultiServerCluster(new ClusterId(),

driver-core/src/test/unit/com/mongodb/internal/connection/DnsMultiServerClusterSpecification.groovy

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ class DnsMultiServerClusterSpecification extends Specification {
9898
factory.sendNotification(secondServer, SHARD_ROUTER)
9999
def firstTestServer = factory.getServer(firstServer)
100100
def secondTestServer = factory.getServer(secondServer)
101-
def clusterDescription = cluster.getDescription()
101+
def clusterDescription = cluster.getCurrentDescription()
102102

103103
then: 'events are generated, description includes hosts, exception is cleared, and servers are open'
104104
2 * clusterListener.clusterDescriptionChanged(_)
@@ -112,7 +112,7 @@ class DnsMultiServerClusterSpecification extends Specification {
112112
initializer.initialize([secondServer, thirdServer])
113113
factory.sendNotification(secondServer, SHARD_ROUTER)
114114
def thirdTestServer = factory.getServer(thirdServer)
115-
clusterDescription = cluster.getDescription()
115+
clusterDescription = cluster.getCurrentDescription()
116116

117117
then: 'events are generated, description is updated, and the removed server is closed'
118118
1 * clusterListener.clusterDescriptionChanged(_)
@@ -125,7 +125,7 @@ class DnsMultiServerClusterSpecification extends Specification {
125125

126126
when: 'the listener is initialized with another exception'
127127
initializer.initialize(exception)
128-
clusterDescription = cluster.getDescription()
128+
clusterDescription = cluster.getCurrentDescription()
129129

130130
then: 'the exception is ignored'
131131
0 * clusterListener.clusterDescriptionChanged(_)

0 commit comments

Comments
 (0)