diff --git a/packages/rxjs/src/lib/iterator/iteration.next.spec.ts b/packages/rxjs/src/lib/iterator/iteration.next.spec.ts index 8f4223b14..5f9ed0d5b 100644 --- a/packages/rxjs/src/lib/iterator/iteration.next.spec.ts +++ b/packages/rxjs/src/lib/iterator/iteration.next.spec.ts @@ -20,14 +20,13 @@ describe('iteration.next', () => { describe('nextUntilPage()', () => { it('should call next up until the given page is reached.', (done) => { - const targetPage = 10; - - iteratorNextPageUntilPage(instance, targetPage).then((page) => { - expect(page).toBe(targetPage); + const targetPagesToLoad = 10; - instance.latestLoadedPage$.pipe(first()).subscribe((latestPage) => { + iteratorNextPageUntilPage(instance, targetPagesToLoad).then((page) => { + expect(page + 1).toBe(targetPagesToLoad); - expect(latestPage).toBe(targetPage); + instance.numberOfPagesLoaded$.pipe(first()).subscribe((latestPage) => { + expect(latestPage).toBe(targetPagesToLoad); done(); }); }); @@ -36,14 +35,14 @@ describe('iteration.next', () => { it(`should call next up until the iterator's limit is reached, even if target page is after.`, (done) => { - const testLimit = 5; + const testMaxPagesToLoad = 5; const targetPage = 10; - instance.maxPageLoadLimit = testLimit; + instance.maxPageLoadLimit = testMaxPagesToLoad; iteratorNextPageUntilPage(instance, targetPage).then((page) => { - expect(page).toBe(testLimit); + expect(page).toBe(testMaxPagesToLoad - 1); - instance.latestLoadedPage$.pipe(first()).subscribe((page) => { + instance.numberOfPagesLoaded$.pipe(first()).subscribe((page) => { expect(page).toBe(instance.maxPageLoadLimit); done(); }); @@ -81,13 +80,13 @@ describe('iteration.next', () => { it(`should call next up until the iterator's limit is reached.`, (done) => { - const testLimit = 15; - instance.maxPageLoadLimit = testLimit; + const testMaxPagesToLoad = 15; + instance.maxPageLoadLimit = testMaxPagesToLoad; iteratorNextPageUntilMaxPageLoadLimit(instance).then((page) => { - expect(page).toBe(testLimit); + expect(page).toBe(testMaxPagesToLoad - 1); - instance.latestLoadedPage$.pipe(first()).subscribe((page) => { + instance.numberOfPagesLoaded$.pipe(first()).subscribe((page) => { expect(page).toBe(instance.maxPageLoadLimit); done(); }); diff --git a/packages/rxjs/src/lib/iterator/iteration.next.ts b/packages/rxjs/src/lib/iterator/iteration.next.ts index 07f7ea194..632cf3658 100644 --- a/packages/rxjs/src/lib/iterator/iteration.next.ts +++ b/packages/rxjs/src/lib/iterator/iteration.next.ts @@ -1,4 +1,4 @@ -import { map, filter, distinctUntilChanged, delay, switchMap, tap, exhaustMap, first, Observable, combineLatest, shareReplay } from "rxjs"; +import { map, first, Observable, combineLatest, shareReplay } from "rxjs"; import { ItemIteration, PageItemIteration } from "./iteration"; import { performTaskLoop, reduceBooleansWithAndFn } from '@dereekb/util'; @@ -27,7 +27,7 @@ export function iteratorNextPageUntilMaxPageLoadLimit(iterator: PageItemIteratio } /** - * Automatically calls next on the PageItemIteration up to the target page. + * Automatically calls next on the PageItemIteration up to the target page, the number of total pages that should be loaded. * * The promise will reject with an error if an error is encountered. * @@ -40,7 +40,7 @@ export function iteratorNextPageUntilPage(iteration: PageItemIteration, page: nu function checkPageLimit(page) { const pageLimit = getPageLimit(); - return page < Math.min(pageLimit, iteration.maxPageLoadLimit); + return (page + 1) < Math.min(pageLimit, iteration.maxPageLoadLimit); } return new Promise((resolve) => { diff --git a/packages/rxjs/src/lib/iterator/iteration.rxjs.spec.ts b/packages/rxjs/src/lib/iterator/iteration.rxjs.spec.ts new file mode 100644 index 000000000..897f1e312 --- /dev/null +++ b/packages/rxjs/src/lib/iterator/iteration.rxjs.spec.ts @@ -0,0 +1,43 @@ +import { ItemPageIterator, ItemPageIteratorIterationInstance } from './iterator.page'; +import { TestPageIteratorFilter, TEST_PAGE_ARRAY_ITERATOR_DELEGATE, TEST_PAGE_ARRAY_ITERATOR_PAGE_SIZE } from './iterator.page.spec'; +import { iteratorNextPageUntilPage } from './iteration.next'; +import { flattenIterationResultItemArray } from './iteration.rxjs'; +import { first } from 'rxjs/operators'; + +describe('iteration.rxjs', () => { + + let iterator: ItemPageIterator; + let instance: ItemPageIteratorIterationInstance; + + beforeEach(() => { + iterator = new ItemPageIterator(TEST_PAGE_ARRAY_ITERATOR_DELEGATE); + instance = iterator.instance({}); + }); + + afterEach(() => { + instance.destroy(); + }); + + describe('flattenIterationResultItemArray()', () => { + + it(`should aggregate the array of results into a single array.`, (done) => { + + const testPagesToLoad = 10; + + iteratorNextPageUntilPage(instance, testPagesToLoad).then((page) => { + expect(page).toBe(testPagesToLoad - 1); + + const obs = flattenIterationResultItemArray(instance); + + obs.pipe(first()).subscribe((values) => { + expect(values.length).toBe(testPagesToLoad * TEST_PAGE_ARRAY_ITERATOR_PAGE_SIZE); + done(); + }); + + }); + + }); + + }); + +}); diff --git a/packages/rxjs/src/lib/iterator/iteration.rxjs.ts b/packages/rxjs/src/lib/iterator/iteration.rxjs.ts index 83800c201..0efa750ca 100644 --- a/packages/rxjs/src/lib/iterator/iteration.rxjs.ts +++ b/packages/rxjs/src/lib/iterator/iteration.rxjs.ts @@ -1,7 +1,8 @@ -import { reduceBooleansWithAndFn } from '@dereekb/util'; -import { combineLatest, map, Observable, shareReplay } from 'rxjs'; +import { lastValue, flattenArray } from '@dereekb/util'; +import { filterMaybe, scanBuildArray } from '../rxjs'; +import { combineLatest, map, Observable, shareReplay, skipWhile } from 'rxjs'; import { mapLoadingStateResults, PageListLoadingState } from '../loading'; -import { ItemIteration, PageItemIteration } from './iteration'; +import { PageItemIteration } from './iteration'; /** @@ -15,3 +16,29 @@ export function iterationCurrentPageListLoadingState(iteration: PageItemItera shareReplay(1) ); } + +/** + * Used for PageItemIterations that have an array of results returned per page instead of a single item. + * + * @param iteration + * @returns + */ +export function flattenIterationResultItemArray(iteration: PageItemIteration): Observable { + return iteration.allItems$.pipe( + scanBuildArray((allItems: T[][]) => { + const seed = flattenArray(allItems); + const latestItem = lastValue(allItems); + + const accumulatorObs: Observable = iteration.latestState$.pipe( + skipWhile(x => x.model === latestItem), + map(x => x.model), + filterMaybe() + ); + + return { + seed, + accumulatorObs + } as any; + }) + ); +} diff --git a/packages/rxjs/src/lib/iterator/iteration.ts b/packages/rxjs/src/lib/iterator/iteration.ts index 307d8bca9..d584478a4 100644 --- a/packages/rxjs/src/lib/iterator/iteration.ts +++ b/packages/rxjs/src/lib/iterator/iteration.ts @@ -43,7 +43,9 @@ export interface ItemIteration extends Destroyable { export interface PageItemIteration extends ItemIteration { /** - * Current page load limit. + * The maximum number of pages allowed to be loaded. + * + * A page of 15 means that pages 0-14 can be loaded, but not page 15. */ maxPageLoadLimit: PageNumber; diff --git a/packages/rxjs/src/lib/iterator/iterator.page.spec.ts b/packages/rxjs/src/lib/iterator/iterator.page.spec.ts index 1ce231674..59abb5571 100644 --- a/packages/rxjs/src/lib/iterator/iterator.page.spec.ts +++ b/packages/rxjs/src/lib/iterator/iterator.page.spec.ts @@ -1,3 +1,4 @@ +import { PageNumber, range } from '@dereekb/util'; import { skip } from 'rxjs/operators'; import { FIRST_PAGE } from '@dereekb/util'; import { ItemPageIterator, ItemPageIteratorDelegate, ItemPageIteratorIterationInstance, ItemPageIteratorRequest, ItemPageIteratorResult } from './iterator.page'; @@ -11,33 +12,43 @@ export interface TestPageIteratorFilter { resultError?: any; } -export const TEST_PAGE_ITERATOR_DELEGATE: ItemPageIteratorDelegate = { - loadItemsForPage: (request: ItemPageIteratorRequest) => { - const result: ItemPageIteratorResult = { - value: request.page.page - }; +export function makeTestPageIteratorDelegate(makeResultsFn: (page: PageNumber) => T): ItemPageIteratorDelegate { + return { + loadItemsForPage: (request: ItemPageIteratorRequest) => { + const result: ItemPageIteratorResult = { + value: makeResultsFn(request.page.page) + }; - let resultObs: Observable> = of(result); + let resultObs: Observable> = of(result); - if (request.page.filter) { - const { delayTime, resultError, end } = request.page.filter; + if (request.page.filter) { + const { delayTime, resultError, end } = request.page.filter; - if (delayTime) { - resultObs = resultObs.pipe(delay(delayTime)); - } else if (resultError) { - resultObs = of({ - error: resultError - }); - } else if (end) { - resultObs = of({ - end: true - }); + if (delayTime) { + resultObs = resultObs.pipe(delay(delayTime)); + } else if (resultError) { + resultObs = of({ + error: resultError + }); + } else if (end) { + resultObs = of({ + end: true + }); + } } - } - return resultObs; + return resultObs; + } } -}; +} + +export const TEST_PAGE_ARRAY_ITERATOR_PAGE_SIZE = 10; +export const TEST_PAGE_ITERATOR_DELEGATE: ItemPageIteratorDelegate = makeTestPageIteratorDelegate((page) => page); +export const TEST_PAGE_ARRAY_ITERATOR_DELEGATE: ItemPageIteratorDelegate = makeTestPageIteratorDelegate((page) => { + const start = page * TEST_PAGE_ARRAY_ITERATOR_PAGE_SIZE; + const end = start + TEST_PAGE_ARRAY_ITERATOR_PAGE_SIZE; + return range({ start, end }); +}); describe('ItemPageIterator', () => { @@ -195,16 +206,16 @@ describe('ItemPageIterator', () => { it('should return all items after being subscribed to a few pages in.', (done) => { - const loadPages = 5; + const pagesToLoad = 5; - iteratorNextPageUntilPage(instance, loadPages).then(() => { + iteratorNextPageUntilPage(instance, pagesToLoad).then(() => { - instance.latestPageResultState$.subscribe((latestPage) => { - expect(latestPage.page).toBe(loadPages); + instance.numberOfPagesLoaded$.subscribe((pagesLoaded) => { + expect(pagesLoaded).toBe(pagesToLoad); instance.allItems$.subscribe((allItems) => { expect(allItems).toBeDefined(); - expect(allItems.length).toBe(loadPages + 1); + expect(allItems.length).toBe(pagesToLoad); instance.destroy(); done(); @@ -250,8 +261,8 @@ describe('ItemPageIterator', () => { // Load more pages iteratorNextPageUntilPage(instance, page).then(() => { - expect(emissions).toBe(page + 1); - expect(latestAllItems.length).toBe(page + 1); + expect(emissions).toBe(page); + expect(latestAllItems.length).toBe(page); done(); }); diff --git a/packages/rxjs/src/lib/iterator/iterator.page.ts b/packages/rxjs/src/lib/iterator/iterator.page.ts index a20b3d53c..ccced2f11 100644 --- a/packages/rxjs/src/lib/iterator/iterator.page.ts +++ b/packages/rxjs/src/lib/iterator/iterator.page.ts @@ -1,4 +1,4 @@ -import { distinctUntilArrayLengthChanges, filterMaybe } from '../rxjs'; +import { distinctUntilArrayLengthChanges, filterMaybe, scanBuildArray } from '../rxjs'; import { distinctUntilChanged, map, scan, startWith, catchError, skip, skipWhile, tap, take, mergeMap, delay } from 'rxjs/operators'; import { PageLoadingState, loadingStateHasError, loadingStateHasFinishedLoading, loadingStateIsLoading, errorPageResult, successPageResult, mapLoadingStateResults, beginLoading } from "../loading"; import { FIRST_PAGE, UNLOADED_PAGE, Destroyable, Filter, filteredPage, FilteredPage, getNextPageNumber, hasValueOrNotEmpty, Maybe, PageNumber, filterMaybeValues, lastValue } from "@dereekb/util"; @@ -338,6 +338,11 @@ export class ItemPageIteratorIterationInstance = this.latestLoadedPage$.pipe( + map(x => x + 1), + shareReplay(1) + ); + // MARK: ItemIteration /** * Whether or not there are more results to load. @@ -350,8 +355,8 @@ export class ItemPageIteratorIterationInstance = combineLatest([this._maxPageLoadLimit, this.latestLoadedPage$.pipe(startWith(UNLOADED_PAGE))]).pipe( - map(([limit, count]) => count < limit), + readonly canLoadMore$: Observable = combineLatest([this._maxPageLoadLimit, this.numberOfPagesLoaded$.pipe(startWith(0))]).pipe( + map(([maxPageLoadLimit, numberOfPagesLoaded]) => numberOfPagesLoaded < maxPageLoadLimit), distinctUntilChanged(), shareReplay(1) ); @@ -375,33 +380,30 @@ export class ItemPageIteratorIterationInstance = this.state$.pipe( skipWhile(x => !x.latestFinished), // Do not emit until the first finished state occurs. distinctUntilArrayLengthChanges((x) => x.allSuccessful), - /* - We start with allSuccessfulPageResults$ since it contains all page results since the start of the iterator, - and subscription to allItems may not have started at the same time. - - We use scan to add in all models coming in afterwards by pushing them into the accumulator. - This is to prevent performance issues with very large iteration sets, since we can - append onto the array, rather than concat/copy the array each time. - */ - exhaustMap((state) => { + scanBuildArray((state) => { + /* + We start with allSuccessfulPageResults$ since it contains all page results since the start of the iterator, + and subscription to allItems may not have started at the same time. + + We use scan to add in all models coming in afterwards by pushing them into the accumulator. + This is to prevent performance issues with very large iteration sets, since we can + append onto the array, rather than concat/copy the array each time. + */ const allPageResultsUpToFirstSubscription = state.allSuccessful; const firstLatestState = lastValue(allPageResultsUpToFirstSubscription); const seed: V[] = filterMaybeValues(allPageResultsUpToFirstSubscription.map(x => x.model?.value)); - return this.latestPageResultState$.pipe( + const accumulatorObs: Observable = this.latestPageResultState$.pipe( skipWhile(x => x === firstLatestState), - startWith(beginLoading()), // Start with to prevent waiting on emissions from skip. - scan((acc: V[], next: PageLoadingState>) => { - if (next.model?.value != null) { - acc.push(next.model.value); - } + map(x => x.model?.value), + filterMaybe() + ); - return acc; - }, seed) - ) - }), - distinctUntilArrayLengthChanges(), - shareReplay(1) + return { + seed, + accumulatorObs + }; + }) ); next(request: ItemIteratorNextRequest = {}): void { diff --git a/packages/rxjs/src/lib/rxjs/array.ts b/packages/rxjs/src/lib/rxjs/array.ts index e9921eaf0..f9834ab99 100644 --- a/packages/rxjs/src/lib/rxjs/array.ts +++ b/packages/rxjs/src/lib/rxjs/array.ts @@ -1,4 +1,6 @@ -import { distinctUntilChanged, MonoTypeOperatorFunction } from "rxjs"; +import { exhaustMap, scan, shareReplay, startWith } from 'rxjs/operators'; +import { distinctUntilChanged, MonoTypeOperatorFunction, Observable, OperatorFunction } from "rxjs"; +import { Maybe, ArrayOrValue, mergeArrayOrValueIntoArray } from '@dereekb/util'; export function distinctUntilArrayLengthChanges(getArray: (value: A) => any[]): MonoTypeOperatorFunction; export function distinctUntilArrayLengthChanges(): MonoTypeOperatorFunction; @@ -9,3 +11,41 @@ export function distinctUntilArrayLengthChanges(getArray?: (value: A) => any[ return distinctUntilChanged((a, b) => a === b, (x) => getArray(x).length); } + + +export interface ScanBuildArrayConfig { + seed?: Maybe; + accumulatorObs: Observable>; +}; + +export type ScanBuildArrayConfigFn = (seedState: S) => ScanBuildArrayConfig; + +/** + * Used to lazy build an array from two observables. + * + * The piped observable is for retrieving the seed value, and the accumulatorObs observable is used for + * retrieving values going forward. + * + * This is useful in cases where values are very large. + * + * @param param0 + * @returns + */ +export function scanBuildArray(init: ScanBuildArrayConfigFn): OperatorFunction { + return exhaustMap((seedState: S) => { + const { seed = [], accumulatorObs } = init(seedState); + + return accumulatorObs.pipe( + startWith(undefined), // Start with to not wait for the accumulator to pass a value. + scan((acc: T[], next: Maybe>) => { + if (next != null) { + mergeArrayOrValueIntoArray(acc, next); + } + + return acc; + }, seed), + distinctUntilArrayLengthChanges(), + shareReplay(1) + ); + }); +} diff --git a/packages/util/src/lib/array/array.number.ts b/packages/util/src/lib/array/array.number.ts index b9cf4e007..ece343ed0 100644 --- a/packages/util/src/lib/array/array.number.ts +++ b/packages/util/src/lib/array/array.number.ts @@ -41,6 +41,8 @@ export function reduceNumbersFn(reduceFn: ((a: number, b: numb /** * Generates an array containing the range of numbers specified. * + * The end value is not included. + * * @param param0 * @returns */ diff --git a/packages/util/src/lib/array/array.ts b/packages/util/src/lib/array/array.ts index 07743eb35..2d5f6edae 100644 --- a/packages/util/src/lib/array/array.ts +++ b/packages/util/src/lib/array/array.ts @@ -81,7 +81,7 @@ export function mergeIntoArray(target: Maybe, ...arrays: T[][]) { if (!target) { target = []; } - + arrays.forEach((array) => { mergeArrayIntoArray(target, array); }); @@ -89,6 +89,15 @@ export function mergeIntoArray(target: Maybe, ...arrays: T[][]) { return target; } +export function mergeArrayOrValueIntoArray(target: T[], value: ArrayOrValue): T[] { + if (Array.isArray(value)) { + return mergeArrayIntoArray(target, value); + } else { + target.push(value); + return target; + } +} + /** * Merges all the values from the second array into the first using push. *