diff --git a/packages/powersync-sdk-common/src/client/AbstractPowerSyncDatabase.ts b/packages/powersync-sdk-common/src/client/AbstractPowerSyncDatabase.ts index a3edf1d2..849fcd11 100644 --- a/packages/powersync-sdk-common/src/client/AbstractPowerSyncDatabase.ts +++ b/packages/powersync-sdk-common/src/client/AbstractPowerSyncDatabase.ts @@ -135,7 +135,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver { const r = await this.validateChecksums(checkpoint); if (!r.checkpointValid) { - this.logger.error('Checksums failed for', r.failures); - for (const b of r.failures ?? []) { + this.logger.error('Checksums failed for', r.checkpointFailures); + for (const b of r.checkpointFailures ?? []) { await this.deleteBucket(b); } - return { ready: false, checkpointValid: false, failures: r.failures }; + return { ready: false, checkpointValid: false, checkpointFailures: r.checkpointFailures }; } const bucketNames = checkpoint.buckets.map((b) => b.bucket); @@ -178,7 +178,7 @@ export class SqliteBucketStorage implements BucketStorageAdapter { return { checkpointValid: false, ready: false, - failures: [] + checkpointFailures: [] }; } @@ -190,7 +190,7 @@ export class SqliteBucketStorage implements BucketStorageAdapter { return { checkpointValid: false, ready: false, - failures: result['failed_buckets'] + checkpointFailures: result['failed_buckets'] }; } } diff --git a/packages/powersync-sdk-common/src/client/sync/bucket/SyncDataBucket.ts b/packages/powersync-sdk-common/src/client/sync/bucket/SyncDataBucket.ts index 3f5411ef..5f6a9c7b 100644 --- a/packages/powersync-sdk-common/src/client/sync/bucket/SyncDataBucket.ts +++ b/packages/powersync-sdk-common/src/client/sync/bucket/SyncDataBucket.ts @@ -32,11 +32,11 @@ export class SyncDataBucket { /** * The `after` specified in the request. */ - public after: OpId, + public after?: OpId, /** * Use this for the next request. */ - public next_after: OpId + public next_after?: OpId ) {} toJSON(): SyncDataBucketJSON { diff --git a/packages/powersync-sdk-common/src/index.ts b/packages/powersync-sdk-common/src/index.ts index 531aae0c..7ebfb1a2 100644 --- a/packages/powersync-sdk-common/src/index.ts +++ b/packages/powersync-sdk-common/src/index.ts @@ -10,6 +10,7 @@ export * from './client/sync/bucket/CrudTransaction'; export * from './client/sync/bucket/SyncDataBatch'; export * from './client/sync/bucket/SyncDataBucket'; export * from './client/sync/bucket/OpType'; +export * from './client/sync/bucket/OplogEntry'; export * from './client/sync/stream/AbstractRemote'; export * from './client/sync/stream/AbstractStreamingSyncImplementation'; export * from './client/sync/stream/streaming-sync-types'; diff --git a/packages/powersync-sdk-web/tests/bucket_storage.test.ts b/packages/powersync-sdk-web/tests/bucket_storage.test.ts new file mode 100644 index 00000000..67649ee0 --- /dev/null +++ b/packages/powersync-sdk-web/tests/bucket_storage.test.ts @@ -0,0 +1,809 @@ +import { + BucketStorageAdapter, + OpType, + OpTypeEnum, + OplogEntry, + Schema, + SqliteBucketStorage, + SyncDataBatch, + SyncDataBucket +} from '@journeyapps/powersync-sdk-common'; +import { afterEach, beforeEach, describe, expect, it } from 'vitest'; +import { AbstractPowerSyncDatabase, Checkpoint } from '@journeyapps/powersync-sdk-common'; +import { WASQLitePowerSyncDatabaseOpenFactory } from '@journeyapps/powersync-sdk-web'; +import { Mutex } from 'async-mutex'; +import { testSchema } from './test_schema'; + +const putAsset1_1 = OplogEntry.fromRow({ + op_id: '1', + op: new OpType(OpTypeEnum.PUT).toJSON(), + object_type: 'assets', + object_id: 'O1', + data: '{"description": "bar"}', + checksum: 1 +}); + +const putAsset2_2 = OplogEntry.fromRow({ + op_id: '2', + op: new OpType(OpTypeEnum.PUT).toJSON(), + object_type: 'assets', + object_id: 'O2', + data: '{"description": "bar"}', + checksum: 2 +}); + +const putAsset1_3 = OplogEntry.fromRow({ + op_id: '3', + op: new OpType(OpTypeEnum.PUT).toJSON(), + object_type: 'assets', + object_id: 'O1', + data: '{"description": "bard"}', + checksum: 3 +}); + +const removeAsset1_4 = OplogEntry.fromRow({ + op_id: '4', + op: new OpType(OpTypeEnum.REMOVE).toJSON(), + object_type: 'assets', + object_id: 'O1', + checksum: 4 +}); + +const removeAsset1_5 = OplogEntry.fromRow({ + op_id: '5', + op: new OpType(OpTypeEnum.REMOVE).toJSON(), + object_type: 'assets', + object_id: 'O1', + checksum: 5 +}); + +describe('Bucket Storage', () => { + const factory = new WASQLitePowerSyncDatabaseOpenFactory({ + dbFilename: 'test-bucket-storage.db', + flags: { + enableMultiTabs: false + }, + schema: testSchema + }); + + let db: AbstractPowerSyncDatabase; + let bucketStorage: BucketStorageAdapter; + + beforeEach(async () => { + db = factory.getInstance(); + await db.waitForReady(); + bucketStorage = new SqliteBucketStorage(db.database, new Mutex()); + }); + + afterEach(async () => { + await db.disconnectAndClear(); + await db.close(); + }); + + async function syncLocalChecked(checkpoint: Checkpoint) { + var result = await bucketStorage.syncLocalDatabase(checkpoint); + expect(result).deep.equals({ ready: true, checkpointValid: true }); + } + + async function expectAsset1_3(database = db) { + expect(await database.getAll("SELECT id, description, make FROM assets WHERE id = 'O1'")).deep.equals([ + { id: 'O1', description: 'bard', make: null } + ]); + } + + async function expectNoAsset1() { + expect(await db.getAll("SELECT id, description, make FROM assets WHERE id = 'O1'")).deep.equals([]); + } + + async function expectNoAssets() { + expect(await db.getAll('SELECT id, description, make FROM assets')).deep.equals([]); + } + + it('Basic Setup', async () => { + await db.waitForReady(); + + expect(await bucketStorage.getBucketStates()).empty; + + await bucketStorage.saveSyncData( + new SyncDataBatch([new SyncDataBucket('bucket1', [putAsset1_1, putAsset2_2, putAsset1_3], false)]) + ); + + const bucketStates = await bucketStorage.getBucketStates(); + + expect(bucketStates).deep.equals([ + { + bucket: 'bucket1', + op_id: '3' + } + ]); + + await syncLocalChecked({ + last_op_id: '3', + buckets: [{ bucket: 'bucket1', checksum: 6 }] + }); + + await expectAsset1_3(); + }); + + it('should get an object from multiple buckets', async () => { + await bucketStorage.saveSyncData( + new SyncDataBatch([ + new SyncDataBucket('bucket1', [putAsset1_3], false), + new SyncDataBucket('bucket2', [putAsset1_3], false) + ]) + ); + + await syncLocalChecked({ + last_op_id: '3', + buckets: [ + { bucket: 'bucket1', checksum: 3 }, + { bucket: 'bucket2', checksum: 3 } + ] + }); + await expectAsset1_3(); + }); + + it('should prioritize later updates', async () => { + // Test behaviour when the same object is present in multiple buckets. + // In this case, there are two different versions in the different buckets. + // While we should not get this with our server implementation, the client still specifies this behaviour: + // The largest op_id wins. + await bucketStorage.saveSyncData( + new SyncDataBatch([ + new SyncDataBucket('bucket1', [putAsset1_3], false), + new SyncDataBucket('bucket2', [putAsset1_1], false) + ]) + ); + + await syncLocalChecked({ + last_op_id: '3', + buckets: [ + { bucket: 'bucket1', checksum: 3 }, + { bucket: 'bucket2', checksum: 1 } + ] + }); + await expectAsset1_3(); + }); + + it('should ignore a remove from one bucket', async () => { + // When we have 1 PUT and 1 REMOVE, the object must be kept. + await bucketStorage.saveSyncData( + new SyncDataBatch([ + new SyncDataBucket('bucket1', [putAsset1_3], false), + new SyncDataBucket('bucket2', [putAsset1_3, removeAsset1_4], false) + ]) + ); + + await syncLocalChecked({ + last_op_id: '4', + buckets: [ + { bucket: 'bucket1', checksum: 3 }, + { bucket: 'bucket2', checksum: 7 } + ] + }); + await expectAsset1_3(); + }); + + it('should remove when removed from all buckets', async () => { + // When we only have REMOVE left for an object, it must be deleted. + await bucketStorage.saveSyncData( + new SyncDataBatch([ + new SyncDataBucket('bucket1', [putAsset1_3, removeAsset1_5], false), + new SyncDataBucket('bucket2', [putAsset1_3, removeAsset1_4], false) + ]) + ); + + await syncLocalChecked({ + last_op_id: '5', + buckets: [ + { bucket: 'bucket1', checksum: 8 }, + { bucket: 'bucket2', checksum: 7 } + ] + }); + + await expectNoAssets(); + }); + + it('should use subkeys', async () => { + // subkeys cause this to be treated as a separate entity in the oplog, + // but same entity in the local db. + var put4 = OplogEntry.fromRow({ + op_id: '4', + op: new OpType(OpTypeEnum.PUT).toJSON(), + subkey: 'b', + object_type: 'assets', + object_id: 'O1', + data: '{"description": "B"}', + checksum: 4 + }); + + var remove5 = OplogEntry.fromRow({ + op_id: '5', + op: new OpType(OpTypeEnum.REMOVE).toJSON(), + subkey: 'b', + object_type: 'assets', + object_id: 'O1', + checksum: 5 + }); + + await bucketStorage.saveSyncData( + new SyncDataBatch([new SyncDataBucket('bucket1', [putAsset1_1, putAsset1_3, put4], false)]) + ); + + await syncLocalChecked({ + last_op_id: '4', + buckets: [{ bucket: 'bucket1', checksum: 8 }] + }); + + expect(await db.getAll("SELECT id, description, make FROM assets WHERE id = 'O1'")).deep.equals([ + { id: 'O1', description: 'B', make: null } + ]); + + await bucketStorage.saveSyncData(new SyncDataBatch([new SyncDataBucket('bucket1', [remove5], false)])); + + await syncLocalChecked({ + last_op_id: '5', + buckets: [{ bucket: 'bucket1', checksum: 13 }] + }); + + await expectAsset1_3(); + }); + + it('should fail checksum validation', async () => { + // Simple checksum validation + await bucketStorage.saveSyncData( + new SyncDataBatch([new SyncDataBucket('bucket1', [putAsset1_1, putAsset2_2, putAsset1_3], false)]) + ); + + const result = await bucketStorage.syncLocalDatabase({ + last_op_id: '3', + buckets: [ + { bucket: 'bucket1', checksum: 10 }, + { bucket: 'bucket2', checksum: 1 } + ] + }); + + expect(result).deep.equals({ + ready: false, + checkpointValid: false, + checkpointFailures: ['bucket1', 'bucket2'] + }); + + await expectNoAssets(); + }); + + it('should delete buckets', async () => { + await bucketStorage.saveSyncData( + new SyncDataBatch([ + new SyncDataBucket('bucket1', [putAsset1_3], false), + new SyncDataBucket('bucket2', [putAsset1_3], false) + ]) + ); + + await bucketStorage.removeBuckets(['bucket2']); + // The delete only takes effect after syncLocal. + + await syncLocalChecked({ + last_op_id: '3', + buckets: [{ bucket: 'bucket1', checksum: 3 }] + }); + + // Bucket is deleted, but object is still present in other buckets. + await expectAsset1_3(); + + await bucketStorage.removeBuckets(['bucket1']); + await syncLocalChecked({ last_op_id: '3', buckets: [] }); + // Both buckets deleted - object removed. + await expectNoAssets(); + }); + + it('should delete and re-create buckets', async () => { + // Save some data + await bucketStorage.saveSyncData(new SyncDataBatch([new SyncDataBucket('bucket1', [putAsset1_1], false)])); + + // Delete the bucket + await bucketStorage.removeBuckets(['bucket1']); + + // Save some data again + await bucketStorage.saveSyncData( + new SyncDataBatch([new SyncDataBucket('bucket1', [putAsset1_1, putAsset1_3], false)]) + ); + // Delete again + await bucketStorage.removeBuckets(['bucket1']); + + // Final save of data + await bucketStorage.saveSyncData( + new SyncDataBatch([new SyncDataBucket('bucket1', [putAsset1_1, putAsset1_3], false)]) + ); + + // Check that the data is there + await syncLocalChecked({ + last_op_id: '3', + buckets: [{ bucket: 'bucket1', checksum: 4 }] + }); + await expectAsset1_3(); + + // Now final delete + await bucketStorage.removeBuckets(['bucket1']); + await syncLocalChecked({ last_op_id: '3', buckets: [] }); + await expectNoAssets(); + }); + + it('should handle MOVE', async () => { + await bucketStorage.saveSyncData( + new SyncDataBatch([ + new SyncDataBucket( + 'bucket1', + [ + OplogEntry.fromRow({ + op_id: '1', + op: new OpType(OpTypeEnum.MOVE).toJSON(), + checksum: 1, + data: '{"target": "3"}' + }) + ], + false + ) + ]) + ); + + // At this point, we have target: 3, but don't have that op yet, so we cannot sync. + var result = await bucketStorage.syncLocalDatabase({ + last_op_id: '2', + buckets: [{ bucket: 'bucket1', checksum: 1 }] + }); + // Checksum passes, but we don't have a complete checkpoint + expect(result).deep.equals({ ready: false, checkpointValid: true }); + + await bucketStorage.saveSyncData(new SyncDataBatch([new SyncDataBucket('bucket1', [putAsset1_3], false)])); + + await syncLocalChecked({ + last_op_id: '3', + buckets: [{ bucket: 'bucket1', checksum: 4 }] + }); + + await expectAsset1_3(); + }); + + it('should handle CLEAR', async () => { + // Save some data + await bucketStorage.saveSyncData(new SyncDataBatch([new SyncDataBucket('bucket1', [putAsset1_1], false)])); + + await syncLocalChecked({ + last_op_id: '1', + buckets: [{ bucket: 'bucket1', checksum: 1 }] + }); + + // CLEAR, then save new data + await bucketStorage.saveSyncData( + new SyncDataBatch([ + new SyncDataBucket( + 'bucket1', + [ + OplogEntry.fromRow({ + op_id: '2', + op: new OpType(OpTypeEnum.CLEAR).toJSON(), + checksum: 2 + }), + OplogEntry.fromRow({ + op_id: '3', + op: new OpType(OpTypeEnum.PUT).toJSON(), + checksum: 3, + data: putAsset2_2.data, + object_id: putAsset2_2.object_id, + object_type: putAsset2_2.object_type + }) + ], + false + ) + ]) + ); + + await syncLocalChecked({ + last_op_id: '3', + // 2 + 3. 1 is replaced with 2. + buckets: [{ bucket: 'bucket1', checksum: 5 }] + }); + + await expectNoAsset1(); + console.log(await db.getAll(`SELECT id, description FROM assets WHERE id = 'O2'`)); + expect(await db.get("SELECT id, description FROM assets WHERE id = 'O2'")).deep.equals({ + id: 'O2', + description: 'bar' + }); + }); + + it('update with new types', async () => { + const dbName = `test-bucket-storage-new-types.db`; + // Test case where a type is added to the schema after we already have the data. + const factory = new WASQLitePowerSyncDatabaseOpenFactory({ + dbFilename: dbName, + flags: { + enableMultiTabs: false + }, + schema: new Schema([]) + }); + + let powersync = factory.getInstance(); + await powersync.waitForReady(); + bucketStorage = new SqliteBucketStorage(powersync.database, new Mutex()); + + await bucketStorage.saveSyncData( + new SyncDataBatch([new SyncDataBucket('bucket1', [putAsset1_1, putAsset2_2, putAsset1_3], false)]) + ); + + await syncLocalChecked({ + last_op_id: '4', + buckets: [{ bucket: 'bucket1', checksum: 6 }] + }); + + await expect(powersync.getAll('SELECT * FROM assets')).rejects.toThrow('no such table'); + await powersync.close(); + + // Now open another instance with new schema + const factory2 = new WASQLitePowerSyncDatabaseOpenFactory({ + dbFilename: dbName, + flags: { + enableMultiTabs: false + }, + schema: testSchema + }); + + powersync = factory2.getInstance(); + + await expectAsset1_3(powersync); + + await powersync.disconnectAndClear(); + await powersync.close(); + }); + + it('should remove types', async () => { + const dbName = `test-bucket-storage-remove-types.db`; + // Test case where a type is added to the schema after we already have the data. + const factory = new WASQLitePowerSyncDatabaseOpenFactory({ + dbFilename: dbName, + flags: { + enableMultiTabs: false + }, + schema: testSchema + }); + + let powersync = factory.getInstance(); + await powersync.waitForReady(); + bucketStorage = new SqliteBucketStorage(powersync.database, new Mutex()); + + await bucketStorage.saveSyncData( + new SyncDataBatch([new SyncDataBucket('bucket1', [putAsset1_1, putAsset2_2, putAsset1_3], false)]) + ); + + await syncLocalChecked({ + last_op_id: '3', + buckets: [{ bucket: 'bucket1', checksum: 6 }] + }); + + await expectAsset1_3(powersync); + + await powersync.close(); + + // Now open another instance with new schema + const factory2 = new WASQLitePowerSyncDatabaseOpenFactory({ + dbFilename: dbName, + flags: { + enableMultiTabs: false + }, + schema: new Schema([]) + }); + + powersync = factory2.getInstance(); + + await expect(powersync.execute('SELECT * FROM assets')).rejects.toThrowError('no such table'); + await powersync.close(); + + // Add schema again + powersync = factory.getInstance(); + + await expectAsset1_3(powersync); + + await powersync.disconnectAndClear(); + await powersync.close(); + }); + + it('should compact', async () => { + // Test compacting behaviour. + // This test relies heavily on internals, and will have to be updated when the compact implementation is updated. + + await bucketStorage.saveSyncData( + new SyncDataBatch([new SyncDataBucket('bucket1', [putAsset1_1, putAsset2_2, removeAsset1_4], false)]) + ); + + await syncLocalChecked({ + last_op_id: '4', + write_checkpoint: '4', + buckets: [{ bucket: 'bucket1', checksum: 7 }] + }); + + await bucketStorage.forceCompact(); + + await syncLocalChecked({ + last_op_id: '4', + write_checkpoint: '4', + buckets: [{ bucket: 'bucket1', checksum: 7 }] + }); + + const stats = await db.getAll( + 'SELECT row_type as type, row_id as id, count(*) as count FROM ps_oplog GROUP BY row_type, row_id ORDER BY row_type, row_id' + ); + expect(stats).deep.equals([{ type: 'assets', id: 'O2', count: 1 }]); + }); + + it('should not sync local db with pending crud - server removed', async () => { + await bucketStorage.saveSyncData( + new SyncDataBatch([new SyncDataBucket('bucket1', [putAsset1_1, putAsset2_2, putAsset1_3], false)]) + ); + + await syncLocalChecked({ + last_op_id: '3', + buckets: [{ bucket: 'bucket1', checksum: 6 }] + }); + + // Local save + await db.execute('INSERT INTO assets(id) VALUES(?)', ['O3']); + expect(await db.getAll("SELECT id FROM assets WHERE id = 'O3'")).deep.equals([{ id: 'O3' }]); + + // At this point, we have data in the crud table, and are not able to sync the local db. + const result = await bucketStorage.syncLocalDatabase({ + last_op_id: '3', + write_checkpoint: '3', + buckets: [{ bucket: 'bucket1', checksum: 6 }] + }); + + expect(result).deep.equals({ ready: false, checkpointValid: true }); + + const batch = await bucketStorage.getCrudBatch(); + await batch!.complete(); + await bucketStorage.updateLocalTarget(async () => { + return '4'; + }); + + // At this point, the data has been uploaded, but not synced back yet. + const result3 = await bucketStorage.syncLocalDatabase({ + last_op_id: '3', + write_checkpoint: '3', + buckets: [{ bucket: 'bucket1', checksum: 6 }] + }); + expect(result3).deep.equals({ ready: false, checkpointValid: true }); + + // The data must still be present locally. + expect(await db.getAll("SELECT id FROM assets WHERE id = 'O3'")).deep.equals([{ id: 'O3' }]); + + await bucketStorage.saveSyncData(new SyncDataBatch([new SyncDataBucket('bucket1', [], false)])); + + // Now we have synced the data back (or lack of data in this case), + // so we can do a local sync. + await syncLocalChecked({ + last_op_id: '5', + write_checkpoint: '5', + buckets: [{ bucket: 'bucket1', checksum: 6 }] + }); + + // Since the object was not in the sync response, it is deleted. + expect(await db.getAll("SELECT id FROM assets WHERE id = 'O3'")).empty; + }); + + it('should not sync local db with pending crud when more crud is added (1)', async () => { + await bucketStorage.saveSyncData( + new SyncDataBatch([new SyncDataBucket('bucket1', [putAsset1_1, putAsset2_2, putAsset1_3], false)]) + ); + + await syncLocalChecked({ + last_op_id: '3', + write_checkpoint: '3', + buckets: [{ bucket: 'bucket1', checksum: 6 }] + }); + + // Local save + await db.execute('INSERT INTO assets(id) VALUES(?)', ['O3']); + + const batch = await bucketStorage.getCrudBatch(); + await batch!.complete(); + await bucketStorage.updateLocalTarget(async () => { + return '4'; + }); + + const result3 = await bucketStorage.syncLocalDatabase({ + last_op_id: '3', + write_checkpoint: '3', + buckets: [{ bucket: 'bucket1', checksum: 6 }] + }); + expect(result3).deep.equals({ ready: false, checkpointValid: true }); + + await bucketStorage.saveSyncData(new SyncDataBatch([new SyncDataBucket('bucket1', [], false)])); + + // Add more data before syncLocalDatabase. + await db.execute('INSERT INTO assets(id) VALUES(?)', ['O4']); + + const result4 = await bucketStorage.syncLocalDatabase({ + last_op_id: '5', + write_checkpoint: '5', + buckets: [{ bucket: 'bucket1', checksum: 6 }] + }); + expect(result4).deep.equals({ ready: false, checkpointValid: true }); + }); + + it('should not sync local db with pending crud when more crud is added (2)', async () => { + await bucketStorage.saveSyncData( + new SyncDataBatch([new SyncDataBucket('bucket1', [putAsset1_1, putAsset2_2, putAsset1_3], false)]) + ); + + await syncLocalChecked({ + last_op_id: '3', + write_checkpoint: '3', + buckets: [{ bucket: 'bucket1', checksum: 6 }] + }); + + // Local save + await db.execute('INSERT INTO assets(id) VALUES(?)', ['O3']); + const batch = await bucketStorage.getCrudBatch(); + // Add more data before the complete() call + + await db.execute('INSERT INTO assets(id) VALUES(?)', ['O4']); + await batch!.complete(); + await bucketStorage.updateLocalTarget(async () => { + return '4'; + }); + + await bucketStorage.saveSyncData(new SyncDataBatch([new SyncDataBucket('bucket1', [], false)])); + + const result4 = await bucketStorage.syncLocalDatabase({ + last_op_id: '5', + write_checkpoint: '5', + buckets: [{ bucket: 'bucket1', checksum: 6 }] + }); + expect(result4).deep.equals({ ready: false, checkpointValid: true }); + }); + + it('should not sync local db with pending crud - update on server', async () => { + await bucketStorage.saveSyncData( + new SyncDataBatch([new SyncDataBucket('bucket1', [putAsset1_1, putAsset2_2, putAsset1_3], false)]) + ); + + await syncLocalChecked({ + last_op_id: '3', + write_checkpoint: '3', + buckets: [{ bucket: 'bucket1', checksum: 6 }] + }); + + // Local save + await db.execute('INSERT INTO assets(id) VALUES(?)', ['O3']); + const batch = await bucketStorage.getCrudBatch(); + await batch!.complete(); + await bucketStorage.updateLocalTarget(async () => { + return '4'; + }); + + await bucketStorage.saveSyncData( + new SyncDataBatch([ + new SyncDataBucket( + 'bucket1', + [ + OplogEntry.fromRow({ + op_id: '5', + op: new OpType(OpTypeEnum.PUT).toJSON(), + object_type: 'assets', + object_id: 'O3', + checksum: 5, + data: '{"description": "server updated"}' + }) + ], + false + ) + ]) + ); + + await syncLocalChecked({ + last_op_id: '5', + write_checkpoint: '5', + buckets: [{ bucket: 'bucket1', checksum: 11 }] + }); + + expect(await db.getAll("SELECT description FROM assets WHERE id = 'O3'")).deep.equals([ + { description: 'server updated' } + ]); + }); + + it('should revert a failing insert', async () => { + await bucketStorage.saveSyncData( + new SyncDataBatch([new SyncDataBucket('bucket1', [putAsset1_1, putAsset2_2, putAsset1_3], false)]) + ); + + await syncLocalChecked({ + last_op_id: '3', + write_checkpoint: '3', + buckets: [{ bucket: 'bucket1', checksum: 6 }] + }); + + // Local insert, later rejected by server + await db.execute('INSERT INTO assets(id, description) VALUES(?, ?)', ['O3', 'inserted']); + const batch = await bucketStorage.getCrudBatch(); + await batch!.complete(); + await bucketStorage.updateLocalTarget(async () => { + return '4'; + }); + + expect(await db.getAll("SELECT description FROM assets WHERE id = 'O3'")).deep.equals([ + { description: 'inserted' } + ]); + + await syncLocalChecked({ + last_op_id: '3', + write_checkpoint: '4', + buckets: [{ bucket: 'bucket1', checksum: 6 }] + }); + + expect(await db.getAll("SELECT description FROM assets WHERE id = 'O3'")).empty; + }); + + it('should revert a failing delete', async () => { + await bucketStorage.saveSyncData( + new SyncDataBatch([new SyncDataBucket('bucket1', [putAsset1_1, putAsset2_2, putAsset1_3], false)]) + ); + + await syncLocalChecked({ + last_op_id: '3', + write_checkpoint: '3', + buckets: [{ bucket: 'bucket1', checksum: 6 }] + }); + + // Local delete, later rejected by server + await db.execute('DELETE FROM assets WHERE id = ?', ['O2']); + + expect(await db.getAll("SELECT description FROM assets WHERE id = 'O2'")).empty; + // Simulate a permissions error when uploading - data should be preserved. + const batch = await bucketStorage.getCrudBatch(); + await batch!.complete(); + + await bucketStorage.updateLocalTarget(async () => { + return '4'; + }); + + await syncLocalChecked({ + last_op_id: '3', + write_checkpoint: '4', + buckets: [{ bucket: 'bucket1', checksum: 6 }] + }); + + expect(await db.getAll("SELECT description FROM assets WHERE id = 'O2'")).deep.equals([{ description: 'bar' }]); + }); + + it('should revert a failing update', async () => { + await bucketStorage.saveSyncData( + new SyncDataBatch([new SyncDataBucket('bucket1', [putAsset1_1, putAsset2_2, putAsset1_3], false)]) + ); + + await syncLocalChecked({ + last_op_id: '3', + write_checkpoint: '3', + buckets: [{ bucket: 'bucket1', checksum: 6 }] + }); + + // Local update, later rejected by server + await db.execute('UPDATE assets SET description = ? WHERE id = ?', ['updated', 'O2']); + + expect(await db.getAll(`SELECT description FROM assets WHERE id = 'O2'`)).deep.equals([{ description: 'updated' }]); + // Simulate a permissions error when uploading - data should be preserved. + const batch = await bucketStorage.getCrudBatch(); + await batch!.complete(); + + await bucketStorage.updateLocalTarget(async () => { + return '4'; + }); + + await syncLocalChecked({ + last_op_id: '3', + write_checkpoint: '4', + buckets: [{ bucket: 'bucket1', checksum: 6 }] + }); + + expect(await db.getAll("SELECT description FROM assets WHERE id = 'O2'")).deep.equals([{ description: 'bar' }]); + }); +}); diff --git a/packages/powersync-sdk-web/tests/crud.test.ts b/packages/powersync-sdk-web/tests/crud.test.ts index 5c263b24..e1e95b1e 100644 --- a/packages/powersync-sdk-web/tests/crud.test.ts +++ b/packages/powersync-sdk-web/tests/crud.test.ts @@ -24,7 +24,7 @@ describe('CRUD Tests', () => { * Use a new DB for each run to keep CRUD counters * consistent */ - dbFilename: 'test.db' + uuid(), + dbFilename: `test-crud-${uuid()}.db`, schema: testSchema, flags: { enableMultiTabs: false diff --git a/packages/powersync-sdk-web/tests/stream.test.ts b/packages/powersync-sdk-web/tests/stream.test.ts new file mode 100644 index 00000000..00925406 --- /dev/null +++ b/packages/powersync-sdk-web/tests/stream.test.ts @@ -0,0 +1,179 @@ +import { beforeAll, describe, expect, it } from 'vitest'; +import { + AbstractPowerSyncDatabase, + AbstractRemote, + AbstractStreamingSyncImplementation, + Column, + ColumnType, + PowerSyncBackendConnector, + PowerSyncCredentials, + PowerSyncDatabaseOptions, + RemoteConnector, + Schema, + Table +} from '@journeyapps/powersync-sdk-common'; +import { + PowerSyncDatabase, + WASQLitePowerSyncDatabaseOpenFactory, + WebPowerSyncDatabaseOptions, + WebStreamingSyncImplementation +} from '@journeyapps/powersync-sdk-web'; +import Logger from 'js-logger'; +import { WebPowerSyncOpenFactoryOptions } from 'src/db/adapters/AbstractWebPowerSyncDatabaseOpenFactory'; +import { v4 as uuid } from 'uuid'; + +class TestConnector implements PowerSyncBackendConnector { + async fetchCredentials(): Promise { + return { + endpoint: '', + token: '' + }; + } + async uploadData(database: AbstractPowerSyncDatabase): Promise { + return; + } +} + +class MockRemote extends AbstractRemote { + streamController: ReadableStreamDefaultController | null; + + constructor( + connector: RemoteConnector, + protected onStreamRequested: () => void + ) { + super(connector); + this.streamController = null; + } + + post(path: string, data: any, headers?: Record | undefined): Promise { + throw new Error('Method not implemented.'); + } + get(path: string, headers?: Record | undefined): Promise { + throw new Error('Method not implemented.'); + } + async postStreaming(): Promise { + return new Response(this.generateStream()).body; + } + + private generateStream() { + return new ReadableStream({ + start: (controller) => { + this.streamController = controller; + this.onStreamRequested(); + } + }); + } +} + +class MockedStreamPowerSync extends PowerSyncDatabase { + constructor( + options: WebPowerSyncDatabaseOptions, + protected remote: AbstractRemote + ) { + super(options); + } + + protected generateSyncStreamImplementation( + connector: PowerSyncBackendConnector + ): AbstractStreamingSyncImplementation { + return new WebStreamingSyncImplementation({ + adapter: this.bucketStorageAdapter, + remote: this.remote, + uploadCrud: async () => { + await this.waitForReady(); + await connector.uploadData(this); + }, + identifier: this.options.database.name + }); + } +} + +class MockOpenFactory extends WASQLitePowerSyncDatabaseOpenFactory { + constructor( + options: WebPowerSyncOpenFactoryOptions, + protected remote: AbstractRemote + ) { + super(options); + } + generateInstance(options: PowerSyncDatabaseOptions): AbstractPowerSyncDatabase { + return new MockedStreamPowerSync( + { + ...options + }, + this.remote + ); + } +} + +describe('Stream test', () => { + beforeAll(() => Logger.useDefaults()); + + it('PowerSync reconnect', async () => { + /** + * Very basic implementation of a listener pattern. + * Required since we cannot extend multiple classes. + */ + const callbacks: Map void> = new Map(); + const remote = new MockRemote(new TestConnector(), () => callbacks.forEach((c) => c())); + + const powersync = new MockOpenFactory( + { + dbFilename: 'test-stream-connection.db', + flags: { + enableMultiTabs: false + }, + schema: new Schema([ + new Table({ + name: 'users', + columns: [new Column({ name: 'name', type: ColumnType.TEXT })] + }) + ]) + }, + remote + ).getInstance(); + + const waitForStream = () => + new Promise((resolve) => { + const id = uuid(); + callbacks.set(id, () => { + resolve(); + callbacks.delete(id); + }); + }); + + const streamOpened = waitForStream(); + + powersync.connect(new TestConnector()); + + await streamOpened; + + remote.streamController?.enqueue(new TextEncoder().encode('{"token_expires_in":3426}\n')); + + // Wait for connected to be true + await new Promise((resolve) => { + if (powersync.connected) { + resolve(); + } + const l = powersync.registerListener({ + statusChanged: (status) => { + if (status.connected) { + resolve(); + l?.(); + } + } + }); + }); + + expect(powersync.connected).true; + + // Close the stream + const newStream = waitForStream(); + remote.streamController?.close(); + + // A new stream should be requested + await newStream; + + await powersync.disconnectAndClear(); + await powersync.close(); + }); +}); diff --git a/packages/powersync-sdk-web/tests/watch.test.ts b/packages/powersync-sdk-web/tests/watch.test.ts new file mode 100644 index 00000000..5ca015e0 --- /dev/null +++ b/packages/powersync-sdk-web/tests/watch.test.ts @@ -0,0 +1,105 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; +vi.useRealTimers(); +import { v4 as uuid } from 'uuid'; +import { AbstractPowerSyncDatabase, Schema } from '@journeyapps/powersync-sdk-common'; +import { WASQLitePowerSyncDatabaseOpenFactory } from '@journeyapps/powersync-sdk-web'; +import { testSchema } from './test_schema'; +import _ from 'lodash'; + +/** + * There seems to be an issue with Vitest browser mode's setTimeout and + * fake timer functionality. + * e.g. calling: + * await new Promise((resolve) => setTimeout(resolve, 10)); + * waits for 1 second instead of 10ms. + * Setting this to 1 second as a work around. + */ +const throttleDuration = 1000; + +describe('Watch Tests', () => { + const factory = new WASQLitePowerSyncDatabaseOpenFactory({ + dbFilename: 'test-watch.db', + schema: testSchema, + flags: { + enableMultiTabs: false + } + }); + + let powersync: AbstractPowerSyncDatabase; + + beforeEach(async () => { + powersync = factory.getInstance(); + }); + + afterEach(async () => { + await powersync.disconnectAndClear(); + await powersync.close(); + }); + + it('watch outside throttle limits', async () => { + const abortController = new AbortController(); + + const watch = powersync.watch( + 'SELECT count() AS count FROM assets INNER JOIN customers ON customers.id = assets.customer_id', + [], + { signal: abortController.signal, throttleMs: throttleDuration } + ); + + const updatesCount = 2; + let receivedUpdatesCount = 0; + + /** + * Promise which resolves once we received the same amount of update + * notifications as there are inserts. + */ + const receivedUpdates = new Promise(async (resolve) => { + for await (const update of watch) { + receivedUpdatesCount++; + if (receivedUpdatesCount == updatesCount) { + abortController.abort(); + resolve(); + } + } + }); + + for (let updateCount = 0; updateCount < updatesCount; updateCount++) { + await powersync.execute('INSERT INTO assets(id, make, customer_id) VALUES (uuid(), ?, ?)', ['test', uuid()]); + + // Wait the throttle duration, ensuring a watch update for each insert + await new Promise((resolve) => setTimeout(resolve, throttleDuration)); + } + + await receivedUpdates; + expect(receivedUpdatesCount).equals(updatesCount); + }); + + it('watch inside throttle limits', async () => { + const abortController = new AbortController(); + + const watch = powersync.watch( + 'SELECT count() AS count FROM assets INNER JOIN customers ON customers.id = assets.customer_id', + [], + { signal: abortController.signal, throttleMs: throttleDuration } + ); + + const updatesCount = 5; + let receivedUpdatesCount = 0; + // Listen to updates + (async () => { + for await (const update of watch) { + receivedUpdatesCount++; + } + })(); + + // Create the inserts as fast as possible + for (let updateCount = 0; updateCount < updatesCount; updateCount++) { + await powersync.execute('INSERT INTO assets(id, make, customer_id) VALUES (uuid(), ?, ?)', ['test', uuid()]); + } + + await new Promise((resolve) => setTimeout(resolve, throttleDuration * 2)); + abortController.abort(); + + // There should be one initial result plus one throttled result + expect(receivedUpdatesCount).equals(2); + }); +}); diff --git a/packages/powersync-sdk-web/vitest.config.ts b/packages/powersync-sdk-web/vitest.config.ts index 1339e56f..30de1d5d 100644 --- a/packages/powersync-sdk-web/vitest.config.ts +++ b/packages/powersync-sdk-web/vitest.config.ts @@ -12,7 +12,7 @@ const config: UserConfigExport = { * first. This is required due to the format of Webworker URIs * they link to `.js` files. */ - '@journeyapps/powersync-sdk-web': path.resolve(__dirname, './dist/src') + '@journeyapps/powersync-sdk-web': path.resolve(__dirname, './lib/src') } }, worker: { @@ -26,7 +26,6 @@ const config: UserConfigExport = { }, plugins: [wasm(), topLevelAwait()], test: { - isolate: false, globals: true, setupFiles: [], include: ['tests/**/*.test.ts'],