Skip to content

Commit

Permalink
fix(NODE-4159,NODE-4512): remove servers with incorrect setName from …
Browse files Browse the repository at this point in the history
…topology and fix unix socket parsing (#3348)
  • Loading branch information
nbbeeken authored Aug 10, 2022
1 parent 38403d0 commit 00dcf2d
Show file tree
Hide file tree
Showing 11 changed files with 199 additions and 179 deletions.
9 changes: 3 additions & 6 deletions src/sdam/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import {
MongoNetworkError,
MongoNetworkTimeoutError,
MongoServerClosedError,
MongoServerError,
MongoUnexpectedServerResponseError,
needsRetryableWriteLabel
} from '../error';
Expand Down Expand Up @@ -385,7 +386,7 @@ function calculateRoundTripTime(oldRtt: number, duration: number): number {
return alpha * duration + (1 - alpha) * oldRtt;
}

function markServerUnknown(server: Server, error?: MongoError) {
function markServerUnknown(server: Server, error?: MongoServerError) {
// Load balancer servers can never be marked unknown.
if (server.loadBalanced) {
return;
Expand All @@ -397,11 +398,7 @@ function markServerUnknown(server: Server, error?: MongoError) {

server.emit(
Server.DESCRIPTION_RECEIVED,
new ServerDescription(server.description.hostAddress, undefined, {
error,
topologyVersion:
error && error.topologyVersion ? error.topologyVersion : server.description.topologyVersion
})
new ServerDescription(server.description.hostAddress, undefined, { error })
);
}

Expand Down
104 changes: 37 additions & 67 deletions src/sdam/server_description.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Document, Long, ObjectId } from '../bson';
import type { MongoError } from '../error';
import { MongoRuntimeError, MongoServerError } from '../error';
import { arrayStrictEqual, compareObjectId, errorStrictEqual, HostAddress, now } from '../utils';
import type { ClusterTime } from './common';
import { ServerType } from './common';
Expand Down Expand Up @@ -31,14 +31,11 @@ export type TagSet = { [key: string]: string };
/** @internal */
export interface ServerDescriptionOptions {
/** An Error used for better reporting debugging */
error?: MongoError;
error?: MongoServerError;

/** The round trip time to ping this server (in ms) */
roundTripTime?: number;

/** The topologyVersion */
topologyVersion?: TopologyVersion;

/** If the client is in load balancing mode. */
loadBalanced?: boolean;
}
Expand All @@ -50,28 +47,25 @@ export interface ServerDescriptionOptions {
* @public
*/
export class ServerDescription {
private _hostAddress: HostAddress;
address: string;
type: ServerType;
hosts: string[];
passives: string[];
arbiters: string[];
tags: TagSet;

error?: MongoError;
topologyVersion?: TopologyVersion;
error: MongoServerError | null;
topologyVersion: TopologyVersion | null;
minWireVersion: number;
maxWireVersion: number;
roundTripTime: number;
lastUpdateTime: number;
lastWriteDate: number;

me?: string;
primary?: string;
setName?: string;
setVersion?: number;
electionId?: ObjectId;
logicalSessionTimeoutMinutes?: number;
me: string | null;
primary: string | null;
setName: string | null;
setVersion: number | null;
electionId: ObjectId | null;
logicalSessionTimeoutMinutes: number | null;

// NOTE: does this belong here? It seems we should gossip the cluster time at the CMAP level
$clusterTime?: ClusterTime;
Expand All @@ -83,14 +77,19 @@ export class ServerDescription {
* @param address - The address of the server
* @param hello - An optional hello response for this server
*/
constructor(address: HostAddress | string, hello?: Document, options?: ServerDescriptionOptions) {
if (typeof address === 'string') {
this._hostAddress = new HostAddress(address);
this.address = this._hostAddress.toString();
} else {
this._hostAddress = address;
this.address = this._hostAddress.toString();
constructor(
address: HostAddress | string,
hello?: Document,
options: ServerDescriptionOptions = {}
) {
if (address == null || address === '') {
throw new MongoRuntimeError('ServerDescription must be provided with a non-empty address');
}

this.address =
typeof address === 'string'
? HostAddress.fromString(address).toString(false) // Use HostAddress to normalize
: address.toString(false);
this.type = parseServerType(hello, options);
this.hosts = hello?.hosts?.map((host: string) => host.toLowerCase()) ?? [];
this.passives = hello?.passives?.map((host: string) => host.toLowerCase()) ?? [];
Expand All @@ -101,50 +100,20 @@ export class ServerDescription {
this.roundTripTime = options?.roundTripTime ?? -1;
this.lastUpdateTime = now();
this.lastWriteDate = hello?.lastWrite?.lastWriteDate ?? 0;

if (options?.topologyVersion) {
this.topologyVersion = options.topologyVersion;
} else if (hello?.topologyVersion) {
// TODO(NODE-2674): Preserve int64 sent from MongoDB
this.topologyVersion = hello.topologyVersion;
}

if (options?.error) {
this.error = options.error;
}

if (hello?.primary) {
this.primary = hello.primary;
}

if (hello?.me) {
this.me = hello.me.toLowerCase();
}

if (hello?.setName) {
this.setName = hello.setName;
}

if (hello?.setVersion) {
this.setVersion = hello.setVersion;
}

if (hello?.electionId) {
this.electionId = hello.electionId;
}

if (hello?.logicalSessionTimeoutMinutes) {
this.logicalSessionTimeoutMinutes = hello.logicalSessionTimeoutMinutes;
}

if (hello?.$clusterTime) {
this.$clusterTime = hello.$clusterTime;
}
this.error = options.error ?? null;
// TODO(NODE-2674): Preserve int64 sent from MongoDB
this.topologyVersion = this.error?.topologyVersion ?? hello?.topologyVersion ?? null;
this.setName = hello?.setName ?? null;
this.setVersion = hello?.setVersion ?? null;
this.electionId = hello?.electionId ?? null;
this.logicalSessionTimeoutMinutes = hello?.logicalSessionTimeoutMinutes ?? null;
this.primary = hello?.primary ?? null;
this.me = hello?.me?.toLowerCase() ?? null;
this.$clusterTime = hello?.$clusterTime ?? null;
}

get hostAddress(): HostAddress {
if (this._hostAddress) return this._hostAddress;
else return new HostAddress(this.address);
return HostAddress.fromString(this.address);
}

get allHosts(): string[] {
Expand Down Expand Up @@ -181,7 +150,8 @@ export class ServerDescription {
* in the {@link https://github.com/mongodb/specifications/blob/master/source/server-discovery-and-monitoring/server-discovery-and-monitoring.rst#serverdescription|SDAM spec}
*/
equals(other?: ServerDescription | null): boolean {
// TODO(NODE-4510): Check ServerDescription equality logic for nullish topologyVersion meaning "greater than"
// Despite using the comparator that would determine a nullish topologyVersion as greater than
// for equality we should only always perform direct equality comparison
const topologyVersionsEqual =
this.topologyVersion === other?.topologyVersion ||
compareTopologyVersion(this.topologyVersion, other?.topologyVersion) === 0;
Expand Down Expand Up @@ -271,8 +241,8 @@ function tagsStrictEqual(tags: TagSet, tags2: TagSet): boolean {
* ```
*/
export function compareTopologyVersion(
currentTv?: TopologyVersion,
newTv?: TopologyVersion
currentTv?: TopologyVersion | null,
newTv?: TopologyVersion | null
): 0 | -1 | 1 {
if (currentTv == null || newTv == null) {
return -1;
Expand Down
2 changes: 1 addition & 1 deletion src/sdam/topology.ts
Original file line number Diff line number Diff line change
Expand Up @@ -724,7 +724,7 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
return this.description.commonWireVersion;
}

get logicalSessionTimeoutMinutes(): number | undefined {
get logicalSessionTimeoutMinutes(): number | null {
return this.description.logicalSessionTimeoutMinutes;
}

Expand Down
87 changes: 42 additions & 45 deletions src/sdam/topology_description.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type { ObjectId } from '../bson';
import * as WIRE_CONSTANTS from '../cmap/wire_protocol/constants';
import { MongoError, MongoRuntimeError } from '../error';
import { MongoRuntimeError, MongoServerError } from '../error';
import { compareObjectId, shuffle } from '../utils';
import { ServerType, TopologyType } from './common';
import { ServerDescription } from './server_description';
Expand Down Expand Up @@ -32,29 +32,29 @@ export interface TopologyDescriptionOptions {
*/
export class TopologyDescription {
type: TopologyType;
setName?: string;
maxSetVersion?: number;
maxElectionId?: ObjectId;
setName: string | null;
maxSetVersion: number | null;
maxElectionId: ObjectId | null;
servers: Map<string, ServerDescription>;
stale: boolean;
compatible: boolean;
compatibilityError?: string;
logicalSessionTimeoutMinutes?: number;
logicalSessionTimeoutMinutes: number | null;
heartbeatFrequencyMS: number;
localThresholdMS: number;
commonWireVersion?: number;
commonWireVersion: number;

/**
* Create a TopologyDescription
*/
constructor(
topologyType: TopologyType,
serverDescriptions?: Map<string, ServerDescription>,
setName?: string,
maxSetVersion?: number,
maxElectionId?: ObjectId,
commonWireVersion?: number,
options?: TopologyDescriptionOptions
serverDescriptions: Map<string, ServerDescription> | null = null,
setName: string | null = null,
maxSetVersion: number | null = null,
maxElectionId: ObjectId | null = null,
commonWireVersion: number | null = null,
options: TopologyDescriptionOptions | null = null
) {
options = options ?? {};

Expand All @@ -64,22 +64,10 @@ export class TopologyDescription {
this.compatible = true;
this.heartbeatFrequencyMS = options.heartbeatFrequencyMS ?? 0;
this.localThresholdMS = options.localThresholdMS ?? 15;

if (setName) {
this.setName = setName;
}

if (maxSetVersion) {
this.maxSetVersion = maxSetVersion;
}

if (maxElectionId) {
this.maxElectionId = maxElectionId;
}

if (commonWireVersion) {
this.commonWireVersion = commonWireVersion;
}
this.setName = setName ?? null;
this.maxElectionId = maxElectionId ?? null;
this.maxSetVersion = maxSetVersion ?? null;
this.commonWireVersion = commonWireVersion ?? 0;

// determine server compatibility
for (const serverDescription of this.servers.values()) {
Expand Down Expand Up @@ -108,12 +96,12 @@ export class TopologyDescription {
// value among ServerDescriptions of all data-bearing server types. If any have a null
// logicalSessionTimeoutMinutes, then TopologyDescription.logicalSessionTimeoutMinutes MUST be
// set to null.
this.logicalSessionTimeoutMinutes = undefined;
this.logicalSessionTimeoutMinutes = null;
for (const [, server] of this.servers) {
if (server.isReadable) {
if (server.logicalSessionTimeoutMinutes == null) {
// If any of the servers have a null logicalSessionsTimeout, then the whole topology does
this.logicalSessionTimeoutMinutes = undefined;
this.logicalSessionTimeoutMinutes = null;
break;
}

Expand Down Expand Up @@ -200,11 +188,6 @@ export class TopologyDescription {
// potentially mutated values
let { type: topologyType, setName, maxSetVersion, maxElectionId, commonWireVersion } = this;

if (serverDescription.setName && setName && serverDescription.setName !== setName) {
// TODO(NODE-4159): servers with an incorrect setName should be removed not marked Unknown
serverDescription = new ServerDescription(address, undefined);
}

const serverType = serverDescription.type;
const serverDescriptions = new Map(this.servers);

Expand All @@ -217,6 +200,19 @@ export class TopologyDescription {
}
}

if (
typeof serverDescription.setName === 'string' &&
typeof setName === 'string' &&
serverDescription.setName !== setName
) {
if (topologyType === TopologyType.Single) {
// "Single" Topology with setName mismatch is direct connection usage, mark unknown do not remove
serverDescription = new ServerDescription(address);
} else {
serverDescriptions.delete(address);
}
}

// update the actual server description
serverDescriptions.set(address, serverDescription);

Expand Down Expand Up @@ -311,15 +307,16 @@ export class TopologyDescription {
);
}

get error(): MongoError | undefined {
get error(): MongoServerError | null {
const descriptionsWithError = Array.from(this.servers.values()).filter(
(sd: ServerDescription) => sd.error
);

if (descriptionsWithError.length > 0) {
return descriptionsWithError[0].error;
}
return;

return null;
}

/**
Expand Down Expand Up @@ -366,10 +363,10 @@ function topologyTypeForServerType(serverType: ServerType): TopologyType {
function updateRsFromPrimary(
serverDescriptions: Map<string, ServerDescription>,
serverDescription: ServerDescription,
setName?: string,
maxSetVersion?: number,
maxElectionId?: ObjectId
): [TopologyType, string?, number?, ObjectId?] {
setName: string | null = null,
maxSetVersion: number | null = null,
maxElectionId: ObjectId | null = null
): [TopologyType, string | null, number | null, ObjectId | null] {
setName = setName || serverDescription.setName;
if (setName !== serverDescription.setName) {
serverDescriptions.delete(serverDescription.address);
Expand Down Expand Up @@ -436,7 +433,7 @@ function updateRsFromPrimary(
function updateRsWithPrimaryFromMember(
serverDescriptions: Map<string, ServerDescription>,
serverDescription: ServerDescription,
setName?: string
setName: string | null = null
): TopologyType {
if (setName == null) {
// TODO(NODE-3483): should be an appropriate runtime error
Expand All @@ -456,10 +453,10 @@ function updateRsWithPrimaryFromMember(
function updateRsNoPrimaryFromMember(
serverDescriptions: Map<string, ServerDescription>,
serverDescription: ServerDescription,
setName?: string
): [TopologyType, string?] {
setName: string | null = null
): [TopologyType, string | null] {
const topologyType = TopologyType.ReplicaSetNoPrimary;
setName = setName || serverDescription.setName;
setName = setName ?? serverDescription.setName;
if (setName !== serverDescription.setName) {
serverDescriptions.delete(serverDescription.address);
return [topologyType, setName];
Expand Down
Loading

0 comments on commit 00dcf2d

Please sign in to comment.