Skip to content

Commit cb827bf

Browse files
committed
PR and format changes
1 parent b40fddb commit cb827bf

File tree

3 files changed

+81
-57
lines changed

3 files changed

+81
-57
lines changed

Sources/GRPCCore/Transport/InProcessServerTransport.swift

Lines changed: 41 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -14,42 +14,49 @@
1414
* limitations under the License.
1515
*/
1616

17-
1817
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
1918
/// An in-process implementation of a ``ServerTransport``.
2019
public struct InProcessServerTransport: ServerTransport {
21-
public typealias Inbound = RPCAsyncSequence<RPCRequestPart>
22-
public typealias Outbound = RPCWriter<RPCResponsePart>.Closable
23-
24-
private let newStreams: AsyncStream<RPCStream<Inbound, Outbound>>
25-
private let newStreamsContinuation: AsyncStream<RPCStream<Inbound, Outbound>>.Continuation
26-
27-
/// Creates a new instance of ``InProcessServerTransport``.
28-
public init() {
29-
(self.newStreams, self.newStreamsContinuation) = AsyncStream.makeStream()
30-
}
31-
32-
/// Publish a new ``RPCStream``, which will be returned by the transport's ``RPCAsyncSequence``,
33-
/// returned when calling ``listen()``.
34-
///
35-
/// - Parameter stream: The new ``RPCStream`` to publish.
36-
public func acceptStream(_ stream: RPCStream<Inbound, Outbound>) {
37-
self.newStreamsContinuation.yield(stream)
38-
}
39-
40-
/// Return a new ``RPCAsyncSequence`` that will contain all published ``RPCStream``s published
41-
/// to this transport using the ``acceptStream(_:)`` method.
42-
///
43-
/// - Returns: An ``RPCAsyncSequence`` of all published ``RPCStream``s.
44-
public func listen() -> RPCAsyncSequence<RPCStream<Inbound, Outbound>> {
45-
RPCAsyncSequence(wrapping: self.newStreams)
46-
}
47-
48-
/// Stop listening to any new ``RPCStream`` publications.
49-
///
50-
/// All further calls to ``acceptStream(_:)`` will not produce any new elements on the
51-
/// ``RPCAsyncSequence`` returned by ``listen()``.
52-
public func stopListening() {
53-
self.newStreamsContinuation.finish()
20+
public typealias Inbound = RPCAsyncSequence<RPCRequestPart>
21+
public typealias Outbound = RPCWriter<RPCResponsePart>.Closable
22+
23+
private let newStreams: AsyncStream<RPCStream<Inbound, Outbound>>
24+
private let newStreamsContinuation: AsyncStream<RPCStream<Inbound, Outbound>>.Continuation
25+
26+
/// Creates a new instance of ``InProcessServerTransport``.
27+
public init() {
28+
(self.newStreams, self.newStreamsContinuation) = AsyncStream.makeStream()
29+
}
30+
31+
/// Publish a new ``RPCStream``, which will be returned by the transport's ``RPCAsyncSequence``,
32+
/// returned when calling ``listen()``.
33+
///
34+
/// - Parameter stream: The new ``RPCStream`` to publish.
35+
/// - Throws: ``RPCError`` with code ``RPCError/Code-swift.struct/failedPrecondition``
36+
/// if the server transport stopped listening to new streams (i.e., if ``stopListening()`` has been called).
37+
internal func acceptStream(_ stream: RPCStream<Inbound, Outbound>) throws {
38+
let yieldResult = self.newStreamsContinuation.yield(stream)
39+
if case .terminated = yieldResult {
40+
throw RPCError(
41+
code: .failedPrecondition,
42+
message: "The server transport is closed."
43+
)
5444
}
45+
}
46+
47+
/// Return a new ``RPCAsyncSequence`` that will contain all published ``RPCStream``s published
48+
/// to this transport using the ``acceptStream(_:)`` method.
49+
///
50+
/// - Returns: An ``RPCAsyncSequence`` of all published ``RPCStream``s.
51+
public func listen() -> RPCAsyncSequence<RPCStream<Inbound, Outbound>> {
52+
RPCAsyncSequence(wrapping: self.newStreams)
53+
}
54+
55+
/// Stop listening to any new ``RPCStream`` publications.
56+
///
57+
/// All further calls to ``acceptStream(_:)`` will not produce any new elements on the
58+
/// ``RPCAsyncSequence`` returned by ``listen()``.
59+
public func stopListening() {
60+
self.newStreamsContinuation.finish()
61+
}
5562
}

Tests/GRPCCoreTests/Transport/InProcessServerTransportTest.swift

Lines changed: 40 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -24,47 +24,64 @@ final class InProcessServerTransportTest: XCTestCase {
2424
let stream = RPCStream<RPCAsyncSequence<RPCRequestPart>, RPCWriter<RPCResponsePart>.Closable>(
2525
descriptor: .init(service: "testService", method: "testMethod"),
2626
inbound: .elements([.message([42])]),
27-
outbound: .init(wrapping: BufferedStream.Source(storage: .init(backPressureStrategy: .watermark(.init(low: 1, high: 1)))))
27+
outbound: .init(
28+
wrapping: BufferedStream.Source(
29+
storage: .init(backPressureStrategy: .watermark(.init(low: 1, high: 1)))
30+
)
31+
)
2832
)
29-
33+
3034
let streamSequence = transport.listen()
3135
var streamSequenceInterator = streamSequence.makeAsyncIterator()
32-
33-
transport.acceptStream(stream)
34-
36+
37+
try transport.acceptStream(stream)
38+
3539
let testStream = try await streamSequenceInterator.next()
36-
var inboundIterator = testStream?.inbound.makeAsyncIterator()
37-
let rpcRequestPart = try await inboundIterator?.next()
38-
XCTAssertEqual(rpcRequestPart, .message([42]))
40+
let messages = try await testStream?.inbound.collect()
41+
XCTAssertEqual(messages, [.message([42])])
3942
}
40-
43+
4144
func testStopListening() async throws {
4245
let transport = InProcessServerTransport()
43-
let firstStream = RPCStream<RPCAsyncSequence<RPCRequestPart>, RPCWriter<RPCResponsePart>.Closable>(
46+
let firstStream = RPCStream<
47+
RPCAsyncSequence<RPCRequestPart>, RPCWriter<RPCResponsePart>.Closable
48+
>(
4449
descriptor: .init(service: "testService1", method: "testMethod1"),
4550
inbound: .elements([.message([42])]),
46-
outbound: .init(wrapping: BufferedStream.Source(storage: .init(backPressureStrategy: .watermark(.init(low: 1, high: 1)))))
51+
outbound: .init(
52+
wrapping: BufferedStream.Source(
53+
storage: .init(backPressureStrategy: .watermark(.init(low: 1, high: 1)))
54+
)
55+
)
4756
)
48-
57+
4958
let streamSequence = transport.listen()
5059
var streamSequenceInterator = streamSequence.makeAsyncIterator()
51-
52-
transport.acceptStream(firstStream)
53-
60+
61+
try transport.acceptStream(firstStream)
62+
5463
let firstTestStream = try await streamSequenceInterator.next()
55-
var inboundIterator = firstTestStream?.inbound.makeAsyncIterator()
56-
let rpcRequestPart = try await inboundIterator?.next()
57-
XCTAssertEqual(rpcRequestPart, .message([42]))
58-
64+
let firstStreamMessages = try await firstTestStream?.inbound.collect()
65+
XCTAssertEqual(firstStreamMessages, [.message([42])])
66+
5967
transport.stopListening()
60-
61-
let secondStream = RPCStream<RPCAsyncSequence<RPCRequestPart>, RPCWriter<RPCResponsePart>.Closable>(
68+
69+
let secondStream = RPCStream<
70+
RPCAsyncSequence<RPCRequestPart>, RPCWriter<RPCResponsePart>.Closable
71+
>(
6272
descriptor: .init(service: "testService1", method: "testMethod1"),
6373
inbound: .elements([.message([42])]),
64-
outbound: .init(wrapping: BufferedStream.Source(storage: .init(backPressureStrategy: .watermark(.init(low: 1, high: 1)))))
74+
outbound: .init(
75+
wrapping: BufferedStream.Source(
76+
storage: .init(backPressureStrategy: .watermark(.init(low: 1, high: 1)))
77+
)
78+
)
6579
)
80+
81+
XCTAssertThrowsRPCError(try transport.acceptStream(secondStream)) { error in
82+
XCTAssertEqual(error.code, .failedPrecondition)
83+
}
6684

67-
transport.acceptStream(secondStream)
6885
let secondTestStream = try await streamSequenceInterator.next()
6986
XCTAssertNil(secondTestStream)
7087
}

0 commit comments

Comments
 (0)