Skip to content

Encapsulate ChangeStreamOperation #1029

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Nov 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,16 @@
import com.mongodb.client.model.ReplaceOptions;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.model.WriteModel;
import com.mongodb.client.model.changestream.FullDocument;
import com.mongodb.client.model.changestream.FullDocumentBeforeChange;
import com.mongodb.internal.async.AsyncBatchCursor;
import com.mongodb.internal.client.model.AggregationLevel;
import com.mongodb.internal.client.model.FindOptions;
import com.mongodb.internal.client.model.changestream.ChangeStreamLevel;
import org.bson.BsonDocument;
import org.bson.BsonTimestamp;
import org.bson.BsonValue;
import org.bson.codecs.Decoder;
import org.bson.codecs.configuration.CodecRegistry;
import org.bson.conversions.Bson;

Expand Down Expand Up @@ -271,4 +277,13 @@ public <TResult> AsyncReadOperation<AsyncBatchCursor<TResult>> listIndexes(final
final long maxTimeMS, final BsonValue comment) {
return operations.listIndexes(resultClass, batchSize, maxTimeMS, comment);
}

public <TResult> AsyncReadOperation<AsyncBatchCursor<TResult>> changeStream(final FullDocument fullDocument,
final FullDocumentBeforeChange fullDocumentBeforeChange, final List<? extends Bson> pipeline,
final Decoder<TResult> decoder, final ChangeStreamLevel changeStreamLevel, final Integer batchSize, final Collation collation,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's odd that we have to pass in the decoder rather than just a result class, but it turns out there's slight difference in implementation between sync and async that requires this, and I didn't think this is the time to resolve the difference.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that we also pass a decoder to SyncOperations.changeStream for the same reason: ChangeStreamOperation needs it. And then I see ChangeStreamOperation.getDecoder being used in ChangeStreamBatchCursor, which tells me that the decoder is used by both sync and async code. I don't think I understand what you are pointing to.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me know if you want me to dig into this and explain it. I no longer remember.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know whether that was important or not. But given that the approach was to ignore that anyway, there is probably no reason to try and dig that out again.

final BsonValue comment, final long maxAwaitTimeMS, final BsonDocument resumeToken, final BsonTimestamp startAtOperationTime,
final BsonDocument startAfter, final boolean showExpandedEvents) {
return operations.changeStream(fullDocument, fullDocumentBeforeChange, pipeline, decoder, changeStreamLevel, batchSize,
collation, comment, maxAwaitTimeMS, resumeToken, startAtOperationTime, startAfter, showExpandedEvents);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,21 +46,26 @@
import com.mongodb.client.model.UpdateOneModel;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.model.WriteModel;
import com.mongodb.client.model.changestream.FullDocument;
import com.mongodb.client.model.changestream.FullDocumentBeforeChange;
import com.mongodb.internal.bulk.DeleteRequest;
import com.mongodb.internal.bulk.IndexRequest;
import com.mongodb.internal.bulk.InsertRequest;
import com.mongodb.internal.bulk.UpdateRequest;
import com.mongodb.internal.bulk.WriteRequest;
import com.mongodb.internal.client.model.AggregationLevel;
import com.mongodb.internal.client.model.FindOptions;
import com.mongodb.internal.client.model.changestream.ChangeStreamLevel;
import org.bson.BsonArray;
import org.bson.BsonDocument;
import org.bson.BsonDocumentWrapper;
import org.bson.BsonJavaScript;
import org.bson.BsonString;
import org.bson.BsonTimestamp;
import org.bson.BsonValue;
import org.bson.codecs.Codec;
import org.bson.codecs.CollectibleCodec;
import org.bson.codecs.Decoder;
import org.bson.codecs.configuration.CodecRegistry;
import org.bson.conversions.Bson;

Expand Down Expand Up @@ -608,6 +613,24 @@ <TResult> ListIndexesOperation<TResult> listIndexes(final Class<TResult> resultC
.comment(comment);
}

<TResult> ChangeStreamOperation<TResult> changeStream(final FullDocument fullDocument,
final FullDocumentBeforeChange fullDocumentBeforeChange, final List<? extends Bson> pipeline,
final Decoder<TResult> decoder, final ChangeStreamLevel changeStreamLevel, final Integer batchSize,
final Collation collation, final BsonValue comment, final long maxAwaitTimeMS, final BsonDocument resumeToken,
final BsonTimestamp startAtOperationTime, final BsonDocument startAfter, final boolean showExpandedEvents) {
return new ChangeStreamOperation<>(namespace, fullDocument, fullDocumentBeforeChange, toBsonDocumentList(pipeline), decoder,
changeStreamLevel)
.batchSize(batchSize)
.collation(collation)
.comment(comment)
.maxAwaitTime(maxAwaitTimeMS, MILLISECONDS)
.resumeAfter(resumeToken)
.startAtOperationTime(startAtOperationTime)
.startAfter(startAfter)
.showExpandedEvents(showExpandedEvents)
.retryReads(retryReads);
}

private Codec<TDocument> getCodec() {
return codecRegistry.get(documentClass);
}
Expand All @@ -626,6 +649,9 @@ private List<BsonDocument> toBsonDocumentList(final List<? extends Bson> bsonLis
}
List<BsonDocument> bsonDocumentList = new ArrayList<>(bsonList.size());
for (Bson cur : bsonList) {
if (cur == null) {
throw new IllegalArgumentException("All documents in the list must be non-null");
}
bsonDocumentList.add(toBsonDocument(cur));
}
return bsonDocumentList;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,15 @@
import com.mongodb.client.model.ReplaceOptions;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.model.WriteModel;
import com.mongodb.client.model.changestream.FullDocument;
import com.mongodb.client.model.changestream.FullDocumentBeforeChange;
import com.mongodb.internal.client.model.AggregationLevel;
import com.mongodb.internal.client.model.FindOptions;
import com.mongodb.internal.client.model.changestream.ChangeStreamLevel;
import org.bson.BsonDocument;
import org.bson.BsonTimestamp;
import org.bson.BsonValue;
import org.bson.codecs.Decoder;
import org.bson.codecs.configuration.CodecRegistry;
import org.bson.conversions.Bson;

Expand Down Expand Up @@ -249,4 +255,13 @@ public <TResult> ReadOperation<BatchCursor<TResult>> listIndexes(final Class<TRe
final long maxTimeMS, final BsonValue comment) {
return operations.listIndexes(resultClass, batchSize, maxTimeMS, comment);
}

public <TResult> ReadOperation<BatchCursor<TResult>> changeStream(final FullDocument fullDocument,
final FullDocumentBeforeChange fullDocumentBeforeChange, final List<? extends Bson> pipeline, final Decoder<TResult> decoder,
final ChangeStreamLevel changeStreamLevel, final Integer batchSize, final Collation collation, final BsonValue comment,
final long maxAwaitTimeMS, final BsonDocument resumeToken, final BsonTimestamp startAtOperationTime,
final BsonDocument startAfter, final boolean showExpandedEvents) {
return operations.changeStream(fullDocument, fullDocumentBeforeChange, pipeline, decoder, changeStreamLevel, batchSize,
collation, comment, maxAwaitTimeMS, resumeToken, startAtOperationTime, startAfter, showExpandedEvents);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.mongodb.internal.async.AsyncBatchCursor;
import com.mongodb.internal.client.model.changestream.ChangeStreamLevel;
import com.mongodb.internal.operation.AsyncReadOperation;
import com.mongodb.internal.operation.ChangeStreamOperation;
import com.mongodb.lang.Nullable;
import com.mongodb.reactivestreams.client.ChangeStreamPublisher;
import com.mongodb.reactivestreams.client.ClientSession;
Expand All @@ -35,7 +34,6 @@
import org.bson.conversions.Bson;
import org.reactivestreams.Publisher;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -169,27 +167,7 @@ AsyncReadOperation<AsyncBatchCursor<ChangeStreamDocument<T>>> asAsyncReadOperati
}

private <S> AsyncReadOperation<AsyncBatchCursor<S>> createChangeStreamOperation(final Codec<S> codec, final int initialBatchSize) {
return new ChangeStreamOperation<>(getNamespace(), fullDocument, fullDocumentBeforeChange,
createBsonDocumentList(pipeline), codec, changeStreamLevel)
.batchSize(initialBatchSize)
.collation(collation)
.comment(comment)
.maxAwaitTime(maxAwaitTimeMS, MILLISECONDS)
.resumeAfter(resumeToken)
.startAtOperationTime(startAtOperationTime)
.startAfter(startAfter)
.showExpandedEvents(showExpandedEvents)
.retryReads(getRetryReads());
}

private List<BsonDocument> createBsonDocumentList(final List<? extends Bson> pipeline) {
List<BsonDocument> aggregateList = new ArrayList<>(pipeline.size());
for (Bson obj : pipeline) {
if (obj == null) {
throw new IllegalArgumentException("pipeline can not contain a null value");
}
aggregateList.add(obj.toBsonDocument(BsonDocument.class, getCodecRegistry()));
}
return aggregateList;
return getOperations().changeStream(fullDocument, fullDocumentBeforeChange, pipeline, codec, changeStreamLevel, initialBatchSize,
collation, comment, maxAwaitTimeMS, resumeToken, startAtOperationTime, startAfter, showExpandedEvents);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
import com.mongodb.client.model.changestream.FullDocumentBeforeChange;
import com.mongodb.internal.client.model.changestream.ChangeStreamLevel;
import com.mongodb.internal.operation.BatchCursor;
import com.mongodb.internal.operation.ChangeStreamOperation;
import com.mongodb.internal.operation.ReadOperation;
import com.mongodb.internal.operation.SyncOperations;
import com.mongodb.lang.Nullable;
import org.bson.BsonDocument;
import org.bson.BsonString;
Expand All @@ -43,7 +43,6 @@
import org.bson.codecs.configuration.CodecRegistry;
import org.bson.conversions.Bson;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

Expand All @@ -55,12 +54,11 @@
*/
public class ChangeStreamIterableImpl<TResult> extends MongoIterableImpl<ChangeStreamDocument<TResult>>
implements ChangeStreamIterable<TResult> {
private final MongoNamespace namespace;
private final CodecRegistry codecRegistry;
private final List<? extends Bson> pipeline;
private final Codec<ChangeStreamDocument<TResult>> codec;
private final ChangeStreamLevel changeStreamLevel;

private final SyncOperations<TResult> operations;
private FullDocument fullDocument = FullDocument.DEFAULT;
private FullDocumentBeforeChange fullDocumentBeforeChange = FullDocumentBeforeChange.DEFAULT;
private BsonDocument resumeToken;
Expand All @@ -84,11 +82,11 @@ public ChangeStreamIterableImpl(@Nullable final ClientSession clientSession, fin
final OperationExecutor executor, final List<? extends Bson> pipeline, final Class<TResult> resultClass,
final ChangeStreamLevel changeStreamLevel, final boolean retryReads) {
super(clientSession, executor, readConcern, readPreference, retryReads);
this.namespace = notNull("namespace", namespace);
this.codecRegistry = notNull("codecRegistry", codecRegistry);
this.pipeline = notNull("pipeline", pipeline);
this.codec = ChangeStreamDocument.createCodec(notNull("resultClass", resultClass), codecRegistry);
this.changeStreamLevel = notNull("changeStreamLevel", changeStreamLevel);
this.operations = new SyncOperations<>(namespace, resultClass, readPreference, codecRegistry, retryReads);
}

@Override
Expand Down Expand Up @@ -209,28 +207,8 @@ public ReadOperation<BatchCursor<ChangeStreamDocument<TResult>>> asReadOperation
}

private ReadOperation<BatchCursor<RawBsonDocument>> createChangeStreamOperation() {
return new ChangeStreamOperation<>(namespace, fullDocument, fullDocumentBeforeChange, createBsonDocumentList(pipeline),
new RawBsonDocumentCodec(), changeStreamLevel)
.batchSize(getBatchSize())
.collation(collation)
.comment(comment)
.maxAwaitTime(maxAwaitTimeMS, MILLISECONDS)
.resumeAfter(resumeToken)
.startAtOperationTime(startAtOperationTime)
.startAfter(startAfter)
.showExpandedEvents(showExpandedEvents)
.retryReads(getRetryReads());
}

private List<BsonDocument> createBsonDocumentList(final List<? extends Bson> pipeline) {
List<BsonDocument> aggregateList = new ArrayList<BsonDocument>(pipeline.size());
for (Bson obj : pipeline) {
if (obj == null) {
throw new IllegalArgumentException("pipeline cannot contain a null value");
}
aggregateList.add(obj.toBsonDocument(BsonDocument.class, codecRegistry));
}
return aggregateList;
return operations.changeStream(fullDocument, fullDocumentBeforeChange, pipeline, new RawBsonDocumentCodec(), changeStreamLevel,
getBatchSize(), collation, comment, maxAwaitTimeMS, resumeToken, startAtOperationTime, startAfter, showExpandedEvents);
}

private BatchCursor<RawBsonDocument> execute() {
Expand Down