@@ -18,34 +18,47 @@ final class BoundedBufferStorage<Base: AsyncSequence>: Sendable where Base: Send
1818
1919 func next( ) async -> Result < Base . Element , Error > ? {
2020 return await withTaskCancellationHandler {
21- let ( shouldSuspend , result ) = self . stateMachine. withCriticalRegion { stateMachine -> ( Bool , Result < Base . Element , Error > ? ) in
21+ let action : BoundedBufferStateMachine < Base > . NextAction ? = self . stateMachine. withCriticalRegion { stateMachine in
2222 let action = stateMachine. next ( )
2323 switch action {
2424 case . startTask( let base) :
2525 self . startTask ( stateMachine: & stateMachine, base: base)
26- return ( true , nil )
26+ return nil
27+
2728 case . suspend:
28- return ( true , nil )
29- case . returnResult( let producerContinuation, let result) :
30- producerContinuation? . resume ( )
31- return ( false , result)
29+ return action
30+ case . returnResult:
31+ return action
3232 }
3333 }
3434
35- if !shouldSuspend {
36- return result
35+ switch action {
36+ case . startTask:
37+ // We are handling the startTask in the lock already because we want to avoid
38+ // other inputs interleaving while starting the task
39+ fatalError ( " Internal inconsistency " )
40+
41+ case . suspend:
42+ break
43+
44+ case . returnResult( let producerContinuation, let result) :
45+ producerContinuation? . resume ( )
46+ return result
47+
48+ case . none:
49+ break
3750 }
3851
3952 return await withUnsafeContinuation { ( continuation: UnsafeContinuation < Result < Base . Element , Error > ? , Never > ) in
40- self . stateMachine. withCriticalRegion { stateMachine in
41- let action = stateMachine. nextSuspended ( continuation: continuation)
42- switch action {
43- case . none :
44- break
45- case . returnResult ( let producerContinuation , let result ) :
46- producerContinuation ? . resume ( )
47- continuation . resume ( returning : result )
48- }
53+ let action = self . stateMachine. withCriticalRegion { stateMachine in
54+ stateMachine. nextSuspended ( continuation: continuation)
55+ }
56+ switch action {
57+ case . none :
58+ break
59+ case . returnResult ( let producerContinuation , let result ) :
60+ producerContinuation ? . resume ( )
61+ continuation . resume ( returning : result )
4962 }
5063 }
5164 } onCancel: {
@@ -68,15 +81,15 @@ final class BoundedBufferStorage<Base: AsyncSequence>: Sendable where Base: Send
6881
6982 if shouldSuspend {
7083 await withUnsafeContinuation { ( continuation: UnsafeContinuation < Void , Never > ) in
71- self . stateMachine. withCriticalRegion { stateMachine in
72- let action = stateMachine. producerSuspended ( continuation: continuation)
73-
74- switch action {
75- case . none :
76- break
77- case . resumeProducer :
78- continuation . resume ( )
79- }
84+ let action = self . stateMachine. withCriticalRegion { stateMachine in
85+ stateMachine. producerSuspended ( continuation: continuation)
86+ }
87+
88+ switch action {
89+ case . none :
90+ break
91+ case . resumeProducer :
92+ continuation . resume ( )
8093 }
8194 }
8295 }
@@ -86,35 +99,35 @@ final class BoundedBufferStorage<Base: AsyncSequence>: Sendable where Base: Send
8699 break loop
87100 }
88101
89- self . stateMachine. withCriticalRegion { stateMachine in
90- let action = stateMachine. elementProduced ( element: element)
91- switch action {
92- case . none:
93- break
94- case . resumeConsumer( let continuation, let result) :
95- continuation. resume ( returning: result)
96- }
102+ let action = self . stateMachine. withCriticalRegion { stateMachine in
103+ stateMachine. elementProduced ( element: element)
97104 }
98- }
99-
100- self . stateMachine. withCriticalRegion { stateMachine in
101- let action = stateMachine. finish ( error: nil )
102105 switch action {
103106 case . none:
104107 break
105- case . resumeConsumer( let continuation) :
106- continuation? . resume ( returning: nil )
108+ case . resumeConsumer( let continuation, let result ) :
109+ continuation. resume ( returning: result )
107110 }
108111 }
112+
113+ let action = self . stateMachine. withCriticalRegion { stateMachine in
114+ stateMachine. finish ( error: nil )
115+ }
116+ switch action {
117+ case . none:
118+ break
119+ case . resumeConsumer( let continuation) :
120+ continuation? . resume ( returning: nil )
121+ }
109122 } catch {
110- self . stateMachine. withCriticalRegion { stateMachine in
111- let action = stateMachine. finish ( error: error)
112- switch action {
113- case . none :
114- break
115- case . resumeConsumer ( let continuation ) :
116- continuation ? . resume ( returning : . failure ( error ) )
117- }
123+ let action = self . stateMachine. withCriticalRegion { stateMachine in
124+ stateMachine. finish ( error: error)
125+ }
126+ switch action {
127+ case . none :
128+ break
129+ case . resumeConsumer ( let continuation ) :
130+ continuation ? . resume ( returning : . failure ( error ) )
118131 }
119132 }
120133 }
@@ -123,16 +136,16 @@ final class BoundedBufferStorage<Base: AsyncSequence>: Sendable where Base: Send
123136 }
124137
125138 func interrupted( ) {
126- self . stateMachine. withCriticalRegion { stateMachine in
127- let action = stateMachine. interrupted ( )
128- switch action {
129- case . none :
130- break
131- case . resumeProducerAndConsumer ( let task , let producerContinuation , let consumerContinuation ) :
132- task . cancel ( )
133- producerContinuation ? . resume ( )
134- consumerContinuation ? . resume ( returning : nil )
135- }
139+ let action = self . stateMachine. withCriticalRegion { stateMachine in
140+ stateMachine. interrupted ( )
141+ }
142+ switch action {
143+ case . none :
144+ break
145+ case . resumeProducerAndConsumer ( let task , let producerContinuation , let consumerContinuation ) :
146+ task . cancel ( )
147+ producerContinuation ? . resume ( )
148+ consumerContinuation ? . resume ( returning : nil )
136149 }
137150 }
138151
0 commit comments