Skip to content

Support $changeStreamSplitLargeEvent #1159

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Aug 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public final class ChangeStreamDocument<TDocument> {
private final BsonInt64 txnNumber;
private final BsonDocument lsid;
private final BsonDateTime wallTime;
private final SplitEvent splitEvent;
@BsonExtraElements
private final BsonDocument extraElements;

Expand All @@ -77,12 +78,14 @@ public final class ChangeStreamDocument<TDocument> {
* @param txnNumber the transaction number
* @param lsid the identifier for the session associated with the transaction
* @param wallTime the wall time of the server at the moment the change occurred
* @param splitEvent the split event
* @param extraElements any extra elements that are part of the change stream document but not otherwise mapped to fields
*
* @since 4.7
* @since 4.11
*/
@BsonCreator
public ChangeStreamDocument(@BsonProperty("operationType") final String operationTypeString,
public ChangeStreamDocument(
@Nullable @BsonProperty("operationType") final String operationTypeString,
@BsonProperty("resumeToken") final BsonDocument resumeToken,
@Nullable @BsonProperty("ns") final BsonDocument namespaceDocument,
@Nullable @BsonProperty("to") final BsonDocument destinationNamespaceDocument,
Expand All @@ -94,6 +97,7 @@ public ChangeStreamDocument(@BsonProperty("operationType") final String operatio
@Nullable @BsonProperty("txnNumber") final BsonInt64 txnNumber,
@Nullable @BsonProperty("lsid") final BsonDocument lsid,
@Nullable @BsonProperty("wallTime") final BsonDateTime wallTime,
@Nullable @BsonProperty("splitEvent") final SplitEvent splitEvent,
@Nullable @BsonProperty final BsonDocument extraElements) {
this.resumeToken = resumeToken;
this.namespaceDocument = namespaceDocument;
Expand All @@ -103,14 +107,52 @@ public ChangeStreamDocument(@BsonProperty("operationType") final String operatio
this.fullDocument = fullDocument;
this.clusterTime = clusterTime;
this.operationTypeString = operationTypeString;
this.operationType = OperationType.fromString(operationTypeString);
this.operationType = operationTypeString == null ? null : OperationType.fromString(operationTypeString);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fromString does not take null; note that it is used in the OperationTypeCodec

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am glad we are not passing null to OperationType.fromString, even though it (arguably, incorrectly) has the code to handle nulls.

this.updateDescription = updateDescription;
this.txnNumber = txnNumber;
this.lsid = lsid;
this.wallTime = wallTime;
this.splitEvent = splitEvent;
this.extraElements = extraElements;
}

/**
* Creates a new instance
*
* @param operationTypeString the operation type
* @param resumeToken the resume token
* @param namespaceDocument the BsonDocument representing the namespace
* @param destinationNamespaceDocument the BsonDocument representing the destinatation namespace
* @param fullDocument the full document
* @param fullDocumentBeforeChange the full document before change
* @param documentKey a document containing the _id of the changed document
* @param clusterTime the cluster time at which the change occured
* @param updateDescription the update description
* @param txnNumber the transaction number
* @param lsid the identifier for the session associated with the transaction
* @param wallTime the wall time of the server at the moment the change occurred
* @param extraElements any extra elements that are part of the change stream document but not otherwise mapped to fields
*
* @since 4.7
*/
@Deprecated
public ChangeStreamDocument(@BsonProperty("operationType") final String operationTypeString,
Comment on lines +138 to +139
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an unfortunate proliferation of constructors, but it seems in line with what was done previously in this class.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we include a "@deprecated" tag to specify the reason for deprecation?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the following just now,

     * @deprecated Prefer a non-deprecated constructor

but when I read it, it seemed too obvious, like "@deprecated use something that isn't deprecated". I think one convention (apart from not including the tag) is to link to the preferred replacement, but here, I do not think we should do this since there have already been N replacements, so it seemed like a link would become outdated.

If you still think this is worth including, I don't feel too strongly, but let me know the wording.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before the server change supported in this PR, the operationType field of a ChangeStreamDocument (it is called a "change event" in the MongoDB docs) was not optional: https://github.com/mongodb/specifications/blob/master/source/change-streams/change-streams.rst#response-format, and was set in all events (to see that one has to click and look through all events documented in https://www.mongodb.com/docs/v7.0/reference/change-events/, which is, no doubt, convenient). However, that field appear to become optional (a breaking change 🤷‍♂️) since MongoDB 7.0 based on the comments in https://jira.mongodb.org/browse/DRIVERS-2617. Given that this is not documented anywhere (neither in https://www.mongodb.com/docs/v7.0/reference/operator/aggregation/changeStreamSplitLargeEvent/ or https://www.mongodb.com/docs/v7.0/reference/operator/aggregation/changeStream/, nor in https://github.com/mongodb/specifications/blob/master/source/change-streams/change-streams.rst, nor anywhere in https://www.mongodb.com/docs/v7.0/reference/change-events/), this should be clarified. If the field indeed becomes optional, we will need to add @Nullable annotations appropriately to ChangeStreamDocument, and either change the documentation of OperationType.OTHER, or add a new OMITTED value.1


1 Bizarrely, OperationType.fromString already handles null references (they correspond to missing operationType fields), even though they could not have been missing, and if they were missing, that would have rendered ChangeStreamDocument.getOperationTypeString broken, because the method must not return null (note how ChangeStreamDocument methods that may return null are annotated with @Nullable, for example, getFullDocument).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on https://mongodb.slack.com/archives/C72LB5RPV/p1689609952334209, it looks like the work required to start supporting $changeStreamSplitLargeEvent in drivers was not done, even though it was supposed to be.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll pause on this part until the changes are completed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should I resolve this, or is there something you are still waiting for?

@BsonProperty("resumeToken") final BsonDocument resumeToken,
@Nullable @BsonProperty("ns") final BsonDocument namespaceDocument,
@Nullable @BsonProperty("to") final BsonDocument destinationNamespaceDocument,
@Nullable @BsonProperty("fullDocument") final TDocument fullDocument,
@Nullable @BsonProperty("fullDocumentBeforeChange") final TDocument fullDocumentBeforeChange,
@Nullable @BsonProperty("documentKey") final BsonDocument documentKey,
@Nullable @BsonProperty("clusterTime") final BsonTimestamp clusterTime,
@Nullable @BsonProperty("updateDescription") final UpdateDescription updateDescription,
@Nullable @BsonProperty("txnNumber") final BsonInt64 txnNumber,
@Nullable @BsonProperty("lsid") final BsonDocument lsid,
@Nullable @BsonProperty("wallTime") final BsonDateTime wallTime,
@Nullable @BsonProperty final BsonDocument extraElements) {
this(operationTypeString, resumeToken, namespaceDocument, destinationNamespaceDocument, fullDocument, fullDocumentBeforeChange, documentKey,
clusterTime, updateDescription, txnNumber, lsid, wallTime, null, extraElements);
}

/**
* Creates a new instance
*
Expand Down Expand Up @@ -139,7 +181,7 @@ public ChangeStreamDocument(@BsonProperty("operationType") final String operatio
@Nullable @BsonProperty("txnNumber") final BsonInt64 txnNumber,
@Nullable @BsonProperty("lsid") final BsonDocument lsid) {
this(operationTypeString, resumeToken, namespaceDocument, destinationNamespaceDocument, fullDocument, null, documentKey,
clusterTime, updateDescription, txnNumber, lsid, null, null);
clusterTime, updateDescription, txnNumber, lsid, null, null, null);
}

/**
Expand Down Expand Up @@ -170,7 +212,7 @@ public ChangeStreamDocument(final OperationType operationType,
final BsonInt64 txnNumber,
final BsonDocument lsid) {
this(operationType.getValue(), resumeToken, namespaceDocument, destinationNamespaceDocument, fullDocument, null, documentKey,
clusterTime, updateDescription, txnNumber, lsid, null, null);
clusterTime, updateDescription, txnNumber, lsid, null, null, null);
}

/**
Expand Down Expand Up @@ -360,26 +402,30 @@ public BsonTimestamp getClusterTime() {

/**
* Returns the operation type as a string.
*
* <p>
* This method is useful when using a driver release that has not yet been updated to include a newer operation type in the
* {@link OperationType} enum. In that case, {@link #getOperationType()} will return {@link OperationType#OTHER} and this method can
* be used to retrieve the actual operation type as a string value.
* </p>
* <p>
* May return null only if <code>$changeStreamSplitLargeEvent</code> is used.
*
* @return the operation type as a string
* @since 4.6
* @see #getOperationType()
*/
@Nullable
public String getOperationTypeString() {
return operationTypeString;
}

/**
* Returns the operationType
* Returns the operationType.
* <p>
* May return null only if <code>$changeStreamSplitLargeEvent</code> is used.
*
* @return the operationType
*/
@Nullable
public OperationType getOperationType() {
return operationType;
}
Expand Down Expand Up @@ -430,6 +476,18 @@ public BsonDateTime getWallTime() {
return wallTime;
}

/**
* The split event.
*
* @return the split event
* @since 4.11
* @mongodb.server.release 6.0.9
*/
@Nullable
public SplitEvent getSplitEvent() {
return splitEvent;
}

/**
* Any extra elements that are part of the change stream document but not otherwise mapped to fields.
*
Expand Down Expand Up @@ -462,64 +520,42 @@ public boolean equals(final Object o) {
if (o == null || getClass() != o.getClass()) {
return false;
}

ChangeStreamDocument<?> that = (ChangeStreamDocument<?>) o;

if (!Objects.equals(resumeToken, that.resumeToken)) {
return false;
}
if (!Objects.equals(namespaceDocument, that.namespaceDocument)) {
return false;
}
if (!Objects.equals(destinationNamespaceDocument, that.destinationNamespaceDocument)) {
return false;
}
if (!Objects.equals(fullDocument, that.fullDocument)) {
return false;
}
if (!Objects.equals(fullDocumentBeforeChange, that.fullDocumentBeforeChange)) {
return false;
}
if (!Objects.equals(documentKey, that.documentKey)) {
return false;
}
if (!Objects.equals(operationTypeString, that.operationTypeString)) {
return false;
}
if (!Objects.equals(clusterTime, that.clusterTime)) {
return false;
}
if (!Objects.equals(updateDescription, that.updateDescription)) {
return false;
}
if (!Objects.equals(txnNumber, that.txnNumber)) {
return false;
}
if (!Objects.equals(lsid, that.lsid)) {
return false;
}
if (!Objects.equals(wallTime, that.wallTime)) {
return false;
}

return true;
return Objects.equals(resumeToken, that.resumeToken)
&& Objects.equals(namespaceDocument, that.namespaceDocument)
&& Objects.equals(destinationNamespaceDocument, that.destinationNamespaceDocument)
&& Objects.equals(fullDocument, that.fullDocument)
&& Objects.equals(fullDocumentBeforeChange, that.fullDocumentBeforeChange)
&& Objects.equals(documentKey, that.documentKey)
&& Objects.equals(clusterTime, that.clusterTime)
&& Objects.equals(operationTypeString, that.operationTypeString)
// operationType covered by operationTypeString
&& Objects.equals(updateDescription, that.updateDescription)
&& Objects.equals(txnNumber, that.txnNumber)
&& Objects.equals(lsid, that.lsid)
&& Objects.equals(wallTime, that.wallTime)
&& Objects.equals(splitEvent, that.splitEvent)
&& Objects.equals(extraElements, that.extraElements);
}

@Override
public int hashCode() {
int result = resumeToken != null ? resumeToken.hashCode() : 0;
result = 31 * result + (namespaceDocument != null ? namespaceDocument.hashCode() : 0);
result = 31 * result + (destinationNamespaceDocument != null ? destinationNamespaceDocument.hashCode() : 0);
result = 31 * result + (fullDocument != null ? fullDocument.hashCode() : 0);
result = 31 * result + (fullDocumentBeforeChange != null ? fullDocumentBeforeChange.hashCode() : 0);
result = 31 * result + (documentKey != null ? documentKey.hashCode() : 0);
result = 31 * result + (clusterTime != null ? clusterTime.hashCode() : 0);
result = 31 * result + (operationTypeString != null ? operationTypeString.hashCode() : 0);
result = 31 * result + (updateDescription != null ? updateDescription.hashCode() : 0);
result = 31 * result + (txnNumber != null ? txnNumber.hashCode() : 0);
result = 31 * result + (lsid != null ? lsid.hashCode() : 0);
result = 31 * result + (wallTime != null ? wallTime.hashCode() : 0);
return result;
return Objects.hash(
resumeToken,
namespaceDocument,
destinationNamespaceDocument,
fullDocument,
fullDocumentBeforeChange,
documentKey,
clusterTime,
operationTypeString,
// operationType covered by operationTypeString
updateDescription,
txnNumber,
lsid,
wallTime,
splitEvent,
extraElements);
}

@Override
Expand All @@ -536,6 +572,7 @@ public String toString() {
+ ", updateDescription=" + updateDescription
+ ", txnNumber=" + txnNumber
+ ", lsid=" + lsid
+ ", splitEvent=" + splitEvent
+ ", wallTime=" + wallTime
+ "}";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ final class ChangeStreamDocumentCodec<TResult> implements Codec<ChangeStreamDocu
PojoCodecProvider provider = PojoCodecProvider.builder()
.register(MongoNamespace.class)
.register(UpdateDescription.class)
.register(SplitEvent.class)
.register(TruncatedArray.class)
.register(changeStreamDocumentClassModel)
.build();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright 2008-present MongoDB, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.mongodb.client.model.changestream;

import org.bson.codecs.pojo.annotations.BsonCreator;
import org.bson.codecs.pojo.annotations.BsonProperty;

import java.util.Objects;

/**
* The current fragment, out of the total number of fragments.
* When the change stream's backing aggregation pipeline contains the
* <code>$changeStreamSplitLargeEvent</code> stage, events larger than 16MB
* will be split into multiple events.
*
* @since 4.11
* @mongodb.server.release 6.0.9
* @mongodb.driver.manual reference/operator/aggregation/changeStreamSplitLargeEvent/ $changeStreamSplitLargeEvent
*/
public final class SplitEvent {
private final int fragment;
private final int of;

@BsonCreator
public SplitEvent(
@BsonProperty("fragment") final int fragment,
@BsonProperty("of") final int of) {
this.fragment = fragment;
this.of = of;
}

/**
* Which 1-based fragment this is, out of the total number of fragments.
* @return the fragment number
*/
public int getFragment() {
return fragment;
}

/**
* The total number of fragments.
* @return the total number of fragments.
*/
public int getOf() {
return of;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SplitEvent that = (SplitEvent) o;
return fragment == that.fragment && of == that.of;
}

@Override
public int hashCode() {
return Objects.hash(fragment, of);
}

@Override
public String toString() {
return "SplitEvent{"
+ "fragment=" + fragment
+ ", of=" + of
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,12 @@ private static void replaceTypeAssertionWithActual(final BsonArray expected, fin
}
private CrudTestHelper() {
}

public static String repeat(final int times, final String s) {
StringBuilder builder = new StringBuilder(times);
for (int i = 0; i < times; i++) {
builder.append(s);
}
return builder.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.ArrayList;
import java.util.List;

import static com.mongodb.client.CrudTestHelper.repeat;
import static com.mongodb.client.WithWrapper.withWrapper;
import static com.mongodb.internal.connection.ClientMetadataHelper.createClientMetadataDocument;
import static com.mongodb.internal.connection.ClientMetadataHelper.getOperatingSystemType;
Expand Down Expand Up @@ -321,12 +322,4 @@ private static String join(final String first, final List<String> rest) {
result.addAll(rest);
return String.join(separator, result);
}

public static String repeat(final int times, final String s) {
StringBuilder builder = new StringBuilder(times);
for (int i = 0; i < times; i++) {
builder.append(s);
}
return builder.toString();
}
}
Loading