Skip to content

Commit 02d3b9d

Browse files
committed
Create AsyncQueue
1 parent 10aad91 commit 02d3b9d

File tree

2 files changed

+207
-3
lines changed

2 files changed

+207
-3
lines changed

Sources/AsyncQueue/AsyncQueue.swift

Lines changed: 64 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,4 +20,67 @@
2020
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
2121
// SOFTWARE.
2222

23-
final class AsyncQueue {}
23+
/// A queue that enables ordered sending of events from synchronous to asynchronous code
24+
public final class AsyncQueue: Sendable {
25+
26+
// MARK: Initialization
27+
28+
/// Instantiates an asynchronous queue.
29+
/// - Parameter priority: The priority of the tasks added to the asynchronous queue.
30+
public init(priority: TaskPriority = .medium) {
31+
var capturedTaskStreamContinuation: AsyncStream<@Sendable () async -> Void>.Continuation? = nil
32+
let taskStream = AsyncStream<@Sendable () async -> Void> { continuation in
33+
capturedTaskStreamContinuation = continuation
34+
}
35+
taskStreamContinuation = capturedTaskStreamContinuation
36+
37+
streamTask = Task.detached(priority: priority) {
38+
for await task in taskStream {
39+
await task()
40+
}
41+
}
42+
}
43+
44+
deinit {
45+
taskStreamContinuation.finish()
46+
}
47+
48+
// MARK: Public
49+
50+
/// Schedules an asynchronous task for execution that will not execute until all prior tasks have completed and immediately returns.
51+
/// - Parameter task: The task to enqueue.
52+
public func async(_ task: @escaping @Sendable () async -> Void) {
53+
taskStreamContinuation.yield(task)
54+
}
55+
56+
/// Appends an asynchronous throwing task to the queue that will not execute until all prior tasks have completed and returns after the task is complete.
57+
/// - Parameter task: The task to enqueue.
58+
/// - Returns: The value returned from the enqueued task.
59+
public func await<T>(_ task: @escaping @Sendable () async -> T) async -> T {
60+
await withUnsafeContinuation { continuation in
61+
taskStreamContinuation.yield {
62+
continuation.resume(returning: await task())
63+
}
64+
}
65+
}
66+
67+
/// Appends an asynchronous throwing task to the queue that will not execute until all prior tasks have completed and returns after the task is complete.
68+
/// - Parameter task: The task to enqueue.
69+
/// - Returns: The value returned from the enqueued task.
70+
public func await<T>(_ task: @escaping @Sendable () async throws -> T) async throws -> T {
71+
try await withUnsafeThrowingContinuation { continuation in
72+
taskStreamContinuation.yield {
73+
do {
74+
continuation.resume(returning: try await task())
75+
} catch {
76+
continuation.resume(throwing: error)
77+
}
78+
}
79+
}
80+
}
81+
82+
// MARK: Private
83+
84+
private let streamTask: Task<Void, Never>
85+
private let taskStreamContinuation: AsyncStream<@Sendable () async -> Void>.Continuation!
86+
}

Tests/AsyncQueueTests/AsyncQueueTests.swift

Lines changed: 143 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,149 @@ import XCTest
2626

2727
final class AsyncQueueTests: XCTestCase {
2828

29-
func test_example() {
30-
_ = AsyncQueue()
29+
// MARK: XCTestCase
30+
31+
override func setUp() async throws {
32+
try await super.setUp()
33+
34+
systemUnderTest = AsyncQueue()
35+
}
36+
37+
override func tearDown() async throws {
38+
try await super.tearDown()
39+
40+
await systemUnderTest.await { /* Drain the queue */ }
41+
}
42+
43+
// MARK: Behavior Tests
44+
45+
func test_async_sendsEventsInOrder() {
46+
let counter = Counter()
47+
for iteration in 1...1_000 {
48+
systemUnderTest.async {
49+
await counter.incrementAndExpectCount(equals: iteration)
50+
}
51+
}
52+
}
53+
54+
func test_async_executesAsyncBlocksSeriallyAndAtomically() {
55+
let unsafeCounter = UnsafeCounter()
56+
for iteration in 1...1_000 {
57+
systemUnderTest.async {
58+
unsafeCounter.incrementAndExpectCount(equals: iteration)
59+
}
60+
}
61+
}
62+
63+
func test_async_isNotReentrant() async {
64+
let counter = Counter()
65+
await systemUnderTest.await { [systemUnderTest] in
66+
systemUnderTest.async {
67+
await counter.incrementAndExpectCount(equals: 2)
68+
}
69+
await counter.incrementAndExpectCount(equals: 1)
70+
systemUnderTest.async {
71+
await counter.incrementAndExpectCount(equals: 3)
72+
}
73+
}
74+
}
75+
76+
func test_async_retainsReceiverUntilFlushed() async {
77+
var systemUnderTest: AsyncQueue? = AsyncQueue()
78+
let counter = Counter()
79+
let expectation = self.expectation(description: #function)
80+
await withThrowingTaskGroup(of: Void.self) { taskGroup in
81+
let foreverSleep = Task {
82+
try await Task.sleep(nanoseconds: UInt64.max)
83+
}
84+
taskGroup.addTask {
85+
try await foreverSleep.value
86+
}
87+
systemUnderTest?.async {
88+
// Make the queue wait.
89+
try? await foreverSleep.value
90+
await counter.incrementAndExpectCount(equals: 1)
91+
}
92+
systemUnderTest?.async {
93+
// This async task should not execute until the sleep is cancelled.
94+
await counter.incrementAndExpectCount(equals: 2)
95+
expectation.fulfill()
96+
}
97+
// Nil out our reference to the queue to show that the enqueued tasks will still complete
98+
systemUnderTest = nil
99+
// Cancel the sleep timer to unlock the remaining enqueued tasks.
100+
foreverSleep.cancel()
101+
102+
await waitForExpectations(timeout: 1.0)
103+
}
104+
}
105+
106+
func test_await_sendsEventsInOrder() async {
107+
let counter = Counter()
108+
for iteration in 1...1_000 {
109+
systemUnderTest.async {
110+
await counter.incrementAndExpectCount(equals: iteration)
111+
}
112+
113+
guard iteration % 25 == 0 else {
114+
// Keep sending async events to the queue.
115+
continue
116+
}
117+
118+
await systemUnderTest.await {
119+
let count = await counter.count
120+
XCTAssertEqual(count, iteration)
121+
}
122+
}
123+
}
124+
125+
func test_await_canReturn() async {
126+
let expectedValue = UUID()
127+
let returnedValue = await systemUnderTest.await { expectedValue }
128+
XCTAssertEqual(expectedValue, returnedValue)
129+
}
130+
131+
func test_await_canThrow() async {
132+
struct TestError: Error, Equatable {
133+
private let identifier = UUID()
134+
}
135+
let expectedError = TestError()
136+
do {
137+
try await systemUnderTest.await { throw expectedError }
138+
} catch {
139+
XCTAssertEqual(error as? TestError, expectedError)
140+
}
141+
}
142+
143+
// MARK: Private
144+
145+
private var systemUnderTest = AsyncQueue()
146+
147+
// MARK: - Counter
148+
149+
private actor Counter {
150+
func incrementAndExpectCount(equals expectedCount: Int) {
151+
increment()
152+
XCTAssertEqual(expectedCount, count)
153+
}
154+
155+
func increment() {
156+
count += 1
157+
}
158+
159+
var count = 0
160+
}
161+
162+
// MARK: - UnsafeCounter
163+
164+
/// A counter that is explicitly not safe to utilize concurrently.
165+
private final class UnsafeCounter: @unchecked Sendable {
166+
func incrementAndExpectCount(equals expectedCount: Int) {
167+
count += 1
168+
XCTAssertEqual(expectedCount, count)
169+
}
170+
171+
var count = 0
31172
}
32173

33174
}

0 commit comments

Comments
 (0)