-
Notifications
You must be signed in to change notification settings - Fork 26
/
AsyncPassthroughSubject.swift
126 lines (109 loc) · 3.33 KB
/
AsyncPassthroughSubject.swift
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
//
// AsyncPassthroughSubject.swift
//
//
// Created by Thibault Wittemberg on 07/01/2022.
//
/// An `AsyncPassthroughSubject` is an async sequence in which one can send values over time.
/// When the `AsyncPassthroughSubject` is terminated, new consumers will
/// immediately resume with this termination.
///
/// ```
/// let passthrough = AsyncPassthroughSubject<Int>()
///
/// Task {
/// for await element in passthrough {
/// print(element) // will print 1 2
/// }
/// }
///
/// Task {
/// for await element in passthrough {
/// print(element) // will print 1 2
/// }
/// }
///
/// ... later in the application flow
///
/// passthrough.send(1)
/// passthrough.send(2)
/// passthrough.send(.finished)
/// ```
public final class AsyncPassthroughSubject<Element>: AsyncSubject {
public typealias Element = Element
public typealias Failure = Never
public typealias AsyncIterator = Iterator
struct State {
var terminalState: Termination<Never>?
var channels: [Int: AsyncBufferedChannel<Element>]
var ids: Int
}
let state: ManagedCriticalState<State>
public init() {
self.state = ManagedCriticalState(
State(terminalState: nil, channels: [:], ids: 0)
)
}
/// Sends a value to all consumers
/// - Parameter element: the value to send
public func send(_ element: Element) {
self.state.withCriticalRegion { state in
for channel in state.channels.values {
channel.send(element)
}
}
}
/// Finishes the subject with a normal ending.
/// - Parameter termination: The termination to finish the subject
public func send(_ termination: Termination<Failure>) {
self.state.withCriticalRegion { state in
state.terminalState = termination
let channels = Array(state.channels.values)
state.channels.removeAll()
for channel in channels {
channel.finish()
}
}
}
func handleNewConsumer() -> (iterator: AsyncBufferedChannel<Element>.Iterator, unregister: @Sendable () -> Void) {
let asyncBufferedChannel = AsyncBufferedChannel<Element>()
let terminalState = self.state.withCriticalRegion { state in
state.terminalState
}
if let terminalState = terminalState, terminalState.isFinished {
asyncBufferedChannel.finish()
return (asyncBufferedChannel.makeAsyncIterator(), {})
}
let consumerId = self.state.withCriticalRegion { state -> Int in
state.ids += 1
state.channels[state.ids] = asyncBufferedChannel
return state.ids
}
let unregister = { @Sendable [state] in
state.withCriticalRegion { state in
state.channels[consumerId] = nil
}
}
return (asyncBufferedChannel.makeAsyncIterator(), unregister)
}
public func makeAsyncIterator() -> AsyncIterator {
Iterator(asyncSubject: self)
}
public struct Iterator: AsyncSubjectIterator {
var iterator: AsyncBufferedChannel<Element>.Iterator
let unregister: @Sendable () -> Void
init(asyncSubject: AsyncPassthroughSubject) {
(self.iterator, self.unregister) = asyncSubject.handleNewConsumer()
}
public var hasBufferedElements: Bool {
self.iterator.hasBufferedElements
}
public mutating func next() async -> Element? {
await withTaskCancellationHandler {
await self.iterator.next()
} onCancel: { [unregister] in
unregister()
}
}
}
}