Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow configuring the monitoring protocol to use; use the polling protocol in a FaaS environment by default #1313

Merged
merged 15 commits into from
Feb 29, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,13 @@ class DefaultServerMonitor implements ServerMonitor {
private final ServerApi serverApi;
private final boolean faas;
private final ServerSettings serverSettings;
private final ServerMonitorRunnable monitor;
private final Thread monitorThread;
private final RoundTripTimeRunnable roundTripTimeMonitor;
private final ServerMonitor monitor;
/**
* Must be guarded by {@link #lock}.
*/
@Nullable
private RoundTripTimeMonitor roundTripTimeMonitor;
private final ExponentiallyWeightedMovingAverage averageRoundTripTime = new ExponentiallyWeightedMovingAverage(0.2);
private final Thread roundTripTimeMonitorThread;
Comment on lines -82 to -84
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes required both roundTripTimeMonitor & roundTripTimeMonitorThread to become null-able. I simplified that by replacing these two fields with a single field. However, this change lead to an inconsistent code having a single field roundTripTimeMonitor and two coupled fields monitor & monitorThread. To avoid such inconsistency, I also replaced the latter with a single field.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, the code looks cleaner now 👍

private final Lock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
private volatile boolean isClosed;
Expand All @@ -102,20 +104,26 @@ class DefaultServerMonitor implements ServerMonitor {
this.serverApi = serverApi;
this.faas = faas;
this.sdamProvider = sdamProvider;
monitor = new ServerMonitorRunnable();
monitorThread = new Thread(monitor, "cluster-" + this.serverId.getClusterId() + "-" + this.serverId.getAddress());
monitorThread.setDaemon(true);
roundTripTimeMonitor = new RoundTripTimeRunnable();
roundTripTimeMonitorThread = new Thread(roundTripTimeMonitor,
"cluster-rtt-" + this.serverId.getClusterId() + "-" + this.serverId.getAddress());
roundTripTimeMonitorThread.setDaemon(true);
monitor = new ServerMonitor();
roundTripTimeMonitor = null;
isClosed = false;
}

@Override
public void start() {
monitorThread.start();
roundTripTimeMonitorThread.start();
monitor.start();
}

private void ensureRoundTripTimeMonitorStarted() {
lock.lock();
vbabanin marked this conversation as resolved.
Show resolved Hide resolved
try {
if (roundTripTimeMonitor == null) {
roundTripTimeMonitor = new RoundTripTimeMonitor();
roundTripTimeMonitor.start();
}
} finally {
lock.unlock();
}
}

@Override
Expand All @@ -124,24 +132,35 @@ public void connect() {
}

@Override
@SuppressWarnings("try")
public void close() {
isClosed = true;
monitor.close();
monitorThread.interrupt();
roundTripTimeMonitor.close();
roundTripTimeMonitorThread.interrupt();
withLock(lock, () -> {
//noinspection EmptyTryBlock
try (ServerMonitor ignoredAutoClosed = monitor;
RoundTripTimeMonitor ignoredAutoClose2 = roundTripTimeMonitor) {
// we are automatically closing resources here
}
});
}

@Override
public void cancelCurrentCheck() {
monitor.cancelCurrentCheck();
}

class ServerMonitorRunnable implements Runnable {
class ServerMonitor extends Thread implements AutoCloseable {
private volatile InternalConnection connection = null;
private volatile boolean currentCheckCancelled;

void close() {
ServerMonitor() {
super("cluster-" + serverId.getClusterId() + "-" + serverId.getAddress());
setDaemon(true);
}

@Override
public void close() {
interrupt();
InternalConnection connection = this.connection;
if (connection != null) {
connection.close();
Expand All @@ -155,6 +174,10 @@ public void run() {
while (!isClosed) {
ServerDescription previousServerDescription = currentServerDescription;
currentServerDescription = lookupServerDescription(currentServerDescription);
boolean shouldStreamResponses = shouldStreamResponses(currentServerDescription);
if (shouldStreamResponses) {
ensureRoundTripTimeMonitorStarted();
}
jyemin marked this conversation as resolved.
Show resolved Hide resolved

if (isClosed) {
continue;
Expand All @@ -169,7 +192,7 @@ public void run() {
logStateChange(previousServerDescription, currentServerDescription);
sdamProvider.get().update(currentServerDescription);

if ((shouldStreamResponses(currentServerDescription) && currentServerDescription.getType() != UNKNOWN)
if ((shouldStreamResponses && currentServerDescription.getType() != UNKNOWN)
|| (connection != null && connection.hasMoreToCome())
|| (currentServerDescription.getException() instanceof MongoSocketException
&& previousServerDescription.getType() != UNKNOWN)) {
Expand Down Expand Up @@ -202,16 +225,17 @@ private ServerDescription lookupServerDescription(final ServerDescription curren
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(format("Checking status of %s", serverId.getAddress()));
}
boolean shouldStreamResponses = shouldStreamResponses(currentServerDescription);
serverMonitorListener.serverHearbeatStarted(new ServerHeartbeatStartedEvent(
connection.getDescription().getConnectionId(), shouldStreamResponses(currentServerDescription)));
connection.getDescription().getConnectionId(), shouldStreamResponses));

long start = System.nanoTime();
try {
SessionContext sessionContext = NoOpSessionContext.INSTANCE;
if (!connection.hasMoreToCome()) {
BsonDocument helloDocument = new BsonDocument(getHandshakeCommandName(currentServerDescription), new BsonInt32(1))
.append("helloOk", BsonBoolean.TRUE);
if (shouldStreamResponses(currentServerDescription)) {
if (shouldStreamResponses) {
helloDocument.append("topologyVersion", assertNotNull(currentServerDescription.getTopologyVersion()).asDocument());
helloDocument.append("maxAwaitTimeMS", new BsonInt64(serverSettings.getHeartbeatFrequency(MILLISECONDS)));
}
Expand All @@ -221,23 +245,26 @@ private ServerDescription lookupServerDescription(final ServerDescription curren
}

BsonDocument helloResult;
if (shouldStreamResponses(currentServerDescription)) {
if (shouldStreamResponses) {
helloResult = connection.receive(new BsonDocumentCodec(), sessionContext,
Math.toIntExact(serverSettings.getHeartbeatFrequency(MILLISECONDS)));
} else {
helloResult = connection.receive(new BsonDocumentCodec(), sessionContext);
}

long elapsedTimeNanos = System.nanoTime() - start;
if (!shouldStreamResponses) {
averageRoundTripTime.addSample(elapsedTimeNanos);
}
serverMonitorListener.serverHeartbeatSucceeded(
new ServerHeartbeatSucceededEvent(connection.getDescription().getConnectionId(), helloResult,
elapsedTimeNanos, shouldStreamResponses(currentServerDescription)));
elapsedTimeNanos, shouldStreamResponses));

return createServerDescription(serverId.getAddress(), helloResult, averageRoundTripTime.getAverage());
} catch (Exception e) {
serverMonitorListener.serverHeartbeatFailed(
new ServerHeartbeatFailedEvent(connection.getDescription().getConnectionId(), System.nanoTime() - start,
shouldStreamResponses(currentServerDescription), e));
shouldStreamResponses, e));
throw e;
}
} catch (Throwable t) {
Expand Down Expand Up @@ -399,10 +426,17 @@ static boolean shouldLogStageChange(final ServerDescription previous, final Serv
}


private class RoundTripTimeRunnable implements Runnable {
private class RoundTripTimeMonitor extends Thread implements AutoCloseable {
private volatile InternalConnection connection = null;

void close() {
RoundTripTimeMonitor() {
super("cluster-rtt-" + serverId.getClusterId() + "-" + serverId.getAddress());
setDaemon(true);
}

@Override
public void close() {
interrupt();
InternalConnection connection = this.connection;
if (connection != null) {
connection.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class DefaultServerMonitorSpecification extends Specification {

when:
monitor.close()
monitor.monitorThread.join()
monitor.monitor.join()

then:
!stateChanged
Expand Down