Skip to content

[ConnectionPool] HTTP1StateMachine #416

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Sep 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,11 @@ extension HTTPConnectionPool {
self.connections[index].lease()
}

func parkConnection(at index: Int) -> (Connection.ID, EventLoop) {
precondition(self.connections[index].isIdle)
return (self.connections[index].connectionID, self.connections[index].eventLoop)
}

/// A new HTTP/1.1 connection was released.
///
/// This will put the position into the idle state.
Expand Down Expand Up @@ -446,12 +451,13 @@ extension HTTPConnectionPool {
/// This will put the position into the closed state.
///
/// - Parameter connectionID: The failed connection's id.
/// - Returns: An index and an IdleConnectionContext to determine the next action for the now closed connection.
/// - Returns: An optional index and an IdleConnectionContext to determine the next action for the closed connection.
/// You must call ``removeConnection(at:)`` or ``replaceConnection(at:)`` with the
/// supplied index after this.
mutating func failConnection(_ connectionID: Connection.ID) -> (Int, FailedConnectionContext) {
/// supplied index after this. If nil is returned the connection was closed by the state machine and was
/// therefore already removed.
mutating func failConnection(_ connectionID: Connection.ID) -> (Int, FailedConnectionContext)? {
guard let index = self.connections.firstIndex(where: { $0.connectionID == connectionID }) else {
preconditionFailure("We tried to fail a new connection that we know nothing about?")
return nil
}

let use: ConnectionUse
Expand Down Expand Up @@ -607,22 +613,4 @@ extension HTTPConnectionPool {
var connecting: Int = 0
var backingOff: Int = 0
}

/// The pool cleanup todo list.
struct CleanupContext: Equatable {
/// the connections to close right away. These are idle.
var close: [Connection]

/// the connections that currently run a request that needs to be cancelled to close the connections
var cancel: [Connection]

/// the connections that are backing off from connection creation
var connectBackoff: [Connection.ID]

init(close: [Connection] = [], cancel: [Connection] = [], connectBackoff: [Connection.ID] = []) {
self.close = close
self.cancel = cancel
self.connectBackoff = connectBackoff
}
}
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,12 @@ extension HTTPConnectionPool {
self.count == 0
}

func count(for eventLoop: EventLoop?) -> Int {
if let eventLoop = eventLoop {
return self.withEventLoopQueueIfAvailable(for: eventLoop.id) { $0.count } ?? 0
}
return self.generalPurposeQueue.count
var generalPurposeCount: Int {
self.generalPurposeQueue.count
}

func count(for eventLoop: EventLoop) -> Int {
self.withEventLoopQueueIfAvailable(for: eventLoop.id) { $0.count } ?? 0
}

func isEmpty(for eventLoop: EventLoop?) -> Bool {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the AsyncHTTPClient open source project
//
// Copyright (c) 2021 Apple Inc. and the AsyncHTTPClient project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import NIO
import NIOHTTP1

extension HTTPConnectionPool {
struct StateMachine {
struct Action {
let request: RequestAction
let connection: ConnectionAction

init(request: RequestAction, connection: ConnectionAction) {
self.request = request
self.connection = connection
}

static let none: Action = Action(request: .none, connection: .none)
}

enum ConnectionAction {
enum IsShutdown: Equatable {
case yes(unclean: Bool)
case no
}

case createConnection(Connection.ID, on: EventLoop)
case scheduleBackoffTimer(Connection.ID, backoff: TimeAmount, on: EventLoop)

case scheduleTimeoutTimer(Connection.ID, on: EventLoop)
case cancelTimeoutTimer(Connection.ID)

case closeConnection(Connection, isShutdown: IsShutdown)
case cleanupConnections(CleanupContext, isShutdown: IsShutdown)

case none
}

enum RequestAction {
case executeRequest(Request, Connection, cancelTimeout: Bool)
case executeRequestsAndCancelTimeouts([Request], Connection)

case failRequest(Request, Error, cancelTimeout: Bool)
case failRequestsAndCancelTimeouts([Request], Error)

case scheduleRequestTimeout(for: Request, on: EventLoop)
case cancelRequestTimeout(Request.ID)

case none
}

enum HTTPVersionState {
case http1(HTTP1StateMachine)
}

var state: HTTPVersionState
var isShuttingDown: Bool = false

let eventLoopGroup: EventLoopGroup
let maximumConcurrentHTTP1Connections: Int

init(eventLoopGroup: EventLoopGroup, idGenerator: Connection.ID.Generator, maximumConcurrentHTTP1Connections: Int) {
self.maximumConcurrentHTTP1Connections = maximumConcurrentHTTP1Connections
let http1State = HTTP1StateMachine(
idGenerator: idGenerator,
maximumConcurrentConnections: maximumConcurrentHTTP1Connections
)
self.state = .http1(http1State)
self.eventLoopGroup = eventLoopGroup
}

mutating func executeRequest(_ request: Request) -> Action {
switch self.state {
case .http1(var http1StateMachine):
let action = http1StateMachine.executeRequest(request)
self.state = .http1(http1StateMachine)
return action
}
}

mutating func newHTTP1ConnectionCreated(_ connection: Connection) -> Action {
switch self.state {
case .http1(var http1StateMachine):
let action = http1StateMachine.newHTTP1ConnectionEstablished(connection)
self.state = .http1(http1StateMachine)
return action
}
}

mutating func failedToCreateNewConnection(_ error: Error, connectionID: Connection.ID) -> Action {
switch self.state {
case .http1(var http1StateMachine):
let action = http1StateMachine.failedToCreateNewConnection(
error,
connectionID: connectionID
)
self.state = .http1(http1StateMachine)
return action
}
}

mutating func connectionCreationBackoffDone(_ connectionID: Connection.ID) -> Action {
switch self.state {
case .http1(var http1StateMachine):
let action = http1StateMachine.connectionCreationBackoffDone(connectionID)
self.state = .http1(http1StateMachine)
return action
}
}

/// A request has timed out.
///
/// This is different to a request being cancelled. If a request times out, we need to fail the
/// request, but don't need to cancel the timer (it already triggered). If a request is cancelled
/// we don't need to fail it but we need to cancel its timeout timer.
mutating func timeoutRequest(_ requestID: Request.ID) -> Action {
switch self.state {
case .http1(var http1StateMachine):
let action = http1StateMachine.timeoutRequest(requestID)
self.state = .http1(http1StateMachine)
return action
}
}

/// A request was cancelled.
///
/// This is different to a request timing out. If a request is cancelled we don't need to fail it but we
/// need to cancel its timeout timer. If a request times out, we need to fail the request, but don't
/// need to cancel the timer (it already triggered).
mutating func cancelRequest(_ requestID: Request.ID) -> Action {
switch self.state {
case .http1(var http1StateMachine):
let action = http1StateMachine.cancelRequest(requestID)
self.state = .http1(http1StateMachine)
return action
}
}

mutating func connectionIdleTimeout(_ connectionID: Connection.ID) -> Action {
switch self.state {
case .http1(var http1StateMachine):
let action = http1StateMachine.connectionIdleTimeout(connectionID)
self.state = .http1(http1StateMachine)
return action
}
}

/// A connection has been closed
mutating func connectionClosed(_ connectionID: Connection.ID) -> Action {
switch self.state {
case .http1(var http1StateMachine):
let action = http1StateMachine.connectionClosed(connectionID)
self.state = .http1(http1StateMachine)
return action
}
}

mutating func http1ConnectionReleased(_ connectionID: Connection.ID) -> Action {
switch self.state {
case .http1(var http1StateMachine):
let action = http1StateMachine.http1ConnectionReleased(connectionID)
self.state = .http1(http1StateMachine)
return action
}
}

mutating func shutdown() -> Action {
precondition(!self.isShuttingDown, "Shutdown must only be called once")

self.isShuttingDown = true

switch self.state {
case .http1(var http1StateMachine):
let action = http1StateMachine.shutdown()
self.state = .http1(http1StateMachine)
return action
}
}
}
}

extension HTTPConnectionPool {
/// The pool cleanup todo list.
struct CleanupContext: Equatable {
/// the connections to close right away. These are idle.
var close: [Connection]

/// the connections that currently run a request that needs to be cancelled to close the connections
var cancel: [Connection]

/// the connections that are backing off from connection creation
var connectBackoff: [Connection.ID]

init(close: [Connection] = [], cancel: [Connection] = [], connectBackoff: [Connection.ID] = []) {
self.close = close
self.cancel = cancel
self.connectBackoff = connectBackoff
}
}
}

extension HTTPConnectionPool.StateMachine: CustomStringConvertible {
var description: String {
switch self.state {
case .http1(let http1):
return ".http1(\(http1))"
}
}
}
8 changes: 8 additions & 0 deletions Sources/AsyncHTTPClient/HTTPClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -925,6 +925,7 @@ public struct HTTPClientError: Error, Equatable, CustomStringConvertible {
case tlsHandshakeTimeout
case serverOfferedUnsupportedApplicationProtocol(String)
case requestStreamCancelled
case getConnectionFromPoolTimeout
}

private var code: Code
Expand Down Expand Up @@ -997,4 +998,11 @@ public struct HTTPClientError: Error, Equatable, CustomStringConvertible {
/// The remote server responded with a status code >= 300, before the full request was sent. The request stream
/// was therefore cancelled
public static let requestStreamCancelled = HTTPClientError(code: .requestStreamCancelled)

/// Aquiring a HTTP connection from the connection pool timed out.
///
/// This can have multiple reasons:
/// - A connection could not be created within the timout period.
/// - Tasks are not processed fast enough on the existing connections, to process all waiters in time
public static let getConnectionFromPoolTimeout = HTTPClientError(code: .getConnectionFromPoolTimeout)
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ class HTTPConnectionPool_HTTP1ConnectionsTests: XCTestCase {
let backoff1EL = connections.backoffNextConnectionAttempt(conn1ID)
XCTAssert(backoff1EL === el1)
// backoff done. 2. decide what's next
let (conn1FailIndex, conn1FailContext) = connections.failConnection(conn1ID)
guard let (conn1FailIndex, conn1FailContext) = connections.failConnection(conn1ID) else {
return XCTFail("Expected that the connection is remembered")
}
XCTAssert(conn1FailContext.eventLoop === el1)
XCTAssertEqual(conn1FailContext.use, .generalPurpose)
XCTAssertEqual(conn1FailContext.connectionsStartingForUseCase, 0)
Expand All @@ -83,7 +85,9 @@ class HTTPConnectionPool_HTTP1ConnectionsTests: XCTestCase {
XCTAssertEqual(connections.startingEventLoopConnections(on: el2), 1)
let backoff2EL = connections.backoffNextConnectionAttempt(conn2ID)
XCTAssert(backoff2EL === el2)
let (conn2FailIndex, conn2FailContext) = connections.failConnection(conn2ID)
guard let (conn2FailIndex, conn2FailContext) = connections.failConnection(conn2ID) else {
return XCTFail("Expected that the connection is remembered")
}
XCTAssert(conn2FailContext.eventLoop === el2)
XCTAssertEqual(conn2FailContext.use, .eventLoop(el2))
XCTAssertEqual(conn2FailContext.connectionsStartingForUseCase, 0)
Expand Down Expand Up @@ -329,7 +333,9 @@ class HTTPConnectionPool_HTTP1ConnectionsTests: XCTestCase {
XCTAssertEqual(connections.closeConnection(at: releaseIndex), lease)
XCTAssertFalse(connections.isEmpty)

let (failIndex, _) = connections.failConnection(startingID)
guard let (failIndex, _) = connections.failConnection(startingID) else {
return XCTFail("Expected that the connection is remembered")
}
connections.removeConnection(at: failIndex)
XCTAssertTrue(connections.isEmpty)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the AsyncHTTPClient open source project
//
// Copyright (c) 2018-2019 Apple Inc. and the AsyncHTTPClient project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
//
// HTTPConnectionPool+HTTP1StateTests+XCTest.swift
//
import XCTest

///
/// NOTE: This file was generated by generate_linux_tests.rb
///
/// Do NOT edit this file directly as it will be regenerated automatically when needed.
///

extension HTTPConnectionPool_HTTP1StateMachineTests {
static var allTests: [(String, (HTTPConnectionPool_HTTP1StateMachineTests) -> () throws -> Void)] {
return [
("testCreatingAndFailingConnections", testCreatingAndFailingConnections),
("testConnectionFailureBackoff", testConnectionFailureBackoff),
("testCancelRequestWorks", testCancelRequestWorks),
("testExecuteOnShuttingDownPool", testExecuteOnShuttingDownPool),
("testRequestsAreQueuedIfAllConnectionsAreInUseAndRequestsAreDequeuedInOrder", testRequestsAreQueuedIfAllConnectionsAreInUseAndRequestsAreDequeuedInOrder),
("testBestConnectionIsPicked", testBestConnectionIsPicked),
("testConnectionAbortIsIgnoredIfThereAreNoQueuedRequests", testConnectionAbortIsIgnoredIfThereAreNoQueuedRequests),
("testConnectionCloseLeadsToTumbleWeedIfThereNoQueuedRequests", testConnectionCloseLeadsToTumbleWeedIfThereNoQueuedRequests),
("testConnectionAbortLeadsToNewConnectionsIfThereAreQueuedRequests", testConnectionAbortLeadsToNewConnectionsIfThereAreQueuedRequests),
("testParkedConnectionTimesOut", testParkedConnectionTimesOut),
("testConnectionPoolFullOfParkedConnectionsIsShutdownImmediately", testConnectionPoolFullOfParkedConnectionsIsShutdownImmediately),
("testParkedConnectionTimesOutButIsAlsoClosedByRemote", testParkedConnectionTimesOutButIsAlsoClosedByRemote),
]
}
}
Loading