Skip to content

Commit

Permalink
fix: fixed issue with allSuccessfulStates$ in itemAccumulatorInstance
Browse files Browse the repository at this point in the history
- fixed issue where allSuccessfulStates$ did not return until the first successful state returned. This was a problem in cases where the first state was an error. This observable never returned anything.
  • Loading branch information
dereekb committed Apr 27, 2022
1 parent b0f87e0 commit 0396ac5
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ describe('iteration.rxjs', () => {
describe('flattenIterationResultItemArray()', () => {

it(`should aggregate the array of results into a single array.`, (done) => {

const testPagesToLoad = 10;

iteratorNextPageUntilPage(iteration, testPagesToLoad).then((page) => {
Expand Down
39 changes: 31 additions & 8 deletions packages/rxjs/src/lib/iterator/iteration.accumulator.spec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { ItemPageIterator, ItemPageIterationInstance } from './iterator.page';
import { loadingStateHasFinishedLoading } from '../loading';
import { filter, first } from 'rxjs';
import { filter, first, skip } from 'rxjs';
import { iteratorNextPageUntilPage } from './iteration.next';
import { itemAccumulator, ItemAccumulatorInstance } from './iteration.accumulator';
import { TestPageIteratorFilter, TEST_PAGE_ITERATOR_DELEGATE } from './iterator.page.spec';
Expand Down Expand Up @@ -57,6 +57,25 @@ describe('ItemPageIterator', () => {

describe('allItems$', () => {

describe('with error and no successes', () => {

beforeEach(() => {
initInstanceWithFilter({
resultError: new Error('test error')
});
});

it('should return an empty array.', (done) => {
accumulator.allItems$.pipe(first()).subscribe((items) => {
expect(items).toBeDefined();
expect(items.length).toBe(0);
done();
});

});

});

describe('with mapping', () => {

let mappedAccumulator: ItemAccumulatorInstance<string>;
Expand All @@ -71,7 +90,7 @@ describe('ItemPageIterator', () => {

it('should map the items', (done) => {

mappedAccumulator.allItems$.pipe(first()).subscribe((items) => {
mappedAccumulator.allItems$.pipe(filter(x => x.length > 0)).subscribe((items) => {
expect(items).toBeDefined();
expect(typeof items[0]).toBe('string');
done();
Expand Down Expand Up @@ -103,7 +122,7 @@ describe('ItemPageIterator', () => {

});

it('should emit only after the first state has come through.', (done) => {
it('should emit an empty array before the first state has come through.', (done) => {

initInstanceWithFilter({
delayTime: 500
Expand All @@ -115,12 +134,14 @@ describe('ItemPageIterator', () => {
accumulator.allItems$.subscribe((allItems) => {
emissions += 1;

expect(allItems.length).toBe(1);

done();
if (emissions === 1) {
expect(allItems.length).toBe(0);
} else if (emissions === 2) {
expect(allItems.length).toBe(1);
done();
}
});

expect(emissions).toBe(0);
});

it('should accumulate values as pages are loaded.', (done) => {
Expand All @@ -129,7 +150,9 @@ describe('ItemPageIterator', () => {
let latestAllItems: number[];

// Should trigger first page to be loaded.
accumulator.allItems$.subscribe((allItems) => {
accumulator.allItems$.pipe(
skip(1) // skip the first empty emission
).subscribe((allItems) => {
emissions += 1;
latestAllItems = allItems;
});
Expand Down
8 changes: 7 additions & 1 deletion packages/rxjs/src/lib/iterator/iteration.accumulator.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { startWith } from 'rxjs/operators';
import { SubscriptionObject } from '../subscription';
import { map, Observable, shareReplay, skipWhile, distinctUntilChanged, filter } from 'rxjs';
import { distinctUntilArrayLengthChanges, scanBuildArray, filterMaybe, scanIntoArray } from "../rxjs";
Expand Down Expand Up @@ -58,6 +59,10 @@ export class ItemAccumulatorInstance<O, I = any, N extends ItemIteration<I> = It
*/
readonly allSuccessfulStates$: Observable<LoadingState<I>[]> = this.latestSuccessfulState$.pipe(
scanIntoArray({ immutable: false }),
/**
* Don't wait for the first successful state in order to avoid never returning a value on immediate failures.
*/
startWith([] as LoadingState<I>[]),
distinctUntilArrayLengthChanges(),
shareReplay(1)
);
Expand Down Expand Up @@ -102,7 +107,8 @@ export class ItemAccumulatorInstance<O, I = any, N extends ItemIteration<I> = It
seed,
accumulatorObs
};
})
}),
shareReplay(1)
);

private _sub = new SubscriptionObject(this.allSuccessfulStates$.subscribe());
Expand Down

0 comments on commit 0396ac5

Please sign in to comment.