Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion Sources/MongoSwift/ChangeStream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,13 @@ public struct ResumeToken: Codable, Equatable {
}
}

// TODO: SWIFT-981: Remove this.
/// The key we use for storing a change stream's namespace in it's `userInfo`. This allows types
/// using the decoder e.g. `ChangeStreamEvent` to access the namespace even if it is not present in the raw
/// document the server returns. Ok to force unwrap as initialization never fails.
// swiftlint:disable:next force_unwrapping
internal let changeStreamNamespaceKey = CodingUserInfoKey(rawValue: "namespace")!
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ideally this would have lived on ChangeStream, but you can't have stored static properties on generic types 🤔


// sourcery: skipSyncExport
/// A MongoDB change stream.
/// - SeeAlso: https://docs.mongodb.com/manual/changeStreams/
Expand Down Expand Up @@ -87,6 +94,7 @@ public class ChangeStream<T: Codable>: CursorProtocol {
stealing changeStreamPtr: OpaquePointer,
connection: Connection,
client: MongoClient,
namespace: MongoNamespace,
session: ClientSession?,
decoder: BSONDecoder,
options: ChangeStreamOptions?
Expand All @@ -99,7 +107,8 @@ public class ChangeStream<T: Codable>: CursorProtocol {
type: .tailableAwait
)
self.client = client
self.decoder = decoder
self.decoder = BSONDecoder(copies: decoder, options: nil)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we are updating the userInfo I figured it was a bit safer to make a copy.

this did make me think about the fact that right now users can't give us userInfo that we should use when decoding their custom types, even though they can configure all of the other encoder/decoder options via the strategies. I don't think that's too urgent though, we can wait until if/when someone asks for this capability.

self.decoder.userInfo[changeStreamNamespaceKey] = namespace

// TODO: SWIFT-519 - Starting 4.2, update resumeToken to startAfter (if set).
// startAfter takes precedence over resumeAfter.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public struct ChangeStreamEvent<T: Codable>: Codable {
/// - SeeAlso: https://docs.mongodb.com/manual/changeStreams/#resume-a-change-stream
public let _id: ResumeToken

// TODO: SWIFT-981: Make this field optional.
/// A document containing the database and collection names in which this change happened.
public let ns: MongoNamespace

Expand Down Expand Up @@ -73,4 +74,29 @@ public struct ChangeStreamEvent<T: Codable>: Codable {
* point after the update occurred. If the document was deleted since the updated happened, it will be nil.
*/
public let fullDocument: T?

private enum CodingKeys: String, CodingKey {
case operationType, _id, ns, documentKey, updateDescription, fullDocument
}

// Custom decode method to work around the fact that `invalidate` events do not have an `ns` field in the raw
// document. TODO SWIFT-981: Remove this.
public init(from decoder: Decoder) throws {
let container = try decoder.container(keyedBy: CodingKeys.self)
self.operationType = try container.decode(OperationType.self, forKey: .operationType)
self._id = try container.decode(ResumeToken.self, forKey: ._id)

do {
self.ns = try container.decode(MongoNamespace.self, forKey: .ns)
} catch {
guard let ns = decoder.userInfo[changeStreamNamespaceKey] as? MongoNamespace else {
throw error
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I considered just filling in with an empty namespace or something here, but that seemed like it would hide errors from the server in cases where a namespace is unexpectedly missing.

}
self.ns = ns
}

self.documentKey = try container.decodeIfPresent(BSONDocument.self, forKey: .documentKey)
self.updateDescription = try container.decodeIfPresent(UpdateDescription.self, forKey: .updateDescription)
self.fullDocument = try container.decodeIfPresent(T.self, forKey: .fullDocument)
}
}
2 changes: 1 addition & 1 deletion Sources/MongoSwift/MongoDatabase.swift
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public struct MongoDatabase {
internal let _client: MongoClient

/// The namespace for this database.
private let namespace: MongoNamespace
internal let namespace: MongoNamespace

/// Encoder used by this database for BSON conversions. This encoder's options are inherited by collections derived
/// from this database.
Expand Down
6 changes: 6 additions & 0 deletions Sources/MongoSwift/Operations/WatchOperation.swift
Original file line number Diff line number Diff line change
Expand Up @@ -40,24 +40,29 @@ internal struct WatchOperation<CollectionType: Codable, ChangeStreamType: Codabl
let changeStreamPtr: OpaquePointer
let client: MongoClient
let decoder: BSONDecoder
let namespace: MongoNamespace

switch self.target {
case let .client(c):
client = c
decoder = c.decoder
// workaround for the need for a namespace as described in SWIFT-981.
namespace = MongoNamespace(db: "", collection: nil)
changeStreamPtr = connection.withMongocConnection { connPtr in
mongoc_client_watch(connPtr, pipelinePtr, optsPtr)
}

case let .database(db):
client = db._client
decoder = db.decoder
namespace = db.namespace
changeStreamPtr = db.withMongocDatabase(from: connection) { dbPtr in
mongoc_database_watch(dbPtr, pipelinePtr, optsPtr)
}
case let .collection(coll):
client = coll._client
decoder = coll.decoder
namespace = coll.namespace
changeStreamPtr = coll.withMongocCollection(from: connection) { collPtr in
mongoc_collection_watch(collPtr, pipelinePtr, optsPtr)
}
Expand All @@ -67,6 +72,7 @@ internal struct WatchOperation<CollectionType: Codable, ChangeStreamType: Codabl
stealing: changeStreamPtr,
connection: connection,
client: client,
namespace: namespace,
session: session,
decoder: decoder,
options: self.options
Expand Down
2 changes: 2 additions & 0 deletions Tests/LinuxMain.swift
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,8 @@ extension SyncChangeStreamTests {
("testChangeStreamWithFullDocumentType", testChangeStreamWithFullDocumentType),
("testChangeStreamOnACollectionWithCodableType", testChangeStreamOnACollectionWithCodableType),
("testChangeStreamLazySequence", testChangeStreamLazySequence),
("testDecodingInvalidateEventsOnCollection", testDecodingInvalidateEventsOnCollection),
("testDecodingInvalidateEventsOnDatabase", testDecodingInvalidateEventsOnDatabase),
]
}

Expand Down
75 changes: 75 additions & 0 deletions Tests/MongoSwiftSyncTests/SyncChangeStreamTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -1140,4 +1140,79 @@ final class SyncChangeStreamTests: MongoSwiftTestCase {
expect(stream.isAlive()).to(beFalse())
}
}

/// Test that we properly manually populate the `ns` field of `ChangeStreamEvent`s for invalidate events on
/// collections.
func testDecodingInvalidateEventsOnCollection() throws {
// invalidated change stream on a collection
try self.withTestNamespace { client, _, collection in
let unmetRequirement = try MongoClient.makeTestClient().getUnmetRequirement(
TestRequirement(acceptableTopologies: [.replicaSet, .sharded])
)
guard unmetRequirement == nil else {
printSkipMessage(testName: self.name, unmetRequirement: unmetRequirement!)
return
}

let stream = try collection.watch()

// insert to create the collection
try collection.insertOne(["x": 1])
let insertEvent = try stream.next()?.get()
expect(insertEvent?.operationType).to(equal(.insert))

// drop the collection to generate an invalidate event
try collection.drop()

// as of 4.0.1, the server first emits a drop event and then an invalidate event.
// on 3.6, there is only an invalidate event.
if try client.serverVersion() >= ServerVersion(major: 4, minor: 0, patch: 1) {
let drop = try stream.next()?.get()
expect(drop?.operationType).to(equal(.drop))
}

let invalidate = try stream.next()?.get()
expect(invalidate?.operationType).to(equal(.invalidate))
expect(invalidate?.ns).to(equal(collection.namespace))
}
}

/// Test that we properly manually populate the `ns` field of `ChangeStreamEvent`s for invalidate events on
/// databases.
func testDecodingInvalidateEventsOnDatabase() throws {
// invalidated change stream on a DB
try self.withTestNamespace { client, db, collection in
// DB change streams are supported as of 4.0
let unmetRequirement = try client.getUnmetRequirement(
TestRequirement(
minServerVersion: ServerVersion(major: 4, minor: 0),
acceptableTopologies: [.replicaSet, .sharded]
)
)
guard unmetRequirement == nil else {
printSkipMessage(testName: self.name, unmetRequirement: unmetRequirement!)
return
}

let stream = try db.watch()

// insert to collection to create the db
try collection.insertOne(["x": 1])
let insertEvent = try stream.next()?.get()
expect(insertEvent?.operationType).to(equal(.insert))

// drop the db to generate an invalidate event
try db.drop()
// first we see collection drop, then db drop
let dropColl = try stream.next()?.get()
expect(dropColl?.operationType).to(equal(.drop))
let dropDB = try stream.next()?.get()
expect(dropDB?.operationType).to(equal(.dropDatabase))

let invalidate = try stream.next()?.get()
expect(invalidate?.operationType).to(equal(.invalidate))
expect(invalidate?.ns.db).to(equal(db.name))
expect(invalidate?.ns.collection).to(beNil())
}
}
}