8
8
import org .bson .BsonDocument ;
9
9
import org .bson .codecs .configuration .CodecRegistry ;
10
10
import org .bson .codecs .pojo .PojoCodecProvider ;
11
+ import org .bson .conversions .Bson ;
11
12
13
+ import java .util .List ;
12
14
import java .util .function .Consumer ;
13
15
import java .util .logging .Level ;
14
16
import java .util .logging .Logger ;
15
17
16
18
import static com .mongodb .client .model .Aggregates .match ;
17
19
import static com .mongodb .client .model .Filters .eq ;
20
+ import static com .mongodb .client .model .Filters .in ;
21
+ import static com .mongodb .client .model .changestream .FullDocument .UPDATE_LOOKUP ;
18
22
import static java .util .Arrays .asList ;
23
+ import static java .util .Collections .singletonList ;
19
24
import static org .bson .codecs .configuration .CodecRegistries .fromProviders ;
20
25
import static org .bson .codecs .configuration .CodecRegistries .fromRegistries ;
21
26
@@ -34,28 +39,32 @@ public static void main(String[] args) {
34
39
try (MongoClient mongoClient = MongoClients .create (clientSettings )) {
35
40
MongoDatabase db = mongoClient .getDatabase ("sample_training" );
36
41
MongoCollection <Grade > grades = db .getCollection ("grades" , Grade .class );
42
+ List <Bson > pipeline ;
37
43
38
44
// Only uncomment one example at a time. Follow instructions for each individually then kill all remaining processes.
39
45
40
46
/** => Example 1: print all the write operations.
41
47
* => Start "ChangeStreams" then "MappingPOJOs" to see some change events.
42
48
*/
43
- grades .watch ().forEach (print ());
49
+ grades .watch ().forEach (printEvent ());
44
50
45
51
/** => Example 2: print only insert and delete operations.
46
52
* => Start "ChangeStreams" then "MappingPOJOs" to see some change events.
47
53
*/
48
- // grades.watch(asList(match(in("operationType", asList("insert", "delete"))))).forEach(print());
54
+ // pipeline = singletonList(match(in("operationType", asList("insert", "delete"))));
55
+ // grades.watch(pipeline).forEach(printEvent());
49
56
50
57
/** => Example 3: print only updates without fullDocument.
51
58
* => Start "ChangeStreams" then "Update" to see some change events (start "Create" before if not done earlier).
52
59
*/
53
- // grades.watch(asList(match(eq("operationType", "update")))).forEach(print());
60
+ // pipeline = singletonList(match(eq("operationType", "update")));
61
+ // grades.watch(pipeline).forEach(printEvent());
54
62
55
63
/** => Example 4: print only updates with fullDocument.
56
64
* => Start "ChangeStreams" then "Update" to see some change events.
57
65
*/
58
- // grades.watch(asList(match(eq("operationType", "update")))).fullDocument(UPDATE_LOOKUP).forEach(print());
66
+ // pipeline = singletonList(match(eq("operationType", "update")));
67
+ // grades.watch(pipeline).fullDocument(UPDATE_LOOKUP).forEach(printEvent());
59
68
60
69
/**
61
70
* => Example 5: iterating using a cursor and a while loop + remembering a resumeToken then restart the Change Streams.
@@ -66,8 +75,9 @@ public static void main(String[] args) {
66
75
}
67
76
68
77
private static void exampleWithResumeToken (MongoCollection <Grade > grades ) {
69
- MongoChangeStreamCursor <ChangeStreamDocument <Grade >> cursor = grades .watch (asList (match (eq ("operationType" , "update" ))))
70
- .cursor ();
78
+ List <Bson > pipeline = singletonList (match (eq ("operationType" , "update" )));
79
+ ChangeStreamIterable <Grade > changeStream = grades .watch (pipeline );
80
+ MongoChangeStreamCursor <ChangeStreamDocument <Grade >> cursor = changeStream .cursor ();
71
81
System .out .println ("==> Going through the stream a first time & record a resumeToken" );
72
82
int indexOfOperationToRestartFrom = 5 ;
73
83
int indexOfIncident = 8 ;
@@ -83,10 +93,11 @@ private static void exampleWithResumeToken(MongoCollection<Grade> grades) {
83
93
}
84
94
System .out .println ("==> Let's imagine something wrong happened and I need to restart my Change Stream." );
85
95
System .out .println ("==> Starting from resumeToken=" + resumeToken );
86
- grades .watch (asList (match (eq ("operationType" , "update" )))).resumeAfter (resumeToken ).forEach (print ());
96
+ assert resumeToken != null ;
97
+ grades .watch (pipeline ).resumeAfter (resumeToken ).forEach (printEvent ());
87
98
}
88
99
89
- private static Consumer <ChangeStreamDocument <Grade >> print () {
100
+ private static Consumer <ChangeStreamDocument <Grade >> printEvent () {
90
101
return System .out ::println ;
91
102
}
92
103
}
0 commit comments