Skip to content

Commit d582ffa

Browse files
authored
fix (event processor) [OASIS-5907]: stop promise tracks in-flight dispatcher requests (#397)
Summary: Add tracking functionality to EventProcessor for every request sent by its dispatcher. This is implemented in a RequestTracker class. RequestTracker exposes a method (onRequestsComplete) to get a Promise that fulfills after the completion of requests in-flight at the time of the call. stop is updated to return the promise from RequestTracker onRequestsComplete. Test plan: - New and existing unit tests - Manually tested in browser, Node, & Lambda environments Issues: https://optimizely.atlassian.net/browse/OASIS-5907
1 parent eb47310 commit d582ffa

File tree

5 files changed

+172
-5
lines changed

5 files changed

+172
-5
lines changed

packages/event-processor/CHANGELOG.MD

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
66

77
## [Unreleased]
88
Changes that have landed but are not yet released.
9+
### New Features
10+
- Promise returned from `stop` method of `EventProcessor` now tracks the state of all in-flight dispatcher requests, not just the final request that was triggered at the time `stop` was called
911

1012
## [0.3.2] - October 21, 2019
1113

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/**
2+
* Copyright 2020, Optimizely
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import RequestTracker from '../src/requestTracker'
18+
19+
describe('requestTracker', () => {
20+
describe('onRequestsComplete', () => {
21+
it('returns an immediately-fulfilled promise when no requests are in flight', async () => {
22+
const tracker = new RequestTracker()
23+
await tracker.onRequestsComplete()
24+
})
25+
26+
it('returns a promise that fulfills after in-flight requests are complete', async () => {
27+
let resolveReq1: () => void
28+
const req1 = new Promise<void>(resolve => {
29+
resolveReq1 = resolve
30+
})
31+
let resolveReq2: () => void
32+
const req2 = new Promise<void>(resolve => {
33+
resolveReq2 = resolve
34+
})
35+
let resolveReq3: () => void
36+
const req3 = new Promise<void>(resolve => {
37+
resolveReq3 = resolve
38+
})
39+
40+
const tracker = new RequestTracker()
41+
tracker.trackRequest(req1)
42+
tracker.trackRequest(req2)
43+
tracker.trackRequest(req3)
44+
45+
let reqsComplete = false
46+
const reqsCompletePromise = tracker.onRequestsComplete().then(() => {
47+
reqsComplete = true
48+
})
49+
50+
resolveReq1!()
51+
await req1
52+
expect(reqsComplete).toBe(false)
53+
54+
resolveReq2!()
55+
await req2
56+
expect(reqsComplete).toBe(false)
57+
58+
resolveReq3!()
59+
await req3
60+
await reqsCompletePromise
61+
expect(reqsComplete).toBe(true)
62+
})
63+
})
64+
})

packages/event-processor/__tests__/v1EventProcessor.spec.ts

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/**
2-
* Copyright 2019, Optimizely
2+
* Copyright 2019-2020, Optimizely
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -261,6 +261,40 @@ describe('LogTierV1EventProcessor', () => {
261261
// not have been called again.
262262
expect(dispatcher.dispatchEvent).toBeCalledTimes(0)
263263
})
264+
265+
it('should resolve the stop promise after all dispatcher requests are done', async () => {
266+
const dispatchCbs: Array<EventDispatcherCallback> = []
267+
const dispatcher = {
268+
dispatchEvent: jest.fn((event: EventV1Request, callback: EventDispatcherCallback) => {
269+
dispatchCbs.push(callback)
270+
})
271+
}
272+
273+
const processor = new LogTierV1EventProcessor({
274+
dispatcher,
275+
flushInterval: 100,
276+
maxQueueSize: 2,
277+
})
278+
processor.start()
279+
280+
for (let i = 0; i < 4; i++) {
281+
processor.process(createImpressionEvent())
282+
}
283+
expect(dispatchCbs.length).toBe(2)
284+
285+
let stopPromiseResolved = false
286+
const stopPromise = processor.stop().then(() => {
287+
stopPromiseResolved = true
288+
})
289+
expect(stopPromiseResolved).toBe(false)
290+
291+
dispatchCbs[0]({ statusCode: 204 })
292+
jest.advanceTimersByTime(100)
293+
expect(stopPromiseResolved).toBe(false)
294+
dispatchCbs[1]({ statusCode: 204 })
295+
await stopPromise
296+
expect(stopPromiseResolved).toBe(true)
297+
})
264298
})
265299

266300
describe('when maxQueueSize = 1', () => {

packages/event-processor/src/eventProcessor.ts

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/**
2-
* Copyright 2019, Optimizely
2+
* Copyright 2019-2020, Optimizely
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,6 +20,7 @@ import { EventDispatcher, EventV1Request } from './eventDispatcher'
2020
import { EventQueue, DefaultEventQueue, SingleEventQueue } from './eventQueue'
2121
import { getLogger } from '@optimizely/js-sdk-logging'
2222
import { NOTIFICATION_TYPES, NotificationCenter } from '@optimizely/js-sdk-utils'
23+
import RequestTracker from './requestTracker';
2324

2425
const logger = getLogger('EventProcessor')
2526

@@ -38,6 +39,7 @@ export abstract class AbstractEventProcessor implements EventProcessor {
3839
protected dispatcher: EventDispatcher
3940
protected queue: EventQueue<ProcessableEvents>
4041
private notificationCenter?: NotificationCenter
42+
private requestTracker: RequestTracker
4143

4244
constructor({
4345
dispatcher,
@@ -81,10 +83,12 @@ export abstract class AbstractEventProcessor implements EventProcessor {
8183
})
8284
}
8385
this.notificationCenter = notificationCenter
86+
87+
this.requestTracker = new RequestTracker()
8488
}
8589

8690
drainQueue(buffer: ProcessableEvents[]): Promise<void> {
87-
return new Promise(resolve => {
91+
const reqPromise = new Promise<void>(resolve => {
8892
logger.debug('draining queue with %s events', buffer.length)
8993

9094
if (buffer.length === 0) {
@@ -103,16 +107,19 @@ export abstract class AbstractEventProcessor implements EventProcessor {
103107
)
104108
}
105109
})
110+
this.requestTracker.trackRequest(reqPromise)
111+
return reqPromise
106112
}
107113

108114
process(event: ProcessableEvents): void {
109115
this.queue.enqueue(event)
110116
}
111117

112118
stop(): Promise<any> {
119+
// swallow - an error stopping this queue shouldn't prevent this from stopping
113120
try {
114-
// swallow, an error stopping this queue should prevent this from stopping
115-
return this.queue.stop()
121+
this.queue.stop()
122+
return this.requestTracker.onRequestsComplete()
116123
} catch (e) {
117124
logger.error('Error stopping EventProcessor: "%s"', e.message, e)
118125
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/**
2+
* Copyright 2020, Optimizely
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
/**
18+
* RequestTracker keeps track of in-flight requests for EventProcessor using
19+
* an internal counter. It exposes methods for adding a new request to be
20+
* tracked, and getting a Promise representing the completion of currently
21+
* tracked requests.
22+
*/
23+
class RequestTracker {
24+
private reqsInFlightCount: number = 0
25+
private reqsCompleteResolvers: Array<() => void> = []
26+
27+
/**
28+
* Track the argument request (represented by a Promise). reqPromise will feed
29+
* into the state of Promises returned by onRequestsComplete.
30+
* @param {Promise<void>} reqPromise
31+
*/
32+
public trackRequest(reqPromise: Promise<void>): void {
33+
this.reqsInFlightCount++
34+
const onReqComplete = () => {
35+
this.reqsInFlightCount--
36+
if (this.reqsInFlightCount === 0) {
37+
this.reqsCompleteResolvers.forEach(resolver => resolver())
38+
this.reqsCompleteResolvers = []
39+
}
40+
}
41+
reqPromise.then(onReqComplete, onReqComplete)
42+
}
43+
44+
/**
45+
* Return a Promise that fulfills after all currently-tracked request promises
46+
* are resolved.
47+
* @return {Promise<void>}
48+
*/
49+
public onRequestsComplete(): Promise<void> {
50+
return new Promise(resolve => {
51+
if (this.reqsInFlightCount === 0) {
52+
resolve()
53+
} else {
54+
this.reqsCompleteResolvers.push(resolve)
55+
}
56+
})
57+
}
58+
}
59+
60+
export default RequestTracker

0 commit comments

Comments
 (0)