Skip to content

Commit 472ff4a

Browse files
authored
Add PoolStateMachine (vapor#427)
1 parent 17d3c80 commit 472ff4a

File tree

7 files changed

+814
-56
lines changed

7 files changed

+814
-56
lines changed

Sources/ConnectionPoolModule/PoolStateMachine+ConnectionGroup.swift

Lines changed: 40 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -134,19 +134,6 @@ extension PoolStateMachine {
134134
var info: ConnectionAvailableInfo
135135
}
136136

137-
/// Information around the failed/closed connection.
138-
@usableFromInline
139-
struct FailedConnectionContext {
140-
/// Connections that are currently starting
141-
@usableFromInline
142-
var connectionsStarting: Int
143-
144-
@inlinable
145-
init(connectionsStarting: Int) {
146-
self.connectionsStarting = connectionsStarting
147-
}
148-
}
149-
150137
mutating func refillConnections() -> [ConnectionRequest] {
151138
let existingConnections = self.stats.active
152139
let missingConnection = self.minimumConcurrentConnections - Int(existingConnections)
@@ -477,6 +464,31 @@ extension PoolStateMachine {
477464
return self.closeConnectionIfIdle(at: index)
478465
}
479466

467+
/// Information around the failed/closed connection.
468+
@usableFromInline
469+
struct ClosedAction {
470+
/// Connections that are currently starting
471+
@usableFromInline
472+
var connectionsStarting: Int
473+
474+
@usableFromInline
475+
var timersToCancel: TinyFastSequence<TimerCancellationToken>
476+
477+
@usableFromInline
478+
var newConnectionRequest: ConnectionRequest?
479+
480+
@inlinable
481+
init(
482+
connectionsStarting: Int,
483+
timersToCancel: TinyFastSequence<TimerCancellationToken>,
484+
newConnectionRequest: ConnectionRequest? = nil
485+
) {
486+
self.connectionsStarting = connectionsStarting
487+
self.timersToCancel = timersToCancel
488+
self.newConnectionRequest = newConnectionRequest
489+
}
490+
}
491+
480492
/// Connection closed. Call this method, if a connection is closed.
481493
///
482494
/// This will put the position into the closed state.
@@ -487,12 +499,13 @@ extension PoolStateMachine {
487499
/// supplied index after this. If nil is returned the connection was closed by the state machine and was
488500
/// therefore already removed.
489501
@inlinable
490-
mutating func connectionClosed(_ connectionID: Connection.ID) -> FailedConnectionContext? {
502+
mutating func connectionClosed(_ connectionID: Connection.ID) -> ClosedAction {
491503
guard let index = self.connections.firstIndex(where: { $0.id == connectionID }) else {
492-
return nil
504+
preconditionFailure("All connections that have been created should say goodbye exactly once!")
493505
}
494506

495507
let closedAction = self.connections[index].closed()
508+
var timersToCancel = TinyFastSequence(closedAction.cancelTimers)
496509

497510
if closedAction.wasRunningKeepAlive {
498511
self.stats.runningKeepAlive -= 1
@@ -511,16 +524,22 @@ extension PoolStateMachine {
511524
self.stats.closing -= 1
512525
}
513526

514-
let lastIndex = self.connections.index(before: self.connections.endIndex)
527+
if let cancellationTimer = self.swapForDeletion(index: index) {
528+
timersToCancel.append(cancellationTimer)
529+
}
515530

516-
if index == lastIndex {
517-
self.connections.remove(at: index)
531+
let newConnectionRequest: ConnectionRequest?
532+
if self.connections.count < self.minimumConcurrentConnections {
533+
newConnectionRequest = .init(connectionID: self.generator.next())
518534
} else {
519-
self.connections.swapAt(index, lastIndex)
520-
self.connections.remove(at: lastIndex)
535+
newConnectionRequest = .none
521536
}
522537

523-
return FailedConnectionContext(connectionsStarting: 0)
538+
return ClosedAction(
539+
connectionsStarting: 0,
540+
timersToCancel: timersToCancel,
541+
newConnectionRequest: newConnectionRequest
542+
)
524543
}
525544

526545
// MARK: Shutdown

Sources/ConnectionPoolModule/PoolStateMachine+RequestQueue.swift

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ extension PoolStateMachine {
1010
/// request from the dictionary and keep it inside the queue. Whenever we pop a request from the deque, we validate
1111
/// that it hasn't been cancelled in the meantime by checking if the popped request is still in the `requests` dictionary.
1212
@usableFromInline
13-
struct RequestQueue {
13+
struct RequestQueue: Sendable {
1414
@usableFromInline
1515
private(set) var queue: Deque<RequestID>
1616

@@ -40,8 +40,8 @@ extension PoolStateMachine {
4040
}
4141

4242
@inlinable
43-
mutating func pop(max: UInt16) -> OneElementFastSequence<Request> {
44-
var result = OneElementFastSequence<Request>()
43+
mutating func pop(max: UInt16) -> TinyFastSequence<Request> {
44+
var result = TinyFastSequence<Request>()
4545
result.reserveCapacity(Int(max))
4646
var popped = 0
4747
while let requestID = self.queue.popFirst(), popped < max {
@@ -61,8 +61,8 @@ extension PoolStateMachine {
6161
}
6262

6363
@inlinable
64-
mutating func removeAll() -> OneElementFastSequence<Request> {
65-
let result = OneElementFastSequence(self.requests.values)
64+
mutating func removeAll() -> TinyFastSequence<Request> {
65+
let result = TinyFastSequence(self.requests.values)
6666
self.requests.removeAll()
6767
self.queue.removeAll()
6868
return result

0 commit comments

Comments
 (0)