@@ -54,10 +54,13 @@ export abstract class AbstractStreamingSyncImplementation extends BaseObserver<S
54
54
55
55
private isUploadingCrud : boolean ;
56
56
57
+ protected _isConnected : boolean ;
58
+
57
59
constructor ( options : AbstractStreamingSyncImplementationOptions ) {
58
60
super ( ) ;
59
61
this . options = { ...DEFAULT_STREAMING_SYNC_OPTIONS , ...options } ;
60
62
this . isUploadingCrud = false ;
63
+ this . _isConnected = false ;
61
64
}
62
65
63
66
get lastSyncedAt ( ) {
@@ -68,6 +71,10 @@ export abstract class AbstractStreamingSyncImplementation extends BaseObserver<S
68
71
return this . options . logger ! ;
69
72
}
70
73
74
+ get isConnected ( ) {
75
+ return this . _isConnected ;
76
+ }
77
+
71
78
abstract obtainLock < T > ( lockOptions : LockOptions < T > ) : Promise < T > ;
72
79
73
80
async hasCompletedSync ( ) {
@@ -163,6 +170,9 @@ export abstract class AbstractStreamingSyncImplementation extends BaseObserver<S
163
170
} ,
164
171
signal
165
172
) ) {
173
+ // A connection is active and messages are being received
174
+ this . updateSyncStatus ( true ) ;
175
+
166
176
if ( isStreamingSyncCheckpoint ( line ) ) {
167
177
targetCheckpoint = line . checkpoint ;
168
178
const bucketsToDelete = new Set < string > ( bucketSet ) ;
@@ -289,7 +299,11 @@ export abstract class AbstractStreamingSyncImplementation extends BaseObserver<S
289
299
}
290
300
291
301
private updateSyncStatus ( connected : boolean , lastSyncedAt ?: Date ) {
302
+ const previousValues = [ this . _isConnected , this . _lastSyncedAt ?. valueOf ( ) ] ;
292
303
this . _lastSyncedAt = lastSyncedAt ?? this . lastSyncedAt ;
293
- this . iterateListeners ( ( cb ) => cb . statusChanged ?.( new SyncStatus ( connected , this . lastSyncedAt ) ) ) ;
304
+ this . _isConnected = connected ;
305
+ if ( ! _ . isEqual ( previousValues , [ this . isConnected , this . _lastSyncedAt ?. valueOf ( ) ] ) ) {
306
+ this . iterateListeners ( ( cb ) => cb . statusChanged ?.( new SyncStatus ( this . isConnected , this . lastSyncedAt ) ) ) ;
307
+ }
294
308
}
295
309
}
0 commit comments