Skip to content

Commit

Permalink
HTTP/2 negotiated connection pipeline config (#408)
Browse files Browse the repository at this point in the history
Motivation:

This continues the work to expose functionality which allows users to interact
with HTTP/2 connections via async abstractions and using structured
concurrency.

This work enables protocol negotiation between HTTP/1.1 and HTTP/2 in its most generic form.

Modifications:

Enable protocol negotiation between HTTP/1.1 and HTTP/2 using typed
negotiation results (`NIOProtocolNegotiationResult`) from the
`NIOTypedApplicationProtocolNegotiationHandler`.
In the HTTP/2 case the negotiation result will return the HTTP/2 connection
channel and the `NIOHTTP2Handler.AsyncStreamMultiplexer` which exposes
inbound streams as an iterable async stream.

Result:

Users will be able to set up negotiated HTTP/1.1 / HTTP/2 connections and
iterate over inbound streams.
  • Loading branch information
rnro authored Jul 12, 2023
1 parent 188fa9b commit 044339d
Show file tree
Hide file tree
Showing 8 changed files with 561 additions and 280 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ extension InlineStreamMultiplexer {
self.commonStreamMultiplexer.createStreamChannel(multiplexer: .inline(self), streamStateInitializer)
}

internal func createStreamChannel<Output>(_ initializer: @escaping NIOHTTP2Handler.StreamInitializerWithOutput<Output>) -> EventLoopFuture<Output> {
internal func createStreamChannel<Output>(_ initializer: @escaping NIOChannelInitializerWithOutput<Output>) -> EventLoopFuture<Output> {
self.commonStreamMultiplexer.createStreamChannel(multiplexer: .inline(self), initializer)
}
}
Expand Down Expand Up @@ -239,7 +239,7 @@ extension NIOHTTP2Handler {
}

/// Create a stream channel initialized with the provided closure
public func createStreamChannel<OutboundStreamOutput>(_ initializer: @escaping NIOHTTP2Handler.StreamInitializerWithOutput<OutboundStreamOutput>) async throws -> OutboundStreamOutput {
public func createStreamChannel<OutboundStreamOutput>(_ initializer: @escaping NIOChannelInitializerWithOutput<OutboundStreamOutput>) async throws -> OutboundStreamOutput {
return try await self.inlineStreamMultiplexer.createStreamChannel(initializer).get()
}

Expand All @@ -253,8 +253,8 @@ extension NIOHTTP2Handler {
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
@_spi(AsyncChannel)
public func createStreamChannel<Inbound, Outbound>(
configuration: NIOAsyncChannel<Inbound, Outbound>.Configuration,
initializer: @escaping NIOHTTP2Handler.StreamInitializer
configuration: NIOAsyncChannel<Inbound, Outbound>.Configuration = .init(),
initializer: @escaping NIOChannelInitializer
) async throws -> NIOAsyncChannel<Inbound, Outbound> {
return try await self.createStreamChannel { channel in
initializer(channel).flatMapThrowing { _ in
Expand Down
41 changes: 28 additions & 13 deletions Sources/NIOHTTP2/HTTP2ChannelHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ public let nioDefaultSettings = [
HTTP2Setting(parameter: .maxHeaderListSize, value: HPACKDecoder.defaultMaxHeaderListSize),
]


/// ``NIOHTTP2Handler`` implements the HTTP/2 protocol for a single connection.
///
/// This `ChannelHandler` takes a series of bytes and turns them into a sequence of ``HTTP2Frame`` objects.
Expand Down Expand Up @@ -988,19 +987,11 @@ extension NIOHTTP2Handler {

extension NIOHTTP2Handler {
#if swift(>=5.7)
/// The type of all `inboundStreamInitializer` callbacks.
public typealias StreamInitializer = @Sendable (Channel) -> EventLoopFuture<Void>
/// The type of all `connectionInitializer` callbacks.
public typealias ConnectionInitializer = @Sendable (Channel) -> EventLoopFuture<Void>
/// The type of `inboundStreamInitializer` callbacks which return non-void results.
public typealias StreamInitializerWithOutput<Output> = @Sendable (Channel) -> EventLoopFuture<Output>
/// The type of all `inboundStreamInitializer` callbacks which do not need to return data.
public typealias StreamInitializer = NIOChannelInitializer
#else
/// The type of all `inboundStreamInitializer` callbacks.
public typealias StreamInitializer = (Channel) -> EventLoopFuture<Void>
/// The type of all `connectionInitializer` callbacks.
public typealias ConnectionInitializer = (Channel) -> EventLoopFuture<Void>
/// The type of `inboundStreamInitializer` callbacks which return non-void results.
public typealias StreamInitializerWithOutput<Output> = (Channel) -> EventLoopFuture<Output>
/// The type of all `inboundStreamInitializer` callbacks which need to return data.
public typealias StreamInitializer = NIOChannelInitializer
#endif

/// Creates a new ``NIOHTTP2Handler`` with a local multiplexer. (i.e. using
Expand Down Expand Up @@ -1035,6 +1026,9 @@ extension NIOHTTP2Handler {
}

/// Connection-level configuration.
///
/// The settings that will be used when establishing the connection. These will be sent to the peer as part of the
/// handshake.
public struct ConnectionConfiguration: Hashable, Sendable {
public var initialSettings: HTTP2Settings = nioDefaultSettings
public var headerBlockValidation: ValidationState = .enabled
Expand All @@ -1045,13 +1039,34 @@ extension NIOHTTP2Handler {
}

/// Stream-level configuration.
///
/// The settings that will be used when establishing new streams. These mainly pertain to flow control.
public struct StreamConfiguration: Hashable, Sendable {
public var targetWindowSize: Int = 65535
public var outboundBufferSizeHighWatermark: Int = 8196
public var outboundBufferSizeLowWatermark: Int = 4092
public init() {}
}

/// Overall connection and stream-level configuration.
public struct Configuration: Hashable, Sendable {
/// The settings that will be used when establishing the connection. These will be sent to the peer as part of the
/// handshake.
public var connection: ConnectionConfiguration
/// The settings that will be used when establishing new streams. These mainly pertain to flow control.
public var stream: StreamConfiguration

public init() {
self.connection = .init()
self.stream = .init()
}

public init(connection: ConnectionConfiguration, stream: StreamConfiguration) {
self.connection = connection
self.stream = stream
}
}

/// An `EventLoopFuture` which returns a ``StreamMultiplexer`` which can be used to create new outbound HTTP/2 streams.
///
/// > Note: This is only safe to get if the ``NIOHTTP2Handler`` uses a local multiplexer,
Expand Down
12 changes: 6 additions & 6 deletions Sources/NIOHTTP2/HTTP2CommonInboundStreamMultiplexer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ extension HTTP2CommonInboundStreamMultiplexer {
internal func _createStreamChannel<Output>(
_ multiplexer: HTTP2StreamChannel.OutboundStreamMultiplexer,
_ promise: EventLoopPromise<Output>?,
_ streamStateInitializer: @escaping NIOHTTP2Handler.StreamInitializerWithOutput<Output>
_ streamStateInitializer: @escaping NIOChannelInitializerWithOutput<Output>
) {
self.channel.eventLoop.assertInEventLoop()

Expand All @@ -307,7 +307,7 @@ extension HTTP2CommonInboundStreamMultiplexer {
internal func createStreamChannel<Output>(
multiplexer: HTTP2StreamChannel.OutboundStreamMultiplexer,
promise: EventLoopPromise<Output>?,
_ streamStateInitializer: @escaping NIOHTTP2Handler.StreamInitializerWithOutput<Output>
_ streamStateInitializer: @escaping NIOChannelInitializerWithOutput<Output>
) {
if self.channel.eventLoop.inEventLoop {
self._createStreamChannel(multiplexer, promise, streamStateInitializer)
Expand All @@ -320,7 +320,7 @@ extension HTTP2CommonInboundStreamMultiplexer {

internal func createStreamChannel<Output>(
multiplexer: HTTP2StreamChannel.OutboundStreamMultiplexer,
_ streamStateInitializer: @escaping NIOHTTP2Handler.StreamInitializerWithOutput<Output>
_ streamStateInitializer: @escaping NIOChannelInitializerWithOutput<Output>
) -> EventLoopFuture<Output> {
let promise = self.channel.eventLoop.makePromise(of: Output.self)
self.createStreamChannel(multiplexer: multiplexer, promise: promise, streamStateInitializer)
Expand Down Expand Up @@ -426,11 +426,11 @@ internal protocol ChannelContinuation {
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
struct StreamChannelContinuation<Output>: ChannelContinuation {
private var continuation: AsyncThrowingStream<Output, Error>.Continuation
private let inboundStreamInititializer: NIOHTTP2Handler.StreamInitializerWithOutput<Output>
private let inboundStreamInititializer: NIOChannelInitializerWithOutput<Output>

private init(
continuation: AsyncThrowingStream<Output, Error>.Continuation,
inboundStreamInititializer: @escaping NIOHTTP2Handler.StreamInitializerWithOutput<Output>
inboundStreamInititializer: @escaping NIOChannelInitializerWithOutput<Output>
) {
self.continuation = continuation
self.inboundStreamInititializer = inboundStreamInititializer
Expand All @@ -446,7 +446,7 @@ struct StreamChannelContinuation<Output>: ChannelContinuation {
/// have a `Output` corresponding to that `NIOAsyncChannel` type. Another example is in cases where there is
/// per-stream protocol negotiation where `Output` would be some form of `NIOProtocolNegotiationResult`.
static func initialize(
with inboundStreamInititializer: @escaping NIOHTTP2Handler.StreamInitializerWithOutput<Output>
with inboundStreamInititializer: @escaping NIOChannelInitializerWithOutput<Output>
) -> (StreamChannelContinuation<Output>, NIOHTTP2InboundStreamChannels<Output>) {
let (stream, continuation) = AsyncThrowingStream.makeStream(of: Output.self)
return (StreamChannelContinuation(continuation: continuation, inboundStreamInititializer: inboundStreamInititializer), NIOHTTP2InboundStreamChannels(stream))
Expand Down
Loading

0 comments on commit 044339d

Please sign in to comment.