Skip to content

Commit 8b6a898

Browse files
dfedbachand
andauthored
Make ActorQueue tasks execute on the target actor's execution context (#9)
* Make ActorQueue tasks execute on the target actor's execution context * Documentation and naming pass * Add comments to aid a consumer if they hit the unowned crash * Review feedback Co-authored-by: Michael Bachand <bachand.michael@gmail.com>
1 parent b073271 commit 8b6a898

File tree

5 files changed

+225
-166
lines changed

5 files changed

+225
-166
lines changed

AsyncQueue.podspec

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
Pod::Spec.new do |s|
22
s.name = 'AsyncQueue'
3-
s.version = '0.1.0'
3+
s.version = '0.2.0'
44
s.license = 'MIT'
55
s.summary = 'A queue that enables ordered sending of events from synchronous to asynchronous code.'
66
s.homepage = 'https://github.com/dfed/swift-async-queue'

README.md

Lines changed: 61 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -16,21 +16,23 @@ Tasks sent from a synchronous context to an asynchronous context in Swift Concur
1616
@MainActor
1717
func testMainActorTaskOrdering() async {
1818
actor Counter {
19-
func increment() -> Int {
19+
func incrementAndAssertCountEquals(_ expectedCount: Int) {
2020
count += 1
21-
return count
21+
let incrementedCount = count
22+
XCTAssertEqual(incrementedCount, expectedCount) // often fails
2223
}
23-
var count = 0
24+
25+
private var count = 0
2426
}
2527

2628
let counter = Counter()
2729
var tasks = [Task<Void, Never>]()
2830
for iteration in 1...100 {
2931
tasks.append(Task {
30-
let incrementedCount = await counter.increment()
31-
XCTAssertEqual(incrementedCount, iteration) // often fails
32+
await counter.incrementAndAssertCountEquals(iteration)
3233
})
3334
}
35+
// Wait for all enqueued tasks to finish.
3436
for task in tasks {
3537
_ = await task.value
3638
}
@@ -43,101 +45,87 @@ While [actors](https://docs.swift.org/swift-book/LanguageGuide/Concurrency.html#
4345

4446
### Executing asynchronous tasks in FIFO order
4547

46-
Use a `FIFOQueue` to execute asynchronous tasks enqueued from a nonisolated context in FIFO order. Tasks sent to one of these queues are guaranteed to begin _and end_ executing in the order in which they are enqueued.
47-
48-
```swift
49-
let queue = FIFOQueue()
50-
queue.async {
51-
/*
52-
`async` context that executes after all other enqueued work is completed.
53-
Work enqueued after this task will wait for this task to complete.
54-
*/
55-
try? await Task.sleep(nanoseconds: 1_000_000)
56-
}
57-
queue.async {
58-
/*
59-
This task begins execution once the above one-second sleep completes.
60-
*/
61-
}
62-
await queue.await {
63-
/*
64-
`async` context that can return a value or throw an error.
65-
Executes after all other enqueued work is completed.
66-
Work enqueued after this task will wait for this task to complete.
67-
*/
68-
}
69-
```
48+
Use a `FIFOQueue` to execute asynchronous tasks enqueued from a nonisolated context in FIFO order. Tasks sent to one of these queues are guaranteed to begin _and end_ executing in the order in which they are enqueued. A `FIFOQueue` executes tasks in a similar manner to a `DispatchQueue`: enqueued tasks executes atomically, and the program will deadlock if a task executing on a `FIFOQueue` awaits results from the queue on which it is executing.
7049

71-
With a `FIFOQueue` you can easily execute asynchronous tasks from a nonisolated context in FIFO order:
50+
A `FIFOQueue` can easily execute asynchronous tasks from a nonisolated context in FIFO order:
7251
```swift
7352
func testFIFOQueueOrdering() async {
7453
actor Counter {
75-
func increment() -> Int {
54+
nonisolated
55+
func incrementAndAssertCountEquals(_ expectedCount: Int) {
56+
queue.async {
57+
await self.increment()
58+
let incrementedCount = await self.count
59+
XCTAssertEqual(incrementedCount, expectedCount) // always succeeds
60+
}
61+
}
62+
63+
nonisolated
64+
func flushQueue() async {
65+
await queue.await { }
66+
}
67+
68+
func increment() {
7669
count += 1
77-
return count
7870
}
71+
7972
var count = 0
73+
74+
private let queue = FIFOQueue()
8075
}
8176

8277
let counter = Counter()
83-
let queue = FIFOQueue()
8478
for iteration in 1...100 {
85-
queue.async {
86-
let incrementedCount = await counter.increment()
87-
XCTAssertEqual(incrementedCount, iteration) // always succeeds
88-
}
79+
counter.incrementAndAssertCountEquals(iteration)
8980
}
90-
await queue.await { }
81+
// Wait for all enqueued tasks to finish.
82+
await counter.flushQueue()
9183
}
9284
```
9385

94-
### Sending ordered asynchronous tasks to Actors
86+
FIFO execution has a key downside: the queue must wait for all previously enqueued work – including suspended work – to complete before new work can begin. If you desire new work to start when a prior task suspends, utilize an `ActorQueue`.
9587

96-
Use an `ActorQueue` to send ordered asynchronous tasks from a nonisolated context to an `actor` instance. Tasks sent to one of these queues are guaranteed to begin executing in the order in which they are enqueued. Ordering of execution is guaranteed up until the first [suspension point](https://docs.swift.org/swift-book/LanguageGuide/Concurrency.html#ID639) within the called `actor` code.
88+
### Sending ordered asynchronous tasks to Actors from a nonisolated context
9789

98-
```swift
99-
let queue = ActorQueue()
100-
queue.async {
101-
/*
102-
`async` context that executes after all other enqueued work has begun executing.
103-
Work enqueued after this task will wait for this task to complete or suspend.
104-
*/
105-
try? await Task.sleep(nanoseconds: 1_000_000)
106-
}
107-
queue.async {
108-
/*
109-
This task begins execution once the above task suspends due to the one-second sleep.
110-
*/
111-
}
112-
await queue.await {
113-
/*
114-
`async` context that can return a value or throw an error.
115-
Executes after all other enqueued work has begun executing.
116-
Work enqueued after this task will wait for this task to complete or suspend.
117-
*/
118-
}
119-
```
90+
Use an `ActorQueue` to send ordered asynchronous tasks to an `actor`’s isolated context from nonisolated or synchronous contexts. Tasks sent to an actor queue are guaranteed to begin executing in the order in which they are enqueued. However, unlike a `FIFOQueue`, execution order is guaranteed only until the first [suspension point](https://docs.swift.org/swift-book/LanguageGuide/Concurrency.html#ID639) within the enqueued task. An `ActorQueue` executes tasks within the its adopted actor’s isolated context, resulting in `ActorQueue` task execution having the same properties as `actor` code execution: code between suspension points is executed atomically, and tasks sent to a single `ActorQueue` can await results from the queue without deadlocking.
91+
92+
An instance of an `ActorQueue` is designed to be utilized by a single `actor` instance: tasks sent to an `ActorQueue` utilize the isolated context of the queue‘s adopted `actor` to serialize tasks. As such, there are a few requirements that must be met when dealing with an `ActorQueue`:
93+
1. The lifecycle of any `ActorQueue` should not exceed the lifecycle of its `actor`. It is strongly recommended that an `ActorQueue` be a `let` constant on the adopted `actor`. Enqueuing a task to an `ActorQueue` isntance after its adopted `actor` has been deallocated will result in a crash.
94+
2. An `actor` utilizing an `ActorQueue` should set the adopted execution context of the queue to `self` within the `actor`’s `init`. Failing to set an adopted execution context prior to enqueuing work on an `ActorQueue` will result in a crash.
12095

121-
With an `ActorQueue` you can easily begin execution of asynchronous tasks from a nonisolated context in order:
96+
An `ActorQueue` can easily enqueue tasks that execute on an actor’s isolated context from a nonisolated context in order:
12297
```swift
12398
func testActorQueueOrdering() async {
12499
actor Counter {
125-
func increment() -> Int {
126-
count += 1
127-
return count
100+
init() {
101+
// Adopting the execution context in `init` satisfies requirement #2 above.
102+
queue.adoptExecutionContext(of: self)
128103
}
129-
var count = 0
104+
105+
nonisolated
106+
func incrementAndAssertCountEquals(_ expectedCount: Int) {
107+
queue.async { myself in
108+
myself.count += 1
109+
XCTAssertEqual(expectedCount, myself.count) // always succeeds
110+
}
111+
}
112+
113+
nonisolated
114+
func flushQueue() async {
115+
await queue.await { _ in }
116+
}
117+
118+
private var count = 0
119+
// Making the queue a private let constant satisfies requirement #1 above.
120+
private let queue = ActorQueue<Counter>()
130121
}
131122

132123
let counter = Counter()
133-
let queue = ActorQueue()
134124
for iteration in 1...100 {
135-
queue.async {
136-
let incrementedCount = await counter.increment()
137-
XCTAssertEqual(incrementedCount, iteration) // always succeeds
138-
}
125+
counter.incrementAndAssertCountEquals(iteration)
139126
}
140-
await queue.await { }
127+
// Wait for all enqueued tasks to finish.
128+
await counter.flushQueue()
141129
}
142130
```
143131

Sources/AsyncQueue/ActorQueue.swift

Lines changed: 79 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -20,56 +20,54 @@
2020
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
2121
// SOFTWARE.
2222

23-
/// A queue that executes asynchronous tasks enqueued from a nonisolated context.
23+
/// A queue that enables enqueing ordered asynchronous tasks from a nonisolated context onto an adopted actor's isolated context.
2424
/// Tasks are guaranteed to begin executing in the order in which they are enqueued. However, if a task suspends it will allow subsequently enqueued tasks to begin executing.
25-
/// Asynchronous tasks sent to this queue execute as they would in an `actor` type, allowing for re-entrancy and non-FIFO behavior when an individual task suspends.
25+
/// This queue exhibits the execution behavior of an actor: tasks sent to this queue can re-enter the queue, and tasks may execute in non-FIFO order when a task suspends.
2626
///
27-
/// An `ActorQueue` is used to ensure tasks sent from a nonisolated context to a single `actor`'s isolated context begin execution in order.
28-
/// Here is an example of how an `ActorQueue` should be utilized within an `actor`:
27+
/// An `ActorQueue` ensures tasks sent from a nonisolated context to a single actor's isolated context begin execution in order.
28+
/// Here is an example of how an `ActorQueue` should be utilized within an actor:
2929
/// ```swift
3030
/// public actor LogStore {
3131
///
32+
/// public init() {
33+
/// queue.adoptExecutionContext(of: self)
34+
/// }
35+
///
3236
/// nonisolated
3337
/// public func log(_ message: String) {
34-
/// queue.async {
35-
/// await self.append(message)
38+
/// queue.async { myself in
39+
/// myself.logs.append(message)
3640
/// }
3741
/// }
3842
///
3943
/// nonisolated
4044
/// public func retrieveLogs() async -> [String] {
41-
/// await queue.await { await self.logs }
42-
/// }
43-
///
44-
/// private func append(_ message: String) {
45-
/// logs.append(message)
45+
/// await queue.await { myself in myself.logs }
4646
/// }
4747
///
48-
/// private let queue = ActorQueue()
48+
/// private let queue = ActorQueue<LogStore>()
4949
/// private var logs = [String]()
5050
/// }
5151
/// ```
5252
///
53-
/// - Warning: Execution order is not guaranteed unless the enqueued tasks interact with a single `actor` instance.
54-
public final class ActorQueue {
53+
/// - Precondition: The lifecycle of an `ActorQueue` must not exceed that of the adopted actor.
54+
public final class ActorQueue<ActorType: Actor> {
5555

5656
// MARK: Initialization
5757

5858
/// Instantiates an actor queue.
59-
/// - Parameter priority: The baseline priority of the tasks added to the asynchronous queue.
60-
public init(priority: TaskPriority? = nil) {
61-
var capturedTaskStreamContinuation: AsyncStream<@Sendable () async -> Void>.Continuation? = nil
62-
let taskStream = AsyncStream<@Sendable () async -> Void> { continuation in
59+
public init() {
60+
var capturedTaskStreamContinuation: AsyncStream<ActorTask>.Continuation? = nil
61+
let taskStream = AsyncStream<ActorTask> { continuation in
6362
capturedTaskStreamContinuation = continuation
6463
}
6564
// Continuation will be captured during stream creation, so it is safe to force unwrap here.
6665
// If this force-unwrap fails, something is fundamentally broken in the Swift runtime.
6766
taskStreamContinuation = capturedTaskStreamContinuation!
6867

69-
Task.detached(priority: priority) {
70-
let executor = ActorExecutor()
71-
for await task in taskStream {
72-
await executor.suspendUntilStarted(task)
68+
Task.detached {
69+
for await actorTask in taskStream {
70+
await actorTask.executionContext.suspendUntilStarted(actorTask.task)
7371
}
7472
}
7573
}
@@ -80,65 +78,91 @@ public final class ActorQueue {
8078

8179
// MARK: Public
8280

81+
/// Sets the actor context within which each `async` and `await`ed task will execute.
82+
/// It is recommended that this method be called in the adopted actor’s `init` method.
83+
/// **Must be called prior to enqueuing any work on the receiver.**
84+
///
85+
/// - Parameter actor: The actor on which the queue's task will execute. This parameter is not retained by the receiver.
86+
/// - Warning: Calling this method more than once will result in an assertion failure.
87+
public func adoptExecutionContext(of actor: ActorType) {
88+
assert(weakExecutionContext == nil) // Adopting multiple executionContexts on the same queue is API abuse.
89+
weakExecutionContext = actor
90+
}
91+
8392
/// Schedules an asynchronous task for execution and immediately returns.
8493
/// The scheduled task will not execute until all prior tasks have completed or suspended.
85-
/// - Parameter task: The task to enqueue.
86-
public func async(_ task: @escaping @Sendable () async -> Void) {
87-
taskStreamContinuation.yield(task)
94+
/// - Parameter task: The task to enqueue. The task's parameter is a reference to the actor whose execution context has been adopted.
95+
public func async(_ task: @escaping @Sendable (isolated ActorType) async -> Void) {
96+
taskStreamContinuation.yield(ActorTask(executionContext: executionContext, task: task))
8897
}
8998

9099
/// Schedules an asynchronous task and returns after the task is complete.
91100
/// The scheduled task will not execute until all prior tasks have completed or suspended.
92-
/// - Parameter task: The task to enqueue.
101+
/// - Parameter task: The task to enqueue. The task's parameter is a reference to the actor whose execution context has been adopted.
93102
/// - Returns: The value returned from the enqueued task.
94-
public func await<T>(_ task: @escaping @Sendable () async -> T) async -> T {
95-
await withUnsafeContinuation { continuation in
96-
taskStreamContinuation.yield {
97-
continuation.resume(returning: await task())
98-
}
103+
public func await<T>(_ task: @escaping @Sendable (isolated ActorType) async -> T) async -> T {
104+
let executionContext = self.executionContext // Capture/retain the executionContext before suspending.
105+
return await withUnsafeContinuation { continuation in
106+
taskStreamContinuation.yield(ActorTask(executionContext: executionContext) { executionContext in
107+
continuation.resume(returning: await task(executionContext))
108+
})
99109
}
100110
}
101111

102112
/// Schedules an asynchronous throwing task and returns after the task is complete.
103113
/// The scheduled task will not execute until all prior tasks have completed or suspended.
104-
/// - Parameter task: The task to enqueue.
114+
/// - Parameter task: The task to enqueue. The task's parameter is a reference to the actor whose execution context has been adopted.
105115
/// - Returns: The value returned from the enqueued task.
106-
public func await<T>(_ task: @escaping @Sendable () async throws -> T) async throws -> T {
107-
try await withUnsafeThrowingContinuation { continuation in
108-
taskStreamContinuation.yield {
116+
public func await<T>(_ task: @escaping @Sendable (isolated ActorType) async throws -> T) async throws -> T {
117+
let executionContext = self.executionContext // Capture/retain the executionContext before suspending.
118+
return try await withUnsafeThrowingContinuation { continuation in
119+
taskStreamContinuation.yield(ActorTask(executionContext: executionContext) { executionContext in
109120
do {
110-
continuation.resume(returning: try await task())
121+
continuation.resume(returning: try await task(executionContext))
111122
} catch {
112123
continuation.resume(throwing: error)
113124
}
114-
}
125+
})
115126
}
116127
}
117128

118129
// MARK: Private
119130

120-
private let taskStreamContinuation: AsyncStream<@Sendable () async -> Void>.Continuation
131+
private let taskStreamContinuation: AsyncStream<ActorTask>.Continuation
121132

122-
// MARK: - ActorExecutor
133+
/// The actor on whose isolated context our tasks run, force-unwrapped.
134+
/// Utilize this accessor to retrieve the weak execution context in order to avoid repeating the below comment.
135+
private var executionContext: ActorType {
136+
// Crashing here means that this queue is being sent tasks either before an execution context has been set, or
137+
// after the execution context has deallocated. An ActorQueue's execution context should be set in the adopted
138+
// actor's `init` method, and the ActorQueue should not exceed the lifecycle of the adopted actor.
139+
weakExecutionContext!
140+
}
141+
/// The actor on whose isolated context our tasks run.
142+
/// We must use`weak` here to avoid creating a retain cycle between the adopted actor and this actor queue.
143+
///
144+
/// We will assume this execution context always exists for the lifecycle of the queue because:
145+
/// 1. The lifecycle of any `ActorQueue` must not exceed the lifecycle of its adopted `actor`.
146+
/// 2. The adopted `actor` must set itself as the execution context for this queue within its `init` method.
147+
private weak var weakExecutionContext: ActorType?
148+
149+
private struct ActorTask {
150+
let executionContext: ActorType
151+
let task: @Sendable (isolated ActorType) async -> Void
152+
}
123153

124-
private actor ActorExecutor {
125-
func suspendUntilStarted(_ task: @escaping @Sendable () async -> Void) async {
126-
// Suspend the calling code until our enqueued task starts.
127-
await withUnsafeContinuation { continuation in
128-
// Utilize the serial (but not FIFO) Actor context to execute the task without requiring the calling method to wait for the task to complete.
129-
Task {
130-
// Force this task to execute within the ActorExecutor's context by accessing an ivar on the instance.
131-
// This works around a bug when compiling with Xcode 14.1: https://github.com/apple/swift/issues/62503
132-
_ = void
154+
}
133155

134-
// Signal that the task has started. As long as the `task` below interacts with another `actor` the order of execution is guaranteed.
135-
continuation.resume()
136-
await task()
137-
}
156+
extension Actor {
157+
func suspendUntilStarted(_ task: @escaping @Sendable (isolated Self) async -> Void) async {
158+
// Suspend the calling code until our enqueued task starts.
159+
await withUnsafeContinuation { continuation in
160+
// Utilize the serial (but not FIFO) Actor context to execute the task without requiring the calling method to wait for the task to complete.
161+
Task {
162+
// Signal that the task has started. Since this `task` is executing on the current actor's execution context, the order of execution is guaranteed.
163+
continuation.resume()
164+
await task(self)
138165
}
139166
}
140-
141-
private let void: Void = ()
142167
}
143-
144168
}

0 commit comments

Comments
 (0)