-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Support $changeStreamSplitLargeEvent #1159
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -59,6 +59,7 @@ public final class ChangeStreamDocument<TDocument> { | |
private final BsonInt64 txnNumber; | ||
private final BsonDocument lsid; | ||
private final BsonDateTime wallTime; | ||
private final SplitEvent splitEvent; | ||
@BsonExtraElements | ||
private final BsonDocument extraElements; | ||
|
||
|
@@ -77,12 +78,14 @@ public final class ChangeStreamDocument<TDocument> { | |
* @param txnNumber the transaction number | ||
* @param lsid the identifier for the session associated with the transaction | ||
* @param wallTime the wall time of the server at the moment the change occurred | ||
* @param splitEvent the split event | ||
* @param extraElements any extra elements that are part of the change stream document but not otherwise mapped to fields | ||
* | ||
* @since 4.7 | ||
* @since 4.11 | ||
*/ | ||
@BsonCreator | ||
public ChangeStreamDocument(@BsonProperty("operationType") final String operationTypeString, | ||
public ChangeStreamDocument( | ||
@Nullable @BsonProperty("operationType") final String operationTypeString, | ||
@BsonProperty("resumeToken") final BsonDocument resumeToken, | ||
@Nullable @BsonProperty("ns") final BsonDocument namespaceDocument, | ||
@Nullable @BsonProperty("to") final BsonDocument destinationNamespaceDocument, | ||
|
@@ -94,6 +97,7 @@ public ChangeStreamDocument(@BsonProperty("operationType") final String operatio | |
@Nullable @BsonProperty("txnNumber") final BsonInt64 txnNumber, | ||
@Nullable @BsonProperty("lsid") final BsonDocument lsid, | ||
@Nullable @BsonProperty("wallTime") final BsonDateTime wallTime, | ||
@Nullable @BsonProperty("splitEvent") final SplitEvent splitEvent, | ||
@Nullable @BsonProperty final BsonDocument extraElements) { | ||
this.resumeToken = resumeToken; | ||
this.namespaceDocument = namespaceDocument; | ||
|
@@ -103,14 +107,52 @@ public ChangeStreamDocument(@BsonProperty("operationType") final String operatio | |
this.fullDocument = fullDocument; | ||
this.clusterTime = clusterTime; | ||
this.operationTypeString = operationTypeString; | ||
this.operationType = OperationType.fromString(operationTypeString); | ||
this.operationType = operationTypeString == null ? null : OperationType.fromString(operationTypeString); | ||
this.updateDescription = updateDescription; | ||
this.txnNumber = txnNumber; | ||
this.lsid = lsid; | ||
this.wallTime = wallTime; | ||
this.splitEvent = splitEvent; | ||
this.extraElements = extraElements; | ||
} | ||
|
||
/** | ||
* Creates a new instance | ||
* | ||
* @param operationTypeString the operation type | ||
* @param resumeToken the resume token | ||
* @param namespaceDocument the BsonDocument representing the namespace | ||
* @param destinationNamespaceDocument the BsonDocument representing the destinatation namespace | ||
* @param fullDocument the full document | ||
* @param fullDocumentBeforeChange the full document before change | ||
* @param documentKey a document containing the _id of the changed document | ||
* @param clusterTime the cluster time at which the change occured | ||
* @param updateDescription the update description | ||
* @param txnNumber the transaction number | ||
* @param lsid the identifier for the session associated with the transaction | ||
* @param wallTime the wall time of the server at the moment the change occurred | ||
* @param extraElements any extra elements that are part of the change stream document but not otherwise mapped to fields | ||
* | ||
* @since 4.7 | ||
*/ | ||
@Deprecated | ||
public ChangeStreamDocument(@BsonProperty("operationType") final String operationTypeString, | ||
Comment on lines
+138
to
+139
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is an unfortunate proliferation of constructors, but it seems in line with what was done previously in this class. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we include a "@deprecated" tag to specify the reason for deprecation? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added the following just now,
but when I read it, it seemed too obvious, like "@deprecated use something that isn't deprecated". I think one convention (apart from not including the tag) is to link to the preferred replacement, but here, I do not think we should do this since there have already been N replacements, so it seemed like a link would become outdated. If you still think this is worth including, I don't feel too strongly, but let me know the wording. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Before the server change supported in this PR, the 1 Bizarrely, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Based on https://mongodb.slack.com/archives/C72LB5RPV/p1689609952334209, it looks like the work required to start supporting There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll pause on this part until the changes are completed. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should I resolve this, or is there something you are still waiting for? |
||
@BsonProperty("resumeToken") final BsonDocument resumeToken, | ||
@Nullable @BsonProperty("ns") final BsonDocument namespaceDocument, | ||
@Nullable @BsonProperty("to") final BsonDocument destinationNamespaceDocument, | ||
@Nullable @BsonProperty("fullDocument") final TDocument fullDocument, | ||
@Nullable @BsonProperty("fullDocumentBeforeChange") final TDocument fullDocumentBeforeChange, | ||
@Nullable @BsonProperty("documentKey") final BsonDocument documentKey, | ||
@Nullable @BsonProperty("clusterTime") final BsonTimestamp clusterTime, | ||
@Nullable @BsonProperty("updateDescription") final UpdateDescription updateDescription, | ||
@Nullable @BsonProperty("txnNumber") final BsonInt64 txnNumber, | ||
@Nullable @BsonProperty("lsid") final BsonDocument lsid, | ||
@Nullable @BsonProperty("wallTime") final BsonDateTime wallTime, | ||
@Nullable @BsonProperty final BsonDocument extraElements) { | ||
this(operationTypeString, resumeToken, namespaceDocument, destinationNamespaceDocument, fullDocument, fullDocumentBeforeChange, documentKey, | ||
clusterTime, updateDescription, txnNumber, lsid, wallTime, null, extraElements); | ||
} | ||
|
||
/** | ||
* Creates a new instance | ||
* | ||
|
@@ -139,7 +181,7 @@ public ChangeStreamDocument(@BsonProperty("operationType") final String operatio | |
@Nullable @BsonProperty("txnNumber") final BsonInt64 txnNumber, | ||
@Nullable @BsonProperty("lsid") final BsonDocument lsid) { | ||
this(operationTypeString, resumeToken, namespaceDocument, destinationNamespaceDocument, fullDocument, null, documentKey, | ||
clusterTime, updateDescription, txnNumber, lsid, null, null); | ||
clusterTime, updateDescription, txnNumber, lsid, null, null, null); | ||
} | ||
|
||
/** | ||
|
@@ -170,7 +212,7 @@ public ChangeStreamDocument(final OperationType operationType, | |
final BsonInt64 txnNumber, | ||
final BsonDocument lsid) { | ||
this(operationType.getValue(), resumeToken, namespaceDocument, destinationNamespaceDocument, fullDocument, null, documentKey, | ||
clusterTime, updateDescription, txnNumber, lsid, null, null); | ||
clusterTime, updateDescription, txnNumber, lsid, null, null, null); | ||
} | ||
|
||
/** | ||
|
@@ -360,26 +402,30 @@ public BsonTimestamp getClusterTime() { | |
|
||
/** | ||
* Returns the operation type as a string. | ||
* | ||
* <p> | ||
* This method is useful when using a driver release that has not yet been updated to include a newer operation type in the | ||
* {@link OperationType} enum. In that case, {@link #getOperationType()} will return {@link OperationType#OTHER} and this method can | ||
* be used to retrieve the actual operation type as a string value. | ||
* </p> | ||
* <p> | ||
* May return null only if <code>$changeStreamSplitLargeEvent</code> is used. | ||
* | ||
* @return the operation type as a string | ||
* @since 4.6 | ||
* @see #getOperationType() | ||
*/ | ||
@Nullable | ||
public String getOperationTypeString() { | ||
return operationTypeString; | ||
} | ||
|
||
/** | ||
* Returns the operationType | ||
* Returns the operationType. | ||
* <p> | ||
* May return null only if <code>$changeStreamSplitLargeEvent</code> is used. | ||
* | ||
* @return the operationType | ||
*/ | ||
@Nullable | ||
stIncMale marked this conversation as resolved.
Show resolved
Hide resolved
|
||
public OperationType getOperationType() { | ||
return operationType; | ||
} | ||
|
@@ -430,6 +476,18 @@ public BsonDateTime getWallTime() { | |
return wallTime; | ||
} | ||
|
||
/** | ||
* The split event. | ||
* | ||
* @return the split event | ||
* @since 4.11 | ||
* @mongodb.server.release 6.0.9 | ||
*/ | ||
@Nullable | ||
public SplitEvent getSplitEvent() { | ||
return splitEvent; | ||
} | ||
|
||
/** | ||
* Any extra elements that are part of the change stream document but not otherwise mapped to fields. | ||
* | ||
|
@@ -462,64 +520,42 @@ public boolean equals(final Object o) { | |
if (o == null || getClass() != o.getClass()) { | ||
return false; | ||
} | ||
|
||
ChangeStreamDocument<?> that = (ChangeStreamDocument<?>) o; | ||
|
||
if (!Objects.equals(resumeToken, that.resumeToken)) { | ||
return false; | ||
} | ||
if (!Objects.equals(namespaceDocument, that.namespaceDocument)) { | ||
return false; | ||
} | ||
if (!Objects.equals(destinationNamespaceDocument, that.destinationNamespaceDocument)) { | ||
return false; | ||
} | ||
if (!Objects.equals(fullDocument, that.fullDocument)) { | ||
return false; | ||
} | ||
if (!Objects.equals(fullDocumentBeforeChange, that.fullDocumentBeforeChange)) { | ||
return false; | ||
} | ||
if (!Objects.equals(documentKey, that.documentKey)) { | ||
return false; | ||
} | ||
if (!Objects.equals(operationTypeString, that.operationTypeString)) { | ||
return false; | ||
} | ||
if (!Objects.equals(clusterTime, that.clusterTime)) { | ||
return false; | ||
} | ||
if (!Objects.equals(updateDescription, that.updateDescription)) { | ||
return false; | ||
} | ||
if (!Objects.equals(txnNumber, that.txnNumber)) { | ||
return false; | ||
} | ||
if (!Objects.equals(lsid, that.lsid)) { | ||
return false; | ||
} | ||
if (!Objects.equals(wallTime, that.wallTime)) { | ||
return false; | ||
} | ||
|
||
return true; | ||
return Objects.equals(resumeToken, that.resumeToken) | ||
&& Objects.equals(namespaceDocument, that.namespaceDocument) | ||
&& Objects.equals(destinationNamespaceDocument, that.destinationNamespaceDocument) | ||
&& Objects.equals(fullDocument, that.fullDocument) | ||
&& Objects.equals(fullDocumentBeforeChange, that.fullDocumentBeforeChange) | ||
&& Objects.equals(documentKey, that.documentKey) | ||
&& Objects.equals(clusterTime, that.clusterTime) | ||
&& Objects.equals(operationTypeString, that.operationTypeString) | ||
// operationType covered by operationTypeString | ||
&& Objects.equals(updateDescription, that.updateDescription) | ||
&& Objects.equals(txnNumber, that.txnNumber) | ||
&& Objects.equals(lsid, that.lsid) | ||
&& Objects.equals(wallTime, that.wallTime) | ||
&& Objects.equals(splitEvent, that.splitEvent) | ||
&& Objects.equals(extraElements, that.extraElements); | ||
} | ||
|
||
@Override | ||
public int hashCode() { | ||
int result = resumeToken != null ? resumeToken.hashCode() : 0; | ||
result = 31 * result + (namespaceDocument != null ? namespaceDocument.hashCode() : 0); | ||
result = 31 * result + (destinationNamespaceDocument != null ? destinationNamespaceDocument.hashCode() : 0); | ||
result = 31 * result + (fullDocument != null ? fullDocument.hashCode() : 0); | ||
result = 31 * result + (fullDocumentBeforeChange != null ? fullDocumentBeforeChange.hashCode() : 0); | ||
result = 31 * result + (documentKey != null ? documentKey.hashCode() : 0); | ||
result = 31 * result + (clusterTime != null ? clusterTime.hashCode() : 0); | ||
result = 31 * result + (operationTypeString != null ? operationTypeString.hashCode() : 0); | ||
result = 31 * result + (updateDescription != null ? updateDescription.hashCode() : 0); | ||
result = 31 * result + (txnNumber != null ? txnNumber.hashCode() : 0); | ||
result = 31 * result + (lsid != null ? lsid.hashCode() : 0); | ||
result = 31 * result + (wallTime != null ? wallTime.hashCode() : 0); | ||
return result; | ||
return Objects.hash( | ||
resumeToken, | ||
namespaceDocument, | ||
destinationNamespaceDocument, | ||
fullDocument, | ||
fullDocumentBeforeChange, | ||
documentKey, | ||
clusterTime, | ||
operationTypeString, | ||
// operationType covered by operationTypeString | ||
updateDescription, | ||
txnNumber, | ||
lsid, | ||
wallTime, | ||
splitEvent, | ||
extraElements); | ||
} | ||
|
||
@Override | ||
|
@@ -536,6 +572,7 @@ public String toString() { | |
+ ", updateDescription=" + updateDescription | ||
+ ", txnNumber=" + txnNumber | ||
+ ", lsid=" + lsid | ||
+ ", splitEvent=" + splitEvent | ||
+ ", wallTime=" + wallTime | ||
+ "}"; | ||
} | ||
|
stIncMale marked this conversation as resolved.
Show resolved
Hide resolved
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,86 @@ | ||
/* | ||
* Copyright 2008-present MongoDB, Inc. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.mongodb.client.model.changestream; | ||
|
||
import org.bson.codecs.pojo.annotations.BsonCreator; | ||
import org.bson.codecs.pojo.annotations.BsonProperty; | ||
|
||
import java.util.Objects; | ||
|
||
/** | ||
* The current fragment, out of the total number of fragments. | ||
* When the change stream's backing aggregation pipeline contains the | ||
* <code>$changeStreamSplitLargeEvent</code> stage, events larger than 16MB | ||
* will be split into multiple events. | ||
* | ||
* @since 4.11 | ||
* @mongodb.server.release 6.0.9 | ||
* @mongodb.driver.manual reference/operator/aggregation/changeStreamSplitLargeEvent/ $changeStreamSplitLargeEvent | ||
*/ | ||
public final class SplitEvent { | ||
private final int fragment; | ||
private final int of; | ||
|
||
@BsonCreator | ||
public SplitEvent( | ||
@BsonProperty("fragment") final int fragment, | ||
@BsonProperty("of") final int of) { | ||
this.fragment = fragment; | ||
this.of = of; | ||
} | ||
|
||
/** | ||
* Which 1-based fragment this is, out of the total number of fragments. | ||
* @return the fragment number | ||
*/ | ||
public int getFragment() { | ||
return fragment; | ||
} | ||
|
||
/** | ||
* The total number of fragments. | ||
* @return the total number of fragments. | ||
*/ | ||
public int getOf() { | ||
return of; | ||
} | ||
|
||
@Override | ||
public boolean equals(final Object o) { | ||
if (this == o) { | ||
return true; | ||
} | ||
if (o == null || getClass() != o.getClass()) { | ||
return false; | ||
} | ||
SplitEvent that = (SplitEvent) o; | ||
return fragment == that.fragment && of == that.of; | ||
} | ||
|
||
@Override | ||
public int hashCode() { | ||
return Objects.hash(fragment, of); | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
return "SplitEvent{" | ||
+ "fragment=" + fragment | ||
+ ", of=" + of | ||
+ '}'; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fromString does not take null; note that it is used in the OperationTypeCodec
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am glad we are not passing
null
toOperationType.fromString
, even though it (arguably, incorrectly) has the code to handlenull
s.