Skip to content

Commit f1aedb6

Browse files
committed
ref(core): Improve promise buffer
1 parent c084bd6 commit f1aedb6

File tree

2 files changed

+111
-72
lines changed

2 files changed

+111
-72
lines changed

packages/core/src/utils/promisebuffer.ts

Lines changed: 42 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1-
import { rejectedSyncPromise, resolvedSyncPromise, SyncPromise } from './syncpromise';
1+
import { rejectedSyncPromise, resolvedSyncPromise } from './syncpromise';
22

33
export interface PromiseBuffer<T> {
44
// exposes the internal array so tests can assert on the state of it.
55
// XXX: this really should not be public api.
6-
$: Array<PromiseLike<T>>;
6+
$: PromiseLike<T>[];
77
add(taskProducer: () => PromiseLike<T>): PromiseLike<T>;
88
drain(timeout?: number): PromiseLike<boolean>;
99
}
@@ -14,11 +14,11 @@ export const SENTRY_BUFFER_FULL_ERROR = Symbol.for('SentryBufferFullError');
1414
* Creates an new PromiseBuffer object with the specified limit
1515
* @param limit max number of promises that can be stored in the buffer
1616
*/
17-
export function makePromiseBuffer<T>(limit?: number): PromiseBuffer<T> {
18-
const buffer: Array<PromiseLike<T>> = [];
17+
export function makePromiseBuffer<T>(limit: number = 100): PromiseBuffer<T> {
18+
const buffer: Set<PromiseLike<T>> = new Set();
1919

2020
function isReady(): boolean {
21-
return limit === undefined || buffer.length < limit;
21+
return buffer.size < limit;
2222
}
2323

2424
/**
@@ -27,8 +27,8 @@ export function makePromiseBuffer<T>(limit?: number): PromiseBuffer<T> {
2727
* @param task Can be any PromiseLike<T>
2828
* @returns Removed promise.
2929
*/
30-
function remove(task: PromiseLike<T>): PromiseLike<T | void> {
31-
return buffer.splice(buffer.indexOf(task), 1)[0] || Promise.resolve(undefined);
30+
function remove(task: PromiseLike<T>): void {
31+
buffer.delete(task);
3232
}
3333

3434
/**
@@ -48,22 +48,26 @@ export function makePromiseBuffer<T>(limit?: number): PromiseBuffer<T> {
4848

4949
// start the task and add its promise to the queue
5050
const task = taskProducer();
51-
if (buffer.indexOf(task) === -1) {
52-
buffer.push(task);
53-
}
54-
void task
55-
.then(() => remove(task))
56-
// Use `then(null, rejectionHandler)` rather than `catch(rejectionHandler)` so that we can use `PromiseLike`
57-
// rather than `Promise`. `PromiseLike` doesn't have a `.catch` method, making its polyfill smaller. (ES5 didn't
58-
// have promises, so TS has to polyfill when down-compiling.)
59-
.then(null, () =>
60-
remove(task).then(null, () => {
61-
// We have to add another catch here because `remove()` starts a new promise chain.
62-
}),
63-
);
51+
buffer.add(task);
52+
void task.then(
53+
() => remove(task),
54+
() => remove(task),
55+
);
6456
return task;
6557
}
6658

59+
function drainNextSyncPromise(): PromiseLike<boolean> {
60+
const item = buffer.values().next().value;
61+
62+
if (!item) {
63+
return resolvedSyncPromise(true);
64+
}
65+
66+
return resolvedSyncPromise(item).then(() => {
67+
return drainNextSyncPromise();
68+
});
69+
}
70+
6771
/**
6872
* Wait for all promises in the queue to resolve or for timeout to expire, whichever comes first.
6973
*
@@ -74,34 +78,27 @@ export function makePromiseBuffer<T>(limit?: number): PromiseBuffer<T> {
7478
* `false` otherwise
7579
*/
7680
function drain(timeout?: number): PromiseLike<boolean> {
77-
return new SyncPromise<boolean>((resolve, reject) => {
78-
let counter = buffer.length;
79-
80-
if (!counter) {
81-
return resolve(true);
82-
}
83-
84-
// wait for `timeout` ms and then resolve to `false` (if not cancelled first)
85-
const capturedSetTimeout = setTimeout(() => {
86-
if (timeout && timeout > 0) {
87-
resolve(false);
88-
}
89-
}, timeout);
90-
91-
// if all promises resolve in time, cancel the timer and resolve to `true`
92-
buffer.forEach(item => {
93-
void resolvedSyncPromise(item).then(() => {
94-
if (!--counter) {
95-
clearTimeout(capturedSetTimeout);
96-
resolve(true);
97-
}
98-
}, reject);
99-
});
100-
});
81+
if (!buffer.size) {
82+
return resolvedSyncPromise(true);
83+
}
84+
85+
const drainPromise = drainNextSyncPromise();
86+
87+
if (!timeout) {
88+
return drainPromise;
89+
}
90+
91+
const promises = [drainPromise, new Promise<boolean>(resolve => setTimeout(() => resolve(false), timeout))];
92+
93+
// Promise.race will resolve to the first promise that resolves or rejects
94+
// So if the drainPromise resolves, the timeout promise will be ignored
95+
return Promise.race(promises);
10196
}
10297

10398
return {
104-
$: buffer,
99+
get $(): PromiseLike<T>[] {
100+
return Array.from(buffer);
101+
},
105102
add,
106103
drain,
107104
};

packages/core/test/lib/utils/promisebuffer.test.ts

Lines changed: 69 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,33 @@
11
import { describe, expect, test, vi } from 'vitest';
22
import { makePromiseBuffer } from '../../../src/utils/promisebuffer';
3-
import { SyncPromise } from '../../../src/utils/syncpromise';
3+
import { rejectedSyncPromise, resolvedSyncPromise } from '../../../src/utils/syncpromise';
44

55
describe('PromiseBuffer', () => {
66
describe('add()', () => {
7-
test('no limit', () => {
8-
const buffer = makePromiseBuffer();
9-
const p = vi.fn(() => new SyncPromise(resolve => setTimeout(resolve)));
10-
void buffer.add(p);
11-
expect(buffer.$.length).toEqual(1);
7+
test('sync promises', () => {
8+
const buffer = makePromiseBuffer(1);
9+
let task1;
10+
const producer1 = vi.fn(() => {
11+
task1 = resolvedSyncPromise();
12+
return task1;
13+
});
14+
const producer2 = vi.fn(() => resolvedSyncPromise());
15+
expect(buffer.add(producer1)).toEqual(task1);
16+
void expect(buffer.add(producer2)).rejects.toThrowError();
17+
// This is immediately executed and removed again from the buffer
18+
expect(buffer.$.length).toEqual(0);
19+
expect(producer1).toHaveBeenCalled();
20+
expect(producer2).toHaveBeenCalled();
1221
});
1322

14-
test('with limit', () => {
23+
test('async promises', () => {
1524
const buffer = makePromiseBuffer(1);
1625
let task1;
1726
const producer1 = vi.fn(() => {
18-
task1 = new SyncPromise(resolve => setTimeout(resolve));
27+
task1 = new Promise(resolve => setTimeout(resolve, 1));
1928
return task1;
2029
});
21-
const producer2 = vi.fn(() => new SyncPromise(resolve => setTimeout(resolve)));
30+
const producer2 = vi.fn(() => new Promise(resolve => setTimeout(resolve, 1)));
2231
expect(buffer.add(producer1)).toEqual(task1);
2332
void expect(buffer.add(producer2)).rejects.toThrowError();
2433
expect(buffer.$.length).toEqual(1);
@@ -28,25 +37,60 @@ describe('PromiseBuffer', () => {
2837
});
2938

3039
describe('drain()', () => {
31-
test('without timeout', async () => {
40+
test('drains all promises without timeout', async () => {
3241
const buffer = makePromiseBuffer();
33-
for (let i = 0; i < 5; i++) {
34-
void buffer.add(() => new SyncPromise(resolve => setTimeout(resolve)));
35-
}
42+
43+
const p1 = vi.fn(() => new Promise(resolve => setTimeout(resolve, 1)));
44+
const p2 = vi.fn(() => new Promise(resolve => setTimeout(resolve, 1)));
45+
const p3 = vi.fn(() => new Promise(resolve => setTimeout(resolve, 1)));
46+
const p4 = vi.fn(() => new Promise(resolve => setTimeout(resolve, 1)));
47+
const p5 = vi.fn(() => new Promise(resolve => setTimeout(resolve, 1)));
48+
49+
[p1, p2, p3, p4, p5].forEach(p => {
50+
void buffer.add(p);
51+
});
52+
3653
expect(buffer.$.length).toEqual(5);
3754
const result = await buffer.drain();
3855
expect(result).toEqual(true);
3956
expect(buffer.$.length).toEqual(0);
57+
58+
expect(p1).toHaveBeenCalled();
59+
expect(p2).toHaveBeenCalled();
60+
expect(p3).toHaveBeenCalled();
61+
expect(p4).toHaveBeenCalled();
62+
expect(p5).toHaveBeenCalled();
4063
});
4164

42-
test('with timeout', async () => {
65+
test('drains all promises with timeout xxx', async () => {
4366
const buffer = makePromiseBuffer();
44-
for (let i = 0; i < 5; i++) {
45-
void buffer.add(() => new SyncPromise(resolve => setTimeout(resolve, 100)));
46-
}
67+
68+
const p1 = vi.fn(() => new Promise(resolve => setTimeout(resolve, 2)));
69+
const p2 = vi.fn(() => new Promise(resolve => setTimeout(resolve, 4)));
70+
const p3 = vi.fn(() => new Promise(resolve => setTimeout(resolve, 6)));
71+
const p4 = vi.fn(() => new Promise(resolve => setTimeout(resolve, 8)));
72+
const p5 = vi.fn(() => new Promise(resolve => setTimeout(resolve, 10)));
73+
74+
[p1, p2, p3, p4, p5].forEach(p => {
75+
void buffer.add(p);
76+
});
77+
78+
expect(p1).toHaveBeenCalled();
79+
expect(p2).toHaveBeenCalled();
80+
expect(p3).toHaveBeenCalled();
81+
expect(p4).toHaveBeenCalled();
82+
expect(p5).toHaveBeenCalled();
83+
4784
expect(buffer.$.length).toEqual(5);
48-
const result = await buffer.drain(50);
85+
const result = await buffer.drain(8);
4986
expect(result).toEqual(false);
87+
// p5 is still in the buffer
88+
expect(buffer.$.length).toEqual(1);
89+
90+
// Now drain final item
91+
const result2 = await buffer.drain();
92+
expect(result2).toEqual(true);
93+
expect(buffer.$.length).toEqual(0);
5094
});
5195

5296
test('on empty buffer', async () => {
@@ -60,7 +104,7 @@ describe('PromiseBuffer', () => {
60104

61105
test('resolved promises should not show up in buffer length', async () => {
62106
const buffer = makePromiseBuffer();
63-
const producer = () => new SyncPromise(resolve => setTimeout(resolve));
107+
const producer = () => new Promise(resolve => setTimeout(resolve, 1));
64108
const task = buffer.add(producer);
65109
expect(buffer.$.length).toEqual(1);
66110
await task;
@@ -69,20 +113,18 @@ describe('PromiseBuffer', () => {
69113

70114
test('rejected promises should not show up in buffer length', async () => {
71115
const buffer = makePromiseBuffer();
72-
const producer = () => new SyncPromise((_, reject) => setTimeout(reject));
116+
const error = new Error('whoops');
117+
const producer = () => new Promise((_, reject) => setTimeout(() => reject(error), 1));
73118
const task = buffer.add(producer);
74119
expect(buffer.$.length).toEqual(1);
75-
try {
76-
await task;
77-
} catch {
78-
// no-empty
79-
}
120+
121+
await expect(task).rejects.toThrow(error);
80122
expect(buffer.$.length).toEqual(0);
81123
});
82124

83125
test('resolved task should give an access to the return value', async () => {
84126
const buffer = makePromiseBuffer<string>();
85-
const producer = () => new SyncPromise<string>(resolve => setTimeout(() => resolve('test')));
127+
const producer = () => resolvedSyncPromise('test');
86128
const task = buffer.add(producer);
87129
const result = await task;
88130
expect(result).toEqual('test');
@@ -91,7 +133,7 @@ describe('PromiseBuffer', () => {
91133
test('rejected task should give an access to the return value', async () => {
92134
expect.assertions(1);
93135
const buffer = makePromiseBuffer<string>();
94-
const producer = () => new SyncPromise<string>((_, reject) => setTimeout(() => reject(new Error('whoops'))));
136+
const producer = () => rejectedSyncPromise(new Error('whoops'));
95137
const task = buffer.add(producer);
96138
try {
97139
await task;

0 commit comments

Comments
 (0)