Skip to content

Commit e4f39e1

Browse files
authored
Support $changeStreamSplitLargeEvent (#1159)
JAVA-4955
1 parent b212fb8 commit e4f39e1

File tree

8 files changed

+263
-91
lines changed

8 files changed

+263
-91
lines changed

driver-core/src/main/com/mongodb/client/model/changestream/ChangeStreamDocument.java

Lines changed: 98 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ public final class ChangeStreamDocument<TDocument> {
5959
private final BsonInt64 txnNumber;
6060
private final BsonDocument lsid;
6161
private final BsonDateTime wallTime;
62+
private final SplitEvent splitEvent;
6263
@BsonExtraElements
6364
private final BsonDocument extraElements;
6465

@@ -77,12 +78,14 @@ public final class ChangeStreamDocument<TDocument> {
7778
* @param txnNumber the transaction number
7879
* @param lsid the identifier for the session associated with the transaction
7980
* @param wallTime the wall time of the server at the moment the change occurred
81+
* @param splitEvent the split event
8082
* @param extraElements any extra elements that are part of the change stream document but not otherwise mapped to fields
8183
*
82-
* @since 4.7
84+
* @since 4.11
8385
*/
8486
@BsonCreator
85-
public ChangeStreamDocument(@BsonProperty("operationType") final String operationTypeString,
87+
public ChangeStreamDocument(
88+
@Nullable @BsonProperty("operationType") final String operationTypeString,
8689
@BsonProperty("resumeToken") final BsonDocument resumeToken,
8790
@Nullable @BsonProperty("ns") final BsonDocument namespaceDocument,
8891
@Nullable @BsonProperty("to") final BsonDocument destinationNamespaceDocument,
@@ -94,6 +97,7 @@ public ChangeStreamDocument(@BsonProperty("operationType") final String operatio
9497
@Nullable @BsonProperty("txnNumber") final BsonInt64 txnNumber,
9598
@Nullable @BsonProperty("lsid") final BsonDocument lsid,
9699
@Nullable @BsonProperty("wallTime") final BsonDateTime wallTime,
100+
@Nullable @BsonProperty("splitEvent") final SplitEvent splitEvent,
97101
@Nullable @BsonProperty final BsonDocument extraElements) {
98102
this.resumeToken = resumeToken;
99103
this.namespaceDocument = namespaceDocument;
@@ -103,14 +107,52 @@ public ChangeStreamDocument(@BsonProperty("operationType") final String operatio
103107
this.fullDocument = fullDocument;
104108
this.clusterTime = clusterTime;
105109
this.operationTypeString = operationTypeString;
106-
this.operationType = OperationType.fromString(operationTypeString);
110+
this.operationType = operationTypeString == null ? null : OperationType.fromString(operationTypeString);
107111
this.updateDescription = updateDescription;
108112
this.txnNumber = txnNumber;
109113
this.lsid = lsid;
110114
this.wallTime = wallTime;
115+
this.splitEvent = splitEvent;
111116
this.extraElements = extraElements;
112117
}
113118

119+
/**
120+
* Creates a new instance
121+
*
122+
* @param operationTypeString the operation type
123+
* @param resumeToken the resume token
124+
* @param namespaceDocument the BsonDocument representing the namespace
125+
* @param destinationNamespaceDocument the BsonDocument representing the destinatation namespace
126+
* @param fullDocument the full document
127+
* @param fullDocumentBeforeChange the full document before change
128+
* @param documentKey a document containing the _id of the changed document
129+
* @param clusterTime the cluster time at which the change occured
130+
* @param updateDescription the update description
131+
* @param txnNumber the transaction number
132+
* @param lsid the identifier for the session associated with the transaction
133+
* @param wallTime the wall time of the server at the moment the change occurred
134+
* @param extraElements any extra elements that are part of the change stream document but not otherwise mapped to fields
135+
*
136+
* @since 4.7
137+
*/
138+
@Deprecated
139+
public ChangeStreamDocument(@BsonProperty("operationType") final String operationTypeString,
140+
@BsonProperty("resumeToken") final BsonDocument resumeToken,
141+
@Nullable @BsonProperty("ns") final BsonDocument namespaceDocument,
142+
@Nullable @BsonProperty("to") final BsonDocument destinationNamespaceDocument,
143+
@Nullable @BsonProperty("fullDocument") final TDocument fullDocument,
144+
@Nullable @BsonProperty("fullDocumentBeforeChange") final TDocument fullDocumentBeforeChange,
145+
@Nullable @BsonProperty("documentKey") final BsonDocument documentKey,
146+
@Nullable @BsonProperty("clusterTime") final BsonTimestamp clusterTime,
147+
@Nullable @BsonProperty("updateDescription") final UpdateDescription updateDescription,
148+
@Nullable @BsonProperty("txnNumber") final BsonInt64 txnNumber,
149+
@Nullable @BsonProperty("lsid") final BsonDocument lsid,
150+
@Nullable @BsonProperty("wallTime") final BsonDateTime wallTime,
151+
@Nullable @BsonProperty final BsonDocument extraElements) {
152+
this(operationTypeString, resumeToken, namespaceDocument, destinationNamespaceDocument, fullDocument, fullDocumentBeforeChange, documentKey,
153+
clusterTime, updateDescription, txnNumber, lsid, wallTime, null, extraElements);
154+
}
155+
114156
/**
115157
* Creates a new instance
116158
*
@@ -139,7 +181,7 @@ public ChangeStreamDocument(@BsonProperty("operationType") final String operatio
139181
@Nullable @BsonProperty("txnNumber") final BsonInt64 txnNumber,
140182
@Nullable @BsonProperty("lsid") final BsonDocument lsid) {
141183
this(operationTypeString, resumeToken, namespaceDocument, destinationNamespaceDocument, fullDocument, null, documentKey,
142-
clusterTime, updateDescription, txnNumber, lsid, null, null);
184+
clusterTime, updateDescription, txnNumber, lsid, null, null, null);
143185
}
144186

145187
/**
@@ -170,7 +212,7 @@ public ChangeStreamDocument(final OperationType operationType,
170212
final BsonInt64 txnNumber,
171213
final BsonDocument lsid) {
172214
this(operationType.getValue(), resumeToken, namespaceDocument, destinationNamespaceDocument, fullDocument, null, documentKey,
173-
clusterTime, updateDescription, txnNumber, lsid, null, null);
215+
clusterTime, updateDescription, txnNumber, lsid, null, null, null);
174216
}
175217

176218
/**
@@ -360,26 +402,30 @@ public BsonTimestamp getClusterTime() {
360402

361403
/**
362404
* Returns the operation type as a string.
363-
*
364405
* <p>
365406
* This method is useful when using a driver release that has not yet been updated to include a newer operation type in the
366407
* {@link OperationType} enum. In that case, {@link #getOperationType()} will return {@link OperationType#OTHER} and this method can
367408
* be used to retrieve the actual operation type as a string value.
368-
* </p>
409+
* <p>
410+
* May return null only if <code>$changeStreamSplitLargeEvent</code> is used.
369411
*
370412
* @return the operation type as a string
371413
* @since 4.6
372414
* @see #getOperationType()
373415
*/
416+
@Nullable
374417
public String getOperationTypeString() {
375418
return operationTypeString;
376419
}
377420

378421
/**
379-
* Returns the operationType
422+
* Returns the operationType.
423+
* <p>
424+
* May return null only if <code>$changeStreamSplitLargeEvent</code> is used.
380425
*
381426
* @return the operationType
382427
*/
428+
@Nullable
383429
public OperationType getOperationType() {
384430
return operationType;
385431
}
@@ -430,6 +476,18 @@ public BsonDateTime getWallTime() {
430476
return wallTime;
431477
}
432478

479+
/**
480+
* The split event.
481+
*
482+
* @return the split event
483+
* @since 4.11
484+
* @mongodb.server.release 6.0.9
485+
*/
486+
@Nullable
487+
public SplitEvent getSplitEvent() {
488+
return splitEvent;
489+
}
490+
433491
/**
434492
* Any extra elements that are part of the change stream document but not otherwise mapped to fields.
435493
*
@@ -462,64 +520,42 @@ public boolean equals(final Object o) {
462520
if (o == null || getClass() != o.getClass()) {
463521
return false;
464522
}
465-
466523
ChangeStreamDocument<?> that = (ChangeStreamDocument<?>) o;
467-
468-
if (!Objects.equals(resumeToken, that.resumeToken)) {
469-
return false;
470-
}
471-
if (!Objects.equals(namespaceDocument, that.namespaceDocument)) {
472-
return false;
473-
}
474-
if (!Objects.equals(destinationNamespaceDocument, that.destinationNamespaceDocument)) {
475-
return false;
476-
}
477-
if (!Objects.equals(fullDocument, that.fullDocument)) {
478-
return false;
479-
}
480-
if (!Objects.equals(fullDocumentBeforeChange, that.fullDocumentBeforeChange)) {
481-
return false;
482-
}
483-
if (!Objects.equals(documentKey, that.documentKey)) {
484-
return false;
485-
}
486-
if (!Objects.equals(operationTypeString, that.operationTypeString)) {
487-
return false;
488-
}
489-
if (!Objects.equals(clusterTime, that.clusterTime)) {
490-
return false;
491-
}
492-
if (!Objects.equals(updateDescription, that.updateDescription)) {
493-
return false;
494-
}
495-
if (!Objects.equals(txnNumber, that.txnNumber)) {
496-
return false;
497-
}
498-
if (!Objects.equals(lsid, that.lsid)) {
499-
return false;
500-
}
501-
if (!Objects.equals(wallTime, that.wallTime)) {
502-
return false;
503-
}
504-
505-
return true;
524+
return Objects.equals(resumeToken, that.resumeToken)
525+
&& Objects.equals(namespaceDocument, that.namespaceDocument)
526+
&& Objects.equals(destinationNamespaceDocument, that.destinationNamespaceDocument)
527+
&& Objects.equals(fullDocument, that.fullDocument)
528+
&& Objects.equals(fullDocumentBeforeChange, that.fullDocumentBeforeChange)
529+
&& Objects.equals(documentKey, that.documentKey)
530+
&& Objects.equals(clusterTime, that.clusterTime)
531+
&& Objects.equals(operationTypeString, that.operationTypeString)
532+
// operationType covered by operationTypeString
533+
&& Objects.equals(updateDescription, that.updateDescription)
534+
&& Objects.equals(txnNumber, that.txnNumber)
535+
&& Objects.equals(lsid, that.lsid)
536+
&& Objects.equals(wallTime, that.wallTime)
537+
&& Objects.equals(splitEvent, that.splitEvent)
538+
&& Objects.equals(extraElements, that.extraElements);
506539
}
507540

508541
@Override
509542
public int hashCode() {
510-
int result = resumeToken != null ? resumeToken.hashCode() : 0;
511-
result = 31 * result + (namespaceDocument != null ? namespaceDocument.hashCode() : 0);
512-
result = 31 * result + (destinationNamespaceDocument != null ? destinationNamespaceDocument.hashCode() : 0);
513-
result = 31 * result + (fullDocument != null ? fullDocument.hashCode() : 0);
514-
result = 31 * result + (fullDocumentBeforeChange != null ? fullDocumentBeforeChange.hashCode() : 0);
515-
result = 31 * result + (documentKey != null ? documentKey.hashCode() : 0);
516-
result = 31 * result + (clusterTime != null ? clusterTime.hashCode() : 0);
517-
result = 31 * result + (operationTypeString != null ? operationTypeString.hashCode() : 0);
518-
result = 31 * result + (updateDescription != null ? updateDescription.hashCode() : 0);
519-
result = 31 * result + (txnNumber != null ? txnNumber.hashCode() : 0);
520-
result = 31 * result + (lsid != null ? lsid.hashCode() : 0);
521-
result = 31 * result + (wallTime != null ? wallTime.hashCode() : 0);
522-
return result;
543+
return Objects.hash(
544+
resumeToken,
545+
namespaceDocument,
546+
destinationNamespaceDocument,
547+
fullDocument,
548+
fullDocumentBeforeChange,
549+
documentKey,
550+
clusterTime,
551+
operationTypeString,
552+
// operationType covered by operationTypeString
553+
updateDescription,
554+
txnNumber,
555+
lsid,
556+
wallTime,
557+
splitEvent,
558+
extraElements);
523559
}
524560

525561
@Override
@@ -536,6 +572,7 @@ public String toString() {
536572
+ ", updateDescription=" + updateDescription
537573
+ ", txnNumber=" + txnNumber
538574
+ ", lsid=" + lsid
575+
+ ", splitEvent=" + splitEvent
539576
+ ", wallTime=" + wallTime
540577
+ "}";
541578
}

driver-core/src/main/com/mongodb/client/model/changestream/ChangeStreamDocumentCodec.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ final class ChangeStreamDocumentCodec<TResult> implements Codec<ChangeStreamDocu
5050
PojoCodecProvider provider = PojoCodecProvider.builder()
5151
.register(MongoNamespace.class)
5252
.register(UpdateDescription.class)
53+
.register(SplitEvent.class)
5354
.register(TruncatedArray.class)
5455
.register(changeStreamDocumentClassModel)
5556
.build();
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
* Copyright 2008-present MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.mongodb.client.model.changestream;
18+
19+
import org.bson.codecs.pojo.annotations.BsonCreator;
20+
import org.bson.codecs.pojo.annotations.BsonProperty;
21+
22+
import java.util.Objects;
23+
24+
/**
25+
* The current fragment, out of the total number of fragments.
26+
* When the change stream's backing aggregation pipeline contains the
27+
* <code>$changeStreamSplitLargeEvent</code> stage, events larger than 16MB
28+
* will be split into multiple events.
29+
*
30+
* @since 4.11
31+
* @mongodb.server.release 6.0.9
32+
* @mongodb.driver.manual reference/operator/aggregation/changeStreamSplitLargeEvent/ $changeStreamSplitLargeEvent
33+
*/
34+
public final class SplitEvent {
35+
private final int fragment;
36+
private final int of;
37+
38+
@BsonCreator
39+
public SplitEvent(
40+
@BsonProperty("fragment") final int fragment,
41+
@BsonProperty("of") final int of) {
42+
this.fragment = fragment;
43+
this.of = of;
44+
}
45+
46+
/**
47+
* Which 1-based fragment this is, out of the total number of fragments.
48+
* @return the fragment number
49+
*/
50+
public int getFragment() {
51+
return fragment;
52+
}
53+
54+
/**
55+
* The total number of fragments.
56+
* @return the total number of fragments.
57+
*/
58+
public int getOf() {
59+
return of;
60+
}
61+
62+
@Override
63+
public boolean equals(final Object o) {
64+
if (this == o) {
65+
return true;
66+
}
67+
if (o == null || getClass() != o.getClass()) {
68+
return false;
69+
}
70+
SplitEvent that = (SplitEvent) o;
71+
return fragment == that.fragment && of == that.of;
72+
}
73+
74+
@Override
75+
public int hashCode() {
76+
return Objects.hash(fragment, of);
77+
}
78+
79+
@Override
80+
public String toString() {
81+
return "SplitEvent{"
82+
+ "fragment=" + fragment
83+
+ ", of=" + of
84+
+ '}';
85+
}
86+
}

driver-core/src/test/functional/com/mongodb/client/CrudTestHelper.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,4 +65,12 @@ private static void replaceTypeAssertionWithActual(final BsonArray expected, fin
6565
}
6666
private CrudTestHelper() {
6767
}
68+
69+
public static String repeat(final int times, final String s) {
70+
StringBuilder builder = new StringBuilder(times);
71+
for (int i = 0; i < times; i++) {
72+
builder.append(s);
73+
}
74+
return builder.toString();
75+
}
6876
}

driver-core/src/test/functional/com/mongodb/internal/connection/ClientMetadataHelperProseTest.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import java.util.ArrayList;
3434
import java.util.List;
3535

36+
import static com.mongodb.client.CrudTestHelper.repeat;
3637
import static com.mongodb.client.WithWrapper.withWrapper;
3738
import static com.mongodb.internal.connection.ClientMetadataHelper.createClientMetadataDocument;
3839
import static com.mongodb.internal.connection.ClientMetadataHelper.getOperatingSystemType;
@@ -321,12 +322,4 @@ private static String join(final String first, final List<String> rest) {
321322
result.addAll(rest);
322323
return String.join(separator, result);
323324
}
324-
325-
public static String repeat(final int times, final String s) {
326-
StringBuilder builder = new StringBuilder(times);
327-
for (int i = 0; i < times; i++) {
328-
builder.append(s);
329-
}
330-
return builder.toString();
331-
}
332325
}

0 commit comments

Comments
 (0)