40
40
#include " Firestore/core/src/firebase/firestore/model/document_key.h"
41
41
#include " Firestore/core/src/firebase/firestore/model/document_map.h"
42
42
#include " Firestore/core/src/firebase/firestore/model/document_set.h"
43
+ #include " Firestore/core/src/firebase/firestore/model/mutation_batch.h"
43
44
#include " Firestore/core/src/firebase/firestore/model/no_document.h"
44
45
#include " Firestore/core/src/firebase/firestore/model/snapshot_version.h"
45
46
#include " Firestore/core/src/firebase/firestore/remote/remote_event.h"
64
65
using firebase::firestore::model::DocumentKey;
65
66
using firebase::firestore::model::DocumentKeySet;
66
67
using firebase::firestore::model::DocumentMap;
68
+ using firebase::firestore::model::kBatchIdUnknown ;
67
69
using firebase::firestore::model::ListenSequenceNumber;
68
70
using firebase::firestore::model::MaybeDocumentMap;
69
71
using firebase::firestore::model::Mutation;
78
80
using firebase::firestore::util::AsyncQueue;
79
81
using firebase::firestore::util::MakeNSError;
80
82
using firebase::firestore::util::Status;
83
+ using firebase::firestore::util::StatusCallback;
81
84
82
85
NS_ASSUME_NONNULL_BEGIN
83
86
@@ -191,6 +194,9 @@ @implementation FSTSyncEngine {
191
194
std::unordered_map<User, NSMutableDictionary <NSNumber *, FSTVoidErrorBlock> *, HashUser>
192
195
_mutationCompletionBlocks;
193
196
197
+ /* * Stores user callbacks waiting for pending writes to be acknowledged. */
198
+ std::unordered_map<model::BatchId, std::vector<StatusCallback>> _pendingWritesCallbacks;
199
+
194
200
/* * FSTQueryViews for all active queries, indexed by query. */
195
201
std::unordered_map<Query, FSTQueryView *> _queryViewsByQuery;
196
202
@@ -290,6 +296,51 @@ - (void)writeMutations:(std::vector<Mutation> &&)mutations
290
296
_remoteStore->FillWritePipeline ();
291
297
}
292
298
299
+ - (void )registerPendingWritesCallback : (StatusCallback)callback {
300
+ if (!_remoteStore->CanUseNetwork ()) {
301
+ LOG_DEBUG (" The network is disabled. The task returned by 'awaitPendingWrites()' will not "
302
+ " complete until the network is enabled." );
303
+ }
304
+
305
+ int largestPendingBatchId = [self .localStore getHighestUnacknowledgedBatchId ];
306
+
307
+ if (largestPendingBatchId == kBatchIdUnknown ) {
308
+ // Trigger the callback right away if there is no pending writes at the moment.
309
+ callback (Status::OK ());
310
+ return ;
311
+ }
312
+
313
+ auto it = _pendingWritesCallbacks.find (largestPendingBatchId);
314
+ if (it != _pendingWritesCallbacks.end ()) {
315
+ it->second .push_back (std::move (callback));
316
+ } else {
317
+ _pendingWritesCallbacks.emplace (largestPendingBatchId,
318
+ std::vector<StatusCallback>({std::move (callback)}));
319
+ }
320
+ }
321
+
322
+ /* * Triggers callbacks waiting for this batch id to get acknowledged by server, if there are any. */
323
+ - (void )triggerPendingWriteCallbacksWithBatchId : (int )batchId {
324
+ auto it = _pendingWritesCallbacks.find (batchId);
325
+ if (it != _pendingWritesCallbacks.end ()) {
326
+ for (const auto &callback : it->second ) {
327
+ callback (Status::OK ());
328
+ }
329
+
330
+ _pendingWritesCallbacks.erase (it);
331
+ }
332
+ }
333
+
334
+ - (void )failOutstandingPendingWritesAwaitingCallbacks : (absl::string_view)errorMessage {
335
+ for (const auto &entry : _pendingWritesCallbacks) {
336
+ for (const auto &callback : entry.second ) {
337
+ callback (Status (Error::Cancelled, errorMessage));
338
+ }
339
+ }
340
+
341
+ _pendingWritesCallbacks.clear ();
342
+ }
343
+
293
344
- (void )addMutationCompletionBlock : (FSTVoidErrorBlock)completion batchID : (BatchId)batchID {
294
345
NSMutableDictionary <NSNumber *, FSTVoidErrorBlock> *completionBlocks =
295
346
_mutationCompletionBlocks[_currentUser];
@@ -454,6 +505,8 @@ - (void)applySuccessfulWriteWithResult:(FSTMutationBatchResult *)batchResult {
454
505
// consistently happen before listen events.
455
506
[self processUserCallbacksForBatchID: batchResult.batch.batchID error: nil ];
456
507
508
+ [self triggerPendingWriteCallbacksWithBatchId: batchResult.batch.batchID];
509
+
457
510
MaybeDocumentMap changes = [self .localStore acknowledgeBatchWithResult: batchResult];
458
511
[self emitNewSnapshotsAndNotifyLocalStoreWithChanges: changes remoteEvent:absl: :nullopt];
459
512
}
@@ -472,6 +525,8 @@ - (void)rejectFailedWriteWithBatchID:(BatchId)batchID error:(NSError *)error {
472
525
// consistently happen before listen events.
473
526
[self processUserCallbacksForBatchID: batchID error: error];
474
527
528
+ [self triggerPendingWriteCallbacksWithBatchId: batchID];
529
+
475
530
[self emitNewSnapshotsAndNotifyLocalStoreWithChanges: changes remoteEvent:absl: :nullopt];
476
531
}
477
532
@@ -623,6 +678,9 @@ - (void)credentialDidChangeWithUser:(const firebase::firestore::auth::User &)use
623
678
_currentUser = user;
624
679
625
680
if (userChanged) {
681
+ // Fails callbacks waiting for pending writes requested by previous user.
682
+ [self failOutstandingPendingWritesAwaitingCallbacks:
683
+ " 'waitForPendingWrites' callback is cancelled due to a user change." ];
626
684
// Notify local store and emit any resulting events from swapping out the mutation queue.
627
685
MaybeDocumentMap changes = [self .localStore userDidChange: user];
628
686
[self emitNewSnapshotsAndNotifyLocalStoreWithChanges: changes remoteEvent:absl: :nullopt];
0 commit comments