Skip to content

[HTTP2] Create new connections during migration if needed #459

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,15 @@ extension HTTPConnectionPool {
}
}

var canOrWillBeAbleToExecuteRequests: Bool {
switch self.state {
case .leased, .backingOff, .idle, .starting:
return true
case .closed:
return false
}
}

var isLeased: Bool {
switch self.state {
case .leased:
Expand Down Expand Up @@ -281,6 +290,10 @@ extension HTTPConnectionPool {
return connecting
}

private var maximumAdditionalGeneralPurposeConnections: Int {
self.maximumConcurrentConnections - (self.overflowIndex - 1)
}

/// Is there at least one connection that is able to run requests
var hasActiveConnections: Bool {
self.connections.contains(where: { $0.isIdle || $0.isLeased })
Expand Down Expand Up @@ -530,8 +543,8 @@ extension HTTPConnectionPool {
return migrationContext
}

/// we only handle starting and backing off connection here.
/// All running connections must be handled by the enclosing state machine
/// We only handle starting and backing off connection here.
/// All already running connections must be handled by the enclosing state machine.
/// - Parameters:
/// - starting: starting HTTP connections from previous state machine
/// - backingOff: backing off HTTP connections from previous state machine
Expand All @@ -541,17 +554,96 @@ extension HTTPConnectionPool {
) {
for (connectionID, eventLoop) in starting {
let newConnection = HTTP1ConnectionState(connectionID: connectionID, eventLoop: eventLoop)
self.connections.append(newConnection)
self.connections.insert(newConnection, at: self.overflowIndex)
/// If we can grow, we mark the connection as a general purpose connection.
/// Otherwise, it will be an overflow connection which is only used once for requests with a required event loop
if self.canGrow {
self.overflowIndex = self.connections.index(after: self.overflowIndex)
}
}

for (connectionID, eventLoop) in backingOff {
var backingOffConnection = HTTP1ConnectionState(connectionID: connectionID, eventLoop: eventLoop)
// TODO: Maybe we want to add a static init for backing off connections to HTTP1ConnectionState
backingOffConnection.failedToConnect()
self.connections.append(backingOffConnection)
self.connections.insert(backingOffConnection, at: self.overflowIndex)
/// If we can grow, we mark the connection as a general purpose connection.
/// Otherwise, it will be an overflow connection which is only used once for requests with a required event loop
if self.canGrow {
self.overflowIndex = self.connections.index(after: self.overflowIndex)
}
}
}

/// We will create new connections for each `requiredEventLoopOfPendingRequests`
/// In addition, we also create more general purpose connections if we do not have enough to execute
/// all requests on the given `preferredEventLoopsOfPendingGeneralPurposeRequests`
/// until we reach `maximumConcurrentConnections`
/// - Parameters:
/// - requiredEventLoopsForPendingRequests:
/// event loops for which we have requests with a required event loop.
/// Duplicates are not allowed.
/// - generalPurposeRequestCountPerPreferredEventLoop:
/// request count with no required event loop,
/// grouped by preferred event loop and ordered descending by number of requests
/// - Returns: new connections that must be created
mutating func createConnectionsAfterMigrationIfNeeded(
requiredEventLoopOfPendingRequests: [(EventLoop, Int)],
generalPurposeRequestCountGroupedByPreferredEventLoop: [(EventLoop, Int)]
) -> [(Connection.ID, EventLoop)] {
// create new connections for requests with a required event loop

// we may already start connections for those requests and do not want to start to many
let startingRequiredEventLoopConnectionCount = Dictionary(
self.connections[self.overflowIndex..<self.connections.endIndex].lazy.map {
($0.eventLoop.id, 1)
},
uniquingKeysWith: +
)
var connectionToCreate = requiredEventLoopOfPendingRequests
.flatMap { (eventLoop, requestCount) -> [(Connection.ID, EventLoop)] in
// We need a connection for each queued request with a required event loop.
// Therefore, we look how many request we have queued for a given `eventLoop` and
// how many connections we are already starting on the given `eventLoop`.
// If we have not enough, we will create additional connections to have at least
// on connection per request.
let connectionsToStart = requestCount - startingRequiredEventLoopConnectionCount[eventLoop.id, default: 0]
return stride(from: 0, to: connectionsToStart, by: 1).lazy.map { _ in
(self.createNewOverflowConnection(on: eventLoop), eventLoop)
}
}

// create new connections for requests without a required event loop

// TODO: improve algorithm to create connections uniformly across all preferred event loops
// while paying attention to the number of queued request per event loop
// Currently we start by creating new connections on the event loop with the most queued
// requests. If we have create a enough connections to cover all requests for the given
// event loop we will continue with the event loop with the second most queued requests
// and so on and so forth. We do not need to sort the array because
let newGeneralPurposeConnections: [(Connection.ID, EventLoop)] = generalPurposeRequestCountGroupedByPreferredEventLoop
// we do not want to allocated intermediate arrays.
.lazy
// we flatten the grouped list of event loops by lazily repeating the event loop
// for each request.
// As a result we get one event loop per request (`[EventLoop]`).
.flatMap { eventLoop, requestCount in
repeatElement(eventLoop, count: requestCount)
}
// we may already start connections and do not want to start too many
.dropLast(self.startingGeneralPurposeConnections)
// we need to respect the used defined `maximumConcurrentConnections`
.prefix(self.maximumAdditionalGeneralPurposeConnections)
// we now create a connection for each remaining event loop
.map { eventLoop in
(self.createNewConnection(on: eventLoop), eventLoop)
}

connectionToCreate.append(contentsOf: newGeneralPurposeConnections)

return connectionToCreate
}

// MARK: Shutdown

mutating func shutdown() -> CleanupContext {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,32 +84,40 @@ extension HTTPConnectionPool {
http2Connections: HTTP2Connections,
requests: RequestQueue
) -> ConnectionMigrationAction {
precondition(self.connections.isEmpty)
precondition(self.http2Connections == nil)
precondition(self.requests.isEmpty)
precondition(self.connections.isEmpty, "expected an empty state machine but connections are not empty")
precondition(self.http2Connections == nil, "expected an empty state machine but http2Connections are not nil")
precondition(self.requests.isEmpty, "expected an empty state machine but requests are not empty")

self.requests = requests

// we may have remaining open http1 connections from a pervious migration to http2
if let http1Connections = http1Connections {
self.connections = http1Connections
}

var http2Connections = http2Connections
let migration = http2Connections.migrateToHTTP1()

self.connections.migrateFromHTTP2(
starting: migration.starting,
backingOff: migration.backingOff
)

let createConnections = self.connections.createConnectionsAfterMigrationIfNeeded(
requiredEventLoopOfPendingRequests: requests.requestCountGroupedByRequiredEventLoop(),
generalPurposeRequestCountGroupedByPreferredEventLoop: requests.generalPurposeRequestCountGroupedByPreferredEventLoop()
)

if !http2Connections.isEmpty {
self.http2Connections = http2Connections
}

// TODO: Close all idle connections from context.close
// TODO: Start new http1 connections for pending requests
// TODO: Potentially cancel unneeded bootstraps (Needs cancellable ClientBootstrap)

self.requests = requests

return .init(closeConnections: [], createConnections: [])
return .init(
closeConnections: migration.close,
createConnections: createConnections
)
}

// MARK: - Events
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,8 +346,8 @@ extension HTTPConnectionPool {

// MARK: Migration

/// we only handle starting and backing off connection here.
/// All running connections must be handled by the enclosing state machine
/// We only handle starting and backing off connection here.
/// All already running connections must be handled by the enclosing state machine.
/// - Parameters:
/// - starting: starting HTTP connections from previous state machine
/// - backingOff: backing off HTTP connections from previous state machine
Expand All @@ -368,6 +368,31 @@ extension HTTPConnectionPool {
}
}

/// We will create new connections for `requiredEventLoopsOfPendingRequests`
/// if we do not already have a connection that can or will be able to execute requests on the given event loop.
/// - Parameters:
/// - requiredEventLoopsForPendingRequests: event loops for which we have requests with a required event loop. Duplicates are not allowed.
/// - Returns: new connections that need to be created
mutating func createConnectionsAfterMigrationIfNeeded(
requiredEventLoopsOfPendingRequests: [EventLoop]
) -> [(Connection.ID, EventLoop)] {
// create new connections for requests with a required event loop
let eventLoopsWithConnectionThatCanOrWillBeAbleToExecuteRequests = Set(
self.connections.lazy
.filter {
$0.canOrWillBeAbleToExecuteRequests
}.map {
$0.eventLoop.id
}
)
return requiredEventLoopsOfPendingRequests.compactMap { eventLoop -> (Connection.ID, EventLoop)? in
guard !eventLoopsWithConnectionThatCanOrWillBeAbleToExecuteRequests.contains(eventLoop.id)
else { return nil }
let connectionID = self.createNewConnection(on: eventLoop)
return (connectionID, eventLoop)
}
}

struct HTTP2ToHTTP1MigrationContext {
var backingOff: [(Connection.ID, EventLoop)] = []
var starting: [(Connection.ID, EventLoop)] = []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,13 @@ extension HTTPConnectionPool {
http2Connections: HTTP2Connections?,
requests: RequestQueue
) -> ConnectionMigrationAction {
precondition(self.http1Connections == nil)
precondition(self.connections.isEmpty)
precondition(self.requests.isEmpty)
precondition(self.connections.isEmpty, "expected an empty state machine but connections are not empty")
precondition(self.http1Connections == nil, "expected an empty state machine but http1Connections are not nil")
precondition(self.requests.isEmpty, "expected an empty state machine but requests are not empty")

self.requests = requests

// we may have remaining open http2 connections from a pervious migration to http1
if let http2Connections = http2Connections {
self.connections = http2Connections
}
Expand All @@ -107,17 +110,19 @@ extension HTTPConnectionPool {
backingOff: context.backingOff
)

let createConnections = self.connections.createConnectionsAfterMigrationIfNeeded(
requiredEventLoopsOfPendingRequests: requests.eventLoopsWithPendingRequests()
)

if !http1Connections.isEmpty {
self.http1Connections = http1Connections
}

self.requests = requests

// TODO: Close all idle connections from context.close
// TODO: Start new http2 connections for pending requests
// TODO: Potentially cancel unneeded bootstraps (Needs cancellable ClientBootstrap)

return .init(closeConnections: [], createConnections: [])
return .init(
closeConnections: context.close,
createConnections: createConnections
)
}

mutating func executeRequest(_ request: Request) -> Action {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,21 @@

import NIOCore

private struct HashableEventLoop: Hashable {
static func == (lhs: HashableEventLoop, rhs: HashableEventLoop) -> Bool {
lhs.eventLoop === rhs.eventLoop
}

init(_ eventLoop: EventLoop) {
self.eventLoop = eventLoop
}

let eventLoop: EventLoop
func hash(into hasher: inout Hasher) {
self.eventLoop.id.hash(into: &hasher)
}
}

extension HTTPConnectionPool {
/// A struct to store all queued requests.
struct RequestQueue {
Expand Down Expand Up @@ -131,6 +146,42 @@ extension HTTPConnectionPool {
}
return nil
}

/// - Returns: event loops with at least one request with a required event loop
func eventLoopsWithPendingRequests() -> [EventLoop] {
self.eventLoopQueues.compactMap {
/// all requests in `eventLoopQueues` are guaranteed to have a `requiredEventLoop`
/// however, a queue can be empty
$0.value.first?.requiredEventLoop!
}
}

/// - Returns: request count for requests with required event loop, grouped by required event loop without any particular order
func requestCountGroupedByRequiredEventLoop() -> [(EventLoop, Int)] {
self.eventLoopQueues.values.compactMap { requests -> (EventLoop, Int)? in
/// all requests in `eventLoopQueues` are guaranteed to have a `requiredEventLoop`,
/// however, a queue can be empty
guard let requiredEventLoop = requests.first?.requiredEventLoop! else {
return nil
}
return (requiredEventLoop, requests.count)
}
}

/// - Returns: request count with **no** required event loop, grouped by preferred event loop and ordered descending by number of requests
func generalPurposeRequestCountGroupedByPreferredEventLoop() -> [(EventLoop, Int)] {
let requestCountPerEventLoop = Dictionary(
self.generalPurposeQueue.lazy.map { request in
(HashableEventLoop(request.preferredEventLoop), 1)
},
uniquingKeysWith: +
)
return requestCountPerEventLoop.lazy
.map { ($0.key.eventLoop, $0.value) }
.sorted { lhs, rhs in
lhs.1 > rhs.1
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ extension HTTPConnectionPool_HTTP1ConnectionsTests {
("testCloseConnectionIfIdleButLeasedRaceCondition", testCloseConnectionIfIdleButLeasedRaceCondition),
("testCloseConnectionIfIdleButClosedRaceCondition", testCloseConnectionIfIdleButClosedRaceCondition),
("testShutdown", testShutdown),
("testMigrationFromHTTP2", testMigrationFromHTTP2),
("testMigrationFromHTTP2WithPendingRequestsWithRequiredEventLoop", testMigrationFromHTTP2WithPendingRequestsWithRequiredEventLoop),
("testMigrationFromHTTP2WithPendingRequestsWithPreferredEventLoop", testMigrationFromHTTP2WithPendingRequestsWithPreferredEventLoop),
("testMigrationFromHTTP2WithAlreadyLeasedHTTP1Connection", testMigrationFromHTTP2WithAlreadyLeasedHTTP1Connection),
("testMigrationFromHTTP2WithMoreStartingConnectionsThanMaximumAllowedConccurentConnections", testMigrationFromHTTP2WithMoreStartingConnectionsThanMaximumAllowedConccurentConnections),
("testMigrationFromHTTP2StartsEnoghOverflowConnectionsForRequiredEventLoopRequests", testMigrationFromHTTP2StartsEnoghOverflowConnectionsForRequiredEventLoopRequests),
]
}
}
Loading