Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: BatchExporter should export continuously when batch size is reached #3958

Merged
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ For experimental package changes, see the [experimental CHANGELOG](experimental/

### :bug: (Bug Fix)

* fix(sdk-trace-base): BatchSpanProcessor flushes when `maxExportBatchSize` is reached [#3958](https://github.com/open-telemetry/opentelemetry-js/pull/3958) @nordfjord
* fix(sdk-metrics): allow instrument names to contain '/' [#4155](https://github.com/open-telemetry/opentelemetry-js/pull/4155)
* fix(sdk-metrics): do not report empty scopes and metrics [#4135](https://github.com/open-telemetry/opentelemetry-js/pull/4135) @pichlermarc
* Instruments that were created, but did not have measurements will not be exported anymore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ export abstract class BatchSpanProcessorBase<T extends BufferConfig>
private readonly _scheduledDelayMillis: number;
private readonly _exportTimeoutMillis: number;

private _isExporting = false;
private _finishedSpans: ReadableSpan[] = [];
private _timer: NodeJS.Timeout | undefined;
private _shutdownOnce: BindOnceFuture<void>;
Expand Down Expand Up @@ -216,19 +217,28 @@ export abstract class BatchSpanProcessorBase<T extends BufferConfig>
}

private _maybeStartTimer() {
if (this._timer !== undefined) return;
this._timer = setTimeout(() => {
if (this._isExporting) return;
dyladan marked this conversation as resolved.
Show resolved Hide resolved
const flush = () => {
this._isExporting = true;
this._flushOneBatch()
.then(() => {
this._isExporting = false;
if (this._finishedSpans.length > 0) {
this._clearTimer();
this._maybeStartTimer();
}
})
.catch(e => {
this._isExporting = false;
globalErrorHandler(e);
});
}, this._scheduledDelayMillis);
};
// we only wait if the queue doesn't have enough elements yet
if (this._finishedSpans.length >= this._maxExportBatchSize) {
return flush();
}
if (this._timer !== undefined) return;
this._timer = setTimeout(() => flush(), this._scheduledDelayMillis);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice to have
It would be "nice" to clear (set to undefined) the this._timer just to avoid calling clearTimeout() with an expired timer. This also would allow (in a future PR) to provide some slightly better performance rather than always calling _clearTimeout()`

unrefTimer(this._timer);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import { diag, ROOT_CONTEXT } from '@opentelemetry/api';
import {
ExportResult,
ExportResultCode,
loggingErrorHandler,
setGlobalErrorHandler,
Expand All @@ -27,7 +28,9 @@ import {
BasicTracerProvider,
BufferConfig,
InMemorySpanExporter,
ReadableSpan,
Span,
SpanExporter,
} from '../../../src';
import { context } from '@opentelemetry/api';
import { TestRecordOnlySampler } from './TestRecordOnlySampler';
Expand Down Expand Up @@ -175,43 +178,35 @@ describe('BatchSpanProcessorBase', () => {
assert.strictEqual(spy.args.length, 0);
});

it('should export the sampled spans with buffer size reached', done => {
const clock = sinon.useFakeTimers();
it('should export the sampled spans with buffer size reached', async () => {
dyladan marked this conversation as resolved.
Show resolved Hide resolved
const processor = new BatchSpanProcessor(exporter, defaultBufferConfig);
for (let i = 0; i < defaultBufferConfig.maxExportBatchSize; i++) {
const span = createSampledSpan(`${name}_${i}`);
const span = createSampledSpan(name);
for (let i = 1; i < defaultBufferConfig.maxExportBatchSize; i++) {
dyladan marked this conversation as resolved.
Show resolved Hide resolved
processor.onStart(span, ROOT_CONTEXT);
assert.strictEqual(exporter.getFinishedSpans().length, 0);

processor.onEnd(span);
assert.strictEqual(exporter.getFinishedSpans().length, 0);
}
const span = createSampledSpan(`${name}_6`);
processor.onStart(span, ROOT_CONTEXT);
processor.onEnd(span);

setTimeout(async () => {
assert.strictEqual(exporter.getFinishedSpans().length, 5);
await processor.shutdown();
assert.strictEqual(exporter.getFinishedSpans().length, 0);
done();
}, defaultBufferConfig.scheduledDelayMillis + 1000);
clock.tick(defaultBufferConfig.scheduledDelayMillis + 1000);
clock.restore();
assert.strictEqual(exporter.getFinishedSpans().length, 5);
await processor.shutdown();
assert.strictEqual(exporter.getFinishedSpans().length, 0);
});

it('should force flush when timeout exceeded', done => {
const clock = sinon.useFakeTimers();
const processor = new BatchSpanProcessor(exporter, defaultBufferConfig);
for (let i = 0; i < defaultBufferConfig.maxExportBatchSize; i++) {
const span = createSampledSpan(`${name}_${i}`);
const span = createSampledSpan(name);
for (let i = 1; i < defaultBufferConfig.maxExportBatchSize; i++) {
processor.onStart(span, ROOT_CONTEXT);
processor.onEnd(span);
assert.strictEqual(exporter.getFinishedSpans().length, 0);
}

setTimeout(() => {
assert.strictEqual(exporter.getFinishedSpans().length, 5);
assert.strictEqual(exporter.getFinishedSpans().length, 4);
done();
}, defaultBufferConfig.scheduledDelayMillis + 1000);
dyladan marked this conversation as resolved.
Show resolved Hide resolved

Expand All @@ -222,14 +217,14 @@ describe('BatchSpanProcessorBase', () => {

it('should force flush on demand', () => {
const processor = new BatchSpanProcessor(exporter, defaultBufferConfig);
for (let i = 0; i < defaultBufferConfig.maxExportBatchSize; i++) {
const span = createSampledSpan(`${name}_${i}`);
const span = createSampledSpan(name);
for (let i = 1; i < defaultBufferConfig.maxExportBatchSize; i++) {
processor.onStart(span, ROOT_CONTEXT);
processor.onEnd(span);
}
assert.strictEqual(exporter.getFinishedSpans().length, 0);
processor.forceFlush();
assert.strictEqual(exporter.getFinishedSpans().length, 5);
assert.strictEqual(exporter.getFinishedSpans().length, 4);
dyladan marked this conversation as resolved.
Show resolved Hide resolved
});

it('should not export empty span lists', done => {
Expand Down Expand Up @@ -466,17 +461,10 @@ describe('BatchSpanProcessorBase', () => {
const debugStub = sinon.spy(diag, 'debug');
const warnStub = sinon.spy(diag, 'warn');
const span = createSampledSpan('test');
for (let i = 0, j = 6; i < j; i++) {
for (let i = 0, j = 12; i < j; i++) {
processor.onStart(span, ROOT_CONTEXT);
processor.onEnd(span);
}
assert.equal(processor['_finishedSpans'].length, 6);
assert.equal(processor['_droppedSpansCount'], 0);
sinon.assert.notCalled(debugStub);

processor.onStart(span, ROOT_CONTEXT);
processor.onEnd(span);

assert.equal(processor['_finishedSpans'].length, 6);
assert.equal(processor['_droppedSpansCount'], 1);
sinon.assert.calledOnce(debugStub);
Expand Down Expand Up @@ -517,4 +505,42 @@ describe('BatchSpanProcessorBase', () => {
});
});
});

describe('Concurrency', ()=> {
it('should only send a single batch at a time', async () => {
dyladan marked this conversation as resolved.
Show resolved Hide resolved
let callbacks: ((result: ExportResult) => void)[] = []
let spans: ReadableSpan[] = []
const exporter: SpanExporter = {
export: async (exportedSpans: ReadableSpan[], resultCallback: (result: ExportResult) => void) => {
callbacks.push(resultCallback)
spans.push(...exportedSpans)
},
shutdown: async () => {},
}
const processor = new BatchSpanProcessor(exporter, {
maxExportBatchSize: 5,
maxQueueSize: 6,
});
const totalSpans = 50;
for (let i = 0; i < totalSpans; i++) {
const span = createSampledSpan(`${name}_${i}`);
processor.onStart(span, ROOT_CONTEXT);
processor.onEnd(span);
}
assert.equal(callbacks.length, 1)
assert.equal(spans.length, 5)
callbacks[0]({ code: ExportResultCode.SUCCESS })
await new Promise(resolve => process.nextTick(resolve))
dyladan marked this conversation as resolved.
Show resolved Hide resolved
// After the first batch completes we will have dropped a number
// of spans and the next batch will be smaller
assert.equal(callbacks.length, 2)
assert.equal(spans.length, 10)
callbacks[1]({ code: ExportResultCode.SUCCESS })

// We expect that all the other spans have been dropped
await new Promise(resolve => process.nextTick(resolve))
dyladan marked this conversation as resolved.
Show resolved Hide resolved
assert.equal(callbacks.length, 2)
assert.equal(spans.length, 10)
})
})
});
Loading