Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Cosmos] Simple endpoint refresh interval #15781

Merged
merged 21 commits into from
Jun 28, 2021
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions sdk/cosmosdb/cosmos/review/cosmos.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,12 @@ export class ClientContext {
// (undocumented)
getReadEndpoint(): Promise<string>;
// (undocumented)
getReadEndpoints(): Promise<readonly string[]>;
// (undocumented)
getWriteEndpoint(): Promise<string>;
// (undocumented)
getWriteEndpoints(): Promise<readonly string[]>;
// (undocumented)
partitionKeyDefinitionCache: {
[containerUrl: string]: any;
};
Expand Down Expand Up @@ -236,6 +240,7 @@ export enum ConnectionMode {
export interface ConnectionPolicy {
connectionMode?: ConnectionMode;
enableEndpointDiscovery?: boolean;
endpointRefreshRateInMs?: number;
preferredLocations?: string[];
requestTimeout?: number;
retryOptions?: RetryOptions;
Expand Down Expand Up @@ -486,9 +491,12 @@ export class CosmosClient {
constructor(options: CosmosClientOptions);
database(id: string): Database;
readonly databases: Databases;
dispose(): void;
getDatabaseAccount(options?: RequestOptions): Promise<ResourceResponse<DatabaseAccount>>;
getReadEndpoint(): Promise<string>;
getReadEndpoints(): Promise<readonly string[]>;
getWriteEndpoint(): Promise<string>;
getWriteEndpoints(): Promise<readonly string[]>;
offer(id: string): Offer;
readonly offers: Offers;
}
Expand Down
8 changes: 8 additions & 0 deletions sdk/cosmosdb/cosmos/src/ClientContext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,14 @@ export class ClientContext {
return this.globalEndpointManager.getReadEndpoint();
}

public getWriteEndpoints(): Promise<readonly string[]> {
return this.globalEndpointManager.getWriteEndpoints();
}

public getReadEndpoints(): Promise<readonly string[]> {
return this.globalEndpointManager.getReadEndpoints();
}

public async bulk<T>({
body,
path,
Expand Down
49 changes: 49 additions & 0 deletions sdk/cosmosdb/cosmos/src/CosmosClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ export class CosmosClient {
*/
public readonly offers: Offers;
private clientContext: ClientContext;
private endpointRefresher: NodeJS.Timer;
/**
* Creates a new {@link CosmosClient} object from a connection string. Your database connection string can be found in the Azure Portal
*/
Expand Down Expand Up @@ -93,6 +94,12 @@ export class CosmosClient {
async (opts: RequestOptions) => this.getDatabaseAccount(opts)
);
this.clientContext = new ClientContext(optionsOrConnectionString, globalEndpointManager);
if (optionsOrConnectionString.connectionPolicy?.enableEndpointDiscovery) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should make sure the ref docs for enableEndpointDiscovery talk about the requirement to call dispose

this.backgroundRefreshEndpointList(
globalEndpointManager,
optionsOrConnectionString.connectionPolicy.endpointRefreshRateInMs
);
}

this.databases = new Databases(this, this.clientContext);
this.offers = new Offers(this, this.clientContext);
Expand Down Expand Up @@ -126,6 +133,24 @@ export class CosmosClient {
return this.clientContext.getReadEndpoint();
}

/**
* Gets the known write endpoints. Useful for troubleshooting purposes.
*
* The urls may contain a region suffix (e.g. "-eastus") if we're using location specific endpoints.
*/
public getWriteEndpoints(): Promise<readonly string[]> {
return this.clientContext.getWriteEndpoints();
}

/**
* Gets the currently used read endpoint. Useful for troubleshooting purposes.
*
* The url may contain a region suffix (e.g. "-eastus") if we're using location specific endpoints.
*/
public getReadEndpoints(): Promise<readonly string[]> {
return this.clientContext.getReadEndpoints();
}

/**
* Used for reading, updating, or deleting a existing database by id or accessing containers belonging to that database.
*
Expand Down Expand Up @@ -153,4 +178,28 @@ export class CosmosClient {
public offer(id: string): Offer {
return new Offer(this, id, this.clientContext);
}

/**
* Clears background endpoint refresher. Use client.dispose() when destroying the CosmosClient within another process.
*/
public dispose(): void {
clearTimeout(this.endpointRefresher);
}

private async backgroundRefreshEndpointList(
globalEndpointManager: GlobalEndpointManager,
refreshRate: number
) {
this.endpointRefresher = setInterval(() => {
try {
globalEndpointManager.refreshEndpointList();
} catch (e) {
console.warn("Failed to refresh endpoints", e);
}
}, refreshRate);
const isBrowser = new Function("try {return this===window;}catch(e){ return false;}");
if (!isBrowser) {
this.endpointRefresher.unref();
zfoster marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
18 changes: 14 additions & 4 deletions sdk/cosmosdb/cosmos/src/documents/ConnectionPolicy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ export interface ConnectionPolicy {
connectionMode?: ConnectionMode;
/** Request timeout (time to wait for response from network peer). Represented in milliseconds. */
requestTimeout?: number;
/** Flag to enable/disable automatic redirecting of requests based on read/write operations. */
/**
* Flag to enable/disable automatic redirecting of requests based on read/write operations.
* Required to call client.dispose() when this is set to true after destroying the CosmosClient inside another process or in the browser.
zfoster marked this conversation as resolved.
Show resolved Hide resolved
*/
enableEndpointDiscovery?: boolean;
/** List of azure regions to be used as preferred locations for read requests. */
preferredLocations?: string[];
Expand All @@ -21,16 +24,23 @@ export interface ConnectionPolicy {
* Default is `false`.
*/
useMultipleWriteLocations?: boolean;
/** Rate in milliseconds at which the client will refresh the endpoints list in the background */
endpointRefreshRateInMs?: number;
}

/**
* @hidden
*/
export const defaultConnectionPolicy = Object.freeze({
export const defaultConnectionPolicy: ConnectionPolicy = Object.freeze({
connectionMode: ConnectionMode.Gateway,
requestTimeout: 60000,
enableEndpointDiscovery: true,
preferredLocations: [],
retryOptions: {},
useMultipleWriteLocations: true
retryOptions: {
maxRetryAttemptCount: 9,
fixedRetryIntervalInMilliseconds: 100,
maxWaitTimeInSeconds: 30
},
useMultipleWriteLocations: true,
endpointRefreshRateInMs: 300000
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not a breaking change? Just want to verify

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's a default on the defaultConnectionPolicy and is optional so it should be fine. Let me verify though

});
6 changes: 3 additions & 3 deletions sdk/cosmosdb/cosmos/src/globalEndpointManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ export class GlobalEndpointManager {
* List of azure regions to be used as preferred locations for read requests.
*/
private preferredLocations: string[];
private writeableLocations: Location[];
private readableLocations: Location[];
private writeableLocations: Location[] = [];
private readableLocations: Location[] = [];

/**
* @param options - The document client instance.
Expand Down Expand Up @@ -175,7 +175,7 @@ export class GlobalEndpointManager {
this.writeableLocations.push(location);
}
}
for (const location of databaseAccount.writableLocations) {
for (const location of databaseAccount.readableLocations) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@southpolesteve do you think this was a bug? this causes test failures

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure. Did this method work before this change? Let's discuss tomorrow

const existingLocation = this.readableLocations.find((loc) => loc.name === location.name);
if (!existingLocation) {
this.readableLocations.push(location);
Expand Down
31 changes: 31 additions & 0 deletions sdk/cosmosdb/cosmos/test/public/functional/client.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
} from "../common/TestHelpers";
import AbortController from "node-abort-controller";
import { UsernamePasswordCredential } from "@azure/identity";
import { defaultConnectionPolicy } from "../../../src/documents";

describe("NodeJS CRUD Tests", function(this: Suite) {
this.timeout(process.env.MOCHA_TIMEOUT || 20000);
Expand Down Expand Up @@ -126,4 +127,34 @@ describe("NodeJS CRUD Tests", function(this: Suite) {
}
});
});
describe("Background refresher", async function() {
// not async to leverage done() callback inside setTimeout
it("should fetch new endpoints", function(done) {
// set refresh rate to 700ms
const client = new CosmosClient({
endpoint,
key: masterKey,
connectionPolicy: { ...defaultConnectionPolicy, endpointRefreshRateInMs: 700 }
});

// then timeout 1.2s so that we first fetch no endpoints, then after it refreshes we see them
client
.getReadEndpoints()
.then((firstEndpoints) => {
assert.equal(firstEndpoints.length, 0);
setTimeout(() => {
client
.getReadEndpoints()
.then((endpoints) => {
assert.notEqual(firstEndpoints, endpoints);
done();
return;
})
.catch(console.warn);
}, 1200);
return;
})
.catch(console.warn);
});
});
});