From cf45540254a5c035c351aab70a0d6d62df6234f3 Mon Sep 17 00:00:00 2001 From: Thiago Figueredo Cardoso Date: Tue, 29 Mar 2016 20:23:28 -0300 Subject: [PATCH] feat(bufferTime): add `maxBufferSize` optional argument Close #1295. --- spec/operators/bufferTime-spec.ts | 81 ++++++++++++++--- src/operator/bufferTime.ts | 139 +++++++++++++++++++++--------- 2 files changed, 167 insertions(+), 53 deletions(-) diff --git a/spec/operators/bufferTime-spec.ts b/spec/operators/bufferTime-spec.ts index 7ebcd7546c..2f5b8010d9 100644 --- a/spec/operators/bufferTime-spec.ts +++ b/spec/operators/bufferTime-spec.ts @@ -18,7 +18,7 @@ describe('Observable.prototype.bufferTime', () => { z: [] }; - const result = e1.bufferTime(t, null, rxTestScheduler); + const result = e1.bufferTime(t, null, Number.POSITIVE_INFINITY, rxTestScheduler); expectObservable(result).toBe(expected, values); expectSubscriptions(e1.subscriptions).toBe(subs); @@ -34,11 +34,47 @@ describe('Observable.prototype.bufferTime', () => { z: [] }; - const result = e1.bufferTime(t, null, rxTestScheduler); + const result = e1.bufferTime(t, null, Number.POSITIVE_INFINITY, rxTestScheduler); expectObservable(result).toBe(expected, values); }); + it('should emit buffers at intervals or when the buffer is full', () => { + const e1 = hot('---a---b---c---d---e---f---g-----|'); + const subs = '^ !'; + const t = time( '----------|'); + const expected = '-------w-------x-------y---------(z|)'; + const values = { + w: ['a', 'b'], + x: ['c', 'd'], + y: ['e', 'f'], + z: ['g'] + }; + + const result = e1.bufferTime(t, null, 2, rxTestScheduler); + + expectObservable(result).toBe(expected, values); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); + + it('should emit buffers at intervals or when the buffer is full test 2', () => { + const e1 = hot('---a---b---c---d---e---f---g-----|'); + const subs = '^ !'; + const t = time( '----------|'); + const expected = '----------w--------x---------y---(z|)'; + const values = { + w: ['a', 'b'], + x: ['c', 'd', 'e'], + y: ['f', 'g'], + z: [] + }; + + const result = e1.bufferTime(t, null, 3, rxTestScheduler); + + expectObservable(result).toBe(expected, values); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); + it('should emit buffers that have been created at intervals and close after the specified delay', () => { const e1 = hot('---a---b---c----d----e----f----g----h----i----(k|)'); // --------------------*--------------------*---- start interval @@ -54,7 +90,28 @@ describe('Observable.prototype.bufferTime', () => { z: ['i', 'k'] }; - const result = e1.bufferTime(t, interval, rxTestScheduler); + const result = e1.bufferTime(t, interval, Number.POSITIVE_INFINITY, rxTestScheduler); + + expectObservable(result).toBe(expected, values); + }); + + it('should emit buffers that have been created at intervals and close after the specified delay ' + + 'or when the buffer is full', () => { + const e1 = hot('---a---b---c----d----e----f----g----h----i----(k|)'); + // --------------------*--------------------*---- start interval + // ---------------------| timespans + // ---------------------| + // -----| + const t = time( '---------------------|'); + const interval = time( '--------------------|'); + const expected = '----------------x-------------------y---------(z|)'; + const values = { + x: ['a', 'b', 'c', 'd'], + y: ['e', 'f', 'g', 'h'], + z: ['i', 'k'] + }; + + const result = e1.bufferTime(t, interval, 4, rxTestScheduler); expectObservable(result).toBe(expected, values); }); @@ -81,7 +138,7 @@ describe('Observable.prototype.bufferTime', () => { f: [] }; - const result = e1.bufferTime(t, interval, rxTestScheduler); + const result = e1.bufferTime(t, interval, Number.POSITIVE_INFINITY, rxTestScheduler); expectObservable(result).toBe(expected, values); expectSubscriptions(e1.subscriptions).toBe(e1subs); @@ -107,7 +164,7 @@ describe('Observable.prototype.bufferTime', () => { e: [] }; - const result = e1.bufferTime(t, interval, rxTestScheduler); + const result = e1.bufferTime(t, interval, Number.POSITIVE_INFINITY, rxTestScheduler); expectObservable(result).toBe(expected, values); }); @@ -127,7 +184,7 @@ describe('Observable.prototype.bufferTime', () => { a: ['2', '3', '4'] }; - const result = e1.bufferTime(t, interval, rxTestScheduler); + const result = e1.bufferTime(t, interval, Number.POSITIVE_INFINITY, rxTestScheduler); expectObservable(result, unsub).toBe(expected, values); expectSubscriptions(e1.subscriptions).toBe(subs); @@ -150,7 +207,7 @@ describe('Observable.prototype.bufferTime', () => { const result = e1 .mergeMap((x: any) => Observable.of(x)) - .bufferTime(t, interval, rxTestScheduler) + .bufferTime(t, interval, Number.POSITIVE_INFINITY, rxTestScheduler) .mergeMap((x: any) => Observable.of(x)); expectObservable(result, unsub).toBe(expected, values); @@ -164,7 +221,7 @@ describe('Observable.prototype.bufferTime', () => { const values = { b: [] }; const t = time('----------|'); - const result = e1.bufferTime(t, null, rxTestScheduler); + const result = e1.bufferTime(t, null, Number.POSITIVE_INFINITY, rxTestScheduler); expectObservable(result).toBe(expected, values); expectSubscriptions(e1.subscriptions).toBe(e1subs); @@ -176,7 +233,7 @@ describe('Observable.prototype.bufferTime', () => { const t = time( '----------|'); const expected = '----------a---------a---------a---------a----'; - const result = e1.bufferTime(t, null, rxTestScheduler); + const result = e1.bufferTime(t, null, Number.POSITIVE_INFINITY, rxTestScheduler); expectObservable(result, unsub).toBe(expected, { a: [] }); }); @@ -186,7 +243,7 @@ describe('Observable.prototype.bufferTime', () => { const expected = '#'; const t = time('----------|'); - const result = e1.bufferTime(t, null, rxTestScheduler); + const result = e1.bufferTime(t, null, Number.POSITIVE_INFINITY, rxTestScheduler); expectObservable(result).toBe(expected, undefined, new Error('haha')); }); @@ -200,7 +257,7 @@ describe('Observable.prototype.bufferTime', () => { w: ['a', 'b'] }; - const result = e1.bufferTime(t, null, rxTestScheduler); + const result = e1.bufferTime(t, null, Number.POSITIVE_INFINITY, rxTestScheduler); expectObservable(result).toBe(expected, values); expectSubscriptions(e1.subscriptions).toBe(e1subs); @@ -222,7 +279,7 @@ describe('Observable.prototype.bufferTime', () => { y: ['e', 'f', 'g', 'h', 'i'] }; - const result = e1.bufferTime(t, interval, rxTestScheduler); + const result = e1.bufferTime(t, interval, Number.POSITIVE_INFINITY, rxTestScheduler); expectObservable(result).toBe(expected, values); expectSubscriptions(e1.subscriptions).toBe(e1subs); diff --git a/src/operator/bufferTime.ts b/src/operator/bufferTime.ts index d8531e58a7..cbd4646759 100644 --- a/src/operator/bufferTime.ts +++ b/src/operator/bufferTime.ts @@ -1,9 +1,11 @@ import {Operator} from '../Operator'; import {Subscriber} from '../Subscriber'; +import {Subscription} from '../Subscription'; import {Observable} from '../Observable'; import {Scheduler} from '../Scheduler'; import {Action} from '../scheduler/Action'; import {async} from '../scheduler/async'; +import {isScheduler} from '../util/isScheduler'; /** * Buffers the source Observable values for a specific time period. @@ -18,7 +20,9 @@ import {async} from '../scheduler/async'; * resets the buffer every `bufferTimeSpan` milliseconds. If * `bufferCreationInterval` is given, this operator opens the buffer every * `bufferCreationInterval` milliseconds and closes (emits and resets) the - * buffer every `bufferTimeSpan` milliseconds. + * buffer every `bufferTimeSpan` milliseconds. When the optional argument + * `maxBufferSize` is specified, the buffer will be closed either after + * `bufferTimeSpan` milliseconds or when it contains `maxBufferSize` elements. * * @example Every second, emit an array of the recent click events * var clicks = Rx.Observable.fromEvent(document, 'click'); @@ -39,35 +43,60 @@ import {async} from '../scheduler/async'; * @param {number} bufferTimeSpan The amount of time to fill each buffer array. * @param {number} [bufferCreationInterval] The interval at which to start new * buffers. + * @param {number} [maxBufferSize] The maximum buffer size. * @param {Scheduler} [scheduler=async] The scheduler on which to schedule the * intervals that determine buffer boundaries. * @return {Observable} An observable of arrays of buffered values. * @method bufferTime * @owner Observable */ -export function bufferTime(bufferTimeSpan: number, - bufferCreationInterval: number = null, - scheduler: Scheduler = async): Observable { - return this.lift(new BufferTimeOperator(bufferTimeSpan, bufferCreationInterval, scheduler)); +export function bufferTime(bufferTimeSpan: number): Observable { + let length: number = arguments.length; + + let scheduler: Scheduler = async; + if (isScheduler(arguments[arguments.length - 1])) { + scheduler = arguments[arguments.length - 1]; + length--; + } + + let bufferCreationInterval: number = null; + if (length >= 2) { + bufferCreationInterval = arguments[1]; + } + + let maxBufferSize: number = Number.POSITIVE_INFINITY; + if (length >= 3) { + maxBufferSize = arguments[2]; + } + + return this.lift(new BufferTimeOperator(bufferTimeSpan, bufferCreationInterval, maxBufferSize, scheduler)); } export interface BufferTimeSignature { - (bufferTimeSpan: number, bufferCreationInterval?: number, scheduler?: Scheduler): Observable; + (bufferTimeSpan: number, scheduler?: Scheduler): Observable; + (bufferTimeSpan: number, bufferCreationInterval: number, scheduler?: Scheduler): Observable; + (bufferTimeSpan: number, bufferCreationInterval: number, maxBufferSize: number, scheduler?: Scheduler): Observable; } class BufferTimeOperator implements Operator { constructor(private bufferTimeSpan: number, private bufferCreationInterval: number, + private maxBufferSize: number, private scheduler: Scheduler) { } call(subscriber: Subscriber, source: any): any { return source._subscribe(new BufferTimeSubscriber( - subscriber, this.bufferTimeSpan, this.bufferCreationInterval, this.scheduler + subscriber, this.bufferTimeSpan, this.bufferCreationInterval, this.maxBufferSize, this.scheduler )); } } +class Context { + buffer: T[] = []; + closeAction: Subscription; +} + type CreationState = { bufferTimeSpan: number; bufferCreationInterval: number, @@ -81,93 +110,121 @@ type CreationState = { * @extends {Ignored} */ class BufferTimeSubscriber extends Subscriber { - private buffers: Array = []; + private contexts: Array> = []; + private timespanOnly: boolean; constructor(destination: Subscriber, private bufferTimeSpan: number, private bufferCreationInterval: number, + private maxBufferSize: number, private scheduler: Scheduler) { super(destination); - const buffer = this.openBuffer(); - if (bufferCreationInterval !== null && bufferCreationInterval >= 0) { - const closeState = { subscriber: this, buffer }; + const context = this.openContext(); + this.timespanOnly = bufferCreationInterval == null || bufferCreationInterval < 0; + if (this.timespanOnly) { + const timeSpanOnlyState = { subscriber: this, context, bufferTimeSpan }; + this.add(context.closeAction = scheduler.schedule(dispatchBufferTimeSpanOnly, bufferTimeSpan, timeSpanOnlyState)); + } else { + const closeState = { subscriber: this, context }; const creationState: CreationState = { bufferTimeSpan, bufferCreationInterval, subscriber: this, scheduler }; - this.add(scheduler.schedule(dispatchBufferClose, bufferTimeSpan, closeState)); + this.add(context.closeAction = scheduler.schedule(dispatchBufferClose, bufferTimeSpan, closeState)); this.add(scheduler.schedule(dispatchBufferCreation, bufferCreationInterval, creationState)); - } else { - const timeSpanOnlyState = { subscriber: this, buffer, bufferTimeSpan }; - this.add(scheduler.schedule(dispatchBufferTimeSpanOnly, bufferTimeSpan, timeSpanOnlyState)); } } protected _next(value: T) { - const buffers = this.buffers; - const len = buffers.length; + const contexts = this.contexts; + const len = contexts.length; + let filledBufferContext: Context; for (let i = 0; i < len; i++) { - buffers[i].push(value); + const context = contexts[i]; + const buffer = context.buffer; + buffer.push(value); + if (buffer.length == this.maxBufferSize) { + filledBufferContext = context; + } + } + + if (filledBufferContext) { + this.onBufferFull(filledBufferContext); } } protected _error(err: any) { - this.buffers.length = 0; + this.contexts.length = 0; super._error(err); } protected _complete() { - const { buffers, destination } = this; - while (buffers.length > 0) { - destination.next(buffers.shift()); + const { contexts, destination } = this; + while (contexts.length > 0) { + const context = contexts.shift(); + destination.next(context.buffer); } super._complete(); } protected _unsubscribe() { - this.buffers = null; + this.contexts = null; + } + + protected onBufferFull(context: Context) { + this.closeContext(context); + const closeAction = context.closeAction; + closeAction.unsubscribe(); + this.remove(closeAction); + + if (this.timespanOnly) { + context = this.openContext(); + const bufferTimeSpan = this.bufferTimeSpan; + const timeSpanOnlyState = { subscriber: this, context, bufferTimeSpan }; + this.add(context.closeAction = this.scheduler.schedule(dispatchBufferTimeSpanOnly, bufferTimeSpan, timeSpanOnlyState)); + } } - openBuffer(): T[] { - let buffer: T[] = []; - this.buffers.push(buffer); - return buffer; + openContext(): Context { + let context: Context = new Context(); + this.contexts.push(context); + return context; } - closeBuffer(buffer: T[]) { - this.destination.next(buffer); - const buffers = this.buffers; - buffers.splice(buffers.indexOf(buffer), 1); + closeContext(context: Context) { + this.destination.next(context.buffer); + const contexts = this.contexts; + contexts.splice(contexts.indexOf(context), 1); } } function dispatchBufferTimeSpanOnly(state: any) { const subscriber: BufferTimeSubscriber = state.subscriber; - const prevBuffer = state.buffer; - if (prevBuffer) { - subscriber.closeBuffer(prevBuffer); + const prevContext = state.context; + if (prevContext) { + subscriber.closeContext(prevContext); } - state.buffer = subscriber.openBuffer(); + state.context = subscriber.openContext(); if (!subscriber.isUnsubscribed) { - (this).schedule(state, state.bufferTimeSpan); + state.context.closeAction = (this).schedule(state, state.bufferTimeSpan); } } interface DispatchArg { subscriber: BufferTimeSubscriber; - buffer: Array; + context: Context; } function dispatchBufferCreation(state: CreationState) { const { bufferCreationInterval, bufferTimeSpan, subscriber, scheduler } = state; - const buffer = subscriber.openBuffer(); + const context = subscriber.openContext(); const action = >>this; if (!subscriber.isUnsubscribed) { - action.add(scheduler.schedule>(dispatchBufferClose, bufferTimeSpan, { subscriber, buffer })); + subscriber.add(context.closeAction = scheduler.schedule>(dispatchBufferClose, bufferTimeSpan, { subscriber, context })); action.schedule(state, bufferCreationInterval); } } function dispatchBufferClose(arg: DispatchArg) { - const { subscriber, buffer } = arg; - subscriber.closeBuffer(buffer); + const { subscriber, context } = arg; + subscriber.closeContext(context); }