Skip to content

Commit c7b67e0

Browse files
committed
Prevent TaskHandler state change after .endOrError
Motivation: Right now if task handler encounters an error, it changes state to `.endOrError`. We gate on that state to make sure that we do not process errors in the pipeline twice. Unfortunately, that state can be reset when we upload body or receive response parts. Modifications: Adds state validation before state is updated to a new value Adds a test Result: Fixes #297
1 parent 8c99c41 commit c7b67e0

File tree

3 files changed

+71
-22
lines changed

3 files changed

+71
-22
lines changed

Sources/AsyncHTTPClient/HTTPHandler.swift

Lines changed: 26 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -840,15 +840,22 @@ extension TaskHandler: ChannelDuplexHandler {
840840
self.writeBody(request: request, context: context)
841841
}.flatMap {
842842
context.eventLoop.assertInEventLoop()
843+
if case .endOrError = self.state {
844+
return context.eventLoop.makeSucceededFuture(())
845+
}
846+
843847
self.state = .bodySent
844848
if let expectedBodyLength = self.expectedBodyLength, expectedBodyLength != self.actualBodyLength {
845849
let error = HTTPClientError.bodyLengthMismatch
846-
self.errorCaught(context: context, error: error)
847850
return context.eventLoop.makeFailedFuture(error)
848851
}
849852
return context.writeAndFlush(self.wrapOutboundOut(.end(nil)))
850853
}.map {
851854
context.eventLoop.assertInEventLoop()
855+
if case .endOrError = self.state {
856+
return
857+
}
858+
852859
self.state = .sent
853860
self.callOutToDelegateFireAndForget(self.delegate.didSendRequest)
854861
}.flatMapErrorThrowing { error in
@@ -924,31 +931,28 @@ extension TaskHandler: ChannelDuplexHandler {
924931
let response = self.unwrapInboundIn(data)
925932
switch response {
926933
case .head(let head):
927-
switch self.state {
928-
case .endOrError:
929-
preconditionFailure("unexpected state on .head")
930-
default:
931-
if !head.isKeepAlive {
932-
self.closing = true
933-
}
934+
if case .endOrError = self.state {
935+
return
936+
}
934937

935-
if let redirectURL = self.redirectHandler?.redirectTarget(status: head.status, headers: head.headers) {
936-
self.state = .redirected(head, redirectURL)
937-
} else {
938-
self.state = .head
939-
self.mayRead = false
940-
self.callOutToDelegate(value: head, channelEventLoop: context.eventLoop, self.delegate.didReceiveHead)
941-
.whenComplete { result in
942-
self.handleBackpressureResult(context: context, result: result)
943-
}
944-
}
938+
if !head.isKeepAlive {
939+
self.closing = true
940+
}
941+
942+
if let redirectURL = self.redirectHandler?.redirectTarget(status: head.status, headers: head.headers) {
943+
self.state = .redirected(head, redirectURL)
944+
} else {
945+
self.state = .head
946+
self.mayRead = false
947+
self.callOutToDelegate(value: head, channelEventLoop: context.eventLoop, self.delegate.didReceiveHead)
948+
.whenComplete { result in
949+
self.handleBackpressureResult(context: context, result: result)
950+
}
945951
}
946952
case .body(let body):
947953
switch self.state {
948-
case .redirected:
954+
case .redirected, .endOrError:
949955
break
950-
case .endOrError:
951-
preconditionFailure("unexpected state on .body")
952956
default:
953957
self.state = .body
954958
self.mayRead = false
@@ -960,7 +964,7 @@ extension TaskHandler: ChannelDuplexHandler {
960964
case .end:
961965
switch self.state {
962966
case .endOrError:
963-
preconditionFailure("unexpected state on .end")
967+
break
964968
case .redirected(let head, let redirectURL):
965969
self.state = .endOrError
966970
self.task.releaseAssociatedConnection(delegateType: Delegate.self, closing: self.closing).whenSuccess {

Tests/AsyncHTTPClientTests/HTTPClientInternalTests+XCTest.swift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ extension HTTPClientInternalTests {
4747
("testInternalRequestURI", testInternalRequestURI),
4848
("testBodyPartStreamStateChangedBeforeNotification", testBodyPartStreamStateChangedBeforeNotification),
4949
("testHandlerDoubleError", testHandlerDoubleError),
50+
("testTaskHandlerStateChangeAfterError", testTaskHandlerStateChangeAfterError),
5051
]
5152
}
5253
}

Tests/AsyncHTTPClientTests/HTTPClientInternalTests.swift

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1119,4 +1119,48 @@ class HTTPClientInternalTests: XCTestCase {
11191119

11201120
XCTAssertEqual(delegate.count, 1)
11211121
}
1122+
1123+
func testTaskHandlerStateChangeAfterError() throws {
1124+
let channel = EmbeddedChannel()
1125+
let task = Task<Void>(eventLoop: channel.eventLoop, logger: HTTPClient.loggingDisabled)
1126+
1127+
let handler = TaskHandler(task: task,
1128+
kind: .host,
1129+
delegate: TestHTTPDelegate(),
1130+
redirectHandler: nil,
1131+
ignoreUncleanSSLShutdown: false,
1132+
logger: HTTPClient.loggingDisabled)
1133+
1134+
try channel.pipeline.addHandler(handler).wait()
1135+
1136+
var request = try Request(url: "http://localhost:8080/get")
1137+
request.headers.add(name: "X-Test-Header", value: "X-Test-Value")
1138+
request.body = .stream(length: 4) { writer in
1139+
writer.write(.byteBuffer(channel.allocator.buffer(string: "1234"))).map {
1140+
handler.state = .endOrError
1141+
}
1142+
}
1143+
1144+
XCTAssertNoThrow(try channel.writeOutbound(request))
1145+
1146+
try channel.writeInbound(HTTPClientResponsePart.head(.init(version: .init(major: 1, minor: 1), status: .ok)))
1147+
XCTAssertTrue(handler.state.isEndOrError)
1148+
1149+
try channel.writeInbound(HTTPClientResponsePart.body(channel.allocator.buffer(string: "1234")))
1150+
XCTAssertTrue(handler.state.isEndOrError)
1151+
1152+
try channel.writeInbound(HTTPClientResponsePart.end(nil))
1153+
XCTAssertTrue(handler.state.isEndOrError)
1154+
}
1155+
}
1156+
1157+
extension TaskHandler.State {
1158+
var isEndOrError: Bool {
1159+
switch self {
1160+
case .endOrError:
1161+
return true
1162+
default:
1163+
return false
1164+
}
1165+
}
11221166
}

0 commit comments

Comments
 (0)