Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@

@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
extension RPCWriter {
@usableFromInline
struct Closable: ClosableRPCWriterProtocol {
public struct Closable: ClosableRPCWriterProtocol {
@usableFromInline
let writer: any ClosableRPCWriterProtocol<Element>

Expand All @@ -36,7 +35,7 @@ extension RPCWriter {
///
/// - Parameter elements: The elements to write.
@inlinable
func write(contentsOf elements: some Sequence<Element>) async throws {
public func write(contentsOf elements: some Sequence<Element>) async throws {
try await self.writer.write(contentsOf: elements)
}

Expand All @@ -45,7 +44,7 @@ extension RPCWriter {
/// All writes after ``finish()`` has been called should result in an error
/// being thrown.
@inlinable
func finish() {
public func finish() {
self.writer.finish()
}

Expand All @@ -54,7 +53,7 @@ extension RPCWriter {
/// All writes after ``finish(throwing:)`` has been called should result in an error
/// being thrown.
@inlinable
func finish(throwing error: Error) {
public func finish(throwing error: Error) {
self.writer.finish(throwing: error)
}
}
Expand Down
62 changes: 62 additions & 0 deletions Sources/GRPCCore/Transport/InProcessServerTransport.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright 2023, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
/// An in-process implementation of a ``ServerTransport``.
public struct InProcessServerTransport: ServerTransport {
public typealias Inbound = RPCAsyncSequence<RPCRequestPart>
public typealias Outbound = RPCWriter<RPCResponsePart>.Closable

private let newStreams: AsyncStream<RPCStream<Inbound, Outbound>>
private let newStreamsContinuation: AsyncStream<RPCStream<Inbound, Outbound>>.Continuation

/// Creates a new instance of ``InProcessServerTransport``.
public init() {
(self.newStreams, self.newStreamsContinuation) = AsyncStream.makeStream()
}

/// Publish a new ``RPCStream``, which will be returned by the transport's ``RPCAsyncSequence``,
/// returned when calling ``listen()``.
///
/// - Parameter stream: The new ``RPCStream`` to publish.
/// - Throws: ``RPCError`` with code ``RPCError/Code-swift.struct/failedPrecondition``
/// if the server transport stopped listening to new streams (i.e., if ``stopListening()`` has been called).
internal func acceptStream(_ stream: RPCStream<Inbound, Outbound>) throws {
let yieldResult = self.newStreamsContinuation.yield(stream)
if case .terminated = yieldResult {
throw RPCError(
code: .failedPrecondition,
message: "The server transport is closed."
)
}
}

/// Return a new ``RPCAsyncSequence`` that will contain all published ``RPCStream``s published
/// to this transport using the ``acceptStream(_:)`` method.
///
/// - Returns: An ``RPCAsyncSequence`` of all published ``RPCStream``s.
public func listen() -> RPCAsyncSequence<RPCStream<Inbound, Outbound>> {
RPCAsyncSequence(wrapping: self.newStreams)
}

/// Stop listening to any new ``RPCStream`` publications.
///
/// All further calls to ``acceptStream(_:)`` will not produce any new elements on the
/// ``RPCAsyncSequence`` returned by ``listen()``.
public func stopListening() {
self.newStreamsContinuation.finish()
}
}
88 changes: 88 additions & 0 deletions Tests/GRPCCoreTests/Transport/InProcessServerTransportTest.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright 2023, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import XCTest

@testable import GRPCCore

final class InProcessServerTransportTest: XCTestCase {
func testStartListening() async throws {
let transport = InProcessServerTransport()
let stream = RPCStream<RPCAsyncSequence<RPCRequestPart>, RPCWriter<RPCResponsePart>.Closable>(
descriptor: .init(service: "testService", method: "testMethod"),
inbound: .elements([.message([42])]),
outbound: .init(
wrapping: BufferedStream.Source(
storage: .init(backPressureStrategy: .watermark(.init(low: 1, high: 1)))
)
)
)

let streamSequence = transport.listen()
var streamSequenceInterator = streamSequence.makeAsyncIterator()

try transport.acceptStream(stream)

let testStream = try await streamSequenceInterator.next()
let messages = try await testStream?.inbound.collect()
XCTAssertEqual(messages, [.message([42])])
}

func testStopListening() async throws {
let transport = InProcessServerTransport()
let firstStream = RPCStream<
RPCAsyncSequence<RPCRequestPart>, RPCWriter<RPCResponsePart>.Closable
>(
descriptor: .init(service: "testService1", method: "testMethod1"),
inbound: .elements([.message([42])]),
outbound: .init(
wrapping: BufferedStream.Source(
storage: .init(backPressureStrategy: .watermark(.init(low: 1, high: 1)))
)
)
)

let streamSequence = transport.listen()
var streamSequenceInterator = streamSequence.makeAsyncIterator()

try transport.acceptStream(firstStream)

let firstTestStream = try await streamSequenceInterator.next()
let firstStreamMessages = try await firstTestStream?.inbound.collect()
XCTAssertEqual(firstStreamMessages, [.message([42])])

transport.stopListening()

let secondStream = RPCStream<
RPCAsyncSequence<RPCRequestPart>, RPCWriter<RPCResponsePart>.Closable
>(
descriptor: .init(service: "testService1", method: "testMethod1"),
inbound: .elements([.message([42])]),
outbound: .init(
wrapping: BufferedStream.Source(
storage: .init(backPressureStrategy: .watermark(.init(low: 1, high: 1)))
)
)
)

XCTAssertThrowsRPCError(try transport.acceptStream(secondStream)) { error in
XCTAssertEqual(error.code, .failedPrecondition)
}

let secondTestStream = try await streamSequenceInterator.next()
XCTAssertNil(secondTestStream)
}
}