Skip to content

Commit 1b3123a

Browse files
authored
Encapsulate ChangeStreamOperation (#1029)
JAVA-4795
1 parent c09f865 commit 1b3123a

File tree

5 files changed

+63
-51
lines changed

5 files changed

+63
-51
lines changed

driver-core/src/main/com/mongodb/internal/operation/AsyncOperations.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,16 @@
4040
import com.mongodb.client.model.ReplaceOptions;
4141
import com.mongodb.client.model.UpdateOptions;
4242
import com.mongodb.client.model.WriteModel;
43+
import com.mongodb.client.model.changestream.FullDocument;
44+
import com.mongodb.client.model.changestream.FullDocumentBeforeChange;
4345
import com.mongodb.internal.async.AsyncBatchCursor;
4446
import com.mongodb.internal.client.model.AggregationLevel;
4547
import com.mongodb.internal.client.model.FindOptions;
48+
import com.mongodb.internal.client.model.changestream.ChangeStreamLevel;
49+
import org.bson.BsonDocument;
50+
import org.bson.BsonTimestamp;
4651
import org.bson.BsonValue;
52+
import org.bson.codecs.Decoder;
4753
import org.bson.codecs.configuration.CodecRegistry;
4854
import org.bson.conversions.Bson;
4955

@@ -271,4 +277,13 @@ public <TResult> AsyncReadOperation<AsyncBatchCursor<TResult>> listIndexes(final
271277
final long maxTimeMS, final BsonValue comment) {
272278
return operations.listIndexes(resultClass, batchSize, maxTimeMS, comment);
273279
}
280+
281+
public <TResult> AsyncReadOperation<AsyncBatchCursor<TResult>> changeStream(final FullDocument fullDocument,
282+
final FullDocumentBeforeChange fullDocumentBeforeChange, final List<? extends Bson> pipeline,
283+
final Decoder<TResult> decoder, final ChangeStreamLevel changeStreamLevel, final Integer batchSize, final Collation collation,
284+
final BsonValue comment, final long maxAwaitTimeMS, final BsonDocument resumeToken, final BsonTimestamp startAtOperationTime,
285+
final BsonDocument startAfter, final boolean showExpandedEvents) {
286+
return operations.changeStream(fullDocument, fullDocumentBeforeChange, pipeline, decoder, changeStreamLevel, batchSize,
287+
collation, comment, maxAwaitTimeMS, resumeToken, startAtOperationTime, startAfter, showExpandedEvents);
288+
}
274289
}

driver-core/src/main/com/mongodb/internal/operation/Operations.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,21 +46,26 @@
4646
import com.mongodb.client.model.UpdateOneModel;
4747
import com.mongodb.client.model.UpdateOptions;
4848
import com.mongodb.client.model.WriteModel;
49+
import com.mongodb.client.model.changestream.FullDocument;
50+
import com.mongodb.client.model.changestream.FullDocumentBeforeChange;
4951
import com.mongodb.internal.bulk.DeleteRequest;
5052
import com.mongodb.internal.bulk.IndexRequest;
5153
import com.mongodb.internal.bulk.InsertRequest;
5254
import com.mongodb.internal.bulk.UpdateRequest;
5355
import com.mongodb.internal.bulk.WriteRequest;
5456
import com.mongodb.internal.client.model.AggregationLevel;
5557
import com.mongodb.internal.client.model.FindOptions;
58+
import com.mongodb.internal.client.model.changestream.ChangeStreamLevel;
5659
import org.bson.BsonArray;
5760
import org.bson.BsonDocument;
5861
import org.bson.BsonDocumentWrapper;
5962
import org.bson.BsonJavaScript;
6063
import org.bson.BsonString;
64+
import org.bson.BsonTimestamp;
6165
import org.bson.BsonValue;
6266
import org.bson.codecs.Codec;
6367
import org.bson.codecs.CollectibleCodec;
68+
import org.bson.codecs.Decoder;
6469
import org.bson.codecs.configuration.CodecRegistry;
6570
import org.bson.conversions.Bson;
6671

@@ -608,6 +613,24 @@ <TResult> ListIndexesOperation<TResult> listIndexes(final Class<TResult> resultC
608613
.comment(comment);
609614
}
610615

616+
<TResult> ChangeStreamOperation<TResult> changeStream(final FullDocument fullDocument,
617+
final FullDocumentBeforeChange fullDocumentBeforeChange, final List<? extends Bson> pipeline,
618+
final Decoder<TResult> decoder, final ChangeStreamLevel changeStreamLevel, final Integer batchSize,
619+
final Collation collation, final BsonValue comment, final long maxAwaitTimeMS, final BsonDocument resumeToken,
620+
final BsonTimestamp startAtOperationTime, final BsonDocument startAfter, final boolean showExpandedEvents) {
621+
return new ChangeStreamOperation<>(namespace, fullDocument, fullDocumentBeforeChange, toBsonDocumentList(pipeline), decoder,
622+
changeStreamLevel)
623+
.batchSize(batchSize)
624+
.collation(collation)
625+
.comment(comment)
626+
.maxAwaitTime(maxAwaitTimeMS, MILLISECONDS)
627+
.resumeAfter(resumeToken)
628+
.startAtOperationTime(startAtOperationTime)
629+
.startAfter(startAfter)
630+
.showExpandedEvents(showExpandedEvents)
631+
.retryReads(retryReads);
632+
}
633+
611634
private Codec<TDocument> getCodec() {
612635
return codecRegistry.get(documentClass);
613636
}
@@ -626,6 +649,9 @@ private List<BsonDocument> toBsonDocumentList(final List<? extends Bson> bsonLis
626649
}
627650
List<BsonDocument> bsonDocumentList = new ArrayList<>(bsonList.size());
628651
for (Bson cur : bsonList) {
652+
if (cur == null) {
653+
throw new IllegalArgumentException("All documents in the list must be non-null");
654+
}
629655
bsonDocumentList.add(toBsonDocument(cur));
630656
}
631657
return bsonDocumentList;

driver-core/src/main/com/mongodb/internal/operation/SyncOperations.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,15 @@
4040
import com.mongodb.client.model.ReplaceOptions;
4141
import com.mongodb.client.model.UpdateOptions;
4242
import com.mongodb.client.model.WriteModel;
43+
import com.mongodb.client.model.changestream.FullDocument;
44+
import com.mongodb.client.model.changestream.FullDocumentBeforeChange;
4345
import com.mongodb.internal.client.model.AggregationLevel;
4446
import com.mongodb.internal.client.model.FindOptions;
47+
import com.mongodb.internal.client.model.changestream.ChangeStreamLevel;
48+
import org.bson.BsonDocument;
49+
import org.bson.BsonTimestamp;
4550
import org.bson.BsonValue;
51+
import org.bson.codecs.Decoder;
4652
import org.bson.codecs.configuration.CodecRegistry;
4753
import org.bson.conversions.Bson;
4854

@@ -249,4 +255,13 @@ public <TResult> ReadOperation<BatchCursor<TResult>> listIndexes(final Class<TRe
249255
final long maxTimeMS, final BsonValue comment) {
250256
return operations.listIndexes(resultClass, batchSize, maxTimeMS, comment);
251257
}
258+
259+
public <TResult> ReadOperation<BatchCursor<TResult>> changeStream(final FullDocument fullDocument,
260+
final FullDocumentBeforeChange fullDocumentBeforeChange, final List<? extends Bson> pipeline, final Decoder<TResult> decoder,
261+
final ChangeStreamLevel changeStreamLevel, final Integer batchSize, final Collation collation, final BsonValue comment,
262+
final long maxAwaitTimeMS, final BsonDocument resumeToken, final BsonTimestamp startAtOperationTime,
263+
final BsonDocument startAfter, final boolean showExpandedEvents) {
264+
return operations.changeStream(fullDocument, fullDocumentBeforeChange, pipeline, decoder, changeStreamLevel, batchSize,
265+
collation, comment, maxAwaitTimeMS, resumeToken, startAtOperationTime, startAfter, showExpandedEvents);
266+
}
252267
}

driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ChangeStreamPublisherImpl.java

Lines changed: 2 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import com.mongodb.internal.async.AsyncBatchCursor;
2424
import com.mongodb.internal.client.model.changestream.ChangeStreamLevel;
2525
import com.mongodb.internal.operation.AsyncReadOperation;
26-
import com.mongodb.internal.operation.ChangeStreamOperation;
2726
import com.mongodb.lang.Nullable;
2827
import com.mongodb.reactivestreams.client.ChangeStreamPublisher;
2928
import com.mongodb.reactivestreams.client.ClientSession;
@@ -35,7 +34,6 @@
3534
import org.bson.conversions.Bson;
3635
import org.reactivestreams.Publisher;
3736

38-
import java.util.ArrayList;
3937
import java.util.List;
4038
import java.util.concurrent.TimeUnit;
4139

@@ -169,27 +167,7 @@ AsyncReadOperation<AsyncBatchCursor<ChangeStreamDocument<T>>> asAsyncReadOperati
169167
}
170168

171169
private <S> AsyncReadOperation<AsyncBatchCursor<S>> createChangeStreamOperation(final Codec<S> codec, final int initialBatchSize) {
172-
return new ChangeStreamOperation<>(getNamespace(), fullDocument, fullDocumentBeforeChange,
173-
createBsonDocumentList(pipeline), codec, changeStreamLevel)
174-
.batchSize(initialBatchSize)
175-
.collation(collation)
176-
.comment(comment)
177-
.maxAwaitTime(maxAwaitTimeMS, MILLISECONDS)
178-
.resumeAfter(resumeToken)
179-
.startAtOperationTime(startAtOperationTime)
180-
.startAfter(startAfter)
181-
.showExpandedEvents(showExpandedEvents)
182-
.retryReads(getRetryReads());
183-
}
184-
185-
private List<BsonDocument> createBsonDocumentList(final List<? extends Bson> pipeline) {
186-
List<BsonDocument> aggregateList = new ArrayList<>(pipeline.size());
187-
for (Bson obj : pipeline) {
188-
if (obj == null) {
189-
throw new IllegalArgumentException("pipeline can not contain a null value");
190-
}
191-
aggregateList.add(obj.toBsonDocument(BsonDocument.class, getCodecRegistry()));
192-
}
193-
return aggregateList;
170+
return getOperations().changeStream(fullDocument, fullDocumentBeforeChange, pipeline, codec, changeStreamLevel, initialBatchSize,
171+
collation, comment, maxAwaitTimeMS, resumeToken, startAtOperationTime, startAfter, showExpandedEvents);
194172
}
195173
}

driver-sync/src/main/com/mongodb/client/internal/ChangeStreamIterableImpl.java

Lines changed: 5 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@
3030
import com.mongodb.client.model.changestream.FullDocumentBeforeChange;
3131
import com.mongodb.internal.client.model.changestream.ChangeStreamLevel;
3232
import com.mongodb.internal.operation.BatchCursor;
33-
import com.mongodb.internal.operation.ChangeStreamOperation;
3433
import com.mongodb.internal.operation.ReadOperation;
34+
import com.mongodb.internal.operation.SyncOperations;
3535
import com.mongodb.lang.Nullable;
3636
import org.bson.BsonDocument;
3737
import org.bson.BsonString;
@@ -43,7 +43,6 @@
4343
import org.bson.codecs.configuration.CodecRegistry;
4444
import org.bson.conversions.Bson;
4545

46-
import java.util.ArrayList;
4746
import java.util.List;
4847
import java.util.concurrent.TimeUnit;
4948

@@ -55,12 +54,11 @@
5554
*/
5655
public class ChangeStreamIterableImpl<TResult> extends MongoIterableImpl<ChangeStreamDocument<TResult>>
5756
implements ChangeStreamIterable<TResult> {
58-
private final MongoNamespace namespace;
5957
private final CodecRegistry codecRegistry;
6058
private final List<? extends Bson> pipeline;
6159
private final Codec<ChangeStreamDocument<TResult>> codec;
6260
private final ChangeStreamLevel changeStreamLevel;
63-
61+
private final SyncOperations<TResult> operations;
6462
private FullDocument fullDocument = FullDocument.DEFAULT;
6563
private FullDocumentBeforeChange fullDocumentBeforeChange = FullDocumentBeforeChange.DEFAULT;
6664
private BsonDocument resumeToken;
@@ -84,11 +82,11 @@ public ChangeStreamIterableImpl(@Nullable final ClientSession clientSession, fin
8482
final OperationExecutor executor, final List<? extends Bson> pipeline, final Class<TResult> resultClass,
8583
final ChangeStreamLevel changeStreamLevel, final boolean retryReads) {
8684
super(clientSession, executor, readConcern, readPreference, retryReads);
87-
this.namespace = notNull("namespace", namespace);
8885
this.codecRegistry = notNull("codecRegistry", codecRegistry);
8986
this.pipeline = notNull("pipeline", pipeline);
9087
this.codec = ChangeStreamDocument.createCodec(notNull("resultClass", resultClass), codecRegistry);
9188
this.changeStreamLevel = notNull("changeStreamLevel", changeStreamLevel);
89+
this.operations = new SyncOperations<>(namespace, resultClass, readPreference, codecRegistry, retryReads);
9290
}
9391

9492
@Override
@@ -209,28 +207,8 @@ public ReadOperation<BatchCursor<ChangeStreamDocument<TResult>>> asReadOperation
209207
}
210208

211209
private ReadOperation<BatchCursor<RawBsonDocument>> createChangeStreamOperation() {
212-
return new ChangeStreamOperation<>(namespace, fullDocument, fullDocumentBeforeChange, createBsonDocumentList(pipeline),
213-
new RawBsonDocumentCodec(), changeStreamLevel)
214-
.batchSize(getBatchSize())
215-
.collation(collation)
216-
.comment(comment)
217-
.maxAwaitTime(maxAwaitTimeMS, MILLISECONDS)
218-
.resumeAfter(resumeToken)
219-
.startAtOperationTime(startAtOperationTime)
220-
.startAfter(startAfter)
221-
.showExpandedEvents(showExpandedEvents)
222-
.retryReads(getRetryReads());
223-
}
224-
225-
private List<BsonDocument> createBsonDocumentList(final List<? extends Bson> pipeline) {
226-
List<BsonDocument> aggregateList = new ArrayList<BsonDocument>(pipeline.size());
227-
for (Bson obj : pipeline) {
228-
if (obj == null) {
229-
throw new IllegalArgumentException("pipeline cannot contain a null value");
230-
}
231-
aggregateList.add(obj.toBsonDocument(BsonDocument.class, codecRegistry));
232-
}
233-
return aggregateList;
210+
return operations.changeStream(fullDocument, fullDocumentBeforeChange, pipeline, new RawBsonDocumentCodec(), changeStreamLevel,
211+
getBatchSize(), collation, comment, maxAwaitTimeMS, resumeToken, startAtOperationTime, startAfter, showExpandedEvents);
234212
}
235213

236214
private BatchCursor<RawBsonDocument> execute() {

0 commit comments

Comments
 (0)