Skip to content

Add namespace type support to ChangeStreamDocument. #1736

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ public final class ChangeStreamDocument<TDocument> {
@BsonId()
private final BsonDocument resumeToken;
private final BsonDocument namespaceDocument;

@BsonProperty("nsType")
private final String namespaceTypeString;
@BsonIgnore
private final NamespaceType namespaceType;
private final BsonDocument destinationNamespaceDocument;
private final TDocument fullDocument;
private final TDocument fullDocumentBeforeChange;
Expand All @@ -66,9 +71,10 @@ public final class ChangeStreamDocument<TDocument> {
/**
* Creates a new instance
*
* @param operationTypeString the operation type
* @param operationType the operation type
* @param resumeToken the resume token
* @param namespaceDocument the BsonDocument representing the namespace
* @param namespaceType the namespace type
* @param destinationNamespaceDocument the BsonDocument representing the destinatation namespace
* @param fullDocument the full document
* @param fullDocumentBeforeChange the full document before change
Expand All @@ -85,9 +91,10 @@ public final class ChangeStreamDocument<TDocument> {
*/
@BsonCreator
public ChangeStreamDocument(
@Nullable @BsonProperty("operationType") final String operationTypeString,
@Nullable @BsonProperty("operationType") final String operationType,
@BsonProperty("resumeToken") final BsonDocument resumeToken,
@Nullable @BsonProperty("ns") final BsonDocument namespaceDocument,
@Nullable @BsonProperty("nsType") final String namespaceType,
@Nullable @BsonProperty("to") final BsonDocument destinationNamespaceDocument,
@Nullable @BsonProperty("fullDocument") final TDocument fullDocument,
@Nullable @BsonProperty("fullDocumentBeforeChange") final TDocument fullDocumentBeforeChange,
Expand All @@ -101,12 +108,14 @@ public ChangeStreamDocument(
@Nullable @BsonProperty final BsonDocument extraElements) {
this.resumeToken = resumeToken;
this.namespaceDocument = namespaceDocument;
this.namespaceTypeString = namespaceType;
this.namespaceType = namespaceTypeString == null ? null : NamespaceType.fromString(namespaceType);
this.destinationNamespaceDocument = destinationNamespaceDocument;
this.fullDocumentBeforeChange = fullDocumentBeforeChange;
this.documentKey = documentKey;
this.fullDocument = fullDocument;
this.clusterTime = clusterTime;
this.operationTypeString = operationTypeString;
this.operationTypeString = operationType;
this.operationType = operationTypeString == null ? null : OperationType.fromString(operationTypeString);
this.updateDescription = updateDescription;
this.txnNumber = txnNumber;
Expand Down Expand Up @@ -134,6 +143,8 @@ public BsonDocument getResumeToken() {
*
* @return the namespace. If the namespaceDocument is null or if it is missing either the 'db' or 'coll' keys,
* then this will return null.
* @see #getNamespaceType()
* @see #getNamespaceTypeString()
*/
@BsonIgnore
@Nullable
Expand All @@ -156,13 +167,49 @@ public MongoNamespace getNamespace() {
*
* @return the namespaceDocument
* @since 3.8
* @see #getNamespaceType()
* @see #getNamespaceTypeString()
*/
@BsonProperty("ns")
@Nullable
public BsonDocument getNamespaceDocument() {
return namespaceDocument;
}

/**
* Returns the type of the newly created namespace object as a String, derived from the "nsType" field in a change stream document.
* <p>
* This method is useful when using a driver release that has not yet been updated to include a newer namespace type in the
* {@link NamespaceType} enum. In that case, {@link #getNamespaceType()} will return {@link NamespaceType#OTHER} and this method can
* be used to retrieve the actual namespace type as a string value.
* <p>
* May return null only if <code>$changeStreamSplitLargeEvent</code> is used.
*
* @return the namespace type as a string
* @since 5.6
* @mongodb.server.release 8.1
* @see #getNamespaceType()
* @see #getNamespaceDocument()
*/
@Nullable
public String getNamespaceTypeString() {
return namespaceTypeString;
}

/**
* Returns the type of the newly created namespace object, derived from the "nsType" field in a change stream document.
*
* @return the namespace type.
* @since 5.6
* @mongodb.server.release 8.1
* @see #getNamespaceTypeString()
* @see #getNamespaceDocument()
*/
@Nullable
public NamespaceType getNamespaceType() {
return namespaceType;
}

/**
* Returns the destination namespace, derived from the "to" field in a change stream document.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright 2008-present MongoDB, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.mongodb.client.model.changestream;

import com.mongodb.lang.Nullable;

/**
* Represents the type of the newly created namespace object in change stream events.
* <p>
* Only present for operations of type {@code create} and when the {@code showExpandedEvents}
* change stream option is enabled.
* </p>
*
* @since 5.6
* @mongodb.server.release 8.1
*/
public enum NamespaceType {
COLLECTION("collection"),
TIMESERIES("timeseries"),
VIEW("view"),
/**
* The other namespace type.
*
* <p>A placeholder for newer namespace types issued by the server.
* Users encountering OTHER namespace types are advised to update the driver to get the actual namespace type.</p>
*
* @since 5.6
*/
OTHER("other");

private final String value;
NamespaceType(final String namespaceTypeName) {
this.value = namespaceTypeName;
}

/**
* @return the String representation of the namespace type
*/
public String getValue() {
return value;
}

/**
* Returns the ChangeStreamNamespaceType from the string value.
*
* @param namespaceTypeName the string value.
* @return the namespace type.
*/
public static NamespaceType fromString(@Nullable final String namespaceTypeName) {
if (namespaceTypeName != null) {
for (NamespaceType namespaceType : NamespaceType.values()) {
if (namespaceTypeName.equals(namespaceType.value)) {
return namespaceType;
}
}
}
return OTHER;
}

@Override
public String toString() {
return "NamespaceType{"
+ "value='" + value + "'"
+ "}";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.mongodb.client.model.changestream;

import com.mongodb.lang.Nullable;

/**
* The {@code $changeStream} operation type.
*
Expand Down Expand Up @@ -95,9 +97,9 @@ public String getValue() {
* Returns the ChangeStreamOperationType from the string value.
*
* @param operationTypeName the string value.
* @return the read concern
* @return the operation type.
*/
public static OperationType fromString(final String operationTypeName) {
public static OperationType fromString(@Nullable final String operationTypeName) {
if (operationTypeName != null) {
for (OperationType operationType : OperationType.values()) {
if (operationTypeName.equals(operationType.value)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ class ChangeStreamDocumentCodecSpecification extends Specification {
new ChangeStreamDocument<Document>(OperationType.INSERT.value,
BsonDocument.parse('{token: true}'),
BsonDocument.parse('{db: "engineering", coll: "users"}'),
NamespaceType.COLLECTION.value,
null,
Document.parse('{_id: 1, userName: "alice123", name: "Alice"}'),
null,
Expand All @@ -73,6 +74,7 @@ class ChangeStreamDocumentCodecSpecification extends Specification {
new ChangeStreamDocument<Document>(OperationType.UPDATE.value,
BsonDocument.parse('{token: true}'),
BsonDocument.parse('{db: "engineering", coll: "users"}'),
NamespaceType.COLLECTION.value,
null,
null,
null,
Expand All @@ -84,6 +86,7 @@ class ChangeStreamDocumentCodecSpecification extends Specification {
new ChangeStreamDocument<Document>(OperationType.UPDATE.value,
BsonDocument.parse('{token: true}'),
BsonDocument.parse('{db: "engineering", coll: "users"}'),
NamespaceType.COLLECTION.value,
null,
Document.parse('{_id: 1, userName: "alice123", name: "Alice"}'),
Document.parse('{_id: 1, userName: "alice1234", name: "Alice"}'),
Expand All @@ -96,6 +99,7 @@ class ChangeStreamDocumentCodecSpecification extends Specification {
new ChangeStreamDocument<Document>(OperationType.REPLACE.value,
BsonDocument.parse('{token: true}'),
BsonDocument.parse('{db: "engineering", coll: "users"}'),
NamespaceType.COLLECTION.value,
null,
Document.parse('{_id: 1, userName: "alice123", name: "Alice"}'),
Document.parse('{_id: 1, userName: "alice1234", name: "Alice"}'),
Expand All @@ -106,6 +110,7 @@ class ChangeStreamDocumentCodecSpecification extends Specification {
new ChangeStreamDocument<Document>(OperationType.DELETE.value,
BsonDocument.parse('{token: true}'),
BsonDocument.parse('{db: "engineering", coll: "users"}'),
NamespaceType.COLLECTION.value,
null,
null,
Document.parse('{_id: 1, userName: "alice123", name: "Alice"}'),
Expand All @@ -116,6 +121,7 @@ class ChangeStreamDocumentCodecSpecification extends Specification {
new ChangeStreamDocument<Document>(OperationType.DROP.value,
BsonDocument.parse('{token: true}'),
BsonDocument.parse('{db: "engineering", coll: "users"}'),
NamespaceType.COLLECTION.value,
null,
null,
null,
Expand All @@ -126,6 +132,7 @@ class ChangeStreamDocumentCodecSpecification extends Specification {
new ChangeStreamDocument<Document>(OperationType.RENAME.value,
BsonDocument.parse('{token: true}'),
BsonDocument.parse('{db: "engineering", coll: "users"}'),
NamespaceType.COLLECTION.value,
BsonDocument.parse('{db: "engineering", coll: "people"}'),
null,
null,
Expand All @@ -140,6 +147,7 @@ class ChangeStreamDocumentCodecSpecification extends Specification {
null,
null,
null,
null,
new BsonTimestamp(1234, 2),
null, null, null, null, null, null
),
Expand All @@ -150,12 +158,14 @@ class ChangeStreamDocumentCodecSpecification extends Specification {
null,
null,
null,
null,
new BsonTimestamp(1234, 2),
null, null, null, null, null, null
),
new ChangeStreamDocument<Document>(OperationType.INSERT.value,
BsonDocument.parse('{token: true}'),
BsonDocument.parse('{db: "engineering", coll: "users"}'),
NamespaceType.COLLECTION.value,
null,
Document.parse('{_id: 1, userName: "alice123", name: "Alice"}'),
null,
Expand All @@ -180,6 +190,7 @@ class ChangeStreamDocumentCodecSpecification extends Specification {
db: 'engineering',
coll: 'users'
},
nsType: 'collection',
documentKey: {
userName: 'alice123',
_id: 1
Expand All @@ -204,6 +215,7 @@ class ChangeStreamDocumentCodecSpecification extends Specification {
db: 'engineering',
coll: 'users'
},
nsType: 'collection',
documentKey: {
_id: 1
},
Expand All @@ -225,6 +237,7 @@ class ChangeStreamDocumentCodecSpecification extends Specification {
db: 'engineering',
coll: 'users'
},
nsType: 'collection',
documentKey: {
_id: 1
},
Expand Down Expand Up @@ -261,6 +274,7 @@ class ChangeStreamDocumentCodecSpecification extends Specification {
db: 'engineering',
coll: 'users'
},
nsType: 'collection',
documentKey: {
_id: 1
},
Expand All @@ -285,6 +299,7 @@ class ChangeStreamDocumentCodecSpecification extends Specification {
db: 'engineering',
coll: 'users'
},
nsType: 'collection',
documentKey: {
_id: 1
},
Expand All @@ -304,6 +319,7 @@ class ChangeStreamDocumentCodecSpecification extends Specification {
db: 'engineering',
coll: 'users'
}
nsType: 'collection',
}
''',
'''
Expand All @@ -315,6 +331,7 @@ class ChangeStreamDocumentCodecSpecification extends Specification {
db: 'engineering',
coll: 'users'
},
nsType: 'collection',
to: {
db: 'engineering',
coll: 'people'
Expand Down Expand Up @@ -347,6 +364,7 @@ class ChangeStreamDocumentCodecSpecification extends Specification {
db: 'engineering',
coll: 'users'
},
nsType: 'collection',
documentKey: {
userName: 'alice123',
_id: 1
Expand Down
Loading