Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make the connection pool error public #1685

Merged
merged 2 commits into from
Oct 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 77 additions & 20 deletions Sources/GRPC/ConnectionPool/ConnectionPool.swift
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ internal final class ConnectionPool {

guard case .active = self._state else {
// Fail the promise right away if we're shutting down or already shut down.
promise.fail(ConnectionPoolError.shutdown)
promise.fail(GRPCConnectionPoolError.shutdown)
return
}

Expand Down Expand Up @@ -354,7 +354,7 @@ internal final class ConnectionPool {
Metadata.waitersMax: "\(self.maxWaiters)"
]
)
promise.fail(ConnectionPoolError.tooManyWaiters(connectionError: self._mostRecentError))
promise.fail(GRPCConnectionPoolError.tooManyWaiters(connectionError: self._mostRecentError))
return
}

Expand All @@ -364,7 +364,7 @@ internal final class ConnectionPool {
// timeout before appending it to the waiters, it wont run until the next event loop tick at the
// earliest (even if the deadline has already passed).
waiter.scheduleTimeout(on: self.eventLoop) {
waiter.fail(ConnectionPoolError.deadlineExceeded(connectionError: self._mostRecentError))
waiter.fail(GRPCConnectionPoolError.deadlineExceeded(connectionError: self._mostRecentError))

if let index = self.waiters.firstIndex(where: { $0.id == waiter.id }) {
self.waiters.remove(at: index)
Expand Down Expand Up @@ -550,7 +550,7 @@ internal final class ConnectionPool {

// Fail the outstanding waiters.
while let waiter = self.waiters.popFirst() {
waiter.fail(ConnectionPoolError.shutdown)
waiter.fail(GRPCConnectionPoolError.shutdown)
}

// Cascade the result of the shutdown into the promise.
Expand Down Expand Up @@ -864,40 +864,97 @@ extension ConnectionPool {
}
}

@usableFromInline
internal enum ConnectionPoolError: Error {
/// The pool is shutdown or shutting down.
case shutdown
/// An error thrown from the ``GRPCChannelPool``.
public struct GRPCConnectionPoolError: Error, CustomStringConvertible {
public struct Code: Hashable, Sendable, CustomStringConvertible {
enum Code {
case shutdown
case tooManyWaiters
case deadlineExceeded
}

fileprivate var code: Code

private init(_ code: Code) {
self.code = code
}

public var description: String {
String(describing: self.code)
}

/// There are too many waiters in the pool.
case tooManyWaiters(connectionError: Error?)
/// The pool is shutdown or shutting down.
public static var shutdown: Self { Self(.shutdown) }

/// The deadline for creating a stream has passed.
case deadlineExceeded(connectionError: Error?)
/// There are too many waiters in the pool.
public static var tooManyWaiters: Self { Self(.tooManyWaiters) }

/// The deadline for creating a stream has passed.
public static var deadlineExceeded: Self { Self(.deadlineExceeded) }
}

/// The error code.
public var code: Code

/// An underlying error which caused this error to be thrown.
public var underlyingError: Error?

public var description: String {
if let underlyingError = self.underlyingError {
return "\(self.code) (\(underlyingError))"
} else {
return String(describing: self.code)
}
}

/// Create a new connection pool error with the given code and underlying error.
///
/// - Parameters:
/// - code: The error code.
/// - underlyingError: The underlying error which led to this error being thrown.
public init(code: Code, underlyingError: Error? = nil) {
self.code = code
self.underlyingError = underlyingError
}
}

extension ConnectionPoolError: GRPCStatusTransformable {
extension GRPCConnectionPoolError {
@usableFromInline
internal func makeGRPCStatus() -> GRPCStatus {
switch self {
static let shutdown = Self(code: .shutdown)

@inlinable
static func tooManyWaiters(connectionError: Error?) -> Self {
Self(code: .tooManyWaiters, underlyingError: connectionError)
}

@inlinable
static func deadlineExceeded(connectionError: Error?) -> Self {
Self(code: .deadlineExceeded, underlyingError: connectionError)
}
}

extension GRPCConnectionPoolError: GRPCStatusTransformable {
public func makeGRPCStatus() -> GRPCStatus {
switch self.code.code {
case .shutdown:
return GRPCStatus(
code: .unavailable,
message: "The connection pool is shutdown"
message: "The connection pool is shutdown",
cause: self.underlyingError
)

case let .tooManyWaiters(error):
case .tooManyWaiters:
return GRPCStatus(
code: .resourceExhausted,
message: "The connection pool has no capacity for new RPCs or RPC waiters",
cause: error
cause: self.underlyingError
)

case let .deadlineExceeded(error):
case .deadlineExceeded:
return GRPCStatus(
code: .deadlineExceeded,
message: "Timed out waiting for an HTTP/2 stream from the connection pool",
cause: error
cause: self.underlyingError
)
}
}
Expand Down
85 changes: 47 additions & 38 deletions Tests/GRPCTests/ConnectionPool/ConnectionPoolTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ final class ConnectionPoolTests: GRPCTestCase {
}

XCTAssertThrowsError(try stream.wait()) { error in
XCTAssert((error as? ConnectionPoolError).isShutdown)
XCTAssert((error as? GRPCConnectionPoolError).isShutdown)
}
}

Expand All @@ -181,14 +181,14 @@ final class ConnectionPoolTests: GRPCTestCase {
}

XCTAssertThrowsError(try tooManyWaiters.wait()) { error in
XCTAssert((error as? ConnectionPoolError).isTooManyWaiters)
XCTAssert((error as? GRPCConnectionPoolError).isTooManyWaiters)
}

XCTAssertNoThrow(try pool.shutdown().wait())
// All 'waiting' futures will be failed by the shutdown promise.
for waiter in waiting {
XCTAssertThrowsError(try waiter.wait()) { error in
XCTAssert((error as? ConnectionPoolError).isShutdown)
XCTAssert((error as? GRPCConnectionPoolError).isShutdown)
}
}
}
Expand All @@ -205,7 +205,7 @@ final class ConnectionPoolTests: GRPCTestCase {

self.eventLoop.advanceTime(to: .uptimeNanoseconds(10))
XCTAssertThrowsError(try waiter.wait()) { error in
XCTAssert((error as? ConnectionPoolError).isDeadlineExceeded)
XCTAssert((error as? GRPCConnectionPoolError).isDeadlineExceeded)
}

XCTAssertEqual(pool.sync.waiters, 0)
Expand All @@ -225,7 +225,7 @@ final class ConnectionPoolTests: GRPCTestCase {

self.eventLoop.run()
XCTAssertThrowsError(try waiter.wait()) { error in
XCTAssert((error as? ConnectionPoolError).isDeadlineExceeded)
XCTAssert((error as? GRPCConnectionPoolError).isDeadlineExceeded)
}

XCTAssertEqual(pool.sync.waiters, 0)
Expand Down Expand Up @@ -358,7 +358,7 @@ final class ConnectionPoolTests: GRPCTestCase {
XCTAssertNoThrow(try shutdown.wait())
for waiter in others {
XCTAssertThrowsError(try waiter.wait()) { error in
XCTAssert((error as? ConnectionPoolError).isShutdown)
XCTAssert((error as? GRPCConnectionPoolError).isShutdown)
}
}
}
Expand Down Expand Up @@ -503,7 +503,7 @@ final class ConnectionPoolTests: GRPCTestCase {
// We need to advance the time to fire the timeout to fail the waiter.
self.eventLoop.advanceTime(to: .uptimeNanoseconds(10))
XCTAssertThrowsError(try waiter1.wait()) { error in
XCTAssert((error as? ConnectionPoolError).isDeadlineExceeded)
XCTAssert((error as? GRPCConnectionPoolError).isDeadlineExceeded)
}

self.eventLoop.run()
Expand Down Expand Up @@ -758,8 +758,10 @@ final class ConnectionPoolTests: GRPCTestCase {
self.eventLoop.advanceTime(to: .uptimeNanoseconds(10))

XCTAssertThrowsError(try w1.wait()) { error in
switch error as? ConnectionPoolError {
case .some(.deadlineExceeded(.none)):
switch error as? GRPCConnectionPoolError {
case .some(let error):
XCTAssertEqual(error.code, .deadlineExceeded)
XCTAssertNil(error.underlyingError)
// Deadline exceeded but no underlying error, as expected.
()
default:
Expand All @@ -774,10 +776,11 @@ final class ConnectionPoolTests: GRPCTestCase {
self.eventLoop.advanceTime(to: .uptimeNanoseconds(20))

XCTAssertThrowsError(try w2.wait()) { error in
switch error as? ConnectionPoolError {
case let .some(.deadlineExceeded(.some(wrappedError))):
switch error as? GRPCConnectionPoolError {
case let .some(error):
XCTAssertEqual(error.code, .deadlineExceeded)
// Deadline exceeded and we have the underlying error.
XCTAssert(wrappedError is DummyError)
XCTAssert(error.underlyingError is DummyError)
default:
XCTFail("Expected ConnectionPoolError.deadlineExceeded(.some) but got \(error)")
}
Expand Down Expand Up @@ -837,9 +840,10 @@ final class ConnectionPoolTests: GRPCTestCase {
$0.eventLoop.makeSucceededVoidFuture()
}
XCTAssertThrowsError(try tooManyWaiters.wait()) { error in
switch error as? ConnectionPoolError {
case .some(.tooManyWaiters(.none)):
()
switch error as? GRPCConnectionPoolError {
case .some(let error):
XCTAssertEqual(error.code, .tooManyWaiters)
XCTAssertNil(error.underlyingError)
default:
XCTFail("Expected ConnectionPoolError.tooManyWaiters(.none) but got \(error)")
}
Expand All @@ -849,9 +853,10 @@ final class ConnectionPoolTests: GRPCTestCase {
self.eventLoop.advanceTime(by: .seconds(1))
for waiter in waiters {
XCTAssertThrowsError(try waiter.wait()) { error in
switch error as? ConnectionPoolError {
case .some(.deadlineExceeded(.none)):
()
switch error as? GRPCConnectionPoolError {
case .some(let error):
XCTAssertEqual(error.code, .deadlineExceeded)
XCTAssertNil(error.underlyingError)
default:
XCTFail("Expected ConnectionPoolError.deadlineExceeded(.none) but got \(error)")
}
Expand All @@ -869,7 +874,7 @@ final class ConnectionPoolTests: GRPCTestCase {
XCTAssertNil(waiter._scheduledTimeout)

waiter.scheduleTimeout(on: self.eventLoop) {
waiter.fail(ConnectionPoolError.deadlineExceeded(connectionError: nil))
waiter.fail(GRPCConnectionPoolError.deadlineExceeded(connectionError: nil))
}

XCTAssertNotNil(waiter._scheduledTimeout)
Expand Down Expand Up @@ -1045,6 +1050,25 @@ final class ConnectionPoolTests: GRPCTestCase {
}
}
}

func testConnectionPoolErrorDescription() {
var error = GRPCConnectionPoolError(code: .deadlineExceeded)
XCTAssertEqual(String(describing: error), "deadlineExceeded")
error.code = .shutdown
XCTAssertEqual(String(describing: error), "shutdown")
error.code = .tooManyWaiters
XCTAssertEqual(String(describing: error), "tooManyWaiters")

struct DummyError: Error {}
error.underlyingError = DummyError()
XCTAssertEqual(String(describing: error), "tooManyWaiters (DummyError())")
}

func testConnectionPoolErrorCodeEquality() {
let error = GRPCConnectionPoolError(code: .deadlineExceeded)
XCTAssertEqual(error.code, .deadlineExceeded)
XCTAssertNotEqual(error.code, .shutdown)
}
}

extension ConnectionPool {
Expand Down Expand Up @@ -1216,31 +1240,16 @@ internal struct HookedStreamLender: StreamLender {
}
}

extension Optional where Wrapped == ConnectionPoolError {
extension Optional where Wrapped == GRPCConnectionPoolError {
internal var isTooManyWaiters: Bool {
switch self {
case .some(.tooManyWaiters):
return true
case .some(.deadlineExceeded), .some(.shutdown), .none:
return false
}
self?.code == .tooManyWaiters
}

internal var isDeadlineExceeded: Bool {
switch self {
case .some(.deadlineExceeded):
return true
case .some(.tooManyWaiters), .some(.shutdown), .none:
return false
}
self?.code == .deadlineExceeded
}

internal var isShutdown: Bool {
switch self {
case .some(.shutdown):
return true
case .some(.tooManyWaiters), .some(.deadlineExceeded), .none:
return false
}
self?.code == .shutdown
}
}
4 changes: 2 additions & 2 deletions Tests/GRPCTests/GRPCStatusTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,10 @@ class GRPCStatusTests: GRPCTestCase {
// No message/cause, so uses the nil backing storage.
XCTAssertEqual(status.testingOnly_storageObjectIdentifier, nilStorageID)

status.cause = ConnectionPoolError.tooManyWaiters(connectionError: nil)
status.cause = GRPCConnectionPoolError.tooManyWaiters(connectionError: nil)
let storageID = status.testingOnly_storageObjectIdentifier
XCTAssertNotEqual(storageID, nilStorageID)
XCTAssert(status.cause is ConnectionPoolError)
XCTAssert(status.cause is GRPCConnectionPoolError)

// The storage of status should be uniquely ref'd, so setting cause to nil should not change
// the backing storage (even if the nil storage could now be used).
Expand Down