Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow configuring the monitoring protocol to use; use the polling protocol in a FaaS environment by default #1313

Merged
merged 15 commits into from
Feb 29, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,30 @@
*/
public final class ServerHeartbeatStartedEvent {
private final ConnectionId connectionId;
private final boolean awaited;

/**
* Construct an instance.
*
* @param connectionId the non-null connnectionId
* @param awaited {@code true} if and only if the heartbeat is for an awaitable `hello` / legacy hello.
* @since 5.1
*/
public ServerHeartbeatStartedEvent(final ConnectionId connectionId) {
public ServerHeartbeatStartedEvent(final ConnectionId connectionId, final boolean awaited) {
this.connectionId = notNull("connectionId", connectionId);
this.awaited = awaited;
}

/**
* Construct an instance.
*
* @param connectionId the non-null connnectionId
* @deprecated Prefer {@link #ServerHeartbeatStartedEvent(ConnectionId, boolean)}.
* If this constructor is used then {@link #isAwaited()} is {@code false}.
*/
@Deprecated
public ServerHeartbeatStartedEvent(final ConnectionId connectionId) {
this(connectionId, false);
}

/**
Expand All @@ -46,12 +62,23 @@ public ConnectionId getConnectionId() {
return connectionId;
}

/**
* Gets whether the heartbeat is for an awaitable `hello` / legacy hello.
*
* @return {@code true} if and only if the heartbeat is for an awaitable `hello` / legacy hello.
jyemin marked this conversation as resolved.
Show resolved Hide resolved
* @since 5.1
*/
public boolean isAwaited() {
return awaited;
}

@Override
public String toString() {
return "ServerHeartbeatStartedEvent{"
+ "connectionId=" + connectionId
+ ", server=" + connectionId.getServerId().getAddress()
+ ", clusterId=" + connectionId.getServerId().getClusterId()
+ ", awaited=" + awaited
+ "} " + super.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,7 @@ public void run() {
logStateChange(previousServerDescription, currentServerDescription);
sdamProvider.get().update(currentServerDescription);

if (((connection == null || shouldStreamResponses(currentServerDescription))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this check "connection == null" not needed anymore?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought I had proven to myself that the check was redundant by transforming to

                    boolean shouldStream = currentServerDescription.getTopologyVersion() != null;
                    if (((connection == null || shouldStream)
                            && shouldStream && currentServerDescription.getType() != UNKNOWN)
                            || (connection != null && connection.hasMoreToCome())
                            || (currentServerDescription.getException() instanceof MongoSocketException
                            && previousServerDescription.getType() != UNKNOWN)) {
                        continue;
                    }

and then allowing Intellij to simplify the expression, but when I tried it again IntelliJ didn't offer the simplification.

This is a truly horrible conditional, and has been the source of at least one bug that I remember.

Copy link
Member Author

@stIncMale stIncMale Feb 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this check "connection == null" not needed anymore?

It had no effect, so I removed it. The following demonstrates why the connection == null expression had no effect.

Here is the original conditional, but reformatted to simplify perception:

(
    (connection == null || shouldStreamResponses(currentServerDescription))
    && currentServerDescription.getTopologyVersion() != null
    && currentServerDescription.getType() != UNKNOWN
)
||
(connection != null && connection.hasMoreToCome())
||
(
    currentServerDescription.getException() instanceof MongoSocketException
    && previousServerDescription.getType() != UNKNOWN
)

It consists of three Boolean expressions ORed (||) together. Only the first one of those is modified in the PR.

  1. Let's alias the connection == null expression as x.
  2. Note that the expressions shouldStreamResponses(currentServerDescription) and currentServerDescription.getTopologyVersion() != null are equivalent (well, were equivalent at the time of the commit that simplified the expression). Let's alias them as s.

Now the first ORed Boolean expression can be written as

(
    (x || s)
    && s
    && currentServerDescription.getType() != UNKNOWN
)

If s is false, this expression evaluates to false. If s is true, this expression evaluates to the same value as currentServerDescription.getType() != UNKNOWN. As you can see, x affects the outcome is neither case, and the expression can be simplified to

(
    s
    && currentServerDescription.getType() != UNKNOWN
)

Copy link
Member

@vbabanin vbabanin Feb 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the clarification, @stIncMale. Now i see, It is indeed redundant check.

i am curious about the original purpose of this check, given it wasn't functional. My assumption was that it might have aimed to bypass waitForNext() on lookup failure as connection is nullified in lookup method. However, the presence of (currentServerDescription.getException() instanceof MongoSocketException && previousServerDescription.getType() != UNKNOWN)seems to cover scenario aligning with the spec, which mandates immediate retry on network errors when the server was previously in a known state.

#. If this was a network error and the server was in a known state before the error, the client MUST NOT sleep and MUST begin the next check immediately.

Feel free to resolve the conversation if @jyemin has no further comments.

&& currentServerDescription.getTopologyVersion() != null && currentServerDescription.getType() != UNKNOWN)
if ((shouldStreamResponses(currentServerDescription) && currentServerDescription.getType() != UNKNOWN)
|| (connection != null && connection.hasMoreToCome())
|| (currentServerDescription.getException() instanceof MongoSocketException
&& previousServerDescription.getType() != UNKNOWN)) {
Expand Down Expand Up @@ -199,7 +198,8 @@ private ServerDescription lookupServerDescription(final ServerDescription curren
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(format("Checking status of %s", serverId.getAddress()));
}
serverMonitorListener.serverHearbeatStarted(new ServerHeartbeatStartedEvent(connection.getDescription().getConnectionId()));
serverMonitorListener.serverHearbeatStarted(new ServerHeartbeatStartedEvent(
connection.getDescription().getConnectionId(), shouldStreamResponses(currentServerDescription)));

long start = System.nanoTime();
try {
Expand Down Expand Up @@ -227,13 +227,13 @@ private ServerDescription lookupServerDescription(final ServerDescription curren
long elapsedTimeNanos = System.nanoTime() - start;
serverMonitorListener.serverHeartbeatSucceeded(
new ServerHeartbeatSucceededEvent(connection.getDescription().getConnectionId(), helloResult,
elapsedTimeNanos, currentServerDescription.getTopologyVersion() != null));
elapsedTimeNanos, shouldStreamResponses(currentServerDescription)));

return createServerDescription(serverId.getAddress(), helloResult, averageRoundTripTime.getAverage());
} catch (Exception e) {
serverMonitorListener.serverHeartbeatFailed(
new ServerHeartbeatFailedEvent(connection.getDescription().getConnectionId(), System.nanoTime() - start,
currentServerDescription.getTopologyVersion() != null, e));
shouldStreamResponses(currentServerDescription), e));
throw e;
}
} catch (Throwable t) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public void testEventsPublished() throws InterruptedException {
listener.clusterDescriptionChanged(clusterDescriptionChangedEvent);
assertEquals(clusterDescriptionChangedEvent, targetListener.take());

ServerHeartbeatStartedEvent serverHeartbeatStartedEvent = new ServerHeartbeatStartedEvent(connectionId);
ServerHeartbeatStartedEvent serverHeartbeatStartedEvent = new ServerHeartbeatStartedEvent(connectionId, false);
listener.serverHearbeatStarted(serverHeartbeatStartedEvent);
assertEquals(serverHeartbeatStartedEvent, targetListener.take());

Expand Down