Skip to content

Commit

Permalink
Enhance pre-and post-image support for change streams (mongodb#926)
Browse files Browse the repository at this point in the history
Change stream watch helpers now accept "whenAvailable" and "required" for the fullDocument option.
Additionally, a new fullDocumentBeforeChange option is introduced, which accepts "whenAvailable" and
"required". Change events may now include a "fullDocumentBeforeChange" response field.

JAVA-4468
  • Loading branch information
jyemin authored Apr 28, 2022
1 parent b969b1a commit 3bbad88
Show file tree
Hide file tree
Showing 24 changed files with 1,325 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public final class ChangeStreamDocument<TDocument> {
private final BsonDocument namespaceDocument;
private final BsonDocument destinationNamespaceDocument;
private final TDocument fullDocument;
private final TDocument fullDocumentBeforeChange;
private final BsonDocument documentKey;
private final BsonTimestamp clusterTime;
@BsonProperty("operationType")
Expand All @@ -64,28 +65,31 @@ public final class ChangeStreamDocument<TDocument> {
* @param namespaceDocument the BsonDocument representing the namespace
* @param destinationNamespaceDocument the BsonDocument representing the destinatation namespace
* @param fullDocument the full document
* @param fullDocumentBeforeChange the full document before change
* @param documentKey a document containing the _id of the changed document
* @param clusterTime the cluster time at which the change occured
* @param updateDescription the update description
* @param txnNumber the transaction number
* @param lsid the identifier for the session associated with the transaction
*
* @since 4.6
* @since 4.7
*/
@BsonCreator
public ChangeStreamDocument(@BsonProperty("operationType") final String operationTypeString,
@BsonProperty("resumeToken") final BsonDocument resumeToken,
@Nullable @BsonProperty("ns") final BsonDocument namespaceDocument,
@Nullable @BsonProperty("to") final BsonDocument destinationNamespaceDocument,
@Nullable @BsonProperty("fullDocument") final TDocument fullDocument,
@Nullable @BsonProperty("documentKey") final BsonDocument documentKey,
@Nullable @BsonProperty("clusterTime") final BsonTimestamp clusterTime,
@Nullable @BsonProperty("updateDescription") final UpdateDescription updateDescription,
@Nullable @BsonProperty("txnNumber") final BsonInt64 txnNumber,
@Nullable @BsonProperty("lsid") final BsonDocument lsid) {
@BsonProperty("resumeToken") final BsonDocument resumeToken,
@Nullable @BsonProperty("ns") final BsonDocument namespaceDocument,
@Nullable @BsonProperty("to") final BsonDocument destinationNamespaceDocument,
@Nullable @BsonProperty("fullDocument") final TDocument fullDocument,
@Nullable @BsonProperty("fullDocumentBeforeChange") final TDocument fullDocumentBeforeChange,
@Nullable @BsonProperty("documentKey") final BsonDocument documentKey,
@Nullable @BsonProperty("clusterTime") final BsonTimestamp clusterTime,
@Nullable @BsonProperty("updateDescription") final UpdateDescription updateDescription,
@Nullable @BsonProperty("txnNumber") final BsonInt64 txnNumber,
@Nullable @BsonProperty("lsid") final BsonDocument lsid) {
this.resumeToken = resumeToken;
this.namespaceDocument = namespaceDocument;
this.destinationNamespaceDocument = destinationNamespaceDocument;
this.fullDocumentBeforeChange = fullDocumentBeforeChange;
this.documentKey = documentKey;
this.fullDocument = fullDocument;
this.clusterTime = clusterTime;
Expand All @@ -96,6 +100,37 @@ public ChangeStreamDocument(@BsonProperty("operationType") final String operatio
this.lsid = lsid;
}

/**
* Creates a new instance
*
* @param operationTypeString the operation type
* @param resumeToken the resume token
* @param namespaceDocument the BsonDocument representing the namespace
* @param destinationNamespaceDocument the BsonDocument representing the destinatation namespace
* @param fullDocument the full document
* @param documentKey a document containing the _id of the changed document
* @param clusterTime the cluster time at which the change occured
* @param updateDescription the update description
* @param txnNumber the transaction number
* @param lsid the identifier for the session associated with the transaction
*
* @since 4.6
*/
@Deprecated
public ChangeStreamDocument(@BsonProperty("operationType") final String operationTypeString,
@BsonProperty("resumeToken") final BsonDocument resumeToken,
@Nullable @BsonProperty("ns") final BsonDocument namespaceDocument,
@Nullable @BsonProperty("to") final BsonDocument destinationNamespaceDocument,
@Nullable @BsonProperty("fullDocument") final TDocument fullDocument,
@Nullable @BsonProperty("documentKey") final BsonDocument documentKey,
@Nullable @BsonProperty("clusterTime") final BsonTimestamp clusterTime,
@Nullable @BsonProperty("updateDescription") final UpdateDescription updateDescription,
@Nullable @BsonProperty("txnNumber") final BsonInt64 txnNumber,
@Nullable @BsonProperty("lsid") final BsonDocument lsid) {
this(operationTypeString, resumeToken, namespaceDocument, destinationNamespaceDocument, fullDocument, null, documentKey,
clusterTime, updateDescription, txnNumber, lsid);
}

/**
* Creates a new instance
*
Expand Down Expand Up @@ -123,7 +158,7 @@ public ChangeStreamDocument(final OperationType operationType,
final UpdateDescription updateDescription,
final BsonInt64 txnNumber,
final BsonDocument lsid) {
this(operationType.getValue(), resumeToken, namespaceDocument, destinationNamespaceDocument, fullDocument, documentKey,
this(operationType.getValue(), resumeToken, namespaceDocument, destinationNamespaceDocument, fullDocument, null, documentKey,
clusterTime, updateDescription, txnNumber, lsid);
}

Expand Down Expand Up @@ -232,7 +267,30 @@ public String getDatabaseName() {
}

/**
* Returns the fullDocument
* Returns the fullDocument.
*
* <p>
* Always present for operations of type {@link OperationType#INSERT} and {@link OperationType#REPLACE}. Also present for operations
* of type {@link OperationType#UPDATE} if the user has specified {@link FullDocument#UPDATE_LOOKUP} for the {@code fullDocument}
* option when creating the change stream.
* </p>
*
* <p>
* For operations of type {@link OperationType#INSERT} and {@link OperationType#REPLACE}, the value will contain the document being
* inserted or the new version of the document that is replacing the existing document, respectively.
* </p>
*
* <p>
* For operations of type {@link OperationType#UPDATE}, the value will contain a copy of the full version of the document from some
* point after the update occurred. If the document was deleted since the updated happened, the value may be null.
* </p>
*
* <p>
* Contains the point-in-time post-image of the modified document if the post-image is available and either
* {@link FullDocument#REQUIRED} or {@link FullDocument#WHEN_AVAILABLE} was specified for the {@code fullDocument} option when
* creating the change stream. A post-image is always available for {@link OperationType#INSERT} and {@link OperationType#REPLACE}
* events.
* </p>
*
* @return the fullDocument
*/
Expand All @@ -241,6 +299,25 @@ public TDocument getFullDocument() {
return fullDocument;
}

/**
* Returns the fullDocument before change
*
* <p>
* Contains the pre-image of the modified or deleted document if the pre-image is available for the change event and either
* {@link FullDocumentBeforeChange#REQUIRED} or {@link FullDocumentBeforeChange#WHEN_AVAILABLE} was specified for the
* {@code fullDocumentBeforeChange} option when creating the change stream. If {@link FullDocumentBeforeChange#WHEN_AVAILABLE} was
* specified but the pre-image is unavailable, the value will be null.
* </p>
*
* @return the fulDocument before change
* @since 4.7
* @mongodb.server.release 6.0
*/
@Nullable
public TDocument getFullDocumentBeforeChange() {
return fullDocumentBeforeChange;
}

/**
* Returns a document containing just the _id of the changed document.
* <p>
Expand Down Expand Up @@ -368,6 +445,10 @@ public boolean equals(final Object o) {
if (fullDocument != null ? !fullDocument.equals(that.fullDocument) : that.fullDocument != null) {
return false;
}
if (fullDocumentBeforeChange != null ? !fullDocumentBeforeChange.equals(that.fullDocumentBeforeChange)
: that.fullDocumentBeforeChange != null) {
return false;
}
if (documentKey != null ? !documentKey.equals(that.documentKey) : that.documentKey != null) {
return false;
}
Expand Down Expand Up @@ -396,6 +477,7 @@ public int hashCode() {
result = 31 * result + (namespaceDocument != null ? namespaceDocument.hashCode() : 0);
result = 31 * result + (destinationNamespaceDocument != null ? destinationNamespaceDocument.hashCode() : 0);
result = 31 * result + (fullDocument != null ? fullDocument.hashCode() : 0);
result = 31 * result + (fullDocumentBeforeChange != null ? fullDocumentBeforeChange.hashCode() : 0);
result = 31 * result + (documentKey != null ? documentKey.hashCode() : 0);
result = 31 * result + (clusterTime != null ? clusterTime.hashCode() : 0);
result = 31 * result + (operationTypeString != null ? operationTypeString.hashCode() : 0);
Expand All @@ -413,6 +495,7 @@ public String toString() {
+ ", namespace=" + getNamespace()
+ ", destinationNamespace=" + getDestinationNamespace()
+ ", fullDocument=" + fullDocument
+ ", fullDocumentBeforeChange=" + fullDocumentBeforeChange
+ ", documentKey=" + documentKey
+ ", clusterTime=" + clusterTime
+ ", updateDescription=" + updateDescription
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ final class ChangeStreamDocumentCodec<TResult> implements Codec<ChangeStreamDocu

ClassModelBuilder<ChangeStreamDocument> classModelBuilder = ClassModel.builder(ChangeStreamDocument.class);
((PropertyModelBuilder<TResult>) classModelBuilder.getProperty("fullDocument")).codec(codecRegistry.get(fullDocumentClass));
((PropertyModelBuilder<TResult>) classModelBuilder.getProperty("fullDocumentBeforeChange"))
.codec(codecRegistry.get(fullDocumentClass));
ClassModel<ChangeStreamDocument> changeStreamDocumentClassModel = classModelBuilder.build();

PojoCodecProvider provider = PojoCodecProvider.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,25 @@ public enum FullDocument {
* <p>The change stream for partial updates will include both a delta describing the changes to the document as well as a copy of the
* entire document that was changed from <em>some time</em> after the change occurred.</p>
*/
UPDATE_LOOKUP("updateLookup");
UPDATE_LOOKUP("updateLookup"),

/**
* Configures the change stream to return the post-image of the modified document for replace and update change events, if it
* is available.
*
* @since 4.7
* @mongodb.server.release 6.0
*/
WHEN_AVAILABLE("whenAvailable"),

/**
* The same behavior as {@link #WHEN_AVAILABLE} except that an error is raised if the post-image is not available.
*
* @since 4.7
* @mongodb.server.release 6.0
*/
REQUIRED("required");


private final String value;
FullDocument(final String caseFirst) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright 2008-present MongoDB, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.mongodb.client.model.changestream;

import static com.mongodb.assertions.Assertions.assertNotNull;
import static java.lang.String.format;

/**
* Change Stream fullDocumentBeforeChange configuration.
*
* <p>
* Determines what to return for update operations when using a Change Stream. Defaults to {@link FullDocumentBeforeChange#DEFAULT}.
* </p>
*
* @since 4.7
* @mongodb.server.release 6.0
*/
public enum FullDocumentBeforeChange {
/**
* The default value
*/
DEFAULT("default"),

/**
* Configures the change stream to not include the pre-image of the modified document.
*/
OFF("off"),

/**
* Configures the change stream to return the pre-image of the modified document for replace, update, and delete change events if it
* is available.
*/
WHEN_AVAILABLE("whenAvailable"),

/**
* The same behavior as {@link #WHEN_AVAILABLE} except that an error is raised by the server if the pre-image is not available.
*/
REQUIRED("required");


private final String value;

/**
* The string value.
*
* @return the string value
*/
public String getValue() {
return value;
}

FullDocumentBeforeChange(final String value) {
this.value = value;
}

/**
* Returns the FullDocumentBeforeChange from the string value.
*
* @param value the string value.
* @return the full document before change
*/
public static FullDocumentBeforeChange fromString(final String value) {
assertNotNull(value);
for (FullDocumentBeforeChange fullDocumentBeforeChange : FullDocumentBeforeChange.values()) {
if (value.equals(fullDocumentBeforeChange.value)) {
return fullDocumentBeforeChange;
}
}
throw new IllegalArgumentException(format("'%s' is not a valid FullDocumentBeforeChange", value));
}}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.mongodb.MongoNamespace;
import com.mongodb.client.model.Collation;
import com.mongodb.client.model.changestream.FullDocument;
import com.mongodb.client.model.changestream.FullDocumentBeforeChange;
import com.mongodb.internal.async.AsyncAggregateResponseBatchCursor;
import com.mongodb.internal.async.AsyncBatchCursor;
import com.mongodb.internal.async.SingleResultCallback;
Expand Down Expand Up @@ -59,6 +60,7 @@ public class ChangeStreamOperation<T> implements AsyncReadOperation<AsyncBatchCu
private static final RawBsonDocumentCodec RAW_BSON_DOCUMENT_CODEC = new RawBsonDocumentCodec();
private final AggregateOperationImpl<RawBsonDocument> wrapped;
private final FullDocument fullDocument;
private final FullDocumentBeforeChange fullDocumentBeforeChange;
private final Decoder<T> decoder;
private final ChangeStreamLevel changeStreamLevel;

Expand All @@ -74,9 +76,9 @@ public class ChangeStreamOperation<T> implements AsyncReadOperation<AsyncBatchCu
* @param pipeline the aggregation pipeline.
* @param decoder the decoder for the result documents.
*/
public ChangeStreamOperation(final MongoNamespace namespace, final FullDocument fullDocument, final List<BsonDocument> pipeline,
final Decoder<T> decoder) {
this(namespace, fullDocument, pipeline, decoder, ChangeStreamLevel.COLLECTION);
public ChangeStreamOperation(final MongoNamespace namespace, final FullDocument fullDocument,
final FullDocumentBeforeChange fullDocumentBeforeChange, final List<BsonDocument> pipeline, final Decoder<T> decoder) {
this(namespace, fullDocument, fullDocumentBeforeChange, pipeline, decoder, ChangeStreamLevel.COLLECTION);
}

/**
Expand All @@ -90,11 +92,13 @@ public ChangeStreamOperation(final MongoNamespace namespace, final FullDocument
*
* @since 3.8
*/
public ChangeStreamOperation(final MongoNamespace namespace, final FullDocument fullDocument, final List<BsonDocument> pipeline,
final Decoder<T> decoder, final ChangeStreamLevel changeStreamLevel) {
public ChangeStreamOperation(final MongoNamespace namespace, final FullDocument fullDocument,
final FullDocumentBeforeChange fullDocumentBeforeChange, final List<BsonDocument> pipeline,
final Decoder<T> decoder, final ChangeStreamLevel changeStreamLevel) {
this.wrapped = new AggregateOperationImpl<RawBsonDocument>(namespace, pipeline, RAW_BSON_DOCUMENT_CODEC,
getAggregateTarget(), getPipelineCreator());
this.fullDocument = notNull("fullDocument", fullDocument);
this.fullDocumentBeforeChange = notNull("fullDocumentBeforeChange", fullDocumentBeforeChange);
this.decoder = notNull("decoder", decoder);
this.changeStreamLevel = notNull("changeStreamLevel", changeStreamLevel);
}
Expand Down Expand Up @@ -432,6 +436,9 @@ public BsonArray create() {
if (fullDocument != FullDocument.DEFAULT) {
changeStream.append("fullDocument", new BsonString(fullDocument.getValue()));
}
if (fullDocumentBeforeChange != FullDocumentBeforeChange.DEFAULT) {
changeStream.append("fullDocumentBeforeChange", new BsonString(fullDocumentBeforeChange.getValue()));
}

if (changeStreamLevel == ChangeStreamLevel.CLIENT) {
changeStream.append("allChangesForCluster", BsonBoolean.TRUE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import com.mongodb.OperationFunctionalSpecification
import com.mongodb.WriteConcern
import com.mongodb.client.model.CreateCollectionOptions
import com.mongodb.client.model.changestream.FullDocument
import com.mongodb.client.model.changestream.FullDocumentBeforeChange
import com.mongodb.client.test.CollectionHelper
import org.bson.BsonArray
import org.bson.BsonDocument
Expand Down Expand Up @@ -52,7 +53,8 @@ class ChangeStreamOperationProseTestSpecification extends OperationFunctionalSpe
given:
def helper = getHelper()
def pipeline = [BsonDocument.parse('{$project: {"_id": 0}}')]
def operation = new ChangeStreamOperation<BsonDocument>(helper.getNamespace(), FullDocument.DEFAULT, pipeline, CODEC)
def operation = new ChangeStreamOperation<BsonDocument>(helper.getNamespace(), FullDocument.DEFAULT,
FullDocumentBeforeChange.DEFAULT, pipeline, CODEC)

when:
def cursor = execute(operation, async)
Expand Down Expand Up @@ -88,7 +90,8 @@ class ChangeStreamOperationProseTestSpecification extends OperationFunctionalSpe

def pipeline = [BsonDocument.parse('{$match: {operationType: "insert"}}')]
def failPointDocument = createFailPointDocument('getMore', 10107)
def operation = new ChangeStreamOperation<BsonDocument>(helper.getNamespace(), FullDocument.DEFAULT, pipeline, CODEC)
def operation = new ChangeStreamOperation<BsonDocument>(helper.getNamespace(), FullDocument.DEFAULT,
FullDocumentBeforeChange.DEFAULT, pipeline, CODEC)

def cursor = execute(operation, async)

Expand Down Expand Up @@ -120,7 +123,8 @@ class ChangeStreamOperationProseTestSpecification extends OperationFunctionalSpe
def 'should not resume for aggregation errors'() {
given:
def pipeline = [BsonDocument.parse('{$unsupportedStage: {_id: 0}}')]
def operation = new ChangeStreamOperation<BsonDocument>(helper.getNamespace(), FullDocument.DEFAULT, pipeline, CODEC)
def operation = new ChangeStreamOperation<BsonDocument>(helper.getNamespace(), FullDocument.DEFAULT,
FullDocumentBeforeChange.DEFAULT, pipeline, CODEC)

when:
def cursor = execute(operation, async)
Expand Down
Loading

0 comments on commit 3bbad88

Please sign in to comment.