-
-
Notifications
You must be signed in to change notification settings - Fork 99
/
watch_on_collection_insert.dart
92 lines (78 loc) · 2.95 KB
/
watch_on_collection_insert.dart
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import 'package:mongo_dart/mongo_dart.dart';
/// Watch does not work on Standalone systems
/// Only Replica Set and Sharded Cluster
///
/// The actual implementation of watch only works for document event,
/// not for database or collection ones (like drop)
/// [Give a look to this also](https://docs.mongodb.com/manual/changeStreams/)
void main() async {
final db = Db('mongodb://127.0.0.1/testdb');
await db.open();
var collection = db.collection('watch-collection-insert');
// clean data if the example is run more than once.
await collection.drop();
await collection.insertMany([
{'custId': 1, 'name': 'Jeremy'},
{'custId': 2, 'name': 'Al'},
{'custId': 3, 'name': 'John'},
]);
/// Only some stages can be used in the pipeline for a change stream:
/// - $addFields
/// - $match
/// - $project
/// - $replaceRoot
/// - $replaceWith (Available starting in MongoDB 4.2)
/// - $redact
/// - $set (Available starting in MongoDB 4.2)
/// - $unset (Available starting in MongoDB 4.2)
///
/// If you look for updates is better to set "fullDocument" to "updateLookup"
/// otherwise the returned document will contain only the changed fields
///
/// *** Note ***
/// In our case, if we do not specify 'updateLookup' the returned document
/// will not contain the 'custId' field (for updates)
var stream = collection.watch(<Map<String, Object>>[
{
r'$match': {'operationType': 'insert'}
}
] /* ,
changeStreamOptions: ChangeStreamOptions(fullDocument: 'updateLookup') */
);
var pleaseClose = false;
/// As the stream does not end until it is closed, do not use .toList()
/// or you will wait indefinitely
var controller = stream.listen((changeEvent) {
Map<String, dynamic> fullDocument =
changeEvent.fullDocument ?? <String, dynamic>{};
print('Detected change for "custId" '
'${fullDocument['custId']}: "${fullDocument['name']}"');
pleaseClose = true;
});
/// The event will be emitted only when the majority of the
/// replicas has acknowledged the change.
/// This is default behavior starting from 4.2, in 4.0 and earlier you have
/// to set the writeConcern to 'majority' or the events will not be emitted
await collection.updateOne(
where.eq('custId', 1), ModifierBuilder().set('name', 'Harry'),
writeConcern: WriteConcern.majority);
await collection.insertOne({'custId': 4, 'name': 'Nathan'},
writeConcern: WriteConcern.majority);
var waitingCount = 0;
await Future.doWhile(() async {
if (pleaseClose) {
print('Insert detected, closing stream and db.');
/// This is the correct way to cancel the watch subscription
await controller.cancel();
await db.close();
return false;
}
print('Waiting for insert to be detected...');
await Future.delayed(Duration(seconds: 2));
waitingCount++;
if (waitingCount > 7) {
throw StateError('Something went wrong :-(');
}
return true;
});
}