Skip to content

Commit

Permalink
feat: added iteration accumulator
Browse files Browse the repository at this point in the history
- Separated accumulator from iterator to prevent iterator from being unable to iterate across large data sets due to caching of accumulated values.
  • Loading branch information
Derek Burgman committed Jan 25, 2022
1 parent f3220c7 commit aaf0390
Show file tree
Hide file tree
Showing 9 changed files with 286 additions and 182 deletions.
2 changes: 1 addition & 1 deletion packages/rxjs/src/lib/iterator/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
export * from './iteration.next';
export * from './iteration.rxjs';
export * from './iteration.accumulator.rxjs';
export * from './iteration';
export * from './iterator.page';
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
import { ItemPageIterator, ItemPageIteratorIterationInstance } from './iterator.page';
import { TestPageIteratorFilter, TEST_PAGE_ARRAY_ITERATOR_DELEGATE, TEST_PAGE_ARRAY_ITERATOR_PAGE_SIZE } from './iterator.page.spec';
import { iteratorNextPageUntilPage } from './iteration.next';
import { flattenIterationResultItemArray } from './iteration.rxjs';
import { flattenIterationResultItemArray } from './iteration.accumulator.rxjs';
import { first } from 'rxjs/operators';
import { PageItemIterationAccumulatorInstance } from './iteration.accumulator';

describe('iteration.rxjs', () => {

let iterator: ItemPageIterator<number[], TestPageIteratorFilter>;
let instance: ItemPageIteratorIterationInstance<number[], TestPageIteratorFilter>;
let accumulator: PageItemIterationAccumulatorInstance<number[]>;

beforeEach(() => {
iterator = new ItemPageIterator(TEST_PAGE_ARRAY_ITERATOR_DELEGATE);
instance = iterator.instance({});
accumulator = new PageItemIterationAccumulatorInstance(instance);
});

afterEach(() => {
Expand All @@ -27,7 +30,7 @@ describe('iteration.rxjs', () => {
iteratorNextPageUntilPage(instance, testPagesToLoad).then((page) => {
expect(page).toBe(testPagesToLoad - 1);

const obs = flattenIterationResultItemArray(instance);
const obs = flattenIterationResultItemArray(accumulator);

obs.pipe(first()).subscribe((values) => {
expect(values.length).toBe(testPagesToLoad * TEST_PAGE_ARRAY_ITERATOR_PAGE_SIZE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,34 +2,21 @@ import { lastValue, flattenArray } from '@dereekb/util';
import { filterMaybe, scanBuildArray } from '../rxjs';
import { combineLatest, map, Observable, shareReplay, skipWhile } from 'rxjs';
import { mapLoadingStateResults, PageListLoadingState } from '../loading';
import { PageItemIteration } from './iteration';


/**
* A PageListLoadingState that captures all the values that have been loaded so far, and the current loading state of currentPageResult$.
*/
export function iterationCurrentPageListLoadingState<V>(iteration: PageItemIteration<V>): Observable<PageListLoadingState<V>> {
return combineLatest([iteration.currentPageState$, iteration.allItems$]).pipe(
map(([state, values]) => mapLoadingStateResults(state, {
mapValue: () => values
}) as PageListLoadingState<V>),
shareReplay(1)
);
}
import { ItemIterationAccumulator, PageItemIterationAccumulator } from './iteration.accumulator';

/**
* Used for PageItemIterations that have an array of results returned per page instead of a single item.
* Used for ItemIterationAccumulators that have an array of results returned per page instead of a single item.
*
* @param iteration
* @returns
*/
export function flattenIterationResultItemArray<T>(iteration: PageItemIteration<T[]>): Observable<T[]> {
export function flattenIterationResultItemArray<T>(iteration: ItemIterationAccumulator<T[]>): Observable<T[]> {
return iteration.allItems$.pipe(
scanBuildArray((allItems: T[][]) => {
const seed = flattenArray(allItems);
const latestItem = lastValue(allItems);

const accumulatorObs: Observable<T[]> = iteration.latestState$.pipe(
const accumulatorObs: Observable<T[]> = iteration.itemIteration.latestState$.pipe(
skipWhile(x => x.model === latestItem),
map(x => x.model),
filterMaybe()
Expand All @@ -42,3 +29,15 @@ export function flattenIterationResultItemArray<T>(iteration: PageItemIteration<
})
);
}

/**
* A PageListLoadingState that captures all the values that have been loaded so far, and the current loading state of currentPageResult$.
*/
export function iterationCurrentPageListLoadingState<V>(iteration: PageItemIterationAccumulator<V>): Observable<PageListLoadingState<V>> {
return combineLatest([iteration.itemIteration.currentState$, iteration.allItems$]).pipe(
map(([state, values]) => mapLoadingStateResults(state, {
mapValue: () => values
}) as PageListLoadingState<V>),
shareReplay(1)
);
}
130 changes: 130 additions & 0 deletions packages/rxjs/src/lib/iterator/iteration.accumulator.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
import { PageNumber, range } from '@dereekb/util';
import { skip } from 'rxjs/operators';
import { FIRST_PAGE } from '@dereekb/util';
import { ItemPageIterator, ItemPageIteratorDelegate, ItemPageIteratorIterationInstance, ItemPageIteratorRequest, ItemPageIteratorResult } from './iterator.page';
import { loadingStateHasFinishedLoading, loadingStateIsLoading } from '../loading';
import { delay, filter, first, of, Observable, tap } from 'rxjs';
import { iteratorNextPageUntilPage } from './iteration.next';
import { ItemIterationAccumulatorInstance } from './iteration.accumulator';
import { TestPageIteratorFilter, TEST_PAGE_ITERATOR_DELEGATE } from './iterator.page.spec';

describe('ItemPageIterator', () => {

let iterator: ItemPageIterator<number, TestPageIteratorFilter>;

beforeEach(() => {
iterator = new ItemPageIterator(TEST_PAGE_ITERATOR_DELEGATE);
});

describe('ItemIterationAccumulatorInstance', () => {

let instance: ItemPageIteratorIterationInstance<number, TestPageIteratorFilter>;
let accumulator: ItemIterationAccumulatorInstance<number>;

function initInstanceWithFilter(filter?: TestPageIteratorFilter) {
instance = iterator.instance({
filter: filter ?? {}
});
accumulator = new ItemIterationAccumulatorInstance(instance);
}

beforeEach(() => {
initInstanceWithFilter();
});

afterEach(() => {
instance.destroy();
accumulator.destroy();
});

describe('successfulLoadCount$', () => {

it('should return 1 after the first result has been loaded.', (done) => {

instance.currentPageResultState$.pipe(
filter(x => loadingStateHasFinishedLoading(x)),
first()
).subscribe(() => {

accumulator.successfulLoadCount$.pipe(
first()
).subscribe((count) => {
expect(count).toBe(1);
done();
});
});
});

});

describe('allItems$', () => {

it('should return all items after being subscribed to a few pages in.', (done) => {

const pagesToLoad = 5;

iteratorNextPageUntilPage(instance, pagesToLoad).then(() => {

instance.numberOfPagesLoaded$.subscribe((pagesLoaded) => {
expect(pagesLoaded).toBe(pagesToLoad);

accumulator.allItems$.subscribe((allItems) => {
expect(allItems).toBeDefined();
expect(allItems.length).toBe(pagesToLoad);

instance.destroy();
done();
});
})

});

});

it('should emit only after the first state has come through.', (done) => {

initInstanceWithFilter({
delayTime: 500
});

let emissions = 0;

// Should trigger first page to be loaded.
accumulator.allItems$.subscribe((allItems) => {
emissions += 1;

expect(allItems.length).toBe(1);

done();
});

expect(emissions).toBe(0);
});

it('should accumulate values as pages are loaded.', (done) => {

let emissions = 0;
let latestAllItems: number[];

// Should trigger first page to be loaded.
accumulator.allItems$.subscribe((allItems) => {
emissions += 1;
latestAllItems = allItems;
});

const page = 1;

// Load more pages
iteratorNextPageUntilPage(instance, page).then(() => {
expect(emissions).toBe(page);
expect(latestAllItems.length).toBe(page);
done();
});

});

});

});

});
97 changes: 97 additions & 0 deletions packages/rxjs/src/lib/iterator/iteration.accumulator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import { SubscriptionObject } from '@dereekb/rxjs';
import { distinctUntilChanged, filter } from 'rxjs/operators';
import { distinctUntilArrayLengthChanges, scanBuildArray, filterMaybe, scanIntoArray } from "../rxjs";
import { lastValue, filterMaybeValues, Destroyable } from "@dereekb/util";
import { map, Observable, shareReplay, skipWhile } from "rxjs";
import { ItemIteration, PageItemIteration } from "./iteration";
import { LoadingState, loadingStateHasError } from '../loading';

/**
* An item iteration that exposes all accumulated values.
*/
export interface ItemIterationAccumulator<V> {

/**
* Iteration being accumulated.
*/
readonly itemIteration: ItemIteration<V>;

/**
* Returns all items loaded so far in the iteration in a single array.
*/
readonly allItems$: Observable<V[]>;

}

/**
* An item iteration that exposes all accumulated values.
*/
export interface PageItemIterationAccumulator<V> extends ItemIterationAccumulator<V> {

/**
* Iteration being accumulated.
*/
readonly itemIteration: PageItemIteration<V>;

}

export class ItemIterationAccumulatorInstance<V, I extends ItemIteration<V> = ItemIteration<V>> implements ItemIterationAccumulator<V>, Destroyable {

constructor(readonly itemIteration: I) { }

readonly latestSuccessfulState$: Observable<LoadingState<V>> = this.itemIteration.latestState$.pipe(
filter(x => !loadingStateHasError(x)),
distinctUntilChanged(),
shareReplay(1)
);

/**
* All successful page results in a single array.
*/
readonly allSuccessfulStates$: Observable<LoadingState<V>[]> = this.latestSuccessfulState$.pipe(
scanIntoArray({ immutable: false }),
distinctUntilArrayLengthChanges(),
shareReplay(1)
);

readonly successfulLoadCount$: Observable<number> = this.allSuccessfulStates$.pipe(
map(x => x.length),
shareReplay(1)
);

readonly allItems$: Observable<V[]> = this.allSuccessfulStates$.pipe(
scanBuildArray((allSuccessfulStates) => {
/*
We start with allSuccessfulPageResults$ since it contains all page results since the start of the iterator,
and subscription to allItems may not have started at the same time.
We use scan to add in all models coming in afterwards by pushing them into the accumulator.
This is to prevent performance issues with very large iteration sets, since we can
append onto the array, rather than concat/copy the array each time.
*/
const allPageResultsUpToFirstSubscription = allSuccessfulStates;
const firstLatestState = lastValue(allPageResultsUpToFirstSubscription);
const seed: V[] = filterMaybeValues(allPageResultsUpToFirstSubscription.map(x => x.model));

const accumulatorObs: Observable<V> = this.latestSuccessfulState$.pipe(
skipWhile(x => x === firstLatestState),
map(x => x.model),
filterMaybe()
);

return {
seed,
accumulatorObs
};
})
);

private _sub = new SubscriptionObject(this.allSuccessfulStates$.subscribe());

destroy() {
this._sub.destroy();
}

}

export class PageItemIterationAccumulatorInstance<V, I extends PageItemIteration<V> = PageItemIteration<V>> extends ItemIterationAccumulatorInstance<V, I> implements PageItemIterationAccumulator<V>, Destroyable { }
14 changes: 4 additions & 10 deletions packages/rxjs/src/lib/iterator/iteration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ export interface ItemIteration<V = any> extends Destroyable {
readonly canLoadMore$: Observable<boolean>;

/**
* Returns the latest items.
* The latest stable state that has finished loading.
*/
readonly latestState$: Observable<LoadingState<Maybe<V>>>;

/**
* Returns all items loaded so far in the iteration in a single array.
* The "current" page state.
*/
readonly allItems$: Observable<V[]>;
readonly currentState$: Observable<LoadingState<Maybe<V>>>;

/**
* Triggers a loading of the next page.
Expand Down Expand Up @@ -60,15 +60,9 @@ export interface PageItemIteration<V = any> extends ItemIteration<V> {
*/
nextPage(request?: ItemIteratorNextRequest): Promise<PageNumber>;

/**
* Returns the latest items.
*/
readonly latestState$: Observable<PageLoadingState<V>>;

/**
* The "current" page state.
*/
readonly currentPageState$: Observable<PageLoadingState<V>>;
readonly currentState$: Observable<PageLoadingState<V>>;

/**
* Returns the latest page that has been fully loaded.
Expand Down
Loading

0 comments on commit aaf0390

Please sign in to comment.