Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import static com.mongodb.connection.ServerType.REPLICA_SET_GHOST;
import static com.mongodb.connection.ServerType.SHARD_ROUTER;
import static com.mongodb.connection.ServerType.STANDALONE;
import static com.mongodb.internal.operation.ServerVersionHelper.SIX_DOT_ZERO_WIRE_VERSION;
import static java.lang.String.format;

public abstract class AbstractMultiServerCluster extends BaseCluster {
Expand Down Expand Up @@ -224,9 +225,7 @@ private boolean handleReplicaSetMemberChanged(final ServerDescription newDescrip
}

if (newDescription.getType() == REPLICA_SET_GHOST) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info(format("Server %s does not appear to be a member of an initiated replica set.", newDescription.getAddress()));
}
LOGGER.info(format("Server %s does not appear to be a member of an initiated replica set.", newDescription.getAddress()));
return true;
}

Expand All @@ -247,46 +246,43 @@ private boolean handleReplicaSetMemberChanged(final ServerDescription newDescrip
if (newDescription.getCanonicalAddress() != null
&& !newDescription.getAddress().equals(new ServerAddress(newDescription.getCanonicalAddress()))
&& !newDescription.isPrimary()) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info(format("Canonical address %s does not match server address. Removing %s from client view of cluster",
newDescription.getCanonicalAddress(), newDescription.getAddress()));
}
LOGGER.info(format("Canonical address %s does not match server address. Removing %s from client view of cluster",
newDescription.getCanonicalAddress(), newDescription.getAddress()));
removeServer(newDescription.getAddress());
return true;
}

if (newDescription.isPrimary()) {
ObjectId electionId = newDescription.getElectionId();
Integer setVersion = newDescription.getSetVersion();
if (setVersion != null && electionId != null) {
if (isStalePrimary(newDescription)) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info(format("Invalidating potential primary %s whose (set version, election id) tuple of (%d, %s) "
+ "is less than one already seen of (%d, %s)",
newDescription.getAddress(),
setVersion, electionId,
maxSetVersion, maxElectionId));
}
addressToServerTupleMap.get(newDescription.getAddress()).server.resetToConnecting();
if (newDescription.getMaxWireVersion() >= SIX_DOT_ZERO_WIRE_VERSION) {
if (nullSafeCompareTo(electionId, maxElectionId) > 0
|| (nullSafeCompareTo(electionId, maxElectionId) == 0 && nullSafeCompareTo(setVersion, maxSetVersion) >= 0)) {
LOGGER.info(format("Setting max election id to %s and max set version to %d from replica set primary %s",
newDescription.getElectionId(), newDescription.getSetVersion(), newDescription.getAddress()));
maxElectionId = newDescription.getElectionId();
maxSetVersion = newDescription.getSetVersion();
} else {
invalidatePotentialPrimary(newDescription);
return false;
}

if (!electionId.equals(maxElectionId)) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info(format("Setting max election id to %s from replica set primary %s", electionId,
newDescription.getAddress()));
} else {
if (setVersion != null && electionId != null) {
if (nullSafeCompareTo(maxSetVersion, setVersion) <= 0
&& (nullSafeCompareTo(maxSetVersion, setVersion) != 0 || nullSafeCompareTo(maxElectionId, electionId) <= 0)) {
LOGGER.info(format("Setting max election id to %s from replica set primary %s",
newDescription.getElectionId(), newDescription.getAddress()));
maxElectionId = newDescription.getElectionId();
} else {
invalidatePotentialPrimary(newDescription);
return false;
}
maxElectionId = electionId;
}
}

if (setVersion != null
&& (maxSetVersion == null || setVersion.compareTo(maxSetVersion) > 0)) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info(format("Setting max set version to %d from replica set primary %s", setVersion,
newDescription.getAddress()));
if (nullSafeCompareTo(setVersion, maxSetVersion) > 0) {
LOGGER.info(format("Setting max set version to %d from replica set primary %s",
newDescription.getSetVersion(), newDescription.getAddress()));
maxSetVersion = setVersion;
}
maxSetVersion = setVersion;
}

if (isNotAlreadyPrimary(newDescription.getAddress())) {
Expand All @@ -297,14 +293,26 @@ private boolean handleReplicaSetMemberChanged(final ServerDescription newDescrip
return true;
}

private boolean isStalePrimary(final ServerDescription newDescription) {
if (maxSetVersion == null || maxElectionId == null) {
return false;
}
private void invalidatePotentialPrimary(final ServerDescription newDescription) {
LOGGER.info(format("Invalidating potential primary %s whose (set version, election id) tuple of (%d, %s) "
+ "is less than one already seen of (%d, %s)",
newDescription.getAddress(), newDescription.getSetVersion(), newDescription.getElectionId(),
maxSetVersion, maxElectionId));
addressToServerTupleMap.get(newDescription.getAddress()).server.resetToConnecting();
}

Integer setVersion = newDescription.getSetVersion();
return (setVersion == null || maxSetVersion.compareTo(setVersion) > 0
|| (maxSetVersion.equals(setVersion) && maxElectionId.compareTo(newDescription.getElectionId()) > 0));
/**
* Implements the same contract as {@link Comparable#compareTo(Object)}, except that a null value is always considers less-than any
* other value (except null, which it considers as equal-to).
*/
private static <T extends Comparable<T>> int nullSafeCompareTo(final T first, final T second) {
if (first == null) {
return second == null ? 0 : -1;
}
if (second == null) {
return 1;
}
return first.compareTo(second);
}

private boolean isNotAlreadyPrimary(final ServerAddress address) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public final class ServerVersionHelper {
public static final int FOUR_DOT_TWO_WIRE_VERSION = 8;
public static final int FOUR_DOT_FOUR_WIRE_VERSION = 9;
public static final int FIVE_DOT_ZERO_WIRE_VERSION = 12;
public static final int SIX_DOT_ZERO_WIRE_VERSION = 17;

public static boolean serverIsAtLeastVersionFourDotZero(final ConnectionDescription description) {
return description.getMaxWireVersion() >= FOUR_DOT_ZERO_WIRE_VERSION;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
{
"description" : "ElectionId is considered higher precedence than setVersion",
"uri" : "mongodb://a/?replicaSet=rs",
"phases" : [
{
"responses" : [
[
"a:27017",
{
"ok" : 1,
"helloOk" : true,
"isWritablePrimary" : true,
"hosts" : [
"a:27017",
"b:27017"
],
"setName" : "rs",
"setVersion" : 1,
"electionId" : {
"$oid" : "000000000000000000000001"
},
"minWireVersion" : 0,
"maxWireVersion" : 17
}
],
[
"b:27017",
{
"ok" : 1,
"helloOk" : true,
"isWritablePrimary" : true,
"hosts" : [
"a:27017",
"b:27017"
],
"setName" : "rs",
"setVersion" : 2,
"electionId" : {
"$oid" : "000000000000000000000001"
},
"minWireVersion" : 0,
"maxWireVersion" : 17
}
],
[
"a:27017",
{
"ok" : 1,
"helloOk" : true,
"isWritablePrimary" : true,
"hosts" : [
"a:27017",
"b:27017"
],
"setName" : "rs",
"setVersion" : 1,
"electionId" : {
"$oid" : "000000000000000000000002"
},
"minWireVersion" : 0,
"maxWireVersion" : 17
}
]
],
"outcome" : {
"servers" : {
"a:27017" : {
"type" : "RSPrimary",
"setName" : "rs",
"setVersion" : 1,
"electionId" : {
"$oid" : "000000000000000000000002"
}
},
"b:27017" : {
"type" : "Unknown",
"setName" : null,
"setVersion" : null,
"electionId" : null
}
},
"topologyType" : "ReplicaSetWithPrimary",
"logicalSessionTimeoutMinutes" : null,
"setName" : "rs",
"maxSetVersion" : 1,
"maxElectionId" : {
"$oid" : "000000000000000000000002"
}
}
}
]
}
Loading