Skip to content

Commit dffcd65

Browse files
committed
[ConnectionPool] Split up locked and unlocked actions
[HTTPConnectionPool] Add HTTP/1.1 stress test
1 parent d45fa9a commit dffcd65

File tree

3 files changed

+196
-73
lines changed

3 files changed

+196
-73
lines changed

Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool.swift

Lines changed: 132 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,17 @@ protocol HTTPConnectionPoolDelegate {
2424
final class HTTPConnectionPool {
2525
private let stateLock = Lock()
2626
private var _state: StateMachine
27+
/// The connection idle timeout timers. Protected by the stateLock
28+
private var _idleTimer = [Connection.ID: Scheduled<Void>]()
29+
/// The connection backoff timeout timers. Protected by the stateLock
30+
private var _backoffTimer = [Connection.ID: Scheduled<Void>]()
2731

2832
private static let fallbackConnectTimeout: TimeAmount = .seconds(30)
2933

3034
let key: ConnectionPool.Key
3135

3236
private let timerLock = Lock()
3337
private var _requestTimer = [Request.ID: Scheduled<Void>]()
34-
private var _idleTimer = [Connection.ID: Scheduled<Void>]()
35-
private var _backoffTimer = [Connection.ID: Scheduled<Void>]()
3638

3739
private var logger: Logger
3840

@@ -75,32 +77,89 @@ final class HTTPConnectionPool {
7577
}
7678

7779
func executeRequest(_ request: HTTPSchedulableRequest) {
78-
let action = self.stateLock.withLock { () -> StateMachine.Action in
79-
self._state.executeRequest(.init(request))
80-
}
81-
self.run(action: action)
80+
self.modifyStateAndRunActions { $0.executeRequest(.init(request)) }
8281
}
8382

8483
func shutdown() {
85-
let action = self.stateLock.withLock { () -> StateMachine.Action in
86-
self._state.shutdown()
84+
self.modifyStateAndRunActions { $0.shutdown() }
85+
}
86+
87+
// MARK: - Private Methods -
88+
89+
// MARK: Actions
90+
91+
///
92+
private struct Actions {
93+
enum ConnectionAction {
94+
enum Unlocked {
95+
case createConnection(Connection.ID, on: EventLoop)
96+
case closeConnection(Connection, isShutdown: StateMachine.ConnectionAction.IsShutdown)
97+
case cleanupConnections(CleanupContext, isShutdown: StateMachine.ConnectionAction.IsShutdown)
98+
case none
99+
}
100+
101+
enum Locked {
102+
case scheduleBackoffTimer(Connection.ID, backoff: TimeAmount, on: EventLoop)
103+
case cancelBackoffTimers([Connection.ID])
104+
case scheduleTimeoutTimer(Connection.ID, on: EventLoop)
105+
case cancelTimeoutTimer(Connection.ID)
106+
case none
107+
}
108+
}
109+
110+
struct Locked {
111+
var connection: ConnectionAction.Locked
112+
}
113+
114+
struct Unlocked {
115+
var connection: ConnectionAction.Unlocked
116+
var request: StateMachine.RequestAction
117+
}
118+
119+
var locked: Locked
120+
var unlocked: Unlocked
121+
122+
init(from stateMachineAction: StateMachine.Action) {
123+
self.locked = Locked(connection: .none)
124+
self.unlocked = Unlocked(connection: .none, request: stateMachineAction.request)
125+
126+
switch stateMachineAction.connection {
127+
case .createConnection(let connectionID, on: let eventLoop):
128+
self.unlocked.connection = .createConnection(connectionID, on: eventLoop)
129+
case .scheduleBackoffTimer(let connectionID, backoff: let backoff, on: let eventLoop):
130+
self.locked.connection = .scheduleBackoffTimer(connectionID, backoff: backoff, on: eventLoop)
131+
case .scheduleTimeoutTimer(let connectionID, on: let eventLoop):
132+
self.locked.connection = .scheduleTimeoutTimer(connectionID, on: eventLoop)
133+
case .cancelTimeoutTimer(let connectionID):
134+
self.locked.connection = .cancelTimeoutTimer(connectionID)
135+
case .closeConnection(let connection, isShutdown: let isShutdown):
136+
self.unlocked.connection = .closeConnection(connection, isShutdown: isShutdown)
137+
case .cleanupConnections(var cleanupContext, isShutdown: let isShutdown):
138+
//
139+
self.locked.connection = .cancelBackoffTimers(cleanupContext.connectBackoff)
140+
cleanupContext.connectBackoff = []
141+
self.unlocked.connection = .cleanupConnections(cleanupContext, isShutdown: isShutdown)
142+
case .none:
143+
break
144+
}
87145
}
88-
self.run(action: action)
89146
}
90147

91148
// MARK: Run actions
92149

93-
private func run(action: StateMachine.Action) {
94-
self.runConnectionAction(action.connection)
95-
self.runRequestAction(action.request)
150+
private func modifyStateAndRunActions(_ closure: (inout StateMachine) -> StateMachine.Action) {
151+
let unlockedActions = self.stateLock.withLock { () -> Actions.Unlocked in
152+
let stateMachineAction = closure(&self._state)
153+
let poolAction = Actions(from: stateMachineAction)
154+
self.runLockedActions(poolAction.locked)
155+
return poolAction.unlocked
156+
}
157+
self.runUnlockedActions(unlockedActions)
96158
}
97159

98-
private func runConnectionAction(_ action: StateMachine.ConnectionAction) {
99-
switch action {
100-
case .createConnection(let connectionID, let eventLoop):
101-
self.createConnection(connectionID, on: eventLoop)
102-
103-
case .scheduleBackoffTimer(let connectionID, let backoff, on: let eventLoop):
160+
private func runLockedActions(_ actions: Actions.Locked) {
161+
switch actions.connection {
162+
case .scheduleBackoffTimer(let connectionID, backoff: let backoff, on: let eventLoop):
104163
self.scheduleConnectionStartBackoffTimer(connectionID, backoff, on: eventLoop)
105164

106165
case .scheduleTimeoutTimer(let connectionID, on: let eventLoop):
@@ -109,6 +168,26 @@ final class HTTPConnectionPool {
109168
case .cancelTimeoutTimer(let connectionID):
110169
self.cancelIdleTimerForConnection(connectionID)
111170

171+
case .cancelBackoffTimers(let connectionIDs):
172+
for connectionID in connectionIDs {
173+
self.cancelConnectionStartBackoffTimer(connectionID)
174+
}
175+
176+
case .none:
177+
break
178+
}
179+
}
180+
181+
private func runUnlockedActions(_ actions: Actions.Unlocked) {
182+
self.runUnlockedConnectionAction(actions.connection)
183+
self.runUnlockedRequestAction(actions.request)
184+
}
185+
186+
private func runUnlockedConnectionAction(_ action: Actions.ConnectionAction.Unlocked) {
187+
switch action {
188+
case .createConnection(let connectionID, let eventLoop):
189+
self.createConnection(connectionID, on: eventLoop)
190+
112191
case .closeConnection(let connection, isShutdown: let isShutdown):
113192
self.logger.trace("close connection", metadata: [
114193
"ahc-connection-id": "\(connection.id)",
@@ -143,7 +222,7 @@ final class HTTPConnectionPool {
143222
}
144223
}
145224

146-
private func runRequestAction(_ action: StateMachine.RequestAction) {
225+
private func runUnlockedRequestAction(_ action: StateMachine.RequestAction) {
147226
// The order of execution fail/execute request vs cancelling the request timeout timer does
148227
// not matter in the actions here. The actions don't cause any side effects that will be
149228
// reported back to the state machine and are not dependent on each other.
@@ -215,11 +294,9 @@ final class HTTPConnectionPool {
215294
guard timeoutFired else { return }
216295

217296
// 3. Tell the state machine about the timeout
218-
let action = self.stateLock.withLock {
219-
self._state.timeoutRequest(requestID)
297+
self.modifyStateAndRunActions {
298+
$0.timeoutRequest(requestID)
220299
}
221-
222-
self.run(action: action)
223300
}
224301

225302
self.timerLock.withLockVoid {
@@ -254,34 +331,27 @@ final class HTTPConnectionPool {
254331
let scheduled = eventLoop.scheduleTask(in: self.idleConnectionTimeout) {
255332
// there might be a race between a cancelTimer call and the triggering
256333
// of this scheduled task. both want to acquire the lock
257-
let timerExisted = self.timerLock.withLock {
258-
self._idleTimer.removeValue(forKey: connectionID) != nil
334+
self.modifyStateAndRunActions { stateMachine in
335+
if self._idleTimer.removeValue(forKey: connectionID) != nil {
336+
// The timer still exists. State Machines assumes it is alive
337+
return stateMachine.connectionIdleTimeout(connectionID)
338+
}
339+
return .none
259340
}
260-
261-
guard timerExisted else { return }
262-
263-
let action = self.stateLock.withLock {
264-
self._state.connectionIdleTimeout(connectionID)
265-
}
266-
self.run(action: action)
267341
}
268342

269-
self.timerLock.withLock {
270-
assert(self._idleTimer[connectionID] == nil)
271-
self._idleTimer[connectionID] = scheduled
272-
}
343+
assert(self._idleTimer[connectionID] == nil)
344+
self._idleTimer[connectionID] = scheduled
273345
}
274346

275347
private func cancelIdleTimerForConnection(_ connectionID: Connection.ID) {
276348
self.logger.trace("Cancel idle connection timeout timer", metadata: [
277349
"ahc-connection-id": "\(connectionID)",
278350
])
279-
280-
let cancelTimer = self.timerLock.withLock {
281-
self._idleTimer.removeValue(forKey: connectionID)
351+
guard let cancelTimer = self._idleTimer.removeValue(forKey: connectionID) else {
352+
preconditionFailure("Expected to have an idle timer for connection \(connectionID) at this point.")
282353
}
283-
284-
cancelTimer?.cancel()
354+
cancelTimer.cancel()
285355
}
286356

287357
private func scheduleConnectionStartBackoffTimer(
@@ -295,30 +365,24 @@ final class HTTPConnectionPool {
295365

296366
let scheduled = eventLoop.scheduleTask(in: timeAmount) {
297367
// there might be a race between a backoffTimer and the pool shutting down.
298-
let timerExisted = self.timerLock.withLock {
299-
self._backoffTimer.removeValue(forKey: connectionID) != nil
300-
}
301-
302-
guard timerExisted else { return }
303-
304-
let action = self.stateLock.withLock {
305-
self._state.connectionCreationBackoffDone(connectionID)
368+
self.modifyStateAndRunActions { stateMachine in
369+
if self._backoffTimer.removeValue(forKey: connectionID) != nil {
370+
// The timer still exists. State Machines assumes it is alive
371+
return stateMachine.connectionCreationBackoffDone(connectionID)
372+
}
373+
return .none
306374
}
307-
self.run(action: action)
308375
}
309376

310-
self.timerLock.withLock {
311-
assert(self._backoffTimer[connectionID] == nil)
312-
self._backoffTimer[connectionID] = scheduled
313-
}
377+
assert(self._backoffTimer[connectionID] == nil)
378+
self._backoffTimer[connectionID] = scheduled
314379
}
315380

316381
private func cancelConnectionStartBackoffTimer(_ connectionID: Connection.ID) {
317-
let backoffTimer = self.timerLock.withLock {
318-
self._backoffTimer[connectionID]
382+
guard let backoffTimer = self._backoffTimer.removeValue(forKey: connectionID) else {
383+
preconditionFailure("Expected to have a backoff timer for connection \(connectionID) at this point.")
319384
}
320-
321-
backoffTimer?.cancel()
385+
backoffTimer.cancel()
322386
}
323387
}
324388

@@ -330,10 +394,9 @@ extension HTTPConnectionPool: HTTPConnectionRequester {
330394
"ahc-connection-id": "\(connection.id)",
331395
"ahc-http-version": "http/1.1",
332396
])
333-
let action = self.stateLock.withLock {
334-
self._state.newHTTP1ConnectionCreated(.http1_1(connection))
397+
self.modifyStateAndRunActions {
398+
$0.newHTTP1ConnectionCreated(.http1_1(connection))
335399
}
336-
self.run(action: action)
337400
}
338401

339402
func http2ConnectionCreated(_ connection: HTTP2Connection, maximumStreams: Int) {
@@ -356,10 +419,9 @@ extension HTTPConnectionPool: HTTPConnectionRequester {
356419
"ahc-error": "\(error)",
357420
"ahc-connection-id": "\(connectionID)",
358421
])
359-
let action = self.stateLock.withLock {
360-
self._state.failedToCreateNewConnection(error, connectionID: connectionID)
422+
self.modifyStateAndRunActions {
423+
$0.failedToCreateNewConnection(error, connectionID: connectionID)
361424
}
362-
self.run(action: action)
363425
}
364426
}
365427

@@ -369,21 +431,19 @@ extension HTTPConnectionPool: HTTP1ConnectionDelegate {
369431
"ahc-connection-id": "\(connection.id)",
370432
"ahc-http-version": "http/1.1",
371433
])
372-
let action = self.stateLock.withLock {
373-
self._state.connectionClosed(connection.id)
434+
self.modifyStateAndRunActions {
435+
$0.connectionClosed(connection.id)
374436
}
375-
self.run(action: action)
376437
}
377438

378439
func http1ConnectionReleased(_ connection: HTTP1Connection) {
379440
self.logger.trace("releasing connection", metadata: [
380441
"ahc-connection-id": "\(connection.id)",
381442
"ahc-http-version": "http/1.1",
382443
])
383-
let action = self.stateLock.withLock {
384-
self._state.http1ConnectionReleased(connection.id)
444+
self.modifyStateAndRunActions {
445+
$0.http1ConnectionReleased(connection.id)
385446
}
386-
self.run(action: action)
387447
}
388448
}
389449

@@ -416,10 +476,9 @@ extension HTTPConnectionPool: HTTP2ConnectionDelegate {
416476
extension HTTPConnectionPool: HTTPRequestScheduler {
417477
func cancelRequest(_ request: HTTPSchedulableRequest) {
418478
let requestID = Request(request).id
419-
let action = self.stateLock.withLock {
420-
self._state.cancelRequest(requestID)
479+
self.modifyStateAndRunActions {
480+
$0.cancelRequest(requestID)
421481
}
422-
self.run(action: action)
423482
}
424483
}
425484

Tests/AsyncHTTPClientTests/HTTPConnectionPoolTests+XCTest.swift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ extension HTTPConnectionPoolTests {
3232
("testConnectionCreationIsRetriedUntilPoolIsShutdown", testConnectionCreationIsRetriedUntilPoolIsShutdown),
3333
("testConnectionCreationIsRetriedUntilRequestIsCancelled", testConnectionCreationIsRetriedUntilRequestIsCancelled),
3434
("testConnectionShutdownIsCalledOnActiveConnections", testConnectionShutdownIsCalledOnActiveConnections),
35+
("testConnectionPoolStressResistanceHTTP1", testConnectionPoolStressResistanceHTTP1),
3536
]
3637
}
3738
}

0 commit comments

Comments
 (0)