-
Notifications
You must be signed in to change notification settings - Fork 26
/
AsyncHandleEventsSequence.swift
160 lines (144 loc) · 4.69 KB
/
AsyncHandleEventsSequence.swift
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
//
// AsyncSequence+HandleEvents.swift
//
//
// Created by Thibault Wittemberg on 31/12/2021.
//
public extension AsyncSequence {
/// Performs the specified closures when async sequences events occur.
///
/// ```
/// let sourceSequence = AsyncLazySequence([1, 2, 3, 4, 5])
/// let handledSequence = sourceSequence.handleEvents {
/// print("Begin looping")
/// } onElement: { element in
/// print("Element is \(element)")
/// } onCancel: {
/// print("Cancelled")
/// } onFinish: { termination in
/// print(termination)
/// }
///
/// for try await element in handledSequence {}
///
/// // will print:
/// // Begin looping
/// // Element is 1
/// // Element is 2
/// // Element is 3
/// // Element is 4
/// // Element is 5
/// // finished
/// ```
///
/// - Parameters:
/// - onStart: The operation to execute when the async sequence is first iterated.
/// - onElement: The operation to execute on each element.
/// - onCancel: The operation to execute when the task suppoerting the async sequence looping is cancelled.
/// - onFinish: The operation to execute when the async sequence looping is finished,
/// whether it is due to an error or a normal termination.
/// - Returns: The AsyncSequence that executes the `receiveElement` operation for each element of the source sequence.
func handleEvents(
onStart: (@Sendable () async -> Void)? = nil,
onElement: (@Sendable (Element) async -> Void)? = nil,
onCancel: (@Sendable () async -> Void)? = nil,
onFinish: (@Sendable (Termination<Error>) async -> Void)? = nil
) -> AsyncHandleEventsSequence<Self> {
AsyncHandleEventsSequence(
self,
onStart: onStart,
onElement: onElement,
onCancel: onCancel,
onFinish: onFinish
)
}
}
public struct AsyncHandleEventsSequence<Base: AsyncSequence>: AsyncSequence {
public typealias Element = Base.Element
public typealias AsyncIterator = Iterator
var base: Base
let onStart: (@Sendable () async -> Void)?
let onElement: (@Sendable (Base.Element) async -> Void)?
let onCancel: (@Sendable () async -> Void)?
let onFinish: (@Sendable (Termination<Error>) async -> Void)?
public init(
_ base: Base,
onStart: (@Sendable () async -> Void)?,
onElement: (@Sendable (Base.Element) async -> Void)?,
onCancel: (@Sendable () async -> Void)?,
onFinish: (@Sendable (Termination<Error>) async -> Void)?
) {
self.base = base
self.onStart = onStart
self.onElement = onElement
self.onCancel = onCancel
self.onFinish = onFinish
}
public func makeAsyncIterator() -> AsyncIterator {
Iterator(
base: self.base.makeAsyncIterator(),
onStart: self.onStart,
onElement: self.onElement,
onCancel: self.onCancel,
onFinish: self.onFinish
)
}
public struct Iterator: AsyncIteratorProtocol {
var base: Base.AsyncIterator
let onStart: (@Sendable () async -> Void)?
let onElement: (@Sendable (Base.Element) async -> Void)?
let onCancel: (@Sendable () async -> Void)?
let onFinish: (@Sendable (Termination<Error>) async -> Void)?
let onStartExecuted = ManagedCriticalState<Bool>(false)
public init(
base: Base.AsyncIterator,
onStart: (@Sendable () async -> Void)?,
onElement: (@Sendable (Base.Element) async -> Void)?,
onCancel: (@Sendable () async -> Void)?,
onFinish: (@Sendable (Termination<Error>) async -> Void)?
) {
self.base = base
self.onStart = onStart
self.onElement = onElement
self.onCancel = onCancel
self.onFinish = onFinish
}
public mutating func next() async rethrows -> Element? {
guard !Task.isCancelled else {
await self.onCancel?()
return nil
}
let shouldCallOnStart = self.onStartExecuted.withCriticalRegion { onStartExecuted -> Bool in
if !onStartExecuted {
onStartExecuted = true
return true
}
return false
}
if shouldCallOnStart {
await self.onStart?()
}
do {
let nextElement = try await self.base.next()
if let element = nextElement {
await self.onElement?(element)
} else {
if Task.isCancelled {
await self.onCancel?()
} else {
await self.onFinish?(.finished)
}
}
return nextElement
} catch let error as CancellationError {
await self.onCancel?()
throw error
} catch {
await self.onFinish?(.failure(error))
throw error
}
}
}
}
extension AsyncHandleEventsSequence: Sendable where Base: Sendable {}
extension AsyncHandleEventsSequence.Iterator: Sendable where Base.AsyncIterator: Sendable {}