Skip to content

Commit

Permalink
feat(NODE-3392): enable snapshot reads on secondaries (#2897)
Browse files Browse the repository at this point in the history
  • Loading branch information
dariakp authored Jul 12, 2021
1 parent 5a8842a commit 523e05c
Show file tree
Hide file tree
Showing 19 changed files with 2,878 additions and 50 deletions.
2 changes: 2 additions & 0 deletions src/operations/execute_operation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ export function executeOperation<
session = topology.startSession({ owner, explicit: false });
} else if (session.hasEnded) {
return cb(new MongoDriverError('Use of expired sessions is not permitted'));
} else if (session.snapshotEnabled && !topology.capabilities.supportsSnapshotReads) {
return cb(new MongoDriverError('Snapshot reads require MongoDB 5.0 or later'));
}
} else if (session) {
// If the user passed an explicit session and we are still, after server selection,
Expand Down
6 changes: 5 additions & 1 deletion src/sdam/topology.ts
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
return this.s.description;
}

capabilities(): ServerCapabilities {
get capabilities(): ServerCapabilities {
return new ServerCapabilities(this.lastIsMaster());
}

Expand Down Expand Up @@ -1064,6 +1064,10 @@ export class ServerCapabilities {
return this.maxWireVersion >= 3;
}

get supportsSnapshotReads(): boolean {
return this.maxWireVersion >= 13;
}

get commandsTakeWriteConcern(): boolean {
return this.maxWireVersion >= 5;
}
Expand Down
79 changes: 56 additions & 23 deletions src/sessions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import type { AbstractCursor } from './cursor/abstract_cursor';
import type { CommandOptions } from './cmap/connection';
import type { WriteConcern } from './write_concern';
import { TypedEventEmitter } from './mongo_types';
import { ReadConcernLevel } from './read_concern';

const minWireVersionForShardedTransactions = 8;

Expand All @@ -51,6 +52,8 @@ function assertAlive(session: ClientSession, callback?: Callback): boolean {
export interface ClientSessionOptions {
/** Whether causal consistency should be enabled on this session */
causalConsistency?: boolean;
/** Whether all read operations should be read from the same snapshot for this session (NOTE: not compatible with `causalConsistency=true`) */
snapshot?: boolean;
/** The default TransactionOptions to use for transactions started on this session. */
defaultTransactionOptions?: TransactionOptions;

Expand All @@ -72,14 +75,18 @@ export type ClientSessionEvents = {

/** @internal */
const kServerSession = Symbol('serverSession');
/** @internal */
const kSnapshotTime = Symbol('snapshotTime');
/** @internal */
const kSnapshotEnabled = Symbol('snapshotEnabled');

/**
* A class representing a client session on the server
*
* NOTE: not meant to be instantiated directly.
* @public
*/
class ClientSession extends TypedEventEmitter<ClientSessionEvents> {
export class ClientSession extends TypedEventEmitter<ClientSessionEvents> {
/** @internal */
topology: Topology;
/** @internal */
Expand All @@ -96,6 +103,10 @@ class ClientSession extends TypedEventEmitter<ClientSessionEvents> {
transaction: Transaction;
/** @internal */
[kServerSession]?: ServerSession;
/** @internal */
[kSnapshotTime]?: Timestamp;
/** @internal */
[kSnapshotEnabled] = false;

/**
* Create a client session.
Expand Down Expand Up @@ -123,15 +134,23 @@ class ClientSession extends TypedEventEmitter<ClientSessionEvents> {

options = options ?? {};

if (options.snapshot === true) {
this[kSnapshotEnabled] = true;
if (options.causalConsistency === true) {
throw new MongoDriverError(
'Properties "causalConsistency" and "snapshot" are mutually exclusive'
);
}
}

this.topology = topology;
this.sessionPool = sessionPool;
this.hasEnded = false;
this.clientOptions = clientOptions;
this[kServerSession] = undefined;

this.supports = {
causalConsistency:
typeof options.causalConsistency === 'boolean' ? options.causalConsistency : true
causalConsistency: options.snapshot !== true && options.causalConsistency !== false
};

this.clusterTime = options.initialClusterTime;
Expand All @@ -157,6 +176,11 @@ class ClientSession extends TypedEventEmitter<ClientSessionEvents> {
return this[kServerSession]!;
}

/** Whether or not this session is configured for snapshot reads */
get snapshotEnabled(): boolean {
return this[kSnapshotEnabled];
}

/**
* Ends this session on the server
*
Expand Down Expand Up @@ -257,6 +281,10 @@ class ClientSession extends TypedEventEmitter<ClientSessionEvents> {
* @param options - Options for the transaction
*/
startTransaction(options?: TransactionOptions): void {
if (this[kSnapshotEnabled]) {
throw new MongoDriverError('Transactions are not allowed with snapshot sessions');
}

assertAlive(this);
if (this.inTransaction()) {
throw new MongoDriverError('Transaction already in progress');
Expand Down Expand Up @@ -623,7 +651,7 @@ export type ServerSessionId = { id: Binary };
* WARNING: not meant to be instantiated directly. For internal use only.
* @public
*/
class ServerSession {
export class ServerSession {
id: ServerSessionId;
lastUse: number;
txnNumber: number;
Expand Down Expand Up @@ -658,7 +686,7 @@ class ServerSession {
* For internal use only
* @internal
*/
class ServerSessionPool {
export class ServerSessionPool {
topology: Topology;
sessions: ServerSession[];

Expand Down Expand Up @@ -746,7 +774,7 @@ class ServerSessionPool {

// TODO: this should be codified in command construction
// @see https://github.com/mongodb/specifications/blob/master/source/read-write-concern/read-write-concern.rst#read-concern
function commandSupportsReadConcern(command: Document, options?: Document): boolean {
export function commandSupportsReadConcern(command: Document, options?: Document): boolean {
if (command.aggregate || command.count || command.distinct || command.find || command.geoNear) {
return true;
}
Expand All @@ -770,7 +798,7 @@ function commandSupportsReadConcern(command: Document, options?: Document): bool
* @param command - the command to decorate
* @param options - Optional settings passed to calling operation
*/
function applySession(
export function applySession(
session: ClientSession,
command: Document,
options?: CommandOptions
Expand Down Expand Up @@ -801,28 +829,35 @@ function applySession(
// first apply non-transaction-specific sessions data
const inTransaction = session.inTransaction() || isTransactionCommand(command);
const isRetryableWrite = options?.willRetryWrite || false;
const shouldApplyReadConcern = commandSupportsReadConcern(command, options);

if (serverSession.txnNumber && (isRetryableWrite || inTransaction)) {
command.txnNumber = Long.fromNumber(serverSession.txnNumber);
}

// now attempt to apply transaction-specific sessions data
if (!inTransaction) {
if (session.transaction.state !== TxnState.NO_TRANSACTION) {
session.transaction.transition(TxnState.NO_TRANSACTION);
}

// TODO: the following should only be applied to read operation per spec.
// for causal consistency
if (session.supports.causalConsistency && session.operationTime && shouldApplyReadConcern) {
if (
session.supports.causalConsistency &&
session.operationTime &&
commandSupportsReadConcern(command, options)
) {
command.readConcern = command.readConcern || {};
Object.assign(command.readConcern, { afterClusterTime: session.operationTime });
} else if (session[kSnapshotEnabled]) {
command.readConcern = command.readConcern || { level: ReadConcernLevel.snapshot };
if (session[kSnapshotTime] !== undefined) {
Object.assign(command.readConcern, { atClusterTime: session[kSnapshotTime] });
}
}

return;
}

// now attempt to apply transaction-specific sessions data

// `autocommit` must always be false to differentiate from retryable writes
command.autocommit = false;

Expand All @@ -843,7 +878,7 @@ function applySession(
}
}

function updateSessionFromResponse(session: ClientSession, document: Document): void {
export function updateSessionFromResponse(session: ClientSession, document: Document): void {
if (document.$clusterTime) {
resolveClusterTime(session, document.$clusterTime);
}
Expand All @@ -855,14 +890,12 @@ function updateSessionFromResponse(session: ClientSession, document: Document):
if (document.recoveryToken && session && session.inTransaction()) {
session.transaction._recoveryToken = document.recoveryToken;
}
}

export {
ClientSession,
ServerSession,
ServerSessionPool,
TxnState,
applySession,
updateSessionFromResponse,
commandSupportsReadConcern
};
if (
document.cursor?.atClusterTime &&
session?.[kSnapshotEnabled] &&
session[kSnapshotTime] === undefined
) {
session[kSnapshotTime] = document.cursor.atClusterTime;
}
}
2 changes: 1 addition & 1 deletion src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ export function decorateWithCollation(
target: MongoClient | Db | Collection,
options: AnyOptions
): void {
const capabilities = getTopology(target).capabilities();
const capabilities = getTopology(target).capabilities;
if (options.collation && typeof options.collation === 'object') {
if (capabilities && capabilities.commandsTakeCollation) {
command.collation = options.collation;
Expand Down
51 changes: 44 additions & 7 deletions test/functional/sessions.test.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
'use strict';

const path = require('path');
const expect = require('chai').expect;
const setupDatabase = require('./shared').setupDatabase;
const withMonitoredClient = require('./shared').withMonitoredClient;
const TestRunnerContext = require('./spec-runner').TestRunnerContext;
const generateTopologyTests = require('./spec-runner').generateTopologyTests;
const loadSpecTests = require('../spec').loadSpecTests;
const { setupDatabase, withMonitoredClient } = require('./shared');
const { TestRunnerContext, generateTopologyTests } = require('./spec-runner');
const { loadSpecTests } = require('../spec');
const { runUnifiedTest } = require('./unified-spec-runner/runner');

const ignoredCommands = ['ismaster'];
const test = {
Expand Down Expand Up @@ -148,7 +148,7 @@ describe('Sessions - functional', function () {
}
});

describe('spec tests', function () {
describe('legacy spec tests', function () {
class SessionSpecTestContext extends TestRunnerContext {
assertSessionNotDirty(options) {
const session = options.session;
Expand Down Expand Up @@ -176,7 +176,7 @@ describe('Sessions - functional', function () {
}

const testContext = new SessionSpecTestContext();
const testSuites = loadSpecTests('sessions');
const testSuites = loadSpecTests(path.join('sessions', 'legacy'));

after(() => testContext.teardown());
before(function () {
Expand All @@ -196,6 +196,43 @@ describe('Sessions - functional', function () {
generateTopologyTests(testSuites, testContext, testFilter);
});

describe('unified spec tests', function () {
for (const sessionTests of loadSpecTests(path.join('sessions', 'unified'))) {
expect(sessionTests).to.be.an('object');
context(String(sessionTests.description), function () {
// TODO: NODE-3393 fix test runner to apply session to all operations
const skipTestMap = {
'snapshot-sessions': [
'countDocuments operation with snapshot',
'Distinct operation with snapshot',
'Mixed operation with snapshot'
],
'snapshot-sessions-not-supported-client-error': [
'Client error on distinct with snapshot'
],
'snapshot-sessions-not-supported-server-error': [
'Server returns an error on distinct with snapshot'
],
'snapshot-sessions-unsupported-ops': [
'Server returns an error on listCollections with snapshot',
'Server returns an error on listDatabases with snapshot',
'Server returns an error on listIndexes with snapshot',
'Server returns an error on runCommand with snapshot'
]
};
const testsToSkip = skipTestMap[sessionTests.description] || [];
for (const test of sessionTests.tests) {
it(String(test.description), {
metadata: { sessions: { skipLeakTests: true } },
test: async function () {
await runUnifiedTest(this, sessionTests, test, testsToSkip);
}
});
}
});
}
});

context('unacknowledged writes', () => {
it('should not include session for unacknowledged writes', {
metadata: { requires: { topology: 'single', mongodb: '>=3.6.0' } },
Expand Down
4 changes: 4 additions & 0 deletions test/functional/unified-spec-runner/entities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,10 @@ export class EntitiesMap<E = Entity> extends Map<string, E> {
options.causalConsistency = entity.session.sessionOptions?.causalConsistency;
}

if (entity.session.sessionOptions?.snapshot) {
options.snapshot = entity.session.sessionOptions.snapshot;
}

if (entity.session.sessionOptions?.defaultTransactionOptions) {
options.defaultTransactionOptions = Object.create(null);
const defaultOptions = entity.session.sessionOptions.defaultTransactionOptions;
Expand Down
6 changes: 3 additions & 3 deletions test/spec/read-write-concern/README.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
=======================
Connection String Tests
=======================
============================
Read and Write Concern Tests
============================

The YAML and JSON files in this directory tree are platform-independent tests
that drivers can use to prove their conformance to the Read and Write Concern
Expand Down
26 changes: 23 additions & 3 deletions test/spec/sessions/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@ Driver Session Tests
Introduction
============

The YAML and JSON files in this directory are platform-independent tests that
drivers can use to prove their conformance to the Driver Sessions Spec. They are
The YAML and JSON files in the ``legacy`` and ``unified`` sub-directories are platform-independent tests
that drivers can use to prove their conformance to the Driver Sessions Spec. They are
designed with the intention of sharing most test-runner code with the
Transactions spec tests.
`Transactions Spec tests <../../transactions/tests/README.rst#test-format>`_.. Tests in the
``unified`` directory are written using the `Unified Test Format <../../unified-test-format/unified-test-format.rst>`_.

Several prose tests, which are not easily expressed in YAML, are also presented
in the Driver Sessions Spec. Those tests will need to be manually implemented
Expand Down Expand Up @@ -78,7 +79,26 @@ the given session is *not* marked dirty::
arguments:
session: session0

Snapshot session tests
======================
Snapshot sessions tests require server of version 5.0 or higher and
replica set or a sharded cluster deployment.
Default snapshot history window on the server is 5 minutes. Running the test in debug mode, or in any other slow configuration
may lead to `SnapshotTooOld` errors. Drivers can work around this issue by increasing the server's `minSnapshotHistoryWindowInSeconds` parameter, for example:

.. code:: python
client.admin.command('setParameter', 1, minSnapshotHistoryWindowInSeconds=60)
Prose tests
```````````
- Setting both ``snapshot`` and ``causalConsistency`` is not allowed

* ``client.startSession(snapshot = true, causalConsistency = true)``
* Assert that an error was raised by driver

Changelog
=========

:2019-05-15: Initial version.
:2021-06-15: Added snapshot-session tests. Introduced legacy and unified folders.
File renamed without changes.
Loading

0 comments on commit 523e05c

Please sign in to comment.