Skip to content

Commit

Permalink
[Cosmos] Simple endpoint refresh interval (#15781)
Browse files Browse the repository at this point in the history
* adds simple background refresh

* Adds setInterval with unref

* cleanup

* wip prenock

* wip

* Removes recorder, fixes timeout in tests

* extract api

* fix lint

* format

* Adds flag

* lint

* Fix parition spelling

* modify endpoint check

* fix tests

* Comment proxy

* adds back copyright

* skip session spec

* Fix session token

* Fix session spec on emulator
  • Loading branch information
zfoster authored Jun 28, 2021
1 parent 23de687 commit d3647f8
Show file tree
Hide file tree
Showing 25 changed files with 371 additions and 144 deletions.
11 changes: 10 additions & 1 deletion sdk/cosmosdb/cosmos/review/cosmos.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,12 @@ export class ClientContext {
// (undocumented)
getReadEndpoint(): Promise<string>;
// (undocumented)
getReadEndpoints(): Promise<readonly string[]>;
// (undocumented)
getWriteEndpoint(): Promise<string>;
// (undocumented)
getWriteEndpoints(): Promise<readonly string[]>;
// (undocumented)
partitionKeyDefinitionCache: {
[containerUrl: string]: any;
};
Expand Down Expand Up @@ -235,7 +239,9 @@ export enum ConnectionMode {
// @public
export interface ConnectionPolicy {
connectionMode?: ConnectionMode;
enableBackgroundEndpointRefreshing?: boolean;
enableEndpointDiscovery?: boolean;
endpointRefreshRateInMs?: number;
preferredLocations?: string[];
requestTimeout?: number;
retryOptions?: RetryOptions;
Expand Down Expand Up @@ -399,7 +405,7 @@ export const Constants: {
MaxExclusive: string;
min: string;
};
EffectiveParitionKeyConstants: {
EffectivePartitionKeyConstants: {
MinimumInclusiveEffectivePartitionKey: string;
MaximumExclusiveEffectivePartitionKey: string;
};
Expand Down Expand Up @@ -486,9 +492,12 @@ export class CosmosClient {
constructor(options: CosmosClientOptions);
database(id: string): Database;
readonly databases: Databases;
dispose(): void;
getDatabaseAccount(options?: RequestOptions): Promise<ResourceResponse<DatabaseAccount>>;
getReadEndpoint(): Promise<string>;
getReadEndpoints(): Promise<readonly string[]>;
getWriteEndpoint(): Promise<string>;
getWriteEndpoints(): Promise<readonly string[]>;
offer(id: string): Offer;
readonly offers: Offers;
}
Expand Down
10 changes: 9 additions & 1 deletion sdk/cosmosdb/cosmos/src/ClientContext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ export class ClientContext {
private readonly sessionContainer: SessionContainer;
private connectionPolicy: ConnectionPolicy;

public partitionKeyDefinitionCache: { [containerUrl: string]: any }; // TODO: ParitionKeyDefinitionCache
public partitionKeyDefinitionCache: { [containerUrl: string]: any }; // TODO: PartitionKeyDefinitionCache
public constructor(
private cosmosClientOptions: CosmosClientOptions,
private globalEndpointManager: GlobalEndpointManager
Expand Down Expand Up @@ -544,6 +544,14 @@ export class ClientContext {
return this.globalEndpointManager.getReadEndpoint();
}

public getWriteEndpoints(): Promise<readonly string[]> {
return this.globalEndpointManager.getWriteEndpoints();
}

public getReadEndpoints(): Promise<readonly string[]> {
return this.globalEndpointManager.getReadEndpoints();
}

public async bulk<T>({
body,
path,
Expand Down
52 changes: 52 additions & 0 deletions sdk/cosmosdb/cosmos/src/CosmosClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ export class CosmosClient {
*/
public readonly offers: Offers;
private clientContext: ClientContext;
private endpointRefresher: NodeJS.Timer;
/**
* Creates a new {@link CosmosClient} object from a connection string. Your database connection string can be found in the Azure Portal
*/
Expand Down Expand Up @@ -93,6 +94,16 @@ export class CosmosClient {
async (opts: RequestOptions) => this.getDatabaseAccount(opts)
);
this.clientContext = new ClientContext(optionsOrConnectionString, globalEndpointManager);
if (
optionsOrConnectionString.connectionPolicy?.enableEndpointDiscovery &&
optionsOrConnectionString.connectionPolicy?.enableBackgroundEndpointRefreshing
) {
this.backgroundRefreshEndpointList(
globalEndpointManager,
optionsOrConnectionString.connectionPolicy.endpointRefreshRateInMs ||
defaultConnectionPolicy.endpointRefreshRateInMs
);
}

this.databases = new Databases(this, this.clientContext);
this.offers = new Offers(this, this.clientContext);
Expand Down Expand Up @@ -126,6 +137,24 @@ export class CosmosClient {
return this.clientContext.getReadEndpoint();
}

/**
* Gets the known write endpoints. Useful for troubleshooting purposes.
*
* The urls may contain a region suffix (e.g. "-eastus") if we're using location specific endpoints.
*/
public getWriteEndpoints(): Promise<readonly string[]> {
return this.clientContext.getWriteEndpoints();
}

/**
* Gets the currently used read endpoint. Useful for troubleshooting purposes.
*
* The url may contain a region suffix (e.g. "-eastus") if we're using location specific endpoints.
*/
public getReadEndpoints(): Promise<readonly string[]> {
return this.clientContext.getReadEndpoints();
}

/**
* Used for reading, updating, or deleting a existing database by id or accessing containers belonging to that database.
*
Expand Down Expand Up @@ -153,4 +182,27 @@ export class CosmosClient {
public offer(id: string): Offer {
return new Offer(this, id, this.clientContext);
}

/**
* Clears background endpoint refresher. Use client.dispose() when destroying the CosmosClient within another process.
*/
public dispose(): void {
clearTimeout(this.endpointRefresher);
}

private async backgroundRefreshEndpointList(
globalEndpointManager: GlobalEndpointManager,
refreshRate: number
) {
this.endpointRefresher = setInterval(() => {
try {
globalEndpointManager.refreshEndpointList();
} catch (e) {
console.warn("Failed to refresh endpoints", e);
}
}, refreshRate);
if (this.endpointRefresher.unref && typeof this.endpointRefresher.unref === "function") {
this.endpointRefresher.unref();
}
}
}
2 changes: 1 addition & 1 deletion sdk/cosmosdb/cosmos/src/common/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ export const Constants = {
min: "min"
},

EffectiveParitionKeyConstants: {
EffectivePartitionKeyConstants: {
MinimumInclusiveEffectivePartitionKey: "",
MaximumExclusiveEffectivePartitionKey: "FF"
}
Expand Down
22 changes: 18 additions & 4 deletions sdk/cosmosdb/cosmos/src/documents/ConnectionPolicy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ export interface ConnectionPolicy {
connectionMode?: ConnectionMode;
/** Request timeout (time to wait for response from network peer). Represented in milliseconds. */
requestTimeout?: number;
/** Flag to enable/disable automatic redirecting of requests based on read/write operations. */
/**
* Flag to enable/disable automatic redirecting of requests based on read/write operations. Default true.
* Required to call client.dispose() when this is set to true after destroying the CosmosClient inside another process or in the browser.
*/
enableEndpointDiscovery?: boolean;
/** List of azure regions to be used as preferred locations for read requests. */
preferredLocations?: string[];
Expand All @@ -21,16 +24,27 @@ export interface ConnectionPolicy {
* Default is `false`.
*/
useMultipleWriteLocations?: boolean;
/** Rate in milliseconds at which the client will refresh the endpoints list in the background */
endpointRefreshRateInMs?: number;
/** Flag to enable/disable background refreshing of endpoints. Defaults to false.
* Endpoint discovery using `enableEndpointsDiscovery` will still work for failed requests. */
enableBackgroundEndpointRefreshing?: boolean;
}

/**
* @hidden
*/
export const defaultConnectionPolicy = Object.freeze({
export const defaultConnectionPolicy: ConnectionPolicy = Object.freeze({
connectionMode: ConnectionMode.Gateway,
requestTimeout: 60000,
enableEndpointDiscovery: true,
preferredLocations: [],
retryOptions: {},
useMultipleWriteLocations: true
retryOptions: {
maxRetryAttemptCount: 9,
fixedRetryIntervalInMilliseconds: 100,
maxWaitTimeInSeconds: 30
},
useMultipleWriteLocations: true,
endpointRefreshRateInMs: 300000,
enableBackgroundEndpointRefreshing: true
});
6 changes: 3 additions & 3 deletions sdk/cosmosdb/cosmos/src/globalEndpointManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ export class GlobalEndpointManager {
* List of azure regions to be used as preferred locations for read requests.
*/
private preferredLocations: string[];
private writeableLocations: Location[];
private readableLocations: Location[];
private writeableLocations: Location[] = [];
private readableLocations: Location[] = [];

/**
* @param options - The document client instance.
Expand Down Expand Up @@ -114,7 +114,7 @@ export class GlobalEndpointManager {
return this.defaultEndpoint;
}

if (!this.readableLocations || !this.writeableLocations) {
if (this.readableLocations.length === 0 || this.writeableLocations.length === 0) {
const { resource: databaseAccount } = await this.readDatabaseAccount({
urlConnection: this.defaultEndpoint
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@ function isCompleteSetOfRange(partitionKeyOrderedRange: any): boolean {
const lastRange = partitionKeyOrderedRange[partitionKeyOrderedRange.length - 1];
isComplete =
firstRange[Constants.PartitionKeyRange.MinInclusive] ===
Constants.EffectiveParitionKeyConstants.MinimumInclusiveEffectivePartitionKey;
Constants.EffectivePartitionKeyConstants.MinimumInclusiveEffectivePartitionKey;
isComplete =
isComplete &&
lastRange[Constants.PartitionKeyRange.MaxExclusive] ===
Constants.EffectiveParitionKeyConstants.MaximumExclusiveEffectivePartitionKey;
Constants.EffectivePartitionKeyConstants.MaximumExclusiveEffectivePartitionKey;

for (let i = 1; i < partitionKeyOrderedRange.length; i++) {
const previousRange = partitionKeyOrderedRange[i - 1];
Expand Down
4 changes: 2 additions & 2 deletions sdk/cosmosdb/cosmos/src/routing/QueryRange.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ export class QueryRange {

public isFullRange(): boolean {
return (
this.min === Constants.EffectiveParitionKeyConstants.MinimumInclusiveEffectivePartitionKey &&
this.max === Constants.EffectiveParitionKeyConstants.MaximumExclusiveEffectivePartitionKey &&
this.min === Constants.EffectivePartitionKeyConstants.MinimumInclusiveEffectivePartitionKey &&
this.max === Constants.EffectivePartitionKeyConstants.MaximumExclusiveEffectivePartitionKey &&
this.isMinInclusive === true &&
this.isMaxInclusive === false
);
Expand Down
93 changes: 63 additions & 30 deletions sdk/cosmosdb/cosmos/test/internal/session.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import assert from "assert";
import { Context } from "mocha";
import { Suite } from "mocha";
import * as sinon from "sinon";
import { ClientContext } from "../../src";
import { ClientContext, PluginConfig, PluginOn } from "../../src";
import { OperationType, ResourceType, trimSlashes } from "../../src/common";
import { ConsistencyLevel } from "../../src";
import { Constants, CosmosClient } from "../../src";
Expand All @@ -14,14 +14,16 @@ import { endpoint, masterKey } from "../public/common/_testConfig";
import { getTestDatabase, removeAllDatabases } from "../public/common/TestHelpers";
import * as RequestHandler from "../../src/request/RequestHandler";
import { RequestContext } from "../../src";
import { Response } from "../../src/request/Response";

// TODO: there is alot of "any" types for tokens here
// TODO: there is alot of leaky document client stuff here that will make removing document client hard

const client = new CosmosClient({
endpoint,
key: masterKey,
consistencyLevel: ConsistencyLevel.Session
consistencyLevel: ConsistencyLevel.Session,
connectionPolicy: { enableBackgroundEndpointRefreshing: false }
});

function getCollection2TokenMap(
Expand All @@ -30,7 +32,63 @@ function getCollection2TokenMap(
return (sessionContainer as any).collectionResourceIdToSessionTokens;
}

describe("Session Token", function(this: Suite) {
describe("New session token", function() {
it("preserves tokens", async function() {
let response: Response<any>;
let rqContext: RequestContext;
const plugins: PluginConfig[] = [
{
on: PluginOn.request,
plugin: async (context, next) => {
rqContext = context;
response = await next(context);
return response;
}
}
];
const sessionClient = new CosmosClient({
endpoint,
key: masterKey,
consistencyLevel: ConsistencyLevel.Session,
connectionPolicy: { enableBackgroundEndpointRefreshing: false },
plugins
});
const containerId = "sessionTestColl";

const containerDefinition = {
id: containerId,
partitionKey: { paths: ["/id"] }
};
const containerOptions = { offerThroughput: 25100 };

const clientContext: ClientContext = (sessionClient as any).clientContext;
const sessionContainer: SessionContainer = (clientContext as any).sessionContainer;
const database = await getTestDatabase("session test", sessionClient);

const { resource: createdContainerDef } = await database.containers.create(
containerDefinition,
containerOptions
);
const container = database.container(createdContainerDef.id);

const resp = await container.items.create({ id: "1" });
await container.item("1").read();

await container.item("1").read();
const responseToken = resp.headers["x-ms-session-token"];
const token = sessionContainer.get({
isNameBased: true,
operationType: OperationType.Create,
resourceAddress: container.url,
resourceType: ResourceType.item,
resourceId: "1"
});
assert.equal(responseToken, token);
assert.equal(responseToken, rqContext.headers["x-ms-session-token"]);
});
});

describe.skip("Session Token", function(this: Suite) {
this.timeout(process.env.MOCHA_TIMEOUT || 20000);

const containerId = "sessionTestColl";
Expand Down Expand Up @@ -341,37 +399,12 @@ describe("Session Token", function(this: Suite) {
await container.item("1", "1").read();
});

// TODO: chrande - looks like this might be broken by going name based?
// We never had a name based version of this test. Looks like we fail to set the session token
// because OwnerId is missing on the header. This only happens for name based.
it.skip("client should not have session token of a container created by another client", async function() {
const client2 = new CosmosClient({
endpoint,
key: masterKey,
consistencyLevel: ConsistencyLevel.Session
});
const database = await getTestDatabase("clientshouldnothaveanotherclienttoken");
await database.containers.create(containerDefinition, containerOptions);
const container = database.container(containerDefinition.id);
await container.read();
await client2
.database(database.id)
.container(containerDefinition.id)
.delete();
await client2.database(database.id).containers.create(containerDefinition, containerOptions);
await client2
.database(database.id)
.container(containerDefinition.id)
.read();
assert.equal((client as any).clientContext.getSessionToken(container.url), ""); // TODO: _self
assert.notEqual((client2 as any).clientContext.getSessionToken(container.url), "");
});

it("validate session container update on 'Not found' with 'undefined' status code for non master resource", async function() {
const client2 = new CosmosClient({
endpoint,
key: masterKey,
consistencyLevel: ConsistencyLevel.Session
consistencyLevel: ConsistencyLevel.Session,
connectionPolicy: { enableBackgroundEndpointRefreshing: false }
});

const db = await getTestDatabase("session test", client);
Expand Down
6 changes: 4 additions & 2 deletions sdk/cosmosdb/cosmos/test/internal/unit/sasToken.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ describe.skip("SAS Token Authorization", function() {
process.env.NODE_TLS_REJECT_UNAUTHORIZED = "0";
const client = new CosmosClient({
endpoint,
key: key
key: key,
connectionPolicy: { enableBackgroundEndpointRefreshing: false }
});

const database = client.database(sasTokenProperties.databaseName);
Expand All @@ -56,7 +57,8 @@ describe.skip("SAS Token Authorization", function() {
"type=sas&ver=1.0&sig=pCgZFxV9JQN1i3vzYNTfQldW1No7I+MSgN628TZcJAI=;dXNlcjEKCi9kYnMvZGIxL2NvbGxzL2NvbGwxLwoKNUZFRTY2MDEKNjIxM0I3MDEKMAo2MAowCkZGRkZGRkZGCjAK";
const sasTokenClient = new CosmosClient({
endpoint,
key: userSasTokenKey
key: userSasTokenKey,
connectionPolicy: { enableBackgroundEndpointRefreshing: false }
});

const dbs = await sasTokenClient.databases.readAll().fetchAll();
Expand Down
Loading

0 comments on commit d3647f8

Please sign in to comment.