Skip to content

Commit

Permalink
fix: fixed issue with AbstractFirestoreDocument stream$
Browse files Browse the repository at this point in the history
- stream$ and data$ immediately created observables instead of waiting for a request. This in turn caused transactions to sometimes fail.
- added tests for this use case
  • Loading branch information
dereekb committed Aug 5, 2022
1 parent c051175 commit 3752d11
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 9 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
/*eslint @typescript-eslint/no-explicit-any:"off"*/
// any is used with intent here, as the recursive AbstractFirestoreDocument requires its use to terminate.

import { lazyFrom } from '@dereekb/rxjs';
import { Observable } from 'rxjs';
import { FirestoreAccessorDriverRef } from '../driver/accessor';
import { FirestoreCollectionNameRef, FirestoreModelId, FirestoreModelIdentityCollectionName, FirestoreModelIdentityModelType, FirestoreModelIdentityRef, FirestoreModelIdRef, FirestoreModelKey, FirestoreModelKeyRef } from './../collection/collection';
Expand Down Expand Up @@ -33,8 +34,8 @@ export type FirestoreDocumentData<D extends FirestoreDocument<any>> = D extends
* Abstract FirestoreDocument implementation that extends a FirestoreDocumentDataAccessor.
*/
export abstract class AbstractFirestoreDocument<T, D extends AbstractFirestoreDocument<T, any, I>, I extends FirestoreModelIdentity = FirestoreModelIdentity> implements FirestoreDocument<T>, LimitedFirestoreDocumentAccessorRef<T, D>, CollectionReferenceRef<T> {
readonly stream$ = this.accessor.stream();
readonly data$: Observable<T> = dataFromSnapshotStream(this.stream$);
readonly stream$ = lazyFrom(() => this.accessor.stream());
readonly data$: Observable<T> = lazyFrom(() => dataFromSnapshotStream(this.stream$));

constructor(readonly accessor: FirestoreDocumentDataAccessor<T>, readonly documentAccessor: LimitedFirestoreDocumentAccessor<T, D>) {}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { itShouldFail, expectFail } from '@dereekb/util/test';
import { firstValueFrom } from 'rxjs';
import { firstValueFrom, first } from 'rxjs';
import { SubscriptionObject } from '@dereekb/rxjs';
import { Transaction, DocumentReference, WriteBatch, FirestoreDocumentAccessor, makeDocuments, FirestoreDocumentDataAccessor, FirestoreContext, FirestoreDocument, RunTransaction, FirebaseAuthUserId, DocumentSnapshot, FirestoreDataConverter, getDocumentSnapshotPairs, useDocumentSnapshot, useDocumentSnapshotData } from '@dereekb/firebase';
import { Transaction, DocumentReference, WriteBatch, FirestoreDocumentAccessor, makeDocuments, FirestoreDocumentDataAccessor, FirestoreContext, FirestoreDocument, RunTransaction, FirebaseAuthUserId, DocumentSnapshot, FirestoreDataConverter, getDocumentSnapshotPairs, useDocumentSnapshot, useDocumentSnapshotData, AbstractFirestoreDocument } from '@dereekb/firebase';
import { MockItemCollectionFixture, MockItemDocument, MockItem, MockItemPrivateDocument, MockItemPrivateFirestoreCollection, MockItemPrivate, MockItemSubItem, MockItemSubItemDocument, MockItemSubItemFirestoreCollection, MockItemSubItemFirestoreCollectionGroup, MockItemUserFirestoreCollection, MockItemUserDocument, MockItemUser, mockItemConverter } from '../mock';
import { Getter } from '@dereekb/util';

Expand Down Expand Up @@ -344,8 +344,8 @@ export interface DescribeAccessorTests<T> {
firestoreDocument: Getter<FirestoreDocument<T>>;
dataForUpdate: () => Partial<T>;
hasDataFromUpdate: (data: T) => boolean;
loadDocumentForTransaction: (transaction: Transaction, ref?: DocumentReference<T>) => FirestoreDocument<T>;
loadDocumentForWriteBatch: (writeBatch: WriteBatch, ref?: DocumentReference<T>) => FirestoreDocument<T>;
loadDocumentForTransaction: (transaction: Transaction, ref?: DocumentReference<T>) => AbstractFirestoreDocument<T, any>;
loadDocumentForWriteBatch: (writeBatch: WriteBatch, ref?: DocumentReference<T>) => FirestoreDocument<T, any>;
}

export function describeFirestoreDocumentAccessorTests<T>(init: () => DescribeAccessorTests<T>) {
Expand Down Expand Up @@ -502,6 +502,41 @@ export function describeFirestoreDocumentAccessorTests<T>(init: () => DescribeAc
});

describe('transaction', () => {
describe('stream$', () => {
it('should not cause the transaction to fail if the document is loaded after changes have begun.', async () => {
await c.context.runTransaction(async (transaction) => {
const transactionDocument = await c.loadDocumentForTransaction(transaction, firestoreDocument.documentRef);

const currentData = await transactionDocument.snapshotData();
expect(currentData).toBeDefined();

const data = c.dataForUpdate();
await transactionDocument.update(data);

// stream$ and data$ do not call stream() until called directly.
const secondLoading = await c.loadDocumentForTransaction(transaction, firestoreDocument.documentRef);
expect(secondLoading).toBeDefined();
});
});

itShouldFail('if stream$ is called after an update has occured in the transaction', async () => {
await expectFail(() =>
c.context.runTransaction(async (transaction) => {
const transactionDocument = await c.loadDocumentForTransaction(transaction, firestoreDocument.documentRef);

const currentData = await transactionDocument.snapshotData();
expect(currentData).toBeDefined();

const data = c.dataForUpdate();
await transactionDocument.update(data);

// read the stream using a promise so the error is captured
await firstValueFrom(c.loadDocumentForTransaction(transaction, firestoreDocument.documentRef).stream$);
})
);
});
});

describe('update()', () => {
it('should update the data if the document exists.', async () => {
await c.context.runTransaction(async (transaction) => {
Expand Down
6 changes: 3 additions & 3 deletions packages/rxjs/src/lib/rxjs/rxjs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,14 @@ export function preventComplete<T>(obs: Observable<T>): Observable<T> {
}

/**
* Similar to from, but uses a Getter to keeps the Observable cold until it is subscribed to, then calls the promise.
* Similar to from, but uses a Getter to keeps the Observable cold until it is subscribed to, then calls the promise or observable.
*
* The promise is shared, so it can be called at max a
* The result value of the promise or the latest value of the observable is shared.
*
* @param getter
* @returns
*/
export function lazyFrom<T>(getter: Getter<Promise<T>>): Observable<T> {
export function lazyFrom<T>(getter: Getter<Promise<T> | Observable<T>>): Observable<T> {
return of(undefined).pipe(
mergeMap(() => from(getter())),
shareReplay(1)
Expand Down

0 comments on commit 3752d11

Please sign in to comment.