-
Notifications
You must be signed in to change notification settings - Fork 67
SWIFT-1469 Provide AsyncSequence APIs for monitoring events #764
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
kmahar
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good start! some minor comments as well as a few design-related questions
| .commandStartedEvent, | ||
| .commandSucceededEvent | ||
| ] | ||
| Task { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we don't await the result or completion of this Task so I don't think we have any guarantee it has finished executing and making all of its assertions by the time the test finishes.
one suggestion is to have the task return i and await and assert on that value below. something like:
let eventsTask = Task {
// ...
return i
}
try await client.db("admin").runCommand(["ping": 1])
let taskResult = try await eventTask.result
expect(taskResult).to(equal(4))and assert it's the right value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
loop never exits so we never return. All relates to the fact that we dont save the continuation, which not sure how to do given the current initialization. Preliminarily pushing all minor commits and will look at this
| try await self.withTestClient { client in | ||
| Task { | ||
| var i = 0 | ||
| var eventHandler: [EventType] = [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we give this a more descriptive name like eventTypes?
I think the thread sanitizer is likely to complain about this because the callback appending to this array isn't always necessarily going to execute on the same thread and so there might be data races as a result. one way to fix this would be using a Lock that synchronizes access to the array. you could look at TestCommandMonitor in APMUtils.swift for an example of that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Named it eventTypes. Looked into NioConcurrencyHelpers.Lock that APMUtils.swift uses and I locked the appending to the array (since you cant pass in an async function into the .withLock{ ... }) and left associated comments.
|
|
||
| func testSDAMEventStreamClient() async throws { | ||
| try await self.withTestClient { client in | ||
| Task { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the same concern as above applies where we are not awaiting completion of the Task.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep will address
| ] | ||
| Task { | ||
| var i = 0 | ||
| for try await event in client.commandEvents { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
given my comment above about never "finishing" the continuation, do we ever break out of this loop / does the task ever actually finish?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we do not, need to cache as commented above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After caching the continuation, I am able to break the loop and end the task by doing something like
let output = client.commandEventStream() ... output.finish()
Although I can only close the loop at a specific time if I call .finish() inside the loop. For example, if I want to check that 4 events happened, I'll call .finish() via if i == 4. Feels a little bit circular but I can confirm that we break out of the loop at expected time. Not sure if you had a different idea about how to test for ending the task
kmahar
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
few comments now but given the discussed design change to allow multiple streams I'll hold off on re-reviewing the rest as I think a lot of the logic is likely to change
Sources/MongoSwift/MongoClient.swift
Outdated
| if self._commandEvents == nil { | ||
| self._commandEvents = CommandEventStream(stream: | ||
| AsyncStream { con in | ||
| self.addCommandEventHandler { event in |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the client currently owns the handler, users will just get the same one next time they invoke the computed property, even if they drop it.
ah right, yeah I think I wrote that comment before realizing in my next comment below that there was only ever one of each stream.
yeah, I think overall as long as there aren't any technical limitations to it, the on-demand/factory approach would be a bit nicer so we can allow multiple streams, and can support stopping/restarting monitoring at will.
patrickfreed
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good! I mostly have little testing nits here and there
Sources/MongoSwift/APM.swift
Outdated
| public typealias AsyncIterator = EventStreamIterator<T> | ||
|
|
||
| /// Creates the asynchronous iterator that produces elements of this `EventStream`. | ||
| public func makeAsyncIterator() -> EventStreamIterator<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem with using AsyncStream<T>.AsyncIterator directly is that if we ever want to change the internals of how CommandEventStream or SDAMEventStream are implemented, we'll still need to somehow create an AsyncStream iterator. If we have a wrapper type, we're free to change the internals of our event stream types as needed.
We also looked into having the event stream types be their own iterators, but that would prevent users from being able to iterate over the same stream from different AsyncIterators for not much benefit, so we decided to just keep it as is.
Tests/MongoSwiftTests/APMTests.swift
Outdated
| try await coll.insertOne(["hello": "world"]) | ||
| } | ||
| // Tasks start and finish | ||
| try await assertIsEventuallyTrue(description: "each task is started") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this statement doesn't verify the tasks have actually started, since the condition will always be true once this part is hit. To verify they've started, you'll need an atomic counter or something that each task can increment once they start executing.
Also, it's important that this assertion happens before we start generating the events (i.e. before the ping command) so as to ensure each task receives all the events.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see - since the update happens in the loop but outside the task it basically runs synchronously. I think we have to return to using (a much simpler) actor for updating things async. Since the tasks dont kick off in the same order, using atomic counters often ends up updating an outdated version itself (ex. on the 4th iter of the loop, the load() function sees the 3rd iter-version of the counter meaning the counter never sees a value of 5). Using
actor TaskCounter {
var taskCounter = 0
func incr() {
taskCounter += 1
}
}
seems to work best. Also moved commands to below the check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One of the neat things about atomics is that you can also increment them atomically. So instead of doing a load + store, you can do atomic.add(1), and it'll avoid the problems you were seeing.
That being said, this actor approach also works, so we can just leave it as-is.
Sources/MongoSwift/MongoClient.swift
Outdated
| } | ||
|
|
||
| /// Provides an `AsyncSequence` API for consuming command monitoring events. | ||
| /// Example: printing the command events out would be written as |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using a block comment for the example code would be a bit more readable than just inline.
e.g.
for await event in client.commandEventStream() {
print(event)
}Ditto for the SDAM event example.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Took this opportunity to beef up the docstring and make sure all the elements are in the right sections of the documentation (summary/discussion/etc.). Docstring is now
/**
* Provides an `AsyncSequence` API for consuming SDAM monitoring events.
*
* Example: printing the SDAM events out would be written as
for try await event in client.sdamEventStream() {
print(event)
}
* Wrapping in a `Task { ... }` may be desired for asynchronicity.
* - Returns: An `SDAMEventStream` that implements `AsyncSequence`.
* - Note: Only the most recent 100 events are stored in the stream.
*/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So in order the code to be formatted as code, you need the triple backticks (```) like in markdown. I think you also need to keep prepending the line with *. For an existing example, check out ClientSession.swift.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As per discussion, this method works but used the backticks method for codebase consistency
|
Failed bc waiting on SWIFT-1604 to be merged in |
patrickfreed
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looks good! I just have one minor docstring comment
Tests/MongoSwiftTests/APMTests.swift
Outdated
| try await coll.insertOne(["hello": "world"]) | ||
| } | ||
| // Tasks start and finish | ||
| try await assertIsEventuallyTrue(description: "each task is started") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One of the neat things about atomics is that you can also increment them atomically. So instead of doing a load + store, you can do atomic.add(1), and it'll avoid the problems you were seeing.
That being said, this actor approach also works, so we can just leave it as-is.
Sources/MongoSwift/MongoClient.swift
Outdated
| } | ||
|
|
||
| /// Provides an `AsyncSequence` API for consuming command monitoring events. | ||
| /// Example: printing the command events out would be written as |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So in order the code to be formatted as code, you need the triple backticks (```) like in markdown. I think you also need to keep prepending the line with *. For an existing example, check out ClientSession.swift.
kmahar
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is looking really close!! I just have a few minor comments and questions.
| */ | ||
| public func addCommandEventHandler<T: CommandEventHandler>(_ handler: T) { | ||
| self.commandEventHandlers.append(WeakEventHandler<T>(referencing: handler)) | ||
| self.eventHandlerLock.withLock { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
related to my comment above re the lock (not something we need to fix today, but for the future): with the number of places in the code we now guard some value behind a lock and just have to remember that it should only be accessed via the lock, it would be a nice improvement to introduce some generic wrapper type like Locked<T> that stores a value of type T plus its corresponding Lock and has a method that lets you do something like
myLockedThing.withLockedValue { value in
// ... do something with value
}The method would acquire the lock, execute your provided closure with the value it guards, then releases the lock. the locked value could be private to the Locked type so it's impossible to access it other than when you hold the lock, which would make it harder for us to forget to acquire the lock.
I filed SWIFT-1612 about this. it could be a nice little ticket to pick up if you end up having any free time waiting for reviews in the future
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a cool idea and increasingly relevant with more async usage in the driver. Def a cool idea to pick up in some downtime.
| let workTask = Task { () -> Bool in | ||
| while !Task.isCancelled { | ||
| guard try await block() else { | ||
| try? await Task.sleep(seconds: sleepInterval) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need to try? here instead of just try?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As per patricks prev comment,
I think we'll want to use a
try?here so that aCancellationErrorisn't throw, otherwise ourXCTFailbelow won't be executed.
Basically if the task gets cancelled, we dont want to throw a cancellation error, we want to hit the continue, then break out of the while !Task.iscancelled() and return false to hit the XCT condition
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oooo ok. can we add a comment saying that this is to ignore CancellationErrors then?
kmahar
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just a couple formatting suggestions and one very minor comment! LGTM otherwise
| let workTask = Task { () -> Bool in | ||
| while !Task.isCancelled { | ||
| guard try await block() else { | ||
| try? await Task.sleep(seconds: sleepInterval) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oooo ok. can we add a comment saying that this is to ignore CancellationErrors then?
Co-authored-by: Kaitlin Mahar <kaitlinmahar@gmail.com>
kmahar
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm! 🎉
patrickfreed
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!!
isabelatkinson
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks good! just one question and a small typo fix
| * If you are looping over the events in the stream, you may wish to do so in a dedicated `Task`. | ||
| * The stream will be ended automatically if the `Task` it is running in is cancelled. | ||
| * - Returns: A `CommandEventStream` that implements `AsyncSequence`. | ||
| * - Note: Only the most recent 100 events are stored in the stream. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the reasoning behind the choice of 100 here? Would there be any benefit in allowing users to set a custom value?
isabelatkinson
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm after our discussion in standup (mod the one typo fix)!
Co-authored-by: Isabel Atkinson <isabelatkinson@gmail.com>
Need to write local tests