Skip to content

Commit 54f491c

Browse files
authored
Add support for multiple streams (vapor#442)
1 parent dc94503 commit 54f491c

File tree

5 files changed

+280
-9
lines changed

5 files changed

+280
-9
lines changed

Sources/ConnectionPoolModule/ConnectionPool.swift

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -265,8 +265,10 @@ public final class ConnectionPool<
265265

266266
}
267267

268-
public func connection(_ connection: Connection, didReceiveNewMaxStreamSetting: UInt16) {
269-
268+
public func connectionReceivedNewMaxStreamSetting(_ connection: Connection, newMaxStreamSetting maxStreams: UInt16) {
269+
self.modifyStateAndRunActions { state in
270+
state.stateMachine.connectionReceivedNewMaxStreamSetting(connection.id, newMaxStreamSetting: maxStreams)
271+
}
270272
}
271273

272274
public func run() async {

Sources/ConnectionPoolModule/PoolStateMachine+ConnectionGroup.swift

Lines changed: 46 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,50 @@ extension PoolStateMachine {
256256
return self.connections[index].timerScheduled(timer, cancelContinuation: cancelContinuation)
257257
}
258258

259+
// MARK: Changes at runtime
260+
261+
@usableFromInline
262+
struct NewMaxStreamInfo {
263+
264+
@usableFromInline
265+
var index: Int
266+
267+
@usableFromInline
268+
var newMaxStreams: UInt16
269+
270+
@usableFromInline
271+
var oldMaxStreams: UInt16
272+
273+
@usableFromInline
274+
var usedStreams: UInt16
275+
276+
@inlinable
277+
init(index: Int, info: ConnectionState.NewMaxStreamInfo) {
278+
self.index = index
279+
self.newMaxStreams = info.newMaxStreams
280+
self.oldMaxStreams = info.oldMaxStreams
281+
self.usedStreams = info.usedStreams
282+
}
283+
}
284+
285+
@inlinable
286+
mutating func connectionReceivedNewMaxStreamSetting(
287+
_ connectionID: ConnectionID,
288+
newMaxStreamSetting maxStreams: UInt16
289+
) -> NewMaxStreamInfo? {
290+
guard let index = self.connections.firstIndex(where: { $0.id == connectionID }) else {
291+
return nil
292+
}
293+
294+
guard let info = self.connections[index].newMaxStreamSetting(maxStreams) else {
295+
return nil
296+
}
297+
298+
self.stats.availableStreams += maxStreams - info.oldMaxStreams
299+
300+
return NewMaxStreamInfo(index: index, info: info)
301+
}
302+
259303
// MARK: Leasing and releasing
260304

261305
/// Lease a connection, if an idle connection is available.
@@ -424,9 +468,9 @@ extension PoolStateMachine {
424468

425469
/// Closes the connection at the given index.
426470
@inlinable
427-
mutating func closeConnectionIfIdle(at index: Int) -> CloseAction {
471+
mutating func closeConnectionIfIdle(at index: Int) -> CloseAction? {
428472
guard let closeAction = self.connections[index].closeIfIdle() else {
429-
preconditionFailure("Invalid state: \(self)")
473+
return nil // apparently the connection isn't idle
430474
}
431475

432476
self.stats.idle -= 1

Sources/ConnectionPoolModule/PoolStateMachine+ConnectionState.swift

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,53 @@ extension PoolStateMachine {
195195
}
196196
}
197197

198+
@usableFromInline
199+
struct NewMaxStreamInfo {
200+
@usableFromInline
201+
var newMaxStreams: UInt16
202+
203+
@usableFromInline
204+
var oldMaxStreams: UInt16
205+
206+
@usableFromInline
207+
var usedStreams: UInt16
208+
209+
@inlinable
210+
init(newMaxStreams: UInt16, oldMaxStreams: UInt16, usedStreams: UInt16) {
211+
self.newMaxStreams = newMaxStreams
212+
self.oldMaxStreams = oldMaxStreams
213+
self.usedStreams = usedStreams
214+
}
215+
}
216+
217+
@inlinable
218+
mutating func newMaxStreamSetting(_ newMaxStreams: UInt16) -> NewMaxStreamInfo? {
219+
switch self.state {
220+
case .starting, .backingOff:
221+
preconditionFailure("Invalid state: \(self.state)")
222+
223+
case .idle(let connection, let oldMaxStreams, let keepAlive, idleTimer: let idleTimer):
224+
self.state = .idle(connection, maxStreams: newMaxStreams, keepAlive: keepAlive, idleTimer: idleTimer)
225+
return NewMaxStreamInfo(
226+
newMaxStreams: newMaxStreams,
227+
oldMaxStreams: oldMaxStreams,
228+
usedStreams: keepAlive.usedStreams
229+
)
230+
231+
case .leased(let connection, let usedStreams, let oldMaxStreams, let keepAlive):
232+
self.state = .leased(connection, usedStreams: usedStreams, maxStreams: newMaxStreams, keepAlive: keepAlive)
233+
return NewMaxStreamInfo(
234+
newMaxStreams: newMaxStreams,
235+
oldMaxStreams: oldMaxStreams,
236+
usedStreams: usedStreams + keepAlive.usedStreams
237+
)
238+
239+
case .closing, .closed:
240+
return nil
241+
}
242+
}
243+
244+
198245
@inlinable
199246
mutating func parkConnection(scheduleKeepAliveTimer: Bool, scheduleIdleTimeoutTimer: Bool) -> Max2Sequence<ConnectionTimer> {
200247
var keepAliveTimer: ConnectionTimer?

Sources/ConnectionPoolModule/PoolStateMachine.swift

Lines changed: 41 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,39 @@ struct PoolStateMachine<
262262
}
263263
}
264264

265+
@inlinable
266+
mutating func connectionReceivedNewMaxStreamSetting(
267+
_ connection: ConnectionID,
268+
newMaxStreamSetting maxStreams: UInt16
269+
) -> Action {
270+
guard let info = self.connections.connectionReceivedNewMaxStreamSetting(connection, newMaxStreamSetting: maxStreams) else {
271+
return .none()
272+
}
273+
274+
let waitingRequests = self.requestQueue.count
275+
276+
guard waitingRequests > 0 else {
277+
return .none()
278+
}
279+
280+
// the only thing we can do if we receive a new max stream setting is check if the new stream
281+
// setting is higher and then dequeue some waiting requests
282+
283+
guard info.newMaxStreams > info.oldMaxStreams && info.newMaxStreams > info.usedStreams else {
284+
return .none()
285+
}
286+
287+
let leaseStreams = min(info.newMaxStreams - info.oldMaxStreams, info.newMaxStreams - info.usedStreams, UInt16(clamping: waitingRequests))
288+
let requests = self.requestQueue.pop(max: leaseStreams)
289+
precondition(Int(leaseStreams) == requests.count)
290+
let leaseResult = self.connections.leaseConnection(at: info.index, streams: leaseStreams)
291+
292+
return .init(
293+
request: .leaseConnection(requests, leaseResult.connection),
294+
connection: .cancelTimers(.init(leaseResult.timersToCancel))
295+
)
296+
}
297+
265298
@inlinable
266299
mutating func timerScheduled(_ timer: Timer, cancelContinuation: TimerCancellationToken) -> TimerCancellationToken? {
267300
self.connections.timerScheduled(timer.underlying, cancelContinuation: cancelContinuation)
@@ -445,11 +478,14 @@ struct PoolStateMachine<
445478
}
446479

447480
case .overflow:
448-
let closeAction = self.connections.closeConnectionIfIdle(at: index)
449-
return .init(
450-
request: .none,
451-
connection: .closeConnection(closeAction.connection, closeAction.timersToCancel)
452-
)
481+
if let closeAction = self.connections.closeConnectionIfIdle(at: index) {
482+
return .init(
483+
request: .none,
484+
connection: .closeConnection(closeAction.connection, closeAction.timersToCancel)
485+
)
486+
} else {
487+
return .none()
488+
}
453489
}
454490

455491
}

Tests/ConnectionPoolModuleTests/ConnectionPoolTests.swift

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -607,6 +607,148 @@ final class ConnectionPoolTests: XCTestCase {
607607
}
608608
}
609609
}
610+
611+
func testLeasingMultipleStreamsFromOneConnectionWorks() async throws {
612+
let clock = MockClock()
613+
let factory = MockConnectionFactory<MockClock>()
614+
let keepAliveDuration = Duration.seconds(30)
615+
let keepAlive = MockPingPongBehavior(keepAliveFrequency: keepAliveDuration, connectionType: MockConnection.self)
616+
617+
var mutableConfig = ConnectionPoolConfiguration()
618+
mutableConfig.minimumConnectionCount = 0
619+
mutableConfig.maximumConnectionSoftLimit = 1
620+
mutableConfig.maximumConnectionHardLimit = 10
621+
mutableConfig.idleTimeout = .seconds(10)
622+
let config = mutableConfig
623+
624+
let pool = ConnectionPool(
625+
configuration: config,
626+
idGenerator: ConnectionIDGenerator(),
627+
requestType: ConnectionFuture.self,
628+
keepAliveBehavior: keepAlive,
629+
observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self),
630+
clock: clock
631+
) {
632+
try await factory.makeConnection(id: $0, for: $1)
633+
}
634+
635+
try await withThrowingTaskGroup(of: Void.self) { taskGroup in
636+
taskGroup.addTask {
637+
await pool.run()
638+
}
639+
640+
// create 4 connection requests
641+
let requests = (0..<10).map { ConnectionFuture(id: $0) }
642+
pool.leaseConnections(requests)
643+
var connections = [MockConnection]()
644+
645+
await factory.nextConnectAttempt { connectionID in
646+
return 10
647+
}
648+
649+
for request in requests {
650+
let connection = try await request.future.success
651+
connections.append(connection)
652+
}
653+
654+
// Ensure that all requests got the same connection
655+
XCTAssertEqual(Set(connections.lazy.map(\.id)).count, 1)
656+
657+
// release all 10 leased streams
658+
for connection in connections {
659+
pool.releaseConnection(connection)
660+
}
661+
662+
for _ in 0..<9 {
663+
_ = try? await factory.nextConnectAttempt { connectionID in
664+
throw CancellationError()
665+
}
666+
}
667+
668+
// shutdown
669+
taskGroup.cancelAll()
670+
for connection in factory.runningConnections {
671+
connection.closeIfClosing()
672+
}
673+
}
674+
}
675+
676+
func testIncreasingAvailableStreamsWorks() async throws {
677+
let clock = MockClock()
678+
let factory = MockConnectionFactory<MockClock>()
679+
let keepAliveDuration = Duration.seconds(30)
680+
let keepAlive = MockPingPongBehavior(keepAliveFrequency: keepAliveDuration, connectionType: MockConnection.self)
681+
682+
var mutableConfig = ConnectionPoolConfiguration()
683+
mutableConfig.minimumConnectionCount = 0
684+
mutableConfig.maximumConnectionSoftLimit = 1
685+
mutableConfig.maximumConnectionHardLimit = 1
686+
mutableConfig.idleTimeout = .seconds(10)
687+
let config = mutableConfig
688+
689+
let pool = ConnectionPool(
690+
configuration: config,
691+
idGenerator: ConnectionIDGenerator(),
692+
requestType: ConnectionFuture.self,
693+
keepAliveBehavior: keepAlive,
694+
observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self),
695+
clock: clock
696+
) {
697+
try await factory.makeConnection(id: $0, for: $1)
698+
}
699+
700+
try await withThrowingTaskGroup(of: Void.self) { taskGroup in
701+
taskGroup.addTask {
702+
await pool.run()
703+
}
704+
705+
// create 4 connection requests
706+
var requests = (0..<21).map { ConnectionFuture(id: $0) }
707+
pool.leaseConnections(requests)
708+
var connections = [MockConnection]()
709+
710+
await factory.nextConnectAttempt { connectionID in
711+
return 1
712+
}
713+
714+
let connection = try await requests.first!.future.success
715+
connections.append(connection)
716+
requests.removeFirst()
717+
718+
pool.connectionReceivedNewMaxStreamSetting(connection, newMaxStreamSetting: 21)
719+
720+
for (index, request) in requests.enumerated() {
721+
let connection = try await request.future.success
722+
connections.append(connection)
723+
}
724+
725+
// Ensure that all requests got the same connection
726+
XCTAssertEqual(Set(connections.lazy.map(\.id)).count, 1)
727+
728+
requests = (22..<42).map { ConnectionFuture(id: $0) }
729+
pool.leaseConnections(requests)
730+
731+
// release all 21 leased streams in a single call
732+
pool.releaseConnection(connection, streams: 21)
733+
734+
// ensure all 20 new requests got fulfilled
735+
for request in requests {
736+
let connection = try await request.future.success
737+
connections.append(connection)
738+
}
739+
740+
// release all 20 leased streams one by one
741+
for _ in requests {
742+
pool.releaseConnection(connection, streams: 1)
743+
}
744+
745+
// shutdown
746+
taskGroup.cancelAll()
747+
for connection in factory.runningConnections {
748+
connection.closeIfClosing()
749+
}
750+
}
751+
}
610752
}
611753

612754
struct ConnectionFuture: ConnectionRequestProtocol {

0 commit comments

Comments
 (0)