Skip to content

[HTTPConnectionPool] Fix timer action races #434

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Sep 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
207 changes: 134 additions & 73 deletions Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool.swift
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,17 @@ protocol HTTPConnectionPoolDelegate {
final class HTTPConnectionPool {
private let stateLock = Lock()
private var _state: StateMachine
/// The connection idle timeout timers. Protected by the stateLock
private var _idleTimer = [Connection.ID: Scheduled<Void>]()
/// The connection backoff timeout timers. Protected by the stateLock
private var _backoffTimer = [Connection.ID: Scheduled<Void>]()

private static let fallbackConnectTimeout: TimeAmount = .seconds(30)

let key: ConnectionPool.Key

private let timerLock = Lock()
private var _requestTimer = [Request.ID: Scheduled<Void>]()
private var _idleTimer = [Connection.ID: Scheduled<Void>]()
private var _backoffTimer = [Connection.ID: Scheduled<Void>]()

private var logger: Logger

Expand Down Expand Up @@ -75,32 +77,91 @@ final class HTTPConnectionPool {
}

func executeRequest(_ request: HTTPSchedulableRequest) {
let action = self.stateLock.withLock { () -> StateMachine.Action in
self._state.executeRequest(.init(request))
}
self.run(action: action)
self.modifyStateAndRunActions { $0.executeRequest(.init(request)) }
}

func shutdown() {
let action = self.stateLock.withLock { () -> StateMachine.Action in
self._state.shutdown()
self.modifyStateAndRunActions { $0.shutdown() }
}

// MARK: - Private Methods -

// MARK: Actions

/// An `HTTPConnectionPool` internal action type that matches the `StateMachine`'s action.
/// However it splits up the actions into actions that need to be executed inside the `stateLock`
/// and outside the `stateLock`.
private struct Actions {
enum ConnectionAction {
enum Unlocked {
case createConnection(Connection.ID, on: EventLoop)
case closeConnection(Connection, isShutdown: StateMachine.ConnectionAction.IsShutdown)
case cleanupConnections(CleanupContext, isShutdown: StateMachine.ConnectionAction.IsShutdown)
case none
}

enum Locked {
case scheduleBackoffTimer(Connection.ID, backoff: TimeAmount, on: EventLoop)
case cancelBackoffTimers([Connection.ID])
case scheduleTimeoutTimer(Connection.ID, on: EventLoop)
case cancelTimeoutTimer(Connection.ID)
case none
}
}

struct Locked {
var connection: ConnectionAction.Locked
}

struct Unlocked {
var connection: ConnectionAction.Unlocked
var request: StateMachine.RequestAction
}

var locked: Locked
var unlocked: Unlocked

init(from stateMachineAction: StateMachine.Action) {
self.locked = Locked(connection: .none)
self.unlocked = Unlocked(connection: .none, request: stateMachineAction.request)

switch stateMachineAction.connection {
case .createConnection(let connectionID, on: let eventLoop):
self.unlocked.connection = .createConnection(connectionID, on: eventLoop)
case .scheduleBackoffTimer(let connectionID, backoff: let backoff, on: let eventLoop):
self.locked.connection = .scheduleBackoffTimer(connectionID, backoff: backoff, on: eventLoop)
case .scheduleTimeoutTimer(let connectionID, on: let eventLoop):
self.locked.connection = .scheduleTimeoutTimer(connectionID, on: eventLoop)
case .cancelTimeoutTimer(let connectionID):
self.locked.connection = .cancelTimeoutTimer(connectionID)
case .closeConnection(let connection, isShutdown: let isShutdown):
self.unlocked.connection = .closeConnection(connection, isShutdown: isShutdown)
case .cleanupConnections(var cleanupContext, isShutdown: let isShutdown):
//
self.locked.connection = .cancelBackoffTimers(cleanupContext.connectBackoff)
cleanupContext.connectBackoff = []
self.unlocked.connection = .cleanupConnections(cleanupContext, isShutdown: isShutdown)
case .none:
break
}
}
self.run(action: action)
}

// MARK: Run actions

private func run(action: StateMachine.Action) {
self.runConnectionAction(action.connection)
self.runRequestAction(action.request)
private func modifyStateAndRunActions(_ closure: (inout StateMachine) -> StateMachine.Action) {
let unlockedActions = self.stateLock.withLock { () -> Actions.Unlocked in
let stateMachineAction = closure(&self._state)
let poolAction = Actions(from: stateMachineAction)
self.runLockedActions(poolAction.locked)
return poolAction.unlocked
}
self.runUnlockedActions(unlockedActions)
}

private func runConnectionAction(_ action: StateMachine.ConnectionAction) {
switch action {
case .createConnection(let connectionID, let eventLoop):
self.createConnection(connectionID, on: eventLoop)

case .scheduleBackoffTimer(let connectionID, let backoff, on: let eventLoop):
private func runLockedActions(_ actions: Actions.Locked) {
switch actions.connection {
case .scheduleBackoffTimer(let connectionID, backoff: let backoff, on: let eventLoop):
self.scheduleConnectionStartBackoffTimer(connectionID, backoff, on: eventLoop)

case .scheduleTimeoutTimer(let connectionID, on: let eventLoop):
Expand All @@ -109,6 +170,26 @@ final class HTTPConnectionPool {
case .cancelTimeoutTimer(let connectionID):
self.cancelIdleTimerForConnection(connectionID)

case .cancelBackoffTimers(let connectionIDs):
for connectionID in connectionIDs {
self.cancelConnectionStartBackoffTimer(connectionID)
}

case .none:
break
}
}

private func runUnlockedActions(_ actions: Actions.Unlocked) {
self.runUnlockedConnectionAction(actions.connection)
self.runUnlockedRequestAction(actions.request)
}

private func runUnlockedConnectionAction(_ action: Actions.ConnectionAction.Unlocked) {
switch action {
case .createConnection(let connectionID, let eventLoop):
self.createConnection(connectionID, on: eventLoop)

case .closeConnection(let connection, isShutdown: let isShutdown):
self.logger.trace("close connection", metadata: [
"ahc-connection-id": "\(connection.id)",
Expand Down Expand Up @@ -143,7 +224,7 @@ final class HTTPConnectionPool {
}
}

private func runRequestAction(_ action: StateMachine.RequestAction) {
private func runUnlockedRequestAction(_ action: StateMachine.RequestAction) {
// The order of execution fail/execute request vs cancelling the request timeout timer does
// not matter in the actions here. The actions don't cause any side effects that will be
// reported back to the state machine and are not dependent on each other.
Expand Down Expand Up @@ -215,11 +296,9 @@ final class HTTPConnectionPool {
guard timeoutFired else { return }

// 3. Tell the state machine about the timeout
let action = self.stateLock.withLock {
self._state.timeoutRequest(requestID)
self.modifyStateAndRunActions {
$0.timeoutRequest(requestID)
}

self.run(action: action)
}

self.timerLock.withLockVoid {
Expand Down Expand Up @@ -254,34 +333,27 @@ final class HTTPConnectionPool {
let scheduled = eventLoop.scheduleTask(in: self.idleConnectionTimeout) {
// there might be a race between a cancelTimer call and the triggering
// of this scheduled task. both want to acquire the lock
let timerExisted = self.timerLock.withLock {
self._idleTimer.removeValue(forKey: connectionID) != nil
self.modifyStateAndRunActions { stateMachine in
if self._idleTimer.removeValue(forKey: connectionID) != nil {
// The timer still exists. State Machines assumes it is alive
return stateMachine.connectionIdleTimeout(connectionID)
}
return .none
}

guard timerExisted else { return }

let action = self.stateLock.withLock {
self._state.connectionIdleTimeout(connectionID)
}
self.run(action: action)
}

self.timerLock.withLock {
assert(self._idleTimer[connectionID] == nil)
self._idleTimer[connectionID] = scheduled
}
assert(self._idleTimer[connectionID] == nil)
self._idleTimer[connectionID] = scheduled
}

private func cancelIdleTimerForConnection(_ connectionID: Connection.ID) {
self.logger.trace("Cancel idle connection timeout timer", metadata: [
"ahc-connection-id": "\(connectionID)",
])

let cancelTimer = self.timerLock.withLock {
self._idleTimer.removeValue(forKey: connectionID)
guard let cancelTimer = self._idleTimer.removeValue(forKey: connectionID) else {
preconditionFailure("Expected to have an idle timer for connection \(connectionID) at this point.")
}

cancelTimer?.cancel()
cancelTimer.cancel()
}

private func scheduleConnectionStartBackoffTimer(
Expand All @@ -295,30 +367,24 @@ final class HTTPConnectionPool {

let scheduled = eventLoop.scheduleTask(in: timeAmount) {
// there might be a race between a backoffTimer and the pool shutting down.
let timerExisted = self.timerLock.withLock {
self._backoffTimer.removeValue(forKey: connectionID) != nil
}

guard timerExisted else { return }

let action = self.stateLock.withLock {
self._state.connectionCreationBackoffDone(connectionID)
self.modifyStateAndRunActions { stateMachine in
if self._backoffTimer.removeValue(forKey: connectionID) != nil {
// The timer still exists. State Machines assumes it is alive
return stateMachine.connectionCreationBackoffDone(connectionID)
}
return .none
}
self.run(action: action)
}

self.timerLock.withLock {
assert(self._backoffTimer[connectionID] == nil)
self._backoffTimer[connectionID] = scheduled
}
assert(self._backoffTimer[connectionID] == nil)
self._backoffTimer[connectionID] = scheduled
}

private func cancelConnectionStartBackoffTimer(_ connectionID: Connection.ID) {
let backoffTimer = self.timerLock.withLock {
self._backoffTimer[connectionID]
guard let backoffTimer = self._backoffTimer.removeValue(forKey: connectionID) else {
preconditionFailure("Expected to have a backoff timer for connection \(connectionID) at this point.")
}

backoffTimer?.cancel()
backoffTimer.cancel()
}
}

Expand All @@ -330,10 +396,9 @@ extension HTTPConnectionPool: HTTPConnectionRequester {
"ahc-connection-id": "\(connection.id)",
"ahc-http-version": "http/1.1",
])
let action = self.stateLock.withLock {
self._state.newHTTP1ConnectionCreated(.http1_1(connection))
self.modifyStateAndRunActions {
$0.newHTTP1ConnectionCreated(.http1_1(connection))
}
self.run(action: action)
}

func http2ConnectionCreated(_ connection: HTTP2Connection, maximumStreams: Int) {
Expand All @@ -356,10 +421,9 @@ extension HTTPConnectionPool: HTTPConnectionRequester {
"ahc-error": "\(error)",
"ahc-connection-id": "\(connectionID)",
])
let action = self.stateLock.withLock {
self._state.failedToCreateNewConnection(error, connectionID: connectionID)
self.modifyStateAndRunActions {
$0.failedToCreateNewConnection(error, connectionID: connectionID)
}
self.run(action: action)
}
}

Expand All @@ -369,21 +433,19 @@ extension HTTPConnectionPool: HTTP1ConnectionDelegate {
"ahc-connection-id": "\(connection.id)",
"ahc-http-version": "http/1.1",
])
let action = self.stateLock.withLock {
self._state.connectionClosed(connection.id)
self.modifyStateAndRunActions {
$0.connectionClosed(connection.id)
}
self.run(action: action)
}

func http1ConnectionReleased(_ connection: HTTP1Connection) {
self.logger.trace("releasing connection", metadata: [
"ahc-connection-id": "\(connection.id)",
"ahc-http-version": "http/1.1",
])
let action = self.stateLock.withLock {
self._state.http1ConnectionReleased(connection.id)
self.modifyStateAndRunActions {
$0.http1ConnectionReleased(connection.id)
}
self.run(action: action)
}
}

Expand Down Expand Up @@ -416,10 +478,9 @@ extension HTTPConnectionPool: HTTP2ConnectionDelegate {
extension HTTPConnectionPool: HTTPRequestScheduler {
func cancelRequest(_ request: HTTPSchedulableRequest) {
let requestID = Request(request).id
let action = self.stateLock.withLock {
self._state.cancelRequest(requestID)
self.modifyStateAndRunActions {
$0.cancelRequest(requestID)
}
self.run(action: action)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ extension HTTPConnectionPoolTests {
("testConnectionCreationIsRetriedUntilPoolIsShutdown", testConnectionCreationIsRetriedUntilPoolIsShutdown),
("testConnectionCreationIsRetriedUntilRequestIsCancelled", testConnectionCreationIsRetriedUntilRequestIsCancelled),
("testConnectionShutdownIsCalledOnActiveConnections", testConnectionShutdownIsCalledOnActiveConnections),
("testConnectionPoolStressResistanceHTTP1", testConnectionPoolStressResistanceHTTP1),
]
}
}
Loading