Skip to content

Commit e3de6ae

Browse files
authored
SWIFT-957 Fill in namespace on ChangeStream invalidate events (#519)
1 parent dc980c9 commit e3de6ae

File tree

6 files changed

+120
-2
lines changed

6 files changed

+120
-2
lines changed

Sources/MongoSwift/ChangeStream.swift

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,13 @@ public struct ResumeToken: Codable, Equatable {
5050
}
5151
}
5252

53+
// TODO: SWIFT-981: Remove this.
54+
/// The key we use for storing a change stream's namespace in it's `userInfo`. This allows types
55+
/// using the decoder e.g. `ChangeStreamEvent` to access the namespace even if it is not present in the raw
56+
/// document the server returns. Ok to force unwrap as initialization never fails.
57+
// swiftlint:disable:next force_unwrapping
58+
internal let changeStreamNamespaceKey = CodingUserInfoKey(rawValue: "namespace")!
59+
5360
// sourcery: skipSyncExport
5461
/// A MongoDB change stream.
5562
/// - SeeAlso: https://docs.mongodb.com/manual/changeStreams/
@@ -87,6 +94,7 @@ public class ChangeStream<T: Codable>: CursorProtocol {
8794
stealing changeStreamPtr: OpaquePointer,
8895
connection: Connection,
8996
client: MongoClient,
97+
namespace: MongoNamespace,
9098
session: ClientSession?,
9199
decoder: BSONDecoder,
92100
options: ChangeStreamOptions?
@@ -99,7 +107,8 @@ public class ChangeStream<T: Codable>: CursorProtocol {
99107
type: .tailableAwait
100108
)
101109
self.client = client
102-
self.decoder = decoder
110+
self.decoder = BSONDecoder(copies: decoder, options: nil)
111+
self.decoder.userInfo[changeStreamNamespaceKey] = namespace
103112

104113
// TODO: SWIFT-519 - Starting 4.2, update resumeToken to startAfter (if set).
105114
// startAfter takes precedence over resumeAfter.

Sources/MongoSwift/ChangeStreamDocument.swift renamed to Sources/MongoSwift/ChangeStreamEvent.swift

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ public struct ChangeStreamEvent<T: Codable>: Codable {
4646
/// - SeeAlso: https://docs.mongodb.com/manual/changeStreams/#resume-a-change-stream
4747
public let _id: ResumeToken
4848

49+
// TODO: SWIFT-981: Make this field optional.
4950
/// A document containing the database and collection names in which this change happened.
5051
public let ns: MongoNamespace
5152

@@ -73,4 +74,29 @@ public struct ChangeStreamEvent<T: Codable>: Codable {
7374
* point after the update occurred. If the document was deleted since the updated happened, it will be nil.
7475
*/
7576
public let fullDocument: T?
77+
78+
private enum CodingKeys: String, CodingKey {
79+
case operationType, _id, ns, documentKey, updateDescription, fullDocument
80+
}
81+
82+
// Custom decode method to work around the fact that `invalidate` events do not have an `ns` field in the raw
83+
// document. TODO SWIFT-981: Remove this.
84+
public init(from decoder: Decoder) throws {
85+
let container = try decoder.container(keyedBy: CodingKeys.self)
86+
self.operationType = try container.decode(OperationType.self, forKey: .operationType)
87+
self._id = try container.decode(ResumeToken.self, forKey: ._id)
88+
89+
do {
90+
self.ns = try container.decode(MongoNamespace.self, forKey: .ns)
91+
} catch {
92+
guard let ns = decoder.userInfo[changeStreamNamespaceKey] as? MongoNamespace else {
93+
throw error
94+
}
95+
self.ns = ns
96+
}
97+
98+
self.documentKey = try container.decodeIfPresent(BSONDocument.self, forKey: .documentKey)
99+
self.updateDescription = try container.decodeIfPresent(UpdateDescription.self, forKey: .updateDescription)
100+
self.fullDocument = try container.decodeIfPresent(T.self, forKey: .fullDocument)
101+
}
76102
}

Sources/MongoSwift/MongoDatabase.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ public struct MongoDatabase {
6363
internal let _client: MongoClient
6464

6565
/// The namespace for this database.
66-
private let namespace: MongoNamespace
66+
internal let namespace: MongoNamespace
6767

6868
/// Encoder used by this database for BSON conversions. This encoder's options are inherited by collections derived
6969
/// from this database.

Sources/MongoSwift/Operations/WatchOperation.swift

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,24 +40,29 @@ internal struct WatchOperation<CollectionType: Codable, ChangeStreamType: Codabl
4040
let changeStreamPtr: OpaquePointer
4141
let client: MongoClient
4242
let decoder: BSONDecoder
43+
let namespace: MongoNamespace
4344

4445
switch self.target {
4546
case let .client(c):
4647
client = c
4748
decoder = c.decoder
49+
// workaround for the need for a namespace as described in SWIFT-981.
50+
namespace = MongoNamespace(db: "", collection: nil)
4851
changeStreamPtr = connection.withMongocConnection { connPtr in
4952
mongoc_client_watch(connPtr, pipelinePtr, optsPtr)
5053
}
5154

5255
case let .database(db):
5356
client = db._client
5457
decoder = db.decoder
58+
namespace = db.namespace
5559
changeStreamPtr = db.withMongocDatabase(from: connection) { dbPtr in
5660
mongoc_database_watch(dbPtr, pipelinePtr, optsPtr)
5761
}
5862
case let .collection(coll):
5963
client = coll._client
6064
decoder = coll.decoder
65+
namespace = coll.namespace
6166
changeStreamPtr = coll.withMongocCollection(from: connection) { collPtr in
6267
mongoc_collection_watch(collPtr, pipelinePtr, optsPtr)
6368
}
@@ -67,6 +72,7 @@ internal struct WatchOperation<CollectionType: Codable, ChangeStreamType: Codabl
6772
stealing: changeStreamPtr,
6873
connection: connection,
6974
client: client,
75+
namespace: namespace,
7076
session: session,
7177
decoder: decoder,
7278
options: self.options

Tests/LinuxMain.swift

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -368,6 +368,8 @@ extension SyncChangeStreamTests {
368368
("testChangeStreamWithFullDocumentType", testChangeStreamWithFullDocumentType),
369369
("testChangeStreamOnACollectionWithCodableType", testChangeStreamOnACollectionWithCodableType),
370370
("testChangeStreamLazySequence", testChangeStreamLazySequence),
371+
("testDecodingInvalidateEventsOnCollection", testDecodingInvalidateEventsOnCollection),
372+
("testDecodingInvalidateEventsOnDatabase", testDecodingInvalidateEventsOnDatabase),
371373
]
372374
}
373375

Tests/MongoSwiftSyncTests/SyncChangeStreamTests.swift

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1140,4 +1140,79 @@ final class SyncChangeStreamTests: MongoSwiftTestCase {
11401140
expect(stream.isAlive()).to(beFalse())
11411141
}
11421142
}
1143+
1144+
/// Test that we properly manually populate the `ns` field of `ChangeStreamEvent`s for invalidate events on
1145+
/// collections.
1146+
func testDecodingInvalidateEventsOnCollection() throws {
1147+
// invalidated change stream on a collection
1148+
try self.withTestNamespace { client, _, collection in
1149+
let unmetRequirement = try MongoClient.makeTestClient().getUnmetRequirement(
1150+
TestRequirement(acceptableTopologies: [.replicaSet, .sharded])
1151+
)
1152+
guard unmetRequirement == nil else {
1153+
printSkipMessage(testName: self.name, unmetRequirement: unmetRequirement!)
1154+
return
1155+
}
1156+
1157+
let stream = try collection.watch()
1158+
1159+
// insert to create the collection
1160+
try collection.insertOne(["x": 1])
1161+
let insertEvent = try stream.next()?.get()
1162+
expect(insertEvent?.operationType).to(equal(.insert))
1163+
1164+
// drop the collection to generate an invalidate event
1165+
try collection.drop()
1166+
1167+
// as of 4.0.1, the server first emits a drop event and then an invalidate event.
1168+
// on 3.6, there is only an invalidate event.
1169+
if try client.serverVersion() >= ServerVersion(major: 4, minor: 0, patch: 1) {
1170+
let drop = try stream.next()?.get()
1171+
expect(drop?.operationType).to(equal(.drop))
1172+
}
1173+
1174+
let invalidate = try stream.next()?.get()
1175+
expect(invalidate?.operationType).to(equal(.invalidate))
1176+
expect(invalidate?.ns).to(equal(collection.namespace))
1177+
}
1178+
}
1179+
1180+
/// Test that we properly manually populate the `ns` field of `ChangeStreamEvent`s for invalidate events on
1181+
/// databases.
1182+
func testDecodingInvalidateEventsOnDatabase() throws {
1183+
// invalidated change stream on a DB
1184+
try self.withTestNamespace { client, db, collection in
1185+
// DB change streams are supported as of 4.0
1186+
let unmetRequirement = try client.getUnmetRequirement(
1187+
TestRequirement(
1188+
minServerVersion: ServerVersion(major: 4, minor: 0),
1189+
acceptableTopologies: [.replicaSet, .sharded]
1190+
)
1191+
)
1192+
guard unmetRequirement == nil else {
1193+
printSkipMessage(testName: self.name, unmetRequirement: unmetRequirement!)
1194+
return
1195+
}
1196+
1197+
let stream = try db.watch()
1198+
1199+
// insert to collection to create the db
1200+
try collection.insertOne(["x": 1])
1201+
let insertEvent = try stream.next()?.get()
1202+
expect(insertEvent?.operationType).to(equal(.insert))
1203+
1204+
// drop the db to generate an invalidate event
1205+
try db.drop()
1206+
// first we see collection drop, then db drop
1207+
let dropColl = try stream.next()?.get()
1208+
expect(dropColl?.operationType).to(equal(.drop))
1209+
let dropDB = try stream.next()?.get()
1210+
expect(dropDB?.operationType).to(equal(.dropDatabase))
1211+
1212+
let invalidate = try stream.next()?.get()
1213+
expect(invalidate?.operationType).to(equal(.invalidate))
1214+
expect(invalidate?.ns.db).to(equal(db.name))
1215+
expect(invalidate?.ns.collection).to(beNil())
1216+
}
1217+
}
11431218
}

0 commit comments

Comments
 (0)