Skip to content

Update to SwiftNIO HTTP/2 1.13.0 #922

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Package.resolved
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
"repositoryURL": "https://github.com/apple/swift-nio-http2.git",
"state": {
"branch": null,
"revision": "c5d10f4165128c3d0cc0e3c0f0a8ef55947a73a6",
"version": "1.12.2"
"revision": "e9627350bdb85bde7e0dc69a29799e40961ced72",
"version": "1.13.0"
}
},
{
Expand Down
2 changes: 1 addition & 1 deletion Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ let package = Package(
// Main SwiftNIO package
.package(url: "https://github.com/apple/swift-nio.git", from: "2.19.0"),
// HTTP2 via SwiftNIO
.package(url: "https://github.com/apple/swift-nio-http2.git", from: "1.12.1"),
.package(url: "https://github.com/apple/swift-nio-http2.git", from: "1.13.0"),
// TLS via SwiftNIO
.package(url: "https://github.com/apple/swift-nio-ssl.git", from: "2.8.0"),
// Support for Network.framework where possible.
Expand Down
6 changes: 2 additions & 4 deletions Sources/GRPC/ClientCalls/ClientCallTransport.swift
Original file line number Diff line number Diff line change
Expand Up @@ -168,13 +168,11 @@ internal class ChannelTransport<Request, Response> {
multiplexer.whenComplete { result in
switch result {
case .success(let mux):
mux.createStreamChannel(promise: streamPromise) { stream, streamID in
var logger = logger
logger[metadataKey: MetadataKey.streamID] = "\(streamID)"
mux.createStreamChannel(promise: streamPromise) { stream in
logger.trace("created http/2 stream")

return stream.pipeline.addHandlers([
_GRPCClientChannelHandler(streamID: streamID, callType: callType, logger: logger),
_GRPCClientChannelHandler(callType: callType, logger: logger),
GRPCClientCodecHandler(serializer: serializer, deserializer: deserializer),
GRPCClientCallHandler(call: call)
])
Expand Down
6 changes: 3 additions & 3 deletions Sources/GRPC/ClientConnection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,7 @@ extension ClientConnection {
public var httpTargetWindowSize: Int

/// The HTTP protocol used for this connection.
public var httpProtocol: HTTP2ToHTTP1ClientCodec.HTTPProtocol {
public var httpProtocol: HTTP2FramePayloadToHTTP1ClientCodec.HTTPProtocol {
return self.tls == nil ? .http : .https
}

Expand Down Expand Up @@ -642,7 +642,7 @@ extension Channel {
}

let configuration: EventLoopFuture<Void> = (tlsConfigured ?? self.eventLoop.makeSucceededFuture(())).flatMap {
self.configureHTTP2Pipeline(mode: .client, targetWindowSize: httpTargetWindowSize)
self.configureHTTP2Pipeline(mode: .client, targetWindowSize: httpTargetWindowSize, inboundStreamInitializer: nil)
}.flatMap { _ in
return self.pipeline.handler(type: NIOHTTP2Handler.self).flatMap { http2Handler in
self.pipeline.addHandlers([
Expand Down Expand Up @@ -677,7 +677,7 @@ extension Channel {
errorDelegate: ClientErrorDelegate?,
logger: Logger
) -> EventLoopFuture<Void> {
return self.configureHTTP2Pipeline(mode: .client).flatMap { _ in
return self.configureHTTP2Pipeline(mode: .client, inboundStreamInitializer: nil).flatMap { _ in
self.pipeline.addHandler(DelegatingErrorHandler(logger: logger, delegate: errorDelegate))
}
}
Expand Down
19 changes: 14 additions & 5 deletions Sources/GRPC/HTTPProtocolSwitcher.swift
Original file line number Diff line number Diff line change
Expand Up @@ -145,13 +145,22 @@ extension HTTPProtocolSwitcher: ChannelInboundHandler, RemovableChannelHandler {
context.channel.configureHTTP2Pipeline(
mode: .server,
targetWindowSize: httpTargetWindowSize
) { (streamChannel, streamID) in
) { streamChannel in
var logger = self.logger
logger[metadataKey: MetadataKey.streamID] = "\(streamID)"
return streamChannel.pipeline.addHandler(HTTP2ToHTTP1ServerCodec(streamID: streamID, normalizeHTTPHeaders: true)).flatMap {
self.handlersInitializer(streamChannel, logger)

// Grab the streamID from the channel.
return streamChannel.getOption(HTTP2StreamChannelOptions.streamID).map { streamID in
logger[metadataKey: MetadataKey.streamID] = "\(streamID)"
return logger
}.recover { _ in
logger[metadataKey: MetadataKey.streamID] = "<unknown>"
return logger
}.flatMap { logger in
return streamChannel.pipeline.addHandler(HTTP2FramePayloadToHTTP1ServerCodec()).flatMap {
self.handlersInitializer(streamChannel, logger)
}
}
}.flatMap { multiplexer in
}.flatMap { multiplexer -> EventLoopFuture<Void> in
// Add a keepalive and idle handlers between the two HTTP2 handlers.
let keepaliveHandler = GRPCServerKeepaliveHandler(configuration: self.keepAlive)
let idleHandler = GRPCIdleHandler(mode: .server, idleTimeout: self.idleTimeout)
Expand Down
2 changes: 1 addition & 1 deletion Sources/GRPC/_EmbeddedThroughput.swift
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ extension EmbeddedChannel {
responseType: Response.Type = Response.self
) -> EventLoopFuture<Void> {
return self.pipeline.addHandlers([
_GRPCClientChannelHandler(streamID: 1, callType: callType, logger: logger),
_GRPCClientChannelHandler(callType: callType, logger: logger),
GRPCClientCodecHandler(
serializer: ProtobufSerializer<Request>(),
deserializer: ProtobufDeserializer<Response>()
Expand Down
32 changes: 11 additions & 21 deletions Sources/GRPC/_GRPCClientChannelHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -268,18 +268,14 @@ public enum GRPCCallType {
/// `public` because it is used within performance tests.
public final class _GRPCClientChannelHandler {
private let logger: Logger
private let streamID: HTTP2StreamID
private var stateMachine: GRPCClientStateMachine

/// Creates a new gRPC channel handler for clients to translate HTTP/2 frames to gRPC messages.
///
/// - Parameters:
/// - streamID: The ID of the HTTP/2 stream that this handler will read and write HTTP/2
/// frames on.
/// - callType: Type of RPC call being made.
/// - logger: Logger.
public init(streamID: HTTP2StreamID, callType: GRPCCallType, logger: Logger) {
self.streamID = streamID
public init(callType: GRPCCallType, logger: Logger) {
self.logger = logger
switch callType {
case .unary:
Expand All @@ -296,12 +292,12 @@ public final class _GRPCClientChannelHandler {

// MARK: - GRPCClientChannelHandler: Inbound
extension _GRPCClientChannelHandler: ChannelInboundHandler {
public typealias InboundIn = HTTP2Frame
public typealias InboundIn = HTTP2Frame.FramePayload
public typealias InboundOut = _RawGRPCClientResponsePart

public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
let frame = self.unwrapInboundIn(data)
switch frame.payload {
let payload = self.unwrapInboundIn(data)
switch payload {
case .headers(let content):
self.readHeaders(content: content, context: context)

Expand Down Expand Up @@ -437,7 +433,7 @@ extension _GRPCClientChannelHandler: ChannelInboundHandler {
// MARK: - GRPCClientChannelHandler: Outbound
extension _GRPCClientChannelHandler: ChannelOutboundHandler {
public typealias OutboundIn = _RawGRPCClientRequestPart
public typealias OutboundOut = HTTP2Frame
public typealias OutboundOut = HTTP2Frame.FramePayload

public func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
switch self.unwrapOutboundIn(data) {
Expand All @@ -446,8 +442,8 @@ extension _GRPCClientChannelHandler: ChannelOutboundHandler {
switch self.stateMachine.sendRequestHeaders(requestHead: requestHead) {
case .success(let headers):
// We're clear to write some headers. Create an appropriate frame and write it.
let frame = HTTP2Frame(streamID: self.streamID, payload: .headers(.init(headers: headers)))
context.write(self.wrapOutboundOut(frame), promise: promise)
let framePayload = HTTP2Frame.FramePayload.headers(.init(headers: headers))
context.write(self.wrapOutboundOut(framePayload), promise: promise)

case .failure(let sendRequestHeadersError):
switch sendRequestHeadersError {
Expand All @@ -464,11 +460,8 @@ extension _GRPCClientChannelHandler: ChannelOutboundHandler {
switch result {
case .success(let buffer):
// We're clear to send a message; wrap it up in an HTTP/2 frame.
let frame = HTTP2Frame(
streamID: self.streamID,
payload: .data(.init(data: .byteBuffer(buffer)))
)
context.write(self.wrapOutboundOut(frame), promise: promise)
let framePayload = HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer)))
context.write(self.wrapOutboundOut(framePayload), promise: promise)

case .failure(let writeError):
switch writeError {
Expand All @@ -493,11 +486,8 @@ extension _GRPCClientChannelHandler: ChannelOutboundHandler {
case .success:
// We can. Send an empty DATA frame with end-stream set.
let empty = context.channel.allocator.buffer(capacity: 0)
let frame = HTTP2Frame(
streamID: self.streamID,
payload: .data(.init(data: .byteBuffer(empty), endStream: true))
)
context.write(self.wrapOutboundOut(frame), promise: promise)
let framePayload = HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(empty), endStream: true))
context.write(self.wrapOutboundOut(framePayload), promise: promise)

case .failure(let error):
// Why can't we close the request stream?
Expand Down
17 changes: 6 additions & 11 deletions Tests/GRPCTests/GRPCStatusCodeTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,13 @@ class GRPCStatusCodeTests: GRPCTestCase {
override func setUp() {
super.setUp()

let handler = _GRPCClientChannelHandler(
streamID: .init(1),
callType: .unary,
logger: self.logger
)

let handler = _GRPCClientChannelHandler(callType: .unary, logger: self.logger)
self.channel = EmbeddedChannel(handler: handler)
}

func headersFrame(status: HTTPResponseStatus) -> HTTP2Frame {
func headersFramePayload(status: HTTPResponseStatus) -> HTTP2Frame.FramePayload {
let headers: HPACKHeaders = [":status": "\(status.code)"]
return .init(streamID: .init(1), payload: .headers(.init(headers: headers)))
return .headers(.init(headers: headers))
}

func sendRequestHead() {
Expand All @@ -60,7 +55,7 @@ class GRPCStatusCodeTests: GRPCTestCase {
func doTestResponseStatus(_ status: HTTPResponseStatus, expected: GRPCStatus.Code) throws {
// Send the request head so we're in a valid state to receive headers.
self.sendRequestHead()
XCTAssertThrowsError(try self.channel.writeInbound(self.headersFrame(status: status))) { error in
XCTAssertThrowsError(try self.channel.writeInbound(self.headersFramePayload(status: status))) { error in
guard let withContext = error as? GRPCError.WithContext,
let invalidHTTPStatus = withContext.error as? GRPCError.InvalidHTTPStatus else {
XCTFail("Unexpected error: \(error)")
Expand Down Expand Up @@ -113,8 +108,8 @@ class GRPCStatusCodeTests: GRPCTestCase {
]

self.sendRequestHead()
let headerFrame = HTTP2Frame(streamID: .init(1), payload: .headers(.init(headers: headers)))
XCTAssertThrowsError(try self.channel.writeInbound(headerFrame)) { error in
let headerFramePayload = HTTP2Frame.FramePayload.headers(.init(headers: headers))
XCTAssertThrowsError(try self.channel.writeInbound(headerFramePayload)) { error in
guard let withContext = error as? GRPCError.WithContext,
let invalidHTTPStatus = withContext.error as? GRPCError.InvalidHTTPStatusWithGRPCStatus else {
XCTFail("Unexpected error: \(error)")
Expand Down