Skip to content

Commit

Permalink
feat: added flattenIterationResultItemArray
Browse files Browse the repository at this point in the history
- Updated how maxPageLoadLimit is used, and how iteratorNextPageUntilPage functions. It now loads up to that page, which is the total number of pages loaded. Iterations now are 0-indexed.
  • Loading branch information
Derek Burgman committed Jan 25, 2022
1 parent ac4fd34 commit f3220c7
Show file tree
Hide file tree
Showing 10 changed files with 210 additions and 75 deletions.
27 changes: 13 additions & 14 deletions packages/rxjs/src/lib/iterator/iteration.next.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,13 @@ describe('iteration.next', () => {
describe('nextUntilPage()', () => {

it('should call next up until the given page is reached.', (done) => {
const targetPage = 10;

iteratorNextPageUntilPage(instance, targetPage).then((page) => {
expect(page).toBe(targetPage);
const targetPagesToLoad = 10;

instance.latestLoadedPage$.pipe(first()).subscribe((latestPage) => {
iteratorNextPageUntilPage(instance, targetPagesToLoad).then((page) => {
expect(page + 1).toBe(targetPagesToLoad);

expect(latestPage).toBe(targetPage);
instance.numberOfPagesLoaded$.pipe(first()).subscribe((latestPage) => {
expect(latestPage).toBe(targetPagesToLoad);
done();
});
});
Expand All @@ -36,14 +35,14 @@ describe('iteration.next', () => {

it(`should call next up until the iterator's limit is reached, even if target page is after.`, (done) => {

const testLimit = 5;
const testMaxPagesToLoad = 5;
const targetPage = 10;
instance.maxPageLoadLimit = testLimit;
instance.maxPageLoadLimit = testMaxPagesToLoad;

iteratorNextPageUntilPage(instance, targetPage).then((page) => {
expect(page).toBe(testLimit);
expect(page).toBe(testMaxPagesToLoad - 1);

instance.latestLoadedPage$.pipe(first()).subscribe((page) => {
instance.numberOfPagesLoaded$.pipe(first()).subscribe((page) => {
expect(page).toBe(instance.maxPageLoadLimit);
done();
});
Expand Down Expand Up @@ -81,13 +80,13 @@ describe('iteration.next', () => {

it(`should call next up until the iterator's limit is reached.`, (done) => {

const testLimit = 15;
instance.maxPageLoadLimit = testLimit;
const testMaxPagesToLoad = 15;
instance.maxPageLoadLimit = testMaxPagesToLoad;

iteratorNextPageUntilMaxPageLoadLimit(instance).then((page) => {
expect(page).toBe(testLimit);
expect(page).toBe(testMaxPagesToLoad - 1);

instance.latestLoadedPage$.pipe(first()).subscribe((page) => {
instance.numberOfPagesLoaded$.pipe(first()).subscribe((page) => {
expect(page).toBe(instance.maxPageLoadLimit);
done();
});
Expand Down
6 changes: 3 additions & 3 deletions packages/rxjs/src/lib/iterator/iteration.next.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { map, filter, distinctUntilChanged, delay, switchMap, tap, exhaustMap, first, Observable, combineLatest, shareReplay } from "rxjs";
import { map, first, Observable, combineLatest, shareReplay } from "rxjs";
import { ItemIteration, PageItemIteration } from "./iteration";
import { performTaskLoop, reduceBooleansWithAndFn } from '@dereekb/util';

Expand Down Expand Up @@ -27,7 +27,7 @@ export function iteratorNextPageUntilMaxPageLoadLimit(iterator: PageItemIteratio
}

/**
* Automatically calls next on the PageItemIteration up to the target page.
* Automatically calls next on the PageItemIteration up to the target page, the number of total pages that should be loaded.
*
* The promise will reject with an error if an error is encountered.
*
Expand All @@ -40,7 +40,7 @@ export function iteratorNextPageUntilPage(iteration: PageItemIteration, page: nu

function checkPageLimit(page) {
const pageLimit = getPageLimit();
return page < Math.min(pageLimit, iteration.maxPageLoadLimit);
return (page + 1) < Math.min(pageLimit, iteration.maxPageLoadLimit);
}

return new Promise((resolve) => {
Expand Down
43 changes: 43 additions & 0 deletions packages/rxjs/src/lib/iterator/iteration.rxjs.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import { ItemPageIterator, ItemPageIteratorIterationInstance } from './iterator.page';
import { TestPageIteratorFilter, TEST_PAGE_ARRAY_ITERATOR_DELEGATE, TEST_PAGE_ARRAY_ITERATOR_PAGE_SIZE } from './iterator.page.spec';
import { iteratorNextPageUntilPage } from './iteration.next';
import { flattenIterationResultItemArray } from './iteration.rxjs';
import { first } from 'rxjs/operators';

describe('iteration.rxjs', () => {

let iterator: ItemPageIterator<number[], TestPageIteratorFilter>;
let instance: ItemPageIteratorIterationInstance<number[], TestPageIteratorFilter>;

beforeEach(() => {
iterator = new ItemPageIterator(TEST_PAGE_ARRAY_ITERATOR_DELEGATE);
instance = iterator.instance({});
});

afterEach(() => {
instance.destroy();
});

describe('flattenIterationResultItemArray()', () => {

it(`should aggregate the array of results into a single array.`, (done) => {

const testPagesToLoad = 10;

iteratorNextPageUntilPage(instance, testPagesToLoad).then((page) => {
expect(page).toBe(testPagesToLoad - 1);

const obs = flattenIterationResultItemArray(instance);

obs.pipe(first()).subscribe((values) => {
expect(values.length).toBe(testPagesToLoad * TEST_PAGE_ARRAY_ITERATOR_PAGE_SIZE);
done();
});

});

});

});

});
33 changes: 30 additions & 3 deletions packages/rxjs/src/lib/iterator/iteration.rxjs.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { reduceBooleansWithAndFn } from '@dereekb/util';
import { combineLatest, map, Observable, shareReplay } from 'rxjs';
import { lastValue, flattenArray } from '@dereekb/util';
import { filterMaybe, scanBuildArray } from '../rxjs';
import { combineLatest, map, Observable, shareReplay, skipWhile } from 'rxjs';
import { mapLoadingStateResults, PageListLoadingState } from '../loading';
import { ItemIteration, PageItemIteration } from './iteration';
import { PageItemIteration } from './iteration';


/**
Expand All @@ -15,3 +16,29 @@ export function iterationCurrentPageListLoadingState<V>(iteration: PageItemItera
shareReplay(1)
);
}

/**
* Used for PageItemIterations that have an array of results returned per page instead of a single item.
*
* @param iteration
* @returns
*/
export function flattenIterationResultItemArray<T>(iteration: PageItemIteration<T[]>): Observable<T[]> {
return iteration.allItems$.pipe(
scanBuildArray((allItems: T[][]) => {
const seed = flattenArray(allItems);
const latestItem = lastValue(allItems);

const accumulatorObs: Observable<T[]> = iteration.latestState$.pipe(
skipWhile(x => x.model === latestItem),
map(x => x.model),
filterMaybe()
);

return {
seed,
accumulatorObs
} as any;
})
);
}
4 changes: 3 additions & 1 deletion packages/rxjs/src/lib/iterator/iteration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ export interface ItemIteration<V = any> extends Destroyable {
export interface PageItemIteration<V = any> extends ItemIteration<V> {

/**
* Current page load limit.
* The maximum number of pages allowed to be loaded.
*
* A page of 15 means that pages 0-14 can be loaded, but not page 15.
*/
maxPageLoadLimit: PageNumber;

Expand Down
67 changes: 39 additions & 28 deletions packages/rxjs/src/lib/iterator/iterator.page.spec.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { PageNumber, range } from '@dereekb/util';
import { skip } from 'rxjs/operators';
import { FIRST_PAGE } from '@dereekb/util';
import { ItemPageIterator, ItemPageIteratorDelegate, ItemPageIteratorIterationInstance, ItemPageIteratorRequest, ItemPageIteratorResult } from './iterator.page';
Expand All @@ -11,33 +12,43 @@ export interface TestPageIteratorFilter {
resultError?: any;
}

export const TEST_PAGE_ITERATOR_DELEGATE: ItemPageIteratorDelegate<number, TestPageIteratorFilter> = {
loadItemsForPage: (request: ItemPageIteratorRequest<number, TestPageIteratorFilter>) => {
const result: ItemPageIteratorResult<number> = {
value: request.page.page
};
export function makeTestPageIteratorDelegate<T>(makeResultsFn: (page: PageNumber) => T): ItemPageIteratorDelegate<T, TestPageIteratorFilter> {
return {
loadItemsForPage: (request: ItemPageIteratorRequest<T, TestPageIteratorFilter>) => {
const result: ItemPageIteratorResult<T> = {
value: makeResultsFn(request.page.page)
};

let resultObs: Observable<ItemPageIteratorResult<number>> = of(result);
let resultObs: Observable<ItemPageIteratorResult<T>> = of(result);

if (request.page.filter) {
const { delayTime, resultError, end } = request.page.filter;
if (request.page.filter) {
const { delayTime, resultError, end } = request.page.filter;

if (delayTime) {
resultObs = resultObs.pipe(delay(delayTime));
} else if (resultError) {
resultObs = of({
error: resultError
});
} else if (end) {
resultObs = of({
end: true
});
if (delayTime) {
resultObs = resultObs.pipe(delay(delayTime));
} else if (resultError) {
resultObs = of({
error: resultError
});
} else if (end) {
resultObs = of({
end: true
});
}
}
}

return resultObs;
return resultObs;
}
}
};
}

export const TEST_PAGE_ARRAY_ITERATOR_PAGE_SIZE = 10;
export const TEST_PAGE_ITERATOR_DELEGATE: ItemPageIteratorDelegate<number, TestPageIteratorFilter> = makeTestPageIteratorDelegate((page) => page);
export const TEST_PAGE_ARRAY_ITERATOR_DELEGATE: ItemPageIteratorDelegate<number[], TestPageIteratorFilter> = makeTestPageIteratorDelegate((page) => {
const start = page * TEST_PAGE_ARRAY_ITERATOR_PAGE_SIZE;
const end = start + TEST_PAGE_ARRAY_ITERATOR_PAGE_SIZE;
return range({ start, end });
});

describe('ItemPageIterator', () => {

Expand Down Expand Up @@ -195,16 +206,16 @@ describe('ItemPageIterator', () => {

it('should return all items after being subscribed to a few pages in.', (done) => {

const loadPages = 5;
const pagesToLoad = 5;

iteratorNextPageUntilPage(instance, loadPages).then(() => {
iteratorNextPageUntilPage(instance, pagesToLoad).then(() => {

instance.latestPageResultState$.subscribe((latestPage) => {
expect(latestPage.page).toBe(loadPages);
instance.numberOfPagesLoaded$.subscribe((pagesLoaded) => {
expect(pagesLoaded).toBe(pagesToLoad);

instance.allItems$.subscribe((allItems) => {
expect(allItems).toBeDefined();
expect(allItems.length).toBe(loadPages + 1);
expect(allItems.length).toBe(pagesToLoad);

instance.destroy();
done();
Expand Down Expand Up @@ -250,8 +261,8 @@ describe('ItemPageIterator', () => {

// Load more pages
iteratorNextPageUntilPage(instance, page).then(() => {
expect(emissions).toBe(page + 1);
expect(latestAllItems.length).toBe(page + 1);
expect(emissions).toBe(page);
expect(latestAllItems.length).toBe(page);
done();
});

Expand Down
50 changes: 26 additions & 24 deletions packages/rxjs/src/lib/iterator/iterator.page.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { distinctUntilArrayLengthChanges, filterMaybe } from '../rxjs';
import { distinctUntilArrayLengthChanges, filterMaybe, scanBuildArray } from '../rxjs';
import { distinctUntilChanged, map, scan, startWith, catchError, skip, skipWhile, tap, take, mergeMap, delay } from 'rxjs/operators';
import { PageLoadingState, loadingStateHasError, loadingStateHasFinishedLoading, loadingStateIsLoading, errorPageResult, successPageResult, mapLoadingStateResults, beginLoading } from "../loading";
import { FIRST_PAGE, UNLOADED_PAGE, Destroyable, Filter, filteredPage, FilteredPage, getNextPageNumber, hasValueOrNotEmpty, Maybe, PageNumber, filterMaybeValues, lastValue } from "@dereekb/util";
Expand Down Expand Up @@ -338,6 +338,11 @@ export class ItemPageIteratorIterationInstance<V, F, C extends ItemPageIteration
shareReplay(1)
);

readonly numberOfPagesLoaded$: Observable<PageNumber> = this.latestLoadedPage$.pipe(
map(x => x + 1),
shareReplay(1)
);

// MARK: ItemIteration
/**
* Whether or not there are more results to load.
Expand All @@ -350,8 +355,8 @@ export class ItemPageIteratorIterationInstance<V, F, C extends ItemPageIteration
/**
* Whether or not the successfulPageResultsCount has passed the maxPageLoadLimit
*/
readonly canLoadMore$: Observable<boolean> = combineLatest([this._maxPageLoadLimit, this.latestLoadedPage$.pipe(startWith(UNLOADED_PAGE))]).pipe(
map(([limit, count]) => count < limit),
readonly canLoadMore$: Observable<boolean> = combineLatest([this._maxPageLoadLimit, this.numberOfPagesLoaded$.pipe(startWith(0))]).pipe(
map(([maxPageLoadLimit, numberOfPagesLoaded]) => numberOfPagesLoaded < maxPageLoadLimit),
distinctUntilChanged(),
shareReplay(1)
);
Expand All @@ -375,33 +380,30 @@ export class ItemPageIteratorIterationInstance<V, F, C extends ItemPageIteration
readonly allItems$: Observable<V[]> = this.state$.pipe(
skipWhile(x => !x.latestFinished), // Do not emit until the first finished state occurs.
distinctUntilArrayLengthChanges((x) => x.allSuccessful),
/*
We start with allSuccessfulPageResults$ since it contains all page results since the start of the iterator,
and subscription to allItems may not have started at the same time.
We use scan to add in all models coming in afterwards by pushing them into the accumulator.
This is to prevent performance issues with very large iteration sets, since we can
append onto the array, rather than concat/copy the array each time.
*/
exhaustMap((state) => {
scanBuildArray((state) => {
/*
We start with allSuccessfulPageResults$ since it contains all page results since the start of the iterator,
and subscription to allItems may not have started at the same time.
We use scan to add in all models coming in afterwards by pushing them into the accumulator.
This is to prevent performance issues with very large iteration sets, since we can
append onto the array, rather than concat/copy the array each time.
*/
const allPageResultsUpToFirstSubscription = state.allSuccessful;
const firstLatestState = lastValue(allPageResultsUpToFirstSubscription);
const seed: V[] = filterMaybeValues(allPageResultsUpToFirstSubscription.map(x => x.model?.value));

return this.latestPageResultState$.pipe(
const accumulatorObs: Observable<V> = this.latestPageResultState$.pipe(
skipWhile(x => x === firstLatestState),
startWith(beginLoading()), // Start with to prevent waiting on emissions from skip.
scan((acc: V[], next: PageLoadingState<ItemPageIteratorResult<V>>) => {
if (next.model?.value != null) {
acc.push(next.model.value);
}
map(x => x.model?.value),
filterMaybe()
);

return acc;
}, seed)
)
}),
distinctUntilArrayLengthChanges(),
shareReplay(1)
return {
seed,
accumulatorObs
};
})
);

next(request: ItemIteratorNextRequest = {}): void {
Expand Down
Loading

0 comments on commit f3220c7

Please sign in to comment.