Skip to content

Don't gossip cluster time from monitoring connections #1276

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jan 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.mongodb.connection.ClusterConnectionMode;
import com.mongodb.internal.IgnorableRequestContext;
import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.internal.session.SessionContext;
import com.mongodb.internal.validator.NoOpFieldNameValidator;
import com.mongodb.lang.Nullable;
import org.bson.BsonDocument;
Expand All @@ -46,20 +45,14 @@ public final class CommandHelper {

static BsonDocument executeCommand(final String database, final BsonDocument command, final ClusterConnectionMode clusterConnectionMode,
@Nullable final ServerApi serverApi, final InternalConnection internalConnection) {
return sendAndReceive(database, command, null, clusterConnectionMode, serverApi, internalConnection);
}

public static BsonDocument executeCommand(final String database, final BsonDocument command, final ClusterClock clusterClock,
final ClusterConnectionMode clusterConnectionMode, @Nullable final ServerApi serverApi,
final InternalConnection internalConnection) {
return sendAndReceive(database, command, clusterClock, clusterConnectionMode, serverApi, internalConnection);
return sendAndReceive(database, command, clusterConnectionMode, serverApi, internalConnection);
}

static BsonDocument executeCommandWithoutCheckingForFailure(final String database, final BsonDocument command,
final ClusterConnectionMode clusterConnectionMode, @Nullable final ServerApi serverApi,
final InternalConnection internalConnection) {
try {
return sendAndReceive(database, command, null, clusterConnectionMode, serverApi, internalConnection);
return sendAndReceive(database, command, clusterConnectionMode, serverApi, internalConnection);
} catch (MongoServerException e) {
return new BsonDocument();
}
Expand Down Expand Up @@ -94,14 +87,11 @@ static boolean isCommandOk(final BsonDocument response) {
}

private static BsonDocument sendAndReceive(final String database, final BsonDocument command,
@Nullable final ClusterClock clusterClock,
final ClusterConnectionMode clusterConnectionMode, @Nullable final ServerApi serverApi,
final InternalConnection internalConnection) {
SessionContext sessionContext = clusterClock == null ? NoOpSessionContext.INSTANCE
: new ClusterClockAdvancingSessionContext(NoOpSessionContext.INSTANCE, clusterClock);
return assertNotNull(internalConnection.sendAndReceive(getCommandMessage(database, command, internalConnection,
clusterConnectionMode, serverApi), new BsonDocumentCodec(), sessionContext, IgnorableRequestContext.INSTANCE,
new OperationContext()));
clusterConnectionMode, serverApi), new BsonDocumentCodec(), NoOpSessionContext.INSTANCE,
IgnorableRequestContext.INSTANCE, new OperationContext()));
}

private static CommandMessage getCommandMessage(final String database, final BsonDocument command,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public ClusterableServer create(final Cluster cluster, final ServerAddress serve
ServerId serverId = new ServerId(cluster.getClusterId(), serverAddress);
ClusterConnectionMode clusterMode = cluster.getSettings().getMode();
SameObjectProvider<SdamServerDescriptionManager> sdamProvider = SameObjectProvider.uninitialized();
ServerMonitor serverMonitor = new DefaultServerMonitor(serverId, serverSettings, cluster.getClock(),
ServerMonitor serverMonitor = new DefaultServerMonitor(serverId, serverSettings,
// no credentials, compressor list, or command listener for the server monitor factory
new InternalStreamConnectionFactory(clusterMode, true, heartbeatStreamFactory, null, applicationName,
mongoDriverInformation, emptyList(), loggerSettings, null, serverApi),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ class DefaultServerMonitor implements ServerMonitor {

private final ServerId serverId;
private final ServerMonitorListener serverMonitorListener;
private final ClusterClock clusterClock;
private final Provider<SdamServerDescriptionManager> sdamProvider;
private final InternalConnectionFactory internalConnectionFactory;
private final ClusterConnectionMode clusterConnectionMode;
Expand All @@ -88,15 +87,13 @@ class DefaultServerMonitor implements ServerMonitor {
private volatile boolean isClosed;

DefaultServerMonitor(final ServerId serverId, final ServerSettings serverSettings,
final ClusterClock clusterClock,
final InternalConnectionFactory internalConnectionFactory,
final InternalConnectionFactory internalConnectionFactory,
final ClusterConnectionMode clusterConnectionMode,
@Nullable final ServerApi serverApi,
final Provider<SdamServerDescriptionManager> sdamProvider) {
this.serverSettings = notNull("serverSettings", serverSettings);
this.serverId = notNull("serverId", serverId);
this.serverMonitorListener = singleServerMonitorListener(serverSettings);
this.clusterClock = notNull("clusterClock", clusterClock);
this.internalConnectionFactory = notNull("internalConnectionFactory", internalConnectionFactory);
this.clusterConnectionMode = notNull("clusterConnectionMode", clusterConnectionMode);
this.serverApi = serverApi;
Expand Down Expand Up @@ -206,7 +203,7 @@ private ServerDescription lookupServerDescription(final ServerDescription curren

long start = System.nanoTime();
try {
SessionContext sessionContext = new ClusterClockAdvancingSessionContext(NoOpSessionContext.INSTANCE, clusterClock);
SessionContext sessionContext = NoOpSessionContext.INSTANCE;
if (!connection.hasMoreToCome()) {
BsonDocument helloDocument = new BsonDocument(getHandshakeCommandName(currentServerDescription), new BsonInt32(1))
.append("helloOk", BsonBoolean.TRUE);
Expand Down Expand Up @@ -432,7 +429,7 @@ private void pingServer(final InternalConnection connection) {
long start = System.nanoTime();
executeCommand("admin",
new BsonDocument(getHandshakeCommandName(connection.getInitialServerDescription()), new BsonInt32(1)),
clusterClock, clusterConnectionMode, serverApi, connection);
clusterConnectionMode, serverApi, connection);
long elapsedTimeNanos = System.nanoTime() - start;
averageRoundTripTime.addSample(elapsedTimeNanos);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,13 @@ package com.mongodb.internal.connection

import com.mongodb.LoggerSettings
import com.mongodb.MongoCommandException
import com.mongodb.ServerAddress
import com.mongodb.connection.ClusterConnectionMode
import com.mongodb.connection.ClusterId
import com.mongodb.connection.ConnectionDescription
import com.mongodb.connection.ConnectionId
import com.mongodb.connection.ServerId
import com.mongodb.connection.ServerType
import com.mongodb.connection.SocketSettings
import com.mongodb.internal.connection.netty.NettyStreamFactory
import org.bson.BsonDocument
import org.bson.BsonInt32
import org.bson.BsonTimestamp
import spock.lang.Specification

import java.util.concurrent.CountDownLatch
Expand All @@ -40,7 +35,6 @@ import static com.mongodb.ClusterFixture.getCredentialWithCache
import static com.mongodb.ClusterFixture.getPrimary
import static com.mongodb.ClusterFixture.getServerApi
import static com.mongodb.ClusterFixture.getSslSettings
import static com.mongodb.internal.connection.CommandHelper.executeCommand
import static com.mongodb.internal.connection.CommandHelper.executeCommandAsync

class CommandHelperSpecification extends Specification {
Expand All @@ -58,24 +52,6 @@ class CommandHelperSpecification extends Specification {
connection?.close()
}

def 'should gossip cluster time'() {
given:
def connection = Mock(InternalStreamConnection) {
getDescription() >> new ConnectionDescription(new ConnectionId(new ServerId(new ClusterId(), new ServerAddress())),
6, ServerType.REPLICA_SET_PRIMARY, 1000, 1000, 1000, [])
}
def clusterClock = new ClusterClock()
clusterClock.advance(new BsonDocument('clusterTime', new BsonTimestamp(42L)))

when:
executeCommand('admin', new BsonDocument(LEGACY_HELLO, new BsonInt32(1)), clusterClock, getClusterConnectionMode(),
getServerApi(), connection)

then:
1 * connection.sendAndReceive(_, _, _ as ClusterClockAdvancingSessionContext, _, _) >> new BsonDocument()
}


def 'should execute command asynchronously'() {
when:
BsonDocument receivedDocument = null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,6 @@ class ServerMonitorSpecification extends OperationFunctionalSpecification {
}
}
serverMonitor = new DefaultServerMonitor(new ServerId(new ClusterId(), address), ServerSettings.builder().build(),
new ClusterClock(),
new InternalStreamConnectionFactory(SINGLE, new SocketStreamFactory(new DefaultInetAddressResolver(),
SocketSettings.builder().connectTimeout(500, TimeUnit.MILLISECONDS).build(), getSslSettings()),
getCredentialWithCache(), null, null, [], LoggerSettings.builder().build(), null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class DefaultServerMonitorSpecification extends Specification {
}
}
monitor = new DefaultServerMonitor(new ServerId(new ClusterId(), new ServerAddress()), ServerSettings.builder().build(),
new ClusterClock(), internalConnectionFactory, ClusterConnectionMode.SINGLE, null, SameObjectProvider.initialized(sdam))
internalConnectionFactory, ClusterConnectionMode.SINGLE, null, SameObjectProvider.initialized(sdam))
monitor.start()

when:
Expand Down Expand Up @@ -167,7 +167,7 @@ class DefaultServerMonitorSpecification extends Specification {
}
monitor = new DefaultServerMonitor(new ServerId(new ClusterId(), new ServerAddress()),
ServerSettings.builder().heartbeatFrequency(1, TimeUnit.SECONDS).addServerMonitorListener(serverMonitorListener).build(),
new ClusterClock(), internalConnectionFactory, ClusterConnectionMode.SINGLE, null, mockSdamProvider())
internalConnectionFactory, ClusterConnectionMode.SINGLE, null, mockSdamProvider())

when:
monitor.start()
Expand Down Expand Up @@ -246,7 +246,7 @@ class DefaultServerMonitorSpecification extends Specification {
}
monitor = new DefaultServerMonitor(new ServerId(new ClusterId(), new ServerAddress()),
ServerSettings.builder().heartbeatFrequency(1, TimeUnit.SECONDS).addServerMonitorListener(serverMonitorListener).build(),
new ClusterClock(), internalConnectionFactory, ClusterConnectionMode.SINGLE, null, mockSdamProvider())
internalConnectionFactory, ClusterConnectionMode.SINGLE, null, mockSdamProvider())

when:
monitor.start()
Expand Down