Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add subpool stats #1852

Merged
merged 3 commits into from
Apr 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions Sources/GRPC/ConnectionManager.swift
Original file line number Diff line number Diff line change
Expand Up @@ -338,12 +338,12 @@ internal final class ConnectionManager: @unchecked Sendable {
/// A logger.
internal var logger: Logger

private let connectionID: String
internal let id: ConnectionManagerID
private var channelNumber: UInt64
private var channelNumberLock = NIOLock()

private var _connectionIDAndNumber: String {
return "\(self.connectionID)/\(self.channelNumber)"
return "\(self.id)/\(self.channelNumber)"
}

private var connectionIDAndNumber: String {
Expand Down Expand Up @@ -394,7 +394,7 @@ internal final class ConnectionManager: @unchecked Sendable {
) {
// Setup the logger.
var logger = logger
let connectionID = UUID().uuidString
let connectionID = ConnectionManagerID()
let channelNumber: UInt64 = 0
logger[metadataKey: MetadataKey.connectionID] = "\(connectionID)/\(channelNumber)"

Expand All @@ -408,7 +408,7 @@ internal final class ConnectionManager: @unchecked Sendable {
self.http2Delegate = http2Delegate
self.idleBehavior = idleBehavior

self.connectionID = connectionID
self.id = connectionID
self.channelNumber = channelNumber
self.logger = logger
}
Expand Down
17 changes: 6 additions & 11 deletions Sources/GRPC/ConnectionPool/ConnectionManagerID.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,20 @@
* limitations under the License.
*/

import struct Foundation.UUID

@usableFromInline
internal struct ConnectionManagerID: Hashable, CustomStringConvertible, Sendable {
@usableFromInline
internal let _id: ObjectIdentifier
internal let id: String

@usableFromInline
internal init(_ manager: ConnectionManager) {
self._id = ObjectIdentifier(manager)
internal init() {
self.id = UUID().uuidString
}

@usableFromInline
internal var description: String {
return String(describing: self._id)
}
}

extension ConnectionManager {
@usableFromInline
internal var id: ConnectionManagerID {
return ConnectionManagerID(self)
return String(describing: self.id)
}
}
57 changes: 54 additions & 3 deletions Sources/GRPC/ConnectionPool/ConnectionPool.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import Atomics
import Logging
import NIOConcurrencyHelpers
import NIOCore
Expand Down Expand Up @@ -108,12 +110,16 @@ internal final class ConnectionPool {

/// A logger which always sets "GRPC" as its source.
@usableFromInline
private(set) var logger: GRPCLogger
internal let logger: GRPCLogger

/// Returns `NIODeadline` representing 'now'. This is useful for testing.
@usableFromInline
internal let now: () -> NIODeadline

/// The ID of this sub-pool.
@usableFromInline
internal let id: GRPCSubPoolID

/// Logging metadata keys.
@usableFromInline
internal enum Metadata {
Expand Down Expand Up @@ -190,16 +196,21 @@ internal final class ConnectionPool {
self.channelProvider = channelProvider
self.streamLender = streamLender
self.delegate = delegate
self.logger = logger
self.now = now

let id = GRPCSubPoolID.next()
var logger = logger
logger[metadataKey: Metadata.id] = "\(id)"

self.id = id
self.logger = logger
}

/// Initialize the connection pool.
///
/// - Parameter connections: The number of connections to add to the pool.
internal func initialize(connections: Int) {
assert(self._connections.isEmpty)
self.logger.logger[metadataKey: Metadata.id] = "\(ObjectIdentifier(self))"
self.logger.debug(
"initializing new sub-pool",
metadata: [
Expand Down Expand Up @@ -628,6 +639,46 @@ internal final class ConnectionPool {
promise.succeed(())
}
}

internal func stats() -> EventLoopFuture<GRPCSubPoolStats> {
let promise = self.eventLoop.makePromise(of: GRPCSubPoolStats.self)

if self.eventLoop.inEventLoop {
self._stats(promise: promise)
} else {
self.eventLoop.execute {
self._stats(promise: promise)
}
}

return promise.futureResult
}

private func _stats(promise: EventLoopPromise<GRPCSubPoolStats>) {
self.eventLoop.assertInEventLoop()

var stats = GRPCSubPoolStats(id: self.id)

for connection in self._connections.values {
let sync = connection.manager.sync
if sync.isIdle {
stats.connectionStates.idle += 1
} else if sync.isConnecting {
stats.connectionStates.connecting += 1
} else if sync.isReady {
stats.connectionStates.ready += 1
} else if sync.isTransientFailure {
stats.connectionStates.transientFailure += 1
}

stats.streamsInUse += connection.reservedStreams
stats.streamsFreeToUse += connection.availableStreams
}

stats.rpcsWaiting += self.waiters.count

promise.succeed(stats)
}
}

extension ConnectionPool: ConnectionManagerConnectivityDelegate {
Expand Down
59 changes: 59 additions & 0 deletions Sources/GRPC/ConnectionPool/ConnectionPoolIDs.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright 2024, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import Atomics

enum RawID {
private static let source = ManagedAtomic(0)

static func next() -> Int {
self.source.loadThenWrappingIncrement(ordering: .relaxed)
}
}

/// The ID of a connection pool.
public struct GRPCConnectionPoolID: Hashable, Sendable, CustomStringConvertible {
private var rawValue: Int

private init(rawValue: Int) {
self.rawValue = rawValue
}

public static func next() -> Self {
return Self(rawValue: RawID.next())
}

public var description: String {
"ConnectionPool(\(self.rawValue))"
}
}

/// The ID of a sub-pool in a connection pool.
public struct GRPCSubPoolID: Hashable, Sendable, CustomStringConvertible {
private var rawValue: Int

private init(rawValue: Int) {
self.rawValue = rawValue
}

public static func next() -> Self {
return Self(rawValue: RawID.next())
}

public var description: String {
"SubPool(\(self.rawValue))"
}
}
69 changes: 68 additions & 1 deletion Sources/GRPC/ConnectionPool/GRPCChannelPool.swift
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,11 @@ extension GRPCChannelPool {
/// pool.
public var delegate: GRPCConnectionPoolDelegate?

/// The period at which connection pool stats are published to the ``delegate``.
///
/// Ignored if either this value or ``delegate`` are `nil`.
public var statsPeriod: TimeAmount?

/// A logger used for background activity, such as connection state changes.
public var backgroundActivityLogger = Logger(
label: "io.grpc",
Expand Down Expand Up @@ -354,7 +359,7 @@ public protocol GRPCConnectionPoolDelegate: Sendable {
/// time and is reported via ``connectionUtilizationChanged(id:streamsUsed:streamCapacity:)``. The
func connectSucceeded(id: GRPCConnectionID, streamCapacity: Int)

/// The utlization of the connection changed; a stream may have been used, returned or the
/// The utilization of the connection changed; a stream may have been used, returned or the
/// maximum number of concurrent streams available on the connection changed.
func connectionUtilizationChanged(id: GRPCConnectionID, streamsUsed: Int, streamCapacity: Int)

Expand All @@ -365,4 +370,66 @@ public protocol GRPCConnectionPoolDelegate: Sendable {
/// The connection was closed. The connection may be established again in the future (notified
/// via ``startedConnecting(id:)``).
func connectionClosed(id: GRPCConnectionID, error: Error?)

/// Stats about the current state of the connection pool.
///
/// Each ``GRPCConnectionPoolStats`` includes the stats for a sub-pool. Each sub-pool is tied
/// to an `EventLoop`.
///
/// Unlike the other delegate methods, this is called periodically based on the value
/// of ``GRPCChannelPool/Configuration/statsPeriod``.
func connectionPoolStats(_ stats: [GRPCSubPoolStats], id: GRPCConnectionPoolID)
}

extension GRPCConnectionPoolDelegate {
public func connectionPoolStats(_ stats: [GRPCSubPoolStats], id: GRPCConnectionPoolID) {
// Default conformance to avoid breaking changes.
}
}

public struct GRPCSubPoolStats: Sendable, Hashable {
public struct ConnectionStates: Sendable, Hashable {
/// The number of idle connections.
public var idle: Int
/// The number of connections trying to establish a connection.
public var connecting: Int
/// The number of connections which are ready to use.
public var ready: Int
/// The number of connections which are backing off waiting to attempt to connect.
public var transientFailure: Int

public init() {
self.idle = 0
self.connecting = 0
self.ready = 0
self.transientFailure = 0
}
}

/// The ID of the subpool.
public var id: GRPCSubPoolID

/// Counts of connection states.
public var connectionStates: ConnectionStates

/// The number of streams currently being used.
public var streamsInUse: Int

/// The number of streams which are currently free to use.
///
/// The sum of this value and `streamsInUse` gives the capacity of the pool.
public var streamsFreeToUse: Int

/// The number of RPCs currently waiting for a stream.
///
/// RPCs waiting for a stream are also known as 'waiters'.
public var rpcsWaiting: Int

public init(id: GRPCSubPoolID) {
self.id = id
self.connectionStates = ConnectionStates()
self.streamsInUse = 0
self.streamsFreeToUse = 0
self.rpcsWaiting = 0
}
}
Loading
Loading