@@ -17,6 +17,7 @@ import OpenAPIRuntime
17
17
import HTTPTypes
18
18
import Vapor
19
19
import NIOFoundationCompat
20
+ import Atomics
20
21
21
22
public final class VaporTransport {
22
23
@@ -58,6 +59,7 @@ enum VaporTransportError: Error {
58
59
case unsupportedHTTPMethod( String )
59
60
case duplicatePathParameter( [ String ] )
60
61
case missingRequiredPathParameter( String )
62
+ case multipleBodyIteration
61
63
}
62
64
63
65
extension [ Vapor . PathComponent ] {
@@ -143,21 +145,24 @@ extension Vapor.Response.Body {
143
145
self = . empty
144
146
return
145
147
}
148
+ let iterated = ManagedAtomic ( false )
146
149
let stream : @Sendable ( any Vapor . BodyStreamWriter ) -> ( ) = { writer in
150
+ guard iterated. compareExchange (
151
+ expected: false ,
152
+ desired: true ,
153
+ ordering: . relaxed
154
+ ) . exchanged else {
155
+ _ = writer. write ( . error( VaporTransportError . multipleBodyIteration) )
156
+ return
157
+ }
147
158
_ = writer. eventLoop. makeFutureWithTask {
148
159
do {
149
160
for try await chunk in body {
150
- try await writer. eventLoop. flatSubmit {
151
- writer. write ( . buffer( ByteBuffer ( bytes: chunk) ) )
152
- } . get ( )
161
+ try await writer. write ( . buffer( ByteBuffer ( bytes: chunk) ) ) . get ( )
153
162
}
154
- try await writer. eventLoop. flatSubmit {
155
- writer. write ( . end)
156
- } . get ( )
163
+ try await writer. write ( . end) . get ( )
157
164
} catch {
158
- try await writer. eventLoop. flatSubmit {
159
- writer. write ( . error( error) )
160
- } . get ( )
165
+ try await writer. write ( . error( error) ) . get ( )
161
166
}
162
167
}
163
168
}
0 commit comments