Skip to content

Commit

Permalink
FIX #5721 RxDB does not respond to collection.remove() changes
Browse files Browse the repository at this point in the history
  • Loading branch information
pubkey committed May 16, 2024
1 parent 56c0727 commit dc8c3a1
Show file tree
Hide file tree
Showing 6 changed files with 120 additions and 3 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
# RxDB Changelog

<!-- CHANGELOG NEWEST -->
- FIX `collection.remove()` must end up with the correct RxCollection state across tabs. [5721](https://github.com/pubkey/rxdb/issues/5721)
- ADD `RxCollection.onRemove` hooks to detect the removing of a RxCollection across tabs.
- IMPROVE performance of insert to [IndexedDB](https://rxdb.info/rx-storage-indexeddb.html)

<!-- ADD new changes here! -->
Expand Down
12 changes: 12 additions & 0 deletions docs-src/docs/rx-collection.md
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,18 @@ Destroys the collection's object instance. This is to free up memory and stop al
await myCollection.destroy();
```

### onDestroy / onRemove()
With these you can add a function that is run when the collection was destroyed or removed.
This works even across multiple browser tabs so you can detect when another tab removes the collection
and you application can behave accordingly.

```js
await myCollection.onDestroy(() => console.log('I am destroyed'));
await myCollection.onRemove(() => console.log('I am removed'));
```




### isRxCollection
Returns true if the given object is an instance of RxCollection. Returns false if not.
Expand Down
1 change: 1 addition & 0 deletions src/plugins/dev-mode/error-messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ export const ERROR_MESSAGES = {
COL18: 'collection-method not allowed because fieldname is in the schema',
// removed in 14.0.0, use CONFLICT instead - COL19: 'Document update conflict. When changing a document you must work on the previous revision',
COL20: 'Storage write error',
COL21: 'The RxCollection is destroyed or removed already, either from this JavaScript realm or from another, like a browser tab',
CONFLICT: 'Document update conflict. When changing a document you must work on the previous revision',

// rx-document.js
Expand Down
18 changes: 18 additions & 0 deletions src/rx-collection-helper.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import type {
HashFunction,
InternalStoreDocType,
RxCollection,
RxDatabase,
RxDocumentData,
RxJsonSchema,
Expand All @@ -24,6 +25,8 @@ import { runAsyncPluginHooks } from './hooks.ts';
import { getAllCollectionDocuments } from './rx-database-internal-store.ts';
import { flatCloneDocWithMeta } from './rx-storage-helper.ts';
import { overwritable } from './overwritable.ts';
import { RxCollectionBase } from './rx-collection.ts';
import { newRxError } from './rx-error.ts';

/**
* fills in the default data.
Expand Down Expand Up @@ -167,3 +170,18 @@ export async function removeCollectionStorages(
);
}
}


export function ensureRxCollectionIsNotDestroyed(
collection: RxCollection | RxCollectionBase<any, any, any, any, any>
) {
if (collection.destroyed) {
throw newRxError(
'COL21',
{
collection: collection.name,
version: collection.schema.version
}
);
}
}
42 changes: 39 additions & 3 deletions src/rx-collection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ import {
import {
fillObjectDataBeforeInsert,
createRxCollectionStorageInstance,
removeCollectionStorages
removeCollectionStorages,
ensureRxCollectionIsNotDestroyed
} from './rx-collection-helper.ts';
import {
createRxQuery,
Expand Down Expand Up @@ -184,10 +185,12 @@ export class RxCollectionBase<
* these functions will be called an awaited.
* Used to automatically clean up stuff that
* belongs to this collection.
*/
*/
public onDestroy: (() => MaybePromise<any>)[] = [];
public destroyed = false;

public onRemove: (() => MaybePromise<any>)[] = [];

public async prepare(): Promise<void> {
this.storageInstance = getWrappedStorageInstance(
this.database,
Expand Down Expand Up @@ -224,8 +227,28 @@ export class RxCollectionBase<
}
);


const listenToRemoveSub = this.database.internalStore.changeStream().pipe(
filter(bulk => {
const key = this.name + '-' + this.schema.version;
const found = bulk.events.find(event => {
return (
event.documentData.context === 'collection' &&
event.documentData.key === key &&
event.operation === 'DELETE'
);
});
return !!found;
})
).subscribe(async () => {
await this.destroy();
await Promise.all(this.onRemove.map(fn => fn()));
});
this._subs.push(listenToRemoveSub);


/**
* Instead of resolving the EventBulk array here and spit it into
* TODO Instead of resolving the EventBulk array here and spit it into
* single events, we should fully work with event bulks internally
* to save performance.
*/
Expand Down Expand Up @@ -290,6 +313,7 @@ export class RxCollectionBase<
* @link https://rxdb.info/cleanup.html
*/
cleanup(_minimumDeletedTime?: number): Promise<boolean> {
ensureRxCollectionIsNotDestroyed(this);
throw pluginMissing('cleanup');
}

Expand All @@ -301,6 +325,7 @@ export class RxCollectionBase<
throw pluginMissing('migration-schema');
}
startMigration(batchSize: number = 10): Promise<void> {
ensureRxCollectionIsNotDestroyed(this);
return this.getMigrationState().startMigration(batchSize);
}
migratePromise(batchSize: number = 10): Promise<any> {
Expand All @@ -310,6 +335,7 @@ export class RxCollectionBase<
async insert(
json: RxDocumentType | RxDocument
): Promise<RxDocument<RxDocumentType, OrmMethods>> {
ensureRxCollectionIsNotDestroyed(this);
const writeResult = await this.bulkInsert([json as any]);

const isError = writeResult.error[0];
Expand All @@ -324,6 +350,7 @@ export class RxCollectionBase<
success: RxDocument<RxDocumentType, OrmMethods>[];
error: RxStorageWriteError<RxDocumentType>[];
}> {
ensureRxCollectionIsNotDestroyed(this);
/**
* Optimization shortcut,
* do nothing when called with an empty array
Expand Down Expand Up @@ -400,6 +427,7 @@ export class RxCollectionBase<
success: RxDocument<RxDocumentType, OrmMethods>[];
error: RxStorageWriteError<RxDocumentType>[];
}> {
ensureRxCollectionIsNotDestroyed(this);
const primaryPath = this.schema.primaryPath;
/**
* Optimization shortcut,
Expand Down Expand Up @@ -469,6 +497,7 @@ export class RxCollectionBase<
success: RxDocument<RxDocumentType, OrmMethods>[];
error: RxStorageWriteError<RxDocumentType>[];
}> {
ensureRxCollectionIsNotDestroyed(this);
const insertData: RxDocumentType[] = [];
const useJsonByDocId: Map<string, RxDocumentType> = new Map();
docsData.forEach(docData => {
Expand Down Expand Up @@ -514,6 +543,7 @@ export class RxCollectionBase<
* same as insert but overwrites existing document with same primary
*/
async upsert(json: Partial<RxDocumentType>): Promise<RxDocument<RxDocumentType, OrmMethods>> {
ensureRxCollectionIsNotDestroyed(this);
const bulkResult = await this.bulkUpsert([json]);
throwIfIsStorageWriteError<RxDocumentType>(
this.asRxCollection,
Expand All @@ -528,6 +558,7 @@ export class RxCollectionBase<
* upserts to a RxDocument, uses incrementalModify if document already exists
*/
incrementalUpsert(json: Partial<RxDocumentType>): Promise<RxDocument<RxDocumentType, OrmMethods>> {
ensureRxCollectionIsNotDestroyed(this);
const useJson = fillObjectDataBeforeInsert(this.schema, json);
const primary: string = useJson[this.schema.primaryPath] as any;
if (!primary) {
Expand Down Expand Up @@ -560,6 +591,7 @@ export class RxCollectionBase<
OrmMethods,
Reactivity
> {
ensureRxCollectionIsNotDestroyed(this);
if (typeof queryObj === 'string') {
throw newRxError('COL5', {
queryObj
Expand All @@ -582,6 +614,7 @@ export class RxCollectionBase<
OrmMethods,
Reactivity
> {
ensureRxCollectionIsNotDestroyed(this);

// TODO move this check to dev-mode plugin
if (
Expand Down Expand Up @@ -628,6 +661,7 @@ export class RxCollectionBase<
OrmMethods,
Reactivity
> {
ensureRxCollectionIsNotDestroyed(this);
if (!queryObj) {
queryObj = _getDefaultQuery();
}
Expand All @@ -647,6 +681,7 @@ export class RxCollectionBase<
OrmMethods,
Reactivity
> {
ensureRxCollectionIsNotDestroyed(this);
const mangoQuery: MangoQuery<RxDocumentType> = {
selector: {
[this.schema.primaryPath]: {
Expand Down Expand Up @@ -849,6 +884,7 @@ export class RxCollectionBase<
*/
async remove(): Promise<any> {
await this.destroy();
await Promise.all(this.onRemove.map(fn => fn()));
await removeCollectionStorages(
this.database.storage,
this.database.internalStore,
Expand Down
48 changes: 48 additions & 0 deletions test/unit/rx-collection.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1120,6 +1120,54 @@ describe('rx-collection.test.ts', () => {

db2.destroy();
});
it('#5721 should remove the RxCollection instance accross tabs and emit the .$removed event', async () => {

Check failure on line 1123 in test/unit/rx-collection.test.ts

View workflow job for this annotation

GitHub Actions / test-code-style

accross ==> across
if (!config.storage.hasMultiInstance) {
return;
}

const dbName = randomCouchString();

async function createDb() {
const db = await createRxDatabase<{ humans: RxCollection<HumanDocumentType>; }>({
name: dbName,
storage: config.storage.getStorage(),
ignoreDuplicate: true
});
await db.addCollections({
humans: { schema: schemas.human }
});
await db.collections.humans.insert(schemaObjects.humanData());
return db;
}

const db1 = await createDb();
const db2 = await createDb();
const col1 = db1.humans;
const col2 = db2.humans;

// remember the emitted events
let emitted1 = false;
let emitted2 = false;
col1.onRemove.push(() => emitted1 = true);
col2.onRemove.push(() => emitted2 = true);

// remove collection2
await col2.remove();

await waitUntil(() => emitted1);
await waitUntil(() => emitted2);

// calling operations on other collection should also fail
await assertThrows(
() => col1.insert(schemaObjects.humanData()),
'RxError',
'COL21'
);

assert.deepStrictEqual(emitted1, emitted2);
db1.destroy();
db2.destroy();
});
});
describeParallel('.bulkRemove()', () => {
describe('positive', () => {
Expand Down

0 comments on commit dc8c3a1

Please sign in to comment.