Skip to content

Commit

Permalink
Do not perform server selection to determine sessions support (mongod…
Browse files Browse the repository at this point in the history
…b#1092)

Instead, check for session support during operation execution
after the connection is checked out.
  • Loading branch information
jyemin authored Mar 9, 2023
1 parent e9a4bd8 commit dfd6e7c
Show file tree
Hide file tree
Showing 21 changed files with 281 additions and 249 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public class ConnectionDescription {
private final int maxMessageSize;
private final List<String> compressors;
private final BsonArray saslSupportedMechanisms;
private final Integer logicalSessionTimeoutMinutes;

private static final int DEFAULT_MAX_MESSAGE_SIZE = 0x2000000; // 32MB
private static final int DEFAULT_MAX_WRITE_BATCH_SIZE = 512;
Expand Down Expand Up @@ -99,6 +100,29 @@ public ConnectionDescription(final ConnectionId connectionId, final int maxWireV
saslSupportedMechanisms);
}

/**
* Construct an instance.
*
* @param connectionId the connection id
* @param maxWireVersion the max wire version
* @param serverType the server type
* @param maxBatchCount the max batch count
* @param maxDocumentSize the max document size in bytes
* @param maxMessageSize the max message size in bytes
* @param compressors the available compressors on the connection
* @param saslSupportedMechanisms the supported SASL mechanisms
* @param logicalSessionTimeoutMinutes the logical session timeout, in minutes
* @since 4.10
*/
public ConnectionDescription(final ConnectionId connectionId, final int maxWireVersion,
final ServerType serverType, final int maxBatchCount, final int maxDocumentSize,
final int maxMessageSize, final List<String> compressors,
@Nullable final BsonArray saslSupportedMechanisms,
@Nullable final Integer logicalSessionTimeoutMinutes) {
this(null, connectionId, maxWireVersion, serverType, maxBatchCount, maxDocumentSize, maxMessageSize, compressors,
saslSupportedMechanisms, logicalSessionTimeoutMinutes);
}

/**
* Construct an instance.
*
Expand All @@ -117,6 +141,14 @@ public ConnectionDescription(@Nullable final ObjectId serviceId, final Connectio
final ServerType serverType, final int maxBatchCount, final int maxDocumentSize,
final int maxMessageSize, final List<String> compressors,
@Nullable final BsonArray saslSupportedMechanisms) {
this(serviceId, connectionId, maxWireVersion, serverType, maxBatchCount, maxDocumentSize, maxMessageSize, compressors,
saslSupportedMechanisms, null);
}

private ConnectionDescription(@Nullable final ObjectId serviceId, final ConnectionId connectionId, final int maxWireVersion,
final ServerType serverType, final int maxBatchCount, final int maxDocumentSize,
final int maxMessageSize, final List<String> compressors,
@Nullable final BsonArray saslSupportedMechanisms, @Nullable final Integer logicalSessionTimeoutMinutes) {
this.serviceId = serviceId;
this.connectionId = connectionId;
this.serverType = serverType;
Expand All @@ -126,6 +158,7 @@ public ConnectionDescription(@Nullable final ObjectId serviceId, final Connectio
this.maxWireVersion = maxWireVersion;
this.compressors = notNull("compressors", Collections.unmodifiableList(new ArrayList<>(compressors)));
this.saslSupportedMechanisms = saslSupportedMechanisms;
this.logicalSessionTimeoutMinutes = logicalSessionTimeoutMinutes;
}
/**
* Creates a new connection description with the set connection id
Expand All @@ -137,7 +170,7 @@ public ConnectionDescription(@Nullable final ObjectId serviceId, final Connectio
public ConnectionDescription withConnectionId(final ConnectionId connectionId) {
notNull("connectionId", connectionId);
return new ConnectionDescription(serviceId, connectionId, maxWireVersion, serverType, maxBatchCount, maxDocumentSize,
maxMessageSize, compressors, saslSupportedMechanisms);
maxMessageSize, compressors, saslSupportedMechanisms, logicalSessionTimeoutMinutes);
}

/**
Expand All @@ -150,7 +183,7 @@ public ConnectionDescription withConnectionId(final ConnectionId connectionId) {
public ConnectionDescription withServiceId(final ObjectId serviceId) {
notNull("serviceId", serviceId);
return new ConnectionDescription(serviceId, connectionId, maxWireVersion, serverType, maxBatchCount, maxDocumentSize,
maxMessageSize, compressors, saslSupportedMechanisms);
maxMessageSize, compressors, saslSupportedMechanisms, logicalSessionTimeoutMinutes);
}

/**
Expand Down Expand Up @@ -248,6 +281,17 @@ public BsonArray getSaslSupportedMechanisms() {
return saslSupportedMechanisms;
}

/**
* Gets the session timeout in minutes.
*
* @return the session timeout in minutes, or null if sessions are not supported by this connection
* @mongodb.server.release 3.6
* @since 4.10
*/
@Nullable
public Integer getLogicalSessionTimeoutMinutes() {
return logicalSessionTimeoutMinutes;
}
/**
* Get the default maximum message size.
*
Expand Down Expand Up @@ -302,6 +346,9 @@ public boolean equals(final Object o) {
if (!compressors.equals(that.compressors)) {
return false;
}
if (!Objects.equals(logicalSessionTimeoutMinutes, that.logicalSessionTimeoutMinutes)) {
return false;
}
return Objects.equals(saslSupportedMechanisms, that.saslSupportedMechanisms);
}

Expand All @@ -316,6 +363,7 @@ public int hashCode() {
result = 31 * result + compressors.hashCode();
result = 31 * result + (serviceId != null ? serviceId.hashCode() : 0);
result = 31 * result + (saslSupportedMechanisms != null ? saslSupportedMechanisms.hashCode() : 0);
result = 31 * result + (logicalSessionTimeoutMinutes != null ? logicalSessionTimeoutMinutes.hashCode() : 0);
return result;
}

Expand All @@ -329,6 +377,7 @@ public String toString() {
+ ", maxDocumentSize=" + maxDocumentSize
+ ", maxMessageSize=" + maxMessageSize
+ ", compressors=" + compressors
+ ", logicialSessionTimeoutMinutes=" + logicalSessionTimeoutMinutes
+ ", serviceId=" + serviceId
+ '}';
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,14 @@ private List<BsonElement> getExtraElements(final SessionContext sessionContext)
if (sessionContext.getClusterTime() != null) {
extraElements.add(new BsonElement("$clusterTime", sessionContext.getClusterTime()));
}
if (sessionContext.hasSession() && responseExpected) {
extraElements.add(new BsonElement("lsid", sessionContext.getSessionId()));
if (sessionContext.hasSession()) {
if (!sessionContext.isImplicitSession() && !getSettings().isSessionSupported()) {
throw new MongoClientException("Attempting to use a ClientSession while connected to a server that doesn't support "
+ "sessions");
}
if (getSettings().isSessionSupported() && responseExpected) {
extraElements.add(new BsonElement("lsid", sessionContext.getSessionId()));
}
}
boolean firstMessageInTransaction = sessionContext.notifyMessageSent();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ static ConnectionDescription createConnectionDescription(final ClusterConnection
ConnectionDescription connectionDescription = new ConnectionDescription(connectionId,
getMaxWireVersion(helloResult), getServerType(helloResult), getMaxWriteBatchSize(helloResult),
getMaxBsonObjectSize(helloResult), getMaxMessageSizeBytes(helloResult), getCompressors(helloResult),
helloResult.getArray("saslSupportedMechs", null));
helloResult.getArray("saslSupportedMechs", null), getLogicalSessionTimeoutMinutes(helloResult));
if (helloResult.containsKey("connectionId")) {
ConnectionId newConnectionId =
connectionDescription.getConnectionId().withServerValue(helloResult.getNumber("connectionId").intValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public final class MessageSettings {
private final int maxBatchCount;
private final int maxWireVersion;
private final ServerType serverType;
private final boolean sessionSupported;

/**
* Gets the builder
Expand All @@ -56,6 +57,7 @@ public static final class Builder {
private int maxBatchCount = DEFAULT_MAX_BATCH_COUNT;
private int maxWireVersion;
private ServerType serverType;
private boolean sessionSupported;

/**
* Build it.
Expand Down Expand Up @@ -108,6 +110,11 @@ public Builder serverType(final ServerType serverType) {
this.serverType = serverType;
return this;
}

public Builder sessionSupported(final boolean sessionSupported) {
this.sessionSupported = sessionSupported;
return this;
}
}

/**
Expand Down Expand Up @@ -145,11 +152,17 @@ public ServerType getServerType() {
return serverType;
}

public boolean isSessionSupported() {
return sessionSupported;
}


private MessageSettings(final Builder builder) {
this.maxDocumentSize = builder.maxDocumentSize;
this.maxMessageSize = builder.maxMessageSize;
this.maxBatchCount = builder.maxBatchCount;
this.maxWireVersion = builder.maxWireVersion;
this.serverType = builder.serverType;
this.sessionSupported = builder.sessionSupported;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ static MessageSettings getMessageSettings(final ConnectionDescription connection
.maxBatchCount(connectionDescription.getMaxBatchCount())
.maxWireVersion(connectionDescription.getMaxWireVersion())
.serverType(connectionDescription.getServerType())
.sessionSupported(connectionDescription.getLogicalSessionTimeoutMinutes() != null)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ private CommandCreator getCommandCreator(final SessionContext sessionContext) {
putIfNotNull(commandDocument, "comment", getComment());
putIfNotNull(commandDocument, "let", getLet());

if (isRetryableWrite(isRetryWrites(), getWriteConcern(), serverDescription, connectionDescription, sessionContext)) {
if (isRetryableWrite(isRetryWrites(), getWriteConcern(), connectionDescription, sessionContext)) {
commandDocument.put("txnNumber", new BsonInt64(sessionContext.advanceTransactionNumber()));
}
return commandDocument;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import com.mongodb.bulk.BulkWriteUpsert;
import com.mongodb.bulk.WriteConcernError;
import com.mongodb.connection.ConnectionDescription;
import com.mongodb.connection.ServerDescription;
import com.mongodb.internal.bulk.DeleteRequest;
import com.mongodb.internal.bulk.UpdateRequest;
import com.mongodb.internal.bulk.WriteRequest;
Expand Down Expand Up @@ -96,8 +95,7 @@ public final class BulkWriteBatch {
private final BsonDocument variables;

static BulkWriteBatch createBulkWriteBatch(final MongoNamespace namespace,
final ServerDescription serverDescription,
final ConnectionDescription connectionDescription,
final ConnectionDescription connectionDescription,
final boolean ordered, final WriteConcern writeConcern,
final Boolean bypassDocumentValidation, final boolean retryWrites,
final List<? extends WriteRequest> writeRequests,
Expand All @@ -107,7 +105,7 @@ static BulkWriteBatch createBulkWriteBatch(final MongoNamespace namespace,
&& !writeConcern.isAcknowledged()) {
throw new MongoClientException("Unacknowledged writes are not supported when using an explicit session");
}
boolean canRetryWrites = isRetryableWrite(retryWrites, writeConcern, serverDescription, connectionDescription, sessionContext);
boolean canRetryWrites = isRetryableWrite(retryWrites, writeConcern, connectionDescription, sessionContext);
List<WriteRequestWithIndex> writeRequestsWithIndex = new ArrayList<>();
boolean writeRequestsAreRetryable = true;
for (int i = 0; i < writeRequests.size(); i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -407,8 +407,7 @@ static <T, R> R executeRetryableWrite(
return withSourceAndConnection(binding::getWriteConnectionSource, true, (source, connection) -> {
int maxWireVersion = connection.getDescription().getMaxWireVersion();
try {
retryState.breakAndThrowIfRetryAnd(() -> !canRetryWrite(source.getServerDescription(), connection.getDescription(),
binding.getSessionContext()));
retryState.breakAndThrowIfRetryAnd(() -> !canRetryWrite(connection.getDescription(), binding.getSessionContext()));
BsonDocument command = retryState.attachment(AttachmentKeys.command())
.map(previousAttemptCommand -> {
assertFalse(firstAttempt);
Expand Down Expand Up @@ -462,8 +461,8 @@ static <T, R> void executeRetryableWriteAsync(
SingleResultCallback<R> addingRetryableLabelCallback = firstAttempt
? releasingCallback
: addingRetryableLabelCallback(releasingCallback, maxWireVersion);
if (retryState.breakAndCompleteIfRetryAnd(() -> !canRetryWrite(source.getServerDescription(), connection.getDescription(),
binding.getSessionContext()), addingRetryableLabelCallback)) {
if (retryState.breakAndCompleteIfRetryAnd(() -> !canRetryWrite(connection.getDescription(), binding.getSessionContext()),
addingRetryableLabelCallback)) {
return;
}
BsonDocument command;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,14 +184,13 @@ public BulkWriteResult execute(final WriteBinding binding) {
retryState.attach(AttachmentKeys.maxWireVersion(), connectionDescription.getMaxWireVersion(), true);
SessionContext sessionContext = binding.getSessionContext();
WriteConcern writeConcern = getAppliedWriteConcern(sessionContext);
if (!isRetryableWrite(retryWrites, getAppliedWriteConcern(sessionContext),
source.getServerDescription(), connectionDescription, sessionContext)) {
if (!isRetryableWrite(retryWrites, getAppliedWriteConcern(sessionContext), connectionDescription, sessionContext)) {
handleMongoWriteConcernWithResponseException(retryState, true);
}
validateWriteRequests(connectionDescription, bypassDocumentValidation, writeRequests, writeConcern);
if (!retryState.attachment(AttachmentKeys.bulkWriteTracker()).orElseThrow(Assertions::fail).batch().isPresent()) {
BulkWriteTracker.attachNew(retryState, BulkWriteBatch.createBulkWriteBatch(namespace,
source.getServerDescription(), connectionDescription, ordered, writeConcern,
connectionDescription, ordered, writeConcern,
bypassDocumentValidation, retryWrites, writeRequests, sessionContext, comment, variables));
}
logRetryExecute(retryState);
Expand Down Expand Up @@ -220,9 +219,7 @@ public void executeAsync(final AsyncWriteBinding binding, final SingleResultCall
retryState.attach(AttachmentKeys.maxWireVersion(), connectionDescription.getMaxWireVersion(), true);
SessionContext sessionContext = binding.getSessionContext();
WriteConcern writeConcern = getAppliedWriteConcern(sessionContext);
if (!isRetryableWrite(retryWrites, getAppliedWriteConcern(sessionContext),
source.getServerDescription(),
connectionDescription, sessionContext)
if (!isRetryableWrite(retryWrites, getAppliedWriteConcern(sessionContext), connectionDescription, sessionContext)
&& handleMongoWriteConcernWithResponseExceptionAsync(retryState, releasingCallback)) {
return;
}
Expand All @@ -233,7 +230,7 @@ && handleMongoWriteConcernWithResponseExceptionAsync(retryState, releasingCallba
try {
if (!retryState.attachment(AttachmentKeys.bulkWriteTracker()).orElseThrow(Assertions::fail).batch().isPresent()) {
BulkWriteTracker.attachNew(retryState, BulkWriteBatch.createBulkWriteBatch(namespace,
source.getServerDescription(), connectionDescription, ordered, writeConcern,
connectionDescription, ordered, writeConcern,
bypassDocumentValidation, retryWrites, writeRequests, sessionContext, comment, variables));
}
} catch (Throwable t) {
Expand Down
Loading

0 comments on commit dfd6e7c

Please sign in to comment.