Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(otlp-exporter-base)!: ensure we do not retry after the timeout has elapsed #4889

Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,6 @@
import { ExportResponse } from './export-response';

export interface IExporterTransport {
send(data: Uint8Array): Promise<ExportResponse>;
send(data: Uint8Array, timeoutMillis: number): Promise<ExportResponse>;
shutdown(): void;
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import { ISerializer } from '@opentelemetry/otlp-transformer';
import { IExporterTransport } from '../../exporter-transport';
import { createHttpExporterTransport } from './http-exporter-transport';
import { OTLPExporterError } from '../../types';
import { createRetryingTransport } from '../../retryable-transport';
import { createRetryingTransport } from '../../retrying-transport';

/**
* Collector Metric Exporter abstract base class
Expand Down Expand Up @@ -76,7 +76,6 @@ export abstract class OTLPExporterNodeBase<
signalSpecificHeaders
),
url: this.url,
timeoutMillis: this.timeoutMillis,
}),
});
}
Expand All @@ -100,16 +99,18 @@ export abstract class OTLPExporterNodeBase<
return;
}

const promise = this._transport.send(data).then(response => {
if (response.status === 'success') {
onSuccess();
return;
}
if (response.status === 'failure' && response.error) {
onError(response.error);
}
onError(new OTLPExporterError('Export failed with unknown error'));
}, onError);
const promise = this._transport
.send(data, this.timeoutMillis)
.then(response => {
if (response.status === 'success') {
onSuccess();
return;
}
if (response.status === 'failure' && response.error) {
onError(response.error);
}
onError(new OTLPExporterError('Export failed with unknown error'));
pichlermarc marked this conversation as resolved.
Show resolved Hide resolved
}, onError);

this._sendingPromises.push(promise);
const popPromise = () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class HttpExporterTransport implements IExporterTransport {

constructor(private _parameters: HttpRequestParameters) {}

async send(data: Uint8Array): Promise<ExportResponse> {
async send(data: Uint8Array, timeoutMillis: number): Promise<ExportResponse> {
if (this._send == null) {
// Lazy require to ensure that http/https is not required before instrumentations can wrap it.
const {
Expand All @@ -50,9 +50,15 @@ class HttpExporterTransport implements IExporterTransport {
return new Promise<ExportResponse>(resolve => {
// this will always be defined
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
this._send?.(this._parameters, this._agent!, data, result => {
resolve(result);
});
this._send?.(
this._parameters,
this._agent!,
data,
result => {
resolve(result);
},
timeoutMillis
);
});
}
shutdown() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ export type sendWithHttp = (
params: HttpRequestParameters,
agent: http.Agent | https.Agent,
data: Uint8Array,
onDone: (response: ExportResponse) => void
onDone: (response: ExportResponse) => void,
timeoutMillis: number
) => void;

export interface HttpRequestParameters {
timeoutMillis: number;
url: string;
headers: Record<string, string>;
compression: 'gzip' | 'none';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ export function sendWithHttp(
params: HttpRequestParameters,
agent: http.Agent | https.Agent,
data: Uint8Array,
onDone: (response: ExportResponse) => void
onDone: (response: ExportResponse) => void,
timeoutMillis: number
): void {
const parsedUrl = new URL(params.url);
const nodeVersion = Number(process.versions.node.split('.')[0]);
Expand Down Expand Up @@ -86,7 +87,7 @@ export function sendWithHttp(
});
});

req.setTimeout(params.timeoutMillis, () => {
req.setTimeout(timeoutMillis, () => {
req.destroy();
onDone({
status: 'failure',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,24 +33,42 @@ function getJitter() {
class RetryingTransport implements IExporterTransport {
constructor(private _transport: IExporterTransport) {}

private retry(data: Uint8Array, inMillis: number): Promise<ExportResponse> {
private retry(
data: Uint8Array,
timeoutMillis: number,
inMillis: number
): Promise<ExportResponse> {
return new Promise((resolve, reject) => {
setTimeout(() => {
this._transport.send(data).then(resolve, reject);
this._transport.send(data, timeoutMillis).then(resolve, reject);
}, inMillis);
});
}

async send(data: Uint8Array): Promise<ExportResponse> {
let result = await this._transport.send(data);
async send(data: Uint8Array, timeoutMillis: number): Promise<ExportResponse> {
const deadline = Date.now() + timeoutMillis;
let result = await this._transport.send(data, timeoutMillis);
let attempts = MAX_ATTEMPTS;
let nextBackoff = INITIAL_BACKOFF;

while (result.status === 'retryable' && attempts > 0) {
attempts--;
const backoff = Math.min(nextBackoff, MAX_BACKOFF) + getJitter();

// use maximum of computed backoff and 0 to avoid negative timeouts
const backoff = Math.max(
Math.min(nextBackoff, MAX_BACKOFF) + getJitter(),
0
);
nextBackoff = nextBackoff * BACKOFF_MULTIPLIER;
result = await this.retry(data, result.retryInMillis ?? backoff);
const retryInMillis = result.retryInMillis ?? backoff;

// return when expected retry time is after the export deadline.
const remainingTimeoutMillis = deadline - Date.now();
if (retryInMillis > remainingTimeoutMillis) {
return result;
}

result = await this.retry(data, remainingTimeoutMillis, retryInMillis);
}

return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
import * as sinon from 'sinon';
import * as assert from 'assert';
import { IExporterTransport } from '../../src';
import { createRetryingTransport } from '../../src/retryable-transport';
import { createRetryingTransport } from '../../src/retrying-transport';
import { ExportResponse } from '../../src';
import { assertRejects } from '../testHelper';

const timeoutMillis = 1000000;

describe('RetryingTransport', function () {
describe('send', function () {
it('does not retry when underlying transport succeeds', async function () {
Expand All @@ -39,10 +41,14 @@ describe('RetryingTransport', function () {
const transport = createRetryingTransport({ transport: mockTransport });

// act
const actualResponse = await transport.send(mockData);
const actualResponse = await transport.send(mockData, timeoutMillis);

// assert
sinon.assert.calledOnceWithExactly(transportStubs.send, mockData);
sinon.assert.calledOnceWithExactly(
transportStubs.send,
mockData,
timeoutMillis
);
assert.deepEqual(actualResponse, expectedResponse);
});

Expand All @@ -63,10 +69,14 @@ describe('RetryingTransport', function () {
const transport = createRetryingTransport({ transport: mockTransport });

// act
const actualResponse = await transport.send(mockData);
const actualResponse = await transport.send(mockData, timeoutMillis);

// assert
sinon.assert.calledOnceWithExactly(transportStubs.send, mockData);
sinon.assert.calledOnceWithExactly(
transportStubs.send,
mockData,
timeoutMillis
);
assert.deepEqual(actualResponse, expectedResponse);
});

Expand All @@ -84,10 +94,14 @@ describe('RetryingTransport', function () {
const transport = createRetryingTransport({ transport: mockTransport });

// act
await assertRejects(() => transport.send(mockData));
await assertRejects(() => transport.send(mockData, timeoutMillis));

// assert
sinon.assert.calledOnceWithExactly(transportStubs.send, mockData);
sinon.assert.calledOnceWithExactly(
transportStubs.send,
mockData,
timeoutMillis
);
});

it('does retry when the underlying transport returns retryable', async function () {
Expand All @@ -113,11 +127,19 @@ describe('RetryingTransport', function () {
const transport = createRetryingTransport({ transport: mockTransport });

// act
const actualResponse = await transport.send(mockData);
const actualResponse = await transport.send(mockData, timeoutMillis);

// assert
sinon.assert.calledTwice(transportStubs.send);
sinon.assert.alwaysCalledWithExactly(transportStubs.send, mockData);
sinon.assert.alwaysCalledWithMatch(
transportStubs.send,
mockData,
sinon.match.number.and(
sinon.match(value => {
return value <= timeoutMillis;
})
)
);
assert.deepEqual(actualResponse, successResponse);
});

Expand All @@ -143,11 +165,19 @@ describe('RetryingTransport', function () {
const transport = createRetryingTransport({ transport: mockTransport });

// act
await assertRejects(() => transport.send(mockData));
await assertRejects(() => transport.send(mockData, timeoutMillis));

// assert
sinon.assert.calledTwice(transportStubs.send);
sinon.assert.alwaysCalledWithExactly(transportStubs.send, mockData);
sinon.assert.alwaysCalledWithMatch(
transportStubs.send,
mockData,
sinon.match.number.and(
sinon.match(value => {
return value <= timeoutMillis;
})
)
);
});

it('does retry 5 times, then resolves as retryable', async function () {
Expand All @@ -169,11 +199,48 @@ describe('RetryingTransport', function () {
const transport = createRetryingTransport({ transport: mockTransport });

// act
const result = await transport.send(mockData);
const result = await transport.send(mockData, timeoutMillis);

// assert
sinon.assert.callCount(transportStubs.send, 6); // 1 initial try and 5 retries
sinon.assert.alwaysCalledWithExactly(transportStubs.send, mockData);
sinon.assert.alwaysCalledWithMatch(
transportStubs.send,
mockData,
sinon.match.number.and(
sinon.match(value => {
return value <= timeoutMillis;
})
)
);
assert.strictEqual(result, retryResponse);
});

it('does not retry when retryInMillis takes place after timeoutMillis', async function () {
// arrange
const retryResponse: ExportResponse = {
status: 'retryable',
retryInMillis: timeoutMillis + 100,
};

const mockData = Uint8Array.from([1, 2, 3]);

const transportStubs = {
send: sinon.stub().resolves(retryResponse),
shutdown: sinon.stub(),
};
const mockTransport = <IExporterTransport>transportStubs;
const transport = createRetryingTransport({ transport: mockTransport });

// act
const result = await transport.send(mockData, timeoutMillis);

// assert
// initial try, no retries.
sinon.assert.calledOnceWithExactly(
transportStubs.send,
mockData,
timeoutMillis
);
assert.strictEqual(result, retryResponse);
});
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ export abstract class OTLPGRPCExporterNodeBase<
grpcName: grpcName,
grpcPath: grpcPath,
metadata: metadataProvider,
timeoutMillis: this.timeoutMillis,
});
}

Expand Down Expand Up @@ -126,16 +125,18 @@ export abstract class OTLPGRPCExporterNodeBase<
return;
}

const promise = this._transport.send(data).then(response => {
if (response.status === 'success') {
onSuccess();
return;
}
if (response.status === 'failure' && response.error) {
onError(response.error);
}
onError(new OTLPExporterError('Export failed with unknown error'));
}, onError);
const promise = this._transport
.send(data, this.timeoutMillis)
.then(response => {
if (response.status === 'success') {
onSuccess();
return;
}
if (response.status === 'failure' && response.error) {
onError(response.error);
}
onError(new OTLPExporterError('Export failed with unknown error'));
pichlermarc marked this conversation as resolved.
Show resolved Hide resolved
}, onError);

this._sendingPromises.push(promise);
const popPromise = () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ export interface GrpcExporterTransportParameters {
*/
metadata: () => Metadata;
compression: 'gzip' | 'none';
timeoutMillis: number;
}

export class GrpcExporterTransport implements IExporterTransport {
Expand All @@ -101,7 +100,7 @@ export class GrpcExporterTransport implements IExporterTransport {
this._client?.close();
}

send(data: Uint8Array): Promise<ExportResponse> {
send(data: Uint8Array, timeoutMillis: number): Promise<ExportResponse> {
// We need to make a for gRPC
const buffer = Buffer.from(data);

Expand Down Expand Up @@ -145,9 +144,7 @@ export class GrpcExporterTransport implements IExporterTransport {
}

return new Promise<ExportResponse>(resolve => {
// this will always be defined
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const deadline = Date.now() + this._parameters.timeoutMillis;
const deadline = Date.now() + timeoutMillis;

// this should never happen
if (this._metadata == null) {
Expand Down
Loading