Skip to content

Commit

Permalink
feat: added IterationQueryChangeWatcher
Browse files Browse the repository at this point in the history
- added IterationQueryChangeWatcher to track changes parallel to a FirestoreItemPageIterationInstance
  • Loading branch information
dereekb committed May 12, 2022
1 parent c5bfe24 commit f5b2474
Show file tree
Hide file tree
Showing 13 changed files with 241 additions and 33 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { PageListLoadingState, cleanupDestroyable, filterMaybe, useFirst, SubscriptionObject, accumulatorFlattenPageListLoadingState, tapLog } from '@dereekb/rxjs';
import { BehaviorSubject, combineLatest, map, shareReplay, distinctUntilChanged, Subject, throttleTime, switchMap, Observable, tap, startWith, NEVER, of, filter } from 'rxjs';
import { FirebaseQueryItemAccumulator, firebaseQueryItemAccumulator, FirestoreCollection, FirestoreDocument, FirestoreItemPageIterationInstance, FirestoreItemPageIteratorFilter, FirestoreQueryConstraint } from '@dereekb/firebase';
import { PageListLoadingState, cleanupDestroyable, filterMaybe, useFirst, SubscriptionObject, accumulatorFlattenPageListLoadingState } from '@dereekb/rxjs';
import { BehaviorSubject, combineLatest, map, shareReplay, distinctUntilChanged, Subject, throttleTime, switchMap, Observable, tap, startWith, NEVER } from 'rxjs';
import { FirebaseQueryItemAccumulator, firebaseQueryItemAccumulator, FirestoreCollection, FirestoreDocument, FirestoreItemPageIterationInstance, FirestoreItemPageIteratorFilter, FirestoreQueryConstraint, IterationQueryChangeWatcher, iterationQueryChangeWatcher } from '@dereekb/firebase';
import { ArrayOrValue, Destroyable, Initialized, Maybe } from '@dereekb/util';
import { DbxFirebaseCollectionLoader } from './collection.loader';

Expand Down Expand Up @@ -59,6 +59,10 @@ export class DbxFirebaseCollectionLoaderInstance<T, D extends FirestoreDocument<
shareReplay(1)
);

readonly queryChangeWatcher$: Observable<IterationQueryChangeWatcher<T>> = this.firestoreIteration$.pipe(
map(instance => iterationQueryChangeWatcher({ instance }))
);

readonly accumulator$: Observable<FirebaseQueryItemAccumulator<T>> = this.firestoreIteration$.pipe(
map(x => firebaseQueryItemAccumulator<T>(x)),
cleanupDestroyable(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import { Directive, forwardRef, Input, Provider, Type } from '@angular/core';
import { FirestoreDocument, FirestoreQueryConstraint } from "@dereekb/firebase";
import { Maybe, ArrayOrValue } from '@dereekb/util';
import { DbxFirebaseCollectionStore } from "./store.collection";

/**
* Abstract directive that contains a DbxFirebaseCollectionStore and provides an interface for communicating with other directives.
*/
@Directive()
export abstract class DbxFirebaseCollectionStoreDirective<T, D extends FirestoreDocument<T> = FirestoreDocument<T>, S extends DbxFirebaseCollectionStore<T, D> = DbxFirebaseCollectionStore<T, D>> {

constructor(readonly store: S) { }

readonly pageLoadingState$ = this.store.pageLoadingState$;

// MARK: Inputs
@Input()
set maxPages(maxPages: Maybe<number>) {
this.store.setMaxPages(maxPages);
}

@Input()
set itemsPerPage(itemsPerPage: Maybe<number>) {
this.store.setItemsPerPage(itemsPerPage);
}

@Input()
set constraints(constraints: Maybe<ArrayOrValue<FirestoreQueryConstraint>>) {
this.store.setConstraints(constraints);
}

next() {
this.store.next();
}

restart() {
this.store.restart();
}

setConstraints(constraints: Maybe<ArrayOrValue<FirestoreQueryConstraint>>) {
this.store.setConstraints(constraints);
}

}

/**
* Configures providers for a DbxFirebaseCollectionStoreDirective.
*
* Can optionally also provide the actual store type to include in the providers array so it is instantiated by Angular.
*
* @param sourceType
*/
export function provideDbxFirebaseCollectionStoreDirective<S extends DbxFirebaseCollectionStoreDirective<any, any, any>>(sourceType: Type<S>): Provider[];
export function provideDbxFirebaseCollectionStoreDirective<S extends DbxFirebaseCollectionStore<any, any>, C extends DbxFirebaseCollectionStoreDirective<any, any, S> = DbxFirebaseCollectionStoreDirective<any, any, S>>(sourceType: Type<C>, storeType: Type<S>): Provider[];
export function provideDbxFirebaseCollectionStoreDirective<S extends DbxFirebaseCollectionStore<any, any>, C extends DbxFirebaseCollectionStoreDirective<any, any, S> = DbxFirebaseCollectionStoreDirective<any, any, S>>(sourceType: Type<C>, storeType?: Type<S>): Provider[] {
const providers: Provider[] = [{
provide: DbxFirebaseCollectionStoreDirective,
useExisting: forwardRef(() => sourceType)
}];

if (storeType) {
providers.push(storeType);
}

return providers;
}
5 changes: 3 additions & 2 deletions packages/dbx-firebase/src/lib/model/store/store.collection.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Injectable } from '@angular/core';
import { Observable, shareReplay, distinctUntilChanged, Subscription, exhaustMap, first, map, switchMap, tap } from 'rxjs';
import { FirebaseQueryItemAccumulator, FirestoreCollection, FirestoreDocument, FirestoreItemPageIterationInstance, FirestoreQueryConstraint } from '@dereekb/firebase';
import { FirebaseQueryItemAccumulator, FirestoreCollection, FirestoreDocument, FirestoreItemPageIterationInstance, FirestoreQueryConstraint, IterationQueryChangeWatcher } from '@dereekb/firebase';
import { ObservableOrValue, cleanupDestroyable, PageListLoadingState, filterMaybe } from '@dereekb/rxjs';
import { ArrayOrValue, Maybe } from '@dereekb/util';
import { LockSetComponentStore } from '@dereekb/dbx-core';
Expand All @@ -15,7 +15,7 @@ export interface DbxFirebaseCollectionStore<T, D extends FirestoreDocument<T> =
setConstraints(observableOrValue: ObservableOrValue<Maybe<ArrayOrValue<FirestoreQueryConstraint<T>>>>): Subscription;
next(observableOrValue: ObservableOrValue<void>): void;
restart(observableOrValue: ObservableOrValue<void>): void;

readonly setFirestoreCollection: (() => void) | ((observableOrValue: ObservableOrValue<Maybe<FirestoreCollection<T, D>>>) => Subscription);
}

Expand Down Expand Up @@ -102,6 +102,7 @@ export class AbstractDbxFirebaseCollectionStore<T, D extends FirestoreDocument<T
);

readonly firestoreIteration$: Observable<FirestoreItemPageIterationInstance<T>> = this.loader$.pipe(switchMap(x => x.firestoreIteration$));
readonly queryChangeWatcher$: Observable<IterationQueryChangeWatcher<T>> = this.loader$.pipe(switchMap(x => x.queryChangeWatcher$));
readonly accumulator$: Observable<FirebaseQueryItemAccumulator<T>> = this.loader$.pipe(switchMap(x => x.accumulator$));
readonly pageLoadingState$: Observable<PageListLoadingState<T>> = this.loader$.pipe(switchMap(x => x.pageLoadingState$));

Expand Down
10 changes: 5 additions & 5 deletions packages/firebase/src/lib/client/firestore/driver.query.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import { Observable } from 'rxjs';
import { ArrayOrValue } from '@dereekb/util';
import { DocumentSnapshot, getDocs, limit, query, QueryConstraint, startAt, Query as FirebaseFirestoreQuery, where, startAfter, orderBy, limitToLast, endBefore, endAt, onSnapshot, Transaction as FirebaseFirestoreTransaction } from "firebase/firestore";
import { ArrayOrValue, Maybe } from '@dereekb/util';
import { DocumentSnapshot, getDocs, limit, query, QueryConstraint, startAt, Query as FirebaseFirestoreQuery, where, startAfter, orderBy, limitToLast, endBefore, endAt, onSnapshot } from "firebase/firestore";
import { FIRESTORE_LIMIT_QUERY_CONSTRAINT_TYPE, FIRESTORE_START_AFTER_QUERY_CONSTRAINT_TYPE, FIRESTORE_START_AT_QUERY_CONSTRAINT_TYPE, FIRESTORE_WHERE_QUERY_CONSTRAINT_TYPE, FIRESTORE_LIMIT_TO_LAST_QUERY_CONSTRAINT_TYPE, FIRESTORE_ORDER_BY_QUERY_CONSTRAINT_TYPE, FullFirestoreQueryConstraintHandlersMapping, FIRESTORE_OFFSET_QUERY_CONSTRAINT_TYPE, FIRESTORE_END_AT_QUERY_CONSTRAINT_TYPE, FIRESTORE_END_BEFORE_QUERY_CONSTRAINT_TYPE } from './../../common/firestore/query/constraint';
import { makeFirestoreQueryConstraintFunctionsDriver } from '../../common/firestore/driver/query.handler';
import { FirestoreQueryConstraintFunctionsDriver, FirestoreQueryDriver } from "../../common/firestore/driver/query";
import { Query, QuerySnapshot, Transaction } from "../../common/firestore/types";
import { Query, QuerySnapshot, SnapshotListenOptions, Transaction } from "../../common/firestore/types";
import { streamFromOnSnapshot } from '../../common/firestore/query/query.util';

export interface FirebaseFirestoreQueryBuilder<T = any> {
Expand Down Expand Up @@ -50,8 +50,8 @@ export function firebaseFirestoreQueryDriver(): FirestoreQueryDriver {

return getDocs(query as FirebaseFirestoreQuery<T>);
},
streamDocs<T>(query: Query<T>): Observable<QuerySnapshot<T>> {
return streamFromOnSnapshot((obs) => onSnapshot((query as FirebaseFirestoreQuery<T>), obs));
streamDocs<T>(query: Query<T>, options?: Maybe<SnapshotListenOptions>): Observable<QuerySnapshot<T>> {
return streamFromOnSnapshot((obs) => (options) ? onSnapshot((query as FirebaseFirestoreQuery<T>), options, obs) : onSnapshot((query as FirebaseFirestoreQuery<T>), obs));
}
};
}
5 changes: 3 additions & 2 deletions packages/firebase/src/lib/common/firestore/driver/query.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { Transaction } from './../types';
import { SnapshotListenOptions, Transaction } from './../types';
import { Observable } from 'rxjs';
import { Query, QuerySnapshot } from "../types";
import { FirestoreQueryConstraint } from "../query/constraint";
import { Maybe } from '@dereekb/util';

export type FirestoreQueryDriverQueryFunction = <T>(query: Query<T>, ...queryConstraints: FirestoreQueryConstraint[]) => Query<T>;

Expand All @@ -23,7 +24,7 @@ export interface FirestoreQueryDriver extends FirestoreQueryConstraintFunctionsD
* @param transaction
*/
getDocs<T>(query: Query<T>, transaction?: Transaction): Promise<QuerySnapshot<T>>;
streamDocs<T>(query: Query<T>): Observable<QuerySnapshot<T>>;
streamDocs<T>(query: Query<T>, options?: Maybe<SnapshotListenOptions>): Observable<QuerySnapshot<T>>;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ export function firebaseQuerySnapshotAccumulator<O, T>(iteration: FirestoreItemP
export function firebaseQueryItemAccumulator<T>(iteration: FirestoreItemPageIterationInstance<T>): FirebaseQueryItemAccumulator<T>;
export function firebaseQueryItemAccumulator<U, T>(iteration: FirestoreItemPageIterationInstance<T>, mapItem?: MapFunction<DocumentDataWithId<T>, U>): MappedFirebaseQuerySnapshotAccumulator<U[], T>;
export function firebaseQueryItemAccumulator<U, T>(iteration: FirestoreItemPageIterationInstance<T>, mapItem?: MapFunction<DocumentDataWithId<T>, U>): MappedFirebaseQuerySnapshotAccumulator<U[], T> {

mapItem = mapItem ?? (((x: DocumentDataWithId<T>) => x) as unknown as MapFunction<DocumentDataWithId<T>, U>);

const mapFn: ItemAccumulatorMapFunction<U[], QueryDocumentSnapshotArray<T>> = (x: QueryDocumentSnapshotArray<T>) => {
Expand Down
1 change: 1 addition & 0 deletions packages/firebase/src/lib/common/firestore/query/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ export * from './constraint';
export * from './iterator';
export * from './query';
export * from './query.util';
export * from './watcher';
52 changes: 37 additions & 15 deletions packages/firebase/src/lib/common/firestore/query/iterator.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { PageLoadingState, ItemPageIterator, ItemPageIterationInstance, ItemPageIterationConfig, ItemPageIteratorDelegate, ItemPageIteratorRequest, ItemPageIteratorResult, MappedPageItemIterationInstance, ItemPageLimit } from '@dereekb/rxjs';
import { QueryDocumentSnapshotArray, QuerySnapshot } from "../types";
import { QueryDocumentSnapshotArray, QuerySnapshot, SnapshotListenOptions } from "../types";
import { asArray, Maybe, lastValue, mergeIntoArray, ArrayOrValue } from '@dereekb/util';
import { from, Observable, of, exhaustMap } from "rxjs";
import { from, Observable, of, exhaustMap, map } from "rxjs";
import { CollectionReferenceRef } from '../reference';
import { FirestoreQueryDriverRef } from '../driver/query';
import { FIRESTORE_LIMIT_QUERY_CONSTRAINT_TYPE, FirestoreQueryConstraint, limit, startAfter } from './constraint';
Expand All @@ -24,17 +24,33 @@ export interface FirestoreItemPageIterationBaseConfig<T> extends CollectionRefer
export interface FirestoreItemPageIterationConfig<T> extends FirestoreItemPageIterationBaseConfig<T>, ItemPageIterationConfig<FirestoreItemPageIteratorFilter> { }

export interface FirestoreItemPageQueryResult<T> {
/**
* Time the result was read at.
*/
readonly time: Date;
/**
* The relevant docs for this page result. This value will omit the cursor.
*/
docs: QueryDocumentSnapshotArray<T>;
readonly docs: QueryDocumentSnapshotArray<T>;
/**
* The raw snapshot returned from the query.
*/
snapshot: QuerySnapshot<T>;
readonly snapshot: QuerySnapshot<T>;
/**
* Reloads these results as a snapshot.
*/
reload(): Promise<QuerySnapshot<T>>;
/**
* Streams these results.
*/
stream(options?: FirestoreItemPageQueryResultStreamOptions<T>): Observable<QuerySnapshot<T>>;
}

export interface FirestoreItemPageQueryResultStreamOptions<T> {
options?: Maybe<SnapshotListenOptions>
}

export type FirestoreItemPageIteratorDelegate<T> = ItemPageIteratorDelegate<FirestoreItemPageQueryResult<T>, FirestoreItemPageIteratorFilter, FirestoreItemPageIterationConfig<T>>;
export type FirestoreItemPageIteratorDelegate<T> = ItemPageIteratorDelegate<FirestoreItemPageQueryResult<T>, FirestoreItemPageIteratorFilter, FirestoreItemPageIterationConfig<T>>
export type InternalFirestoreItemPageIterationInstance<T> = ItemPageIterationInstance<FirestoreItemPageQueryResult<T>, FirestoreItemPageIteratorFilter, FirestoreItemPageIterationConfig<T>>;

export function filterDisallowedFirestoreItemPageIteratorInputContraints(constraints: FirestoreQueryConstraint[]): FirestoreQueryConstraint[] {
Expand Down Expand Up @@ -72,22 +88,28 @@ export function makeFirestoreItemPageIteratorDelegate<T>(): FirestoreItemPageIte
}

// Add Limit
const limitCount = filter?.limit ?? itemsPerPage + ((startAfterFilter) ? 1 : 0);
constraints.push(limit(limitCount)); // Add 1 for cursor, since results will start at our cursor.
const limitCount = filterLimit ?? itemsPerPage + ((startAfterFilter) ? 1 : 0); // todo: may not be needed.
const limitConstraint = limit(limitCount);
const constraintsWithLimit = [...constraints, limitConstraint];

const batchQuery = driver.query<T>(collection, ...constraints);
// make query
const batchQuery = driver.query<T>(collection, ...constraintsWithLimit);
const resultPromise: Promise<ItemPageIteratorResult<FirestoreItemPageQueryResult<T>>> = driver.getDocs(batchQuery).then((snapshot) => {
let docs = snapshot.docs;

// Remove the cursor document from the results.
if (cursorDocument && docs[0].id === cursorDocument.id) {
docs = docs.slice(1);
}
const time = new Date();
const docs = snapshot.docs;

const result: ItemPageIteratorResult<FirestoreItemPageQueryResult<T>> = {
value: {
time,
docs,
snapshot
snapshot,
reload() {
return driver.getDocs(batchQuery);
},
stream(options?: FirestoreItemPageQueryResultStreamOptions<T>) {
// todo: consider allowing limit to be changed here to stream a subset. This will be useful for detecting collection changes.
return driver.streamDocs(batchQuery, options?.options);
}
},
end: snapshot.empty
};
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { DocumentReference, QueryDocumentSnapshot, QuerySnapshot } from './../types';
import { DocumentReference, QuerySnapshot } from './../types';
import { Observable } from "rxjs";

// MARK: OnSnapshot
Expand Down
99 changes: 99 additions & 0 deletions packages/firebase/src/lib/common/firestore/query/watcher.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import { groupValues } from '@dereekb/util';
import { timeHasExpired } from '@dereekb/date';
import { filter, map, Observable, skip, switchMap } from 'rxjs';
import { DocumentChange, QuerySnapshot } from '../types';
import { FirestoreItemPageIterationInstance } from "./iterator";

export const DEFAULT_QUERY_CHANGE_WATCHER_DELAY = 1000 * 8;

export interface IterationQueryChangeWatcherConfig<T> {
readonly instance: FirestoreItemPageIterationInstance<T>;
readonly delay?: number;
}

export interface IterationQueryChangeWatcher<T> {

/**
* Streams all subsequent query changes.
*/
readonly stream$: Observable<QuerySnapshot<T>>;

/**
* Event stream
*/
readonly event$: Observable<IterationQueryChangeWatcherEvent<T>>;

/**
* Change
*/
readonly change$: Observable<IterationQueryChangeWatcherChangeType>;

}

export interface IterationQueryChangeWatcherEvent<T> extends IterationQueryChangeWatcherChangeGroup<T> {
readonly changes: DocumentChange<T>[];
readonly type: IterationQueryChangeWatcherChangeType;
}

export interface IterationQueryChangeWatcherChangeGroup<T> {
readonly added: DocumentChange<T>[];
readonly removed: DocumentChange<T>[];
readonly modified: DocumentChange<T>[];
}

export type IterationQueryChangeWatcherChangeType = 'addedAndRemoved' | 'added' | 'removed' | 'modified' | 'none';

export function iterationQueryChangeWatcher<T>(config: IterationQueryChangeWatcherConfig<T>): IterationQueryChangeWatcher<T> {
const { instance, delay: timeUntilActive = DEFAULT_QUERY_CHANGE_WATCHER_DELAY } = config;
const stream$ = instance.snapshotIteration.firstSuccessfulPageResults$.pipe(switchMap((first) => {
const { time, stream } = first.value!.value!;

// todo: capture the change type.

return stream().pipe(
skip(1), // skip the first value.
filter(() => timeHasExpired(time, timeUntilActive))
);
}));

const event$ = stream$.pipe(map(event => {
const changes = event.docChanges();

const results: IterationQueryChangeWatcherChangeGroup<T> = groupValues(changes, (x) => x.type);
(results as any).changes = changes;
(results as any).added = results.added ?? [];
(results as any).removed = results.removed ?? [];
(results as any).modified = results.modified ?? [];
(results as any).type = iterationQueryChangeWatcherChangeTypeForGroup(results);

return results as IterationQueryChangeWatcherEvent<T>;
}));

const change$ = event$.pipe(map(x => x.type));

return {
stream$,
change$,
event$
};
}

export function iterationQueryChangeWatcherChangeTypeForGroup<T>(group: IterationQueryChangeWatcherChangeGroup<T>): IterationQueryChangeWatcherChangeType {
const hasAdded = group.added.length > 0;
const hasRemoved = group.removed.length > 0;
let type: IterationQueryChangeWatcherChangeType;

if (hasAdded && hasRemoved) {
type = 'addedAndRemoved';
} else if (hasAdded) {
type = 'added';
} else if (hasRemoved) {
type = 'removed';
} else if (group.modified.length > 0) {
type = 'modified';
} else {
type = 'none';
}

return type;
}
4 changes: 4 additions & 0 deletions packages/firebase/src/lib/common/firestore/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ export interface SnapshotOptions {
readonly serverTimestamps?: 'estimate' | 'previous' | 'none';
}

export interface SnapshotListenOptions {
readonly includeMetadataChanges?: boolean;
}

// MARK: Converter
/**
* Mirrors the types/methods of FirestoreDataConverter.
Expand Down
Loading

0 comments on commit f5b2474

Please sign in to comment.