Skip to content

Commit

Permalink
feat: firestore iterator
Browse files Browse the repository at this point in the history
  • Loading branch information
Derek Burgman committed Jan 25, 2022
1 parent 77c1f7e commit 549466e
Show file tree
Hide file tree
Showing 7 changed files with 178 additions and 110 deletions.
201 changes: 104 additions & 97 deletions packages/dbx-firebase/src/lib/firestore/iterator.ts
Original file line number Diff line number Diff line change
@@ -1,111 +1,118 @@
import { Injectable } from '@angular/core';
import { ItemPageIterator, ItemPageIteratorIterationInstance, ItemPageIterationConfig, ItemPageIteratorDelegate, ItemPageIteratorRequest, ItemPageIteratorResult, PageItemIteration, AbstractMappedPageItemIteration } from '@dereekb/rxjs';
import { QueryDocumentSnapshot, query, startAt, CollectionReference, getDocs, QueryConstraint, limit, QuerySnapshot } from '@angular/fire/firestore';
import { Maybe, lastValue, mergeIntoArray, Destroyable } from '@dereekb/util';
import { from, Observable, of } from "rxjs";
import { exhaustMap } from "rxjs/operators";

export interface FirestoreItemPageIteratorFilter {
queryConstraints?: Maybe<QueryConstraint[]>;
}

import { QueryDocumentSnapshot, query, startAt, CollectionReference, getDocs, QueryConstraint, limit } from '@angular/fire/firestore';
import { Maybe } from '@dereekb/util';
import { BehaviorSubject, combineLatest, Observable } from "rxjs";
import { exhaustMap, first, switchMap, shareReplay, map, startWith, scan, delay, filter } from "rxjs/operators";

export abstract class AbstractDatastoreCollectionIterator<T> {

private readonly _next = new BehaviorSubject(0);

limit = 100;

constructor(readonly collection: CollectionReference<T>) { }

readonly pageResults$: Observable<QueryDocumentSnapshot<T>[]> = this._next.pipe(
exhaustMap(() => {
return combineLatest([this.hasNext$, this.pageResultsCursorDocument$]).pipe(
first(),
filter(([hasNext]) => hasNext),
switchMap(async ([_, cursor]) => {
const startsAtFilter = (cursor) ? startAt(cursor) : undefined;
const filters = [...this.buildQueryContraints()];

filters.push(limit(this.limit + ((cursor) ? 1 : 0)));

if (startsAtFilter) {
filters.push(startsAtFilter);
}
export interface FirestoreItemPageIterationConfig<T> extends ItemPageIterationConfig<FirestoreItemPageIteratorFilter> {
collection: CollectionReference<T>;
itemsPerPage: number;
}

const batchQuery = query<T>(this.collection, ...filters);
let docs = await getDocs(batchQuery).then(x => x.docs);
export interface FirestoreItemPageQueryResult<T> {
/**
* The relevant docs for this page result. This value will omit the cursor.
*/
docs: QueryDocumentSnapshot<T>[];
/**
* The raw snapshot returned from the query.
*/
snapshot: QuerySnapshot<T>;
}

if (cursor && docs[0].id === cursor.id) {
docs = docs.slice(1);
export type FirestoreItemPageIteratorDelegate<T> = ItemPageIteratorDelegate<FirestoreItemPageQueryResult<T>, FirestoreItemPageIteratorFilter, FirestoreItemPageIterationConfig<T>>;
export type InternalFirestoreItemPageIteratorIterationInstance<T> = ItemPageIteratorIterationInstance<FirestoreItemPageQueryResult<T>, FirestoreItemPageIteratorFilter, FirestoreItemPageIterationConfig<T>>;

export function makeFirestoreItemPageIteratorDelegate<T>(): FirestoreItemPageIteratorDelegate<T> {
return {
loadItemsForPage: (request: ItemPageIteratorRequest<FirestoreItemPageQueryResult<T>, FirestoreItemPageIteratorFilter, FirestoreItemPageIterationConfig<T>>): Observable<ItemPageIteratorResult<FirestoreItemPageQueryResult<T>>> => {
const { page, iteratorConfig } = request;
const lastQueryResult$: Observable<Maybe<FirestoreItemPageQueryResult<T>>> = (page > 0) ? request.lastItem$ : of(undefined);

const { collection, itemsPerPage, filter } = iteratorConfig;

return lastQueryResult$.pipe(
exhaustMap((lastResult) => {
if (lastResult?.snapshot.empty === true) { // TODO: Shouldn't happen. Remove this later.
return of<ItemPageIteratorResult<FirestoreItemPageQueryResult<T>>>({ end: true });
} else {
const constraints: QueryConstraint[] = [];

// Add filter constraints
if (filter?.queryConstraints) {
mergeIntoArray(constraints, filter.queryConstraints);
}

// Add cursor
const cursorDocument = (lastResult) ? lastValue(lastResult.docs) : undefined;
const startsAtFilter = (cursorDocument) ? startAt(cursorDocument) : undefined;

if (startsAtFilter) {
constraints.push(startsAtFilter);
}

// Add Limit
constraints.push(limit(itemsPerPage + ((startsAtFilter) ? 1 : 0))); // Add 1 for cursor, since results will start at our cursor.

const batchQuery = query<T>(collection, ...constraints);
const resultPromise: Promise<ItemPageIteratorResult<FirestoreItemPageQueryResult<T>>> = getDocs(batchQuery).then((snapshot) => {
let docs = snapshot.docs;

// Remove the cursor document from the results.
if (cursorDocument && docs[0].id === cursorDocument.id) {
docs = docs.slice(1);
}

const result: ItemPageIteratorResult<FirestoreItemPageQueryResult<T>> = {
value: {
docs,
snapshot
},
end: snapshot.empty
};

return result;
});
return from(resultPromise);
}

return docs;
})
);
}),
shareReplay(1)
);

/**
* The last document from pageResults$. It is used as a cursor.
*/
readonly pageResultsCursorDocument$: Observable<Maybe<QueryDocumentSnapshot<T>>> = this.pageResults$.pipe(
map(x => x[x.length - 1]),
startWith(undefined as Maybe<QueryDocumentSnapshot<T>>), // StartWith is provided to prevent waiting on pageResults$
shareReplay(1)
);

readonly hasNext$ = this.pageResultsCursorDocument$.pipe(
startWith(true),
scan((prev: QueryDocumentSnapshot<T> | false, curr: QueryDocumentSnapshot<T>) => {
if (prev === false || ((prev as any) !== true && curr == null)) {
return false;
} else if (prev && curr && prev.id === curr.id) {
return false;
} else {
return curr;
}
}),
map(x => x !== false),
shareReplay(1)
);

readonly loadedAll$ = this.hasNext$.pipe(map(x => !x), shareReplay(1));

readonly currentPageResultsData$: Observable<T[]> = this.pageResults$.pipe(
map(x => x.map(y => ({ ...y.data(), id: y.id }))),
shareReplay(1)
);

readonly results = this.pageResults$.pipe(
scan((acc, next) => acc.concat(next), [] as T[]),
shareReplay(1)
);

readonly resultsData$ = this.currentPageResultsData$.pipe(
scan((acc, next) => acc.concat(next), [] as T[]),
shareReplay(1)
);

buildQueryContraints(): QueryConstraint[] {
return [];
}
}
}

next(): void {
this._next.next(this._next.value + 1);
export const FIRESTORE_ITEM_PAGE_ITERATOR_DELEGATE: FirestoreItemPageIteratorDelegate<any> = makeFirestoreItemPageIteratorDelegate() as any;

/**
* Base iterator service used to generate FirestoreItemPageIteratorIterationInstances.
*/
@Injectable()
export class FirestoreItemPageIterator<T> {

private readonly _itemPageIterator = new ItemPageIterator<
FirestoreItemPageQueryResult<T>,
FirestoreItemPageIteratorFilter,
FirestoreItemPageIterationConfig<T>
>(FIRESTORE_ITEM_PAGE_ITERATOR_DELEGATE);

instance<T>(config: FirestoreItemPageIterationConfig<T>): FirestoreItemPageIteratorIterationInstance<T> {
// TODO: as any typings provided since angularfire has a rough time with collection typings sometimes.
// https://github.com/angular/angularfire/issues/2931
const iterator: InternalFirestoreItemPageIteratorIterationInstance<T> = this._itemPageIterator.instance(config as any) as any;
return new FirestoreItemPageIteratorIterationInstance<T>(iterator) as any;
}

async loadAll(): Promise<void> {
this.limit = 1000;
return new Promise((resolve) => {
const sub = this.hasNext$.pipe(delay(50)).subscribe((x) => {
if (x) {
this.next();
} else {
sub.unsubscribe();
resolve();
}
});
});
}
}

export class FirestoreItemPageIteratorIterationInstance<T> extends AbstractMappedPageItemIteration<FirestoreItemPageQueryResult<T>, QueryDocumentSnapshot<T>[], InternalFirestoreItemPageIteratorIterationInstance<T>> implements PageItemIteration<QueryDocumentSnapshot<T>[]>, Destroyable {

destroy() {
this._next.complete();
protected _mapStateValue(input: FirestoreItemPageQueryResult<T>): QueryDocumentSnapshot<T>[] {
return input.docs;
}

}
1 change: 1 addition & 0 deletions packages/rxjs/src/lib/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
export * from './iterator';
export * from './loading';
export * from './lock';
export * from './object';
Expand Down
1 change: 1 addition & 0 deletions packages/rxjs/src/lib/iterator/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
export * from './iteration.next';
export * from './iteration.accumulator.rxjs';
export * from './iteration';
export * from './iterator.mapped';
export * from './iterator.page';
52 changes: 52 additions & 0 deletions packages/rxjs/src/lib/iterator/iterator.mapped.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import { mapLoadingStateResults, PageLoadingState } from "../loading";
import { Destroyable } from "@dereekb/util";
import { map, Observable, shareReplay } from "rxjs";
import { ItemIteratorNextRequest, PageItemIteration } from "./iteration";
import { ItemPageIteratorIterationInstance } from "./iterator.page";


export abstract class AbstractMappedPageItemIteration<I, O, M extends ItemPageIteratorIterationInstance<I, any, any>> implements PageItemIteration<O>, Destroyable {

constructor(private readonly _instance: M) { }

get maxPageLoadLimit() {
return this._instance.maxPageLoadLimit;
}

set maxPageLoadLimit(maxPageLoadLimit: number) {
this._instance.maxPageLoadLimit = maxPageLoadLimit;
}

readonly hasNext$: Observable<boolean> = this._instance.hasNext$;
readonly canLoadMore$: Observable<boolean> = this._instance.canLoadMore$;
readonly latestLoadedPage$: Observable<number> = this._instance.latestLoadedPage$;

readonly latestState$: Observable<PageLoadingState<O>> = this._instance.latestState$.pipe(
map(state => mapLoadingStateResults(state, {
mapValue: (v: I) => this._mapStateValue(v)
})),
shareReplay(1)
);

readonly currentState$: Observable<PageLoadingState<O>> = this._instance.currentState$.pipe(
map(state => mapLoadingStateResults(state, {
mapValue: (v: I) => this._mapStateValue(v)
})),
shareReplay(1)
);

nextPage(request?: ItemIteratorNextRequest): Promise<number> {
return this._instance.nextPage(request);
}

next(request?: ItemIteratorNextRequest): void {
return this._instance.next(request);
}

destroy() {
this._instance.destroy();
}

protected abstract _mapStateValue(input: I): O;

}
6 changes: 3 additions & 3 deletions packages/rxjs/src/lib/iterator/iterator.page.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ export function makeTestPageIteratorDelegate<T>(makeResultsFn: (page: PageNumber
return {
loadItemsForPage: (request: ItemPageIteratorRequest<T, TestPageIteratorFilter>) => {
const result: ItemPageIteratorResult<T> = {
value: makeResultsFn(request.page.page)
value: makeResultsFn(request.page)
};

let resultObs: Observable<ItemPageIteratorResult<T>> = of(result);

if (request.page.filter) {
const { delayTime, resultError, end } = request.page.filter;
if (request.iteratorConfig.filter) {
const { delayTime, resultError, end } = request.iteratorConfig.filter ?? {};

if (delayTime) {
resultObs = resultObs.pipe(delay(delayTime));
Expand Down
25 changes: 15 additions & 10 deletions packages/rxjs/src/lib/iterator/iterator.page.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,24 @@
import { filterMaybe } from '../rxjs';
import { distinctUntilChanged, map, scan, startWith, catchError, skip, mergeMap, delay } from 'rxjs/operators';
import { PageLoadingState, loadingStateHasError, loadingStateHasFinishedLoading, loadingStateIsLoading, successPageResult, mapLoadingStateResults, beginLoading } from "../loading";
import { FIRST_PAGE, Destroyable, Filter, filteredPage, FilteredPage, getNextPageNumber, hasValueOrNotEmpty, Maybe, PageNumber } from "@dereekb/util";
import { FIRST_PAGE, Destroyable, Filter, filteredPage, getNextPageNumber, hasValueOrNotEmpty, Maybe, PageNumber, Page } from "@dereekb/util";
import { BehaviorSubject, combineLatest, exhaustMap, filter, first, Observable, of, OperatorFunction, shareReplay } from "rxjs";
import { ItemIteratorNextRequest, PageItemIteration } from './iteration';
import { iterationHasNextAndCanLoadMore } from './iteration.next';

export interface ItemPageIteratorRequest<V, F> {
export interface ItemPageIteratorRequest<V, F, C extends ItemPageIterationConfig<F> = ItemPageIterationConfig<F>> extends Page {
/**
* Suggested limit of items to load per request.
* The base iterator config.
*/
readonly limit?: number;
readonly iteratorConfig: C;
/**
* Page being loaded.
*/
readonly page: FilteredPage<F>;
readonly page: PageNumber;
/**
* Suggested limit of items to load per request.
*/
readonly limit?: number;
/**
* Returns the last successful item, if available.
*/
Expand Down Expand Up @@ -48,14 +52,14 @@ export interface ItemPageIteratorResult<V> {
end?: boolean;
}

export interface ItemPageIteratorDelegate<V, F> {
export interface ItemPageIteratorDelegate<V, F, C extends ItemPageIterationConfig<F> = ItemPageIterationConfig<F>> {

/**
* Returns an observable of items given the input request.
*
* If the input goes out of bounds, the result should be
*/
loadItemsForPage: (request: ItemPageIteratorRequest<V, F>) => Observable<ItemPageIteratorResult<V>>;
loadItemsForPage: (request: ItemPageIteratorRequest<V, F, C>) => Observable<ItemPageIteratorResult<V>>;

}

Expand All @@ -74,15 +78,15 @@ export const DEFAULT_ITEM_PAGE_ITERATOR_MAX = 100;
/**
* Used for generating new iterations.
*/
export class ItemPageIterator<V, F, C extends ItemPageIterationConfig<F> = ItemPageIterationConfig<F>> {
export class ItemPageIterator<V, F, C extends ItemPageIterationConfig<F> = any> {

private _maxPageLoadLimit = DEFAULT_ITEM_PAGE_ITERATOR_MAX;

get maxPageLoadLimit() {
return this._maxPageLoadLimit;
}

constructor(readonly delegate: ItemPageIteratorDelegate<V, F>) { }
constructor(readonly delegate: ItemPageIteratorDelegate<V, F, C>) { }

/**
* Creates a new instance based on the input config.
Expand Down Expand Up @@ -135,7 +139,8 @@ export class ItemPageIteratorIterationInstance<V, F, C extends ItemPageIteration
const page = filteredPage(nextPageNumber, this.config);

const iteratorResultObs = this.iterator.delegate.loadItemsForPage({
page,
iteratorConfig: this.config,
page: nextPageNumber,
lastItem$: this._lastFinishedPageResultItem$,
lastResult$: this._lastFinishedPageResult$,
lastState$: this._lastFinishedPageResultState$
Expand Down
2 changes: 2 additions & 0 deletions packages/util/src/lib/page/page.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ export interface Page {

/**
* Current page number.
*
* // TODO: Rename to pageNumber
*/
page: PageNumber;

Expand Down

0 comments on commit 549466e

Please sign in to comment.