Skip to content

Commit f162f96

Browse files
committed
PR and format changes
1 parent b40fddb commit f162f96

File tree

3 files changed

+69
-56
lines changed

3 files changed

+69
-56
lines changed

Sources/GRPCCore/Transport/InProcessServerTransport.swift

Lines changed: 34 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -14,42 +14,41 @@
1414
* limitations under the License.
1515
*/
1616

17-
1817
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
1918
/// An in-process implementation of a ``ServerTransport``.
2019
public struct InProcessServerTransport: ServerTransport {
21-
public typealias Inbound = RPCAsyncSequence<RPCRequestPart>
22-
public typealias Outbound = RPCWriter<RPCResponsePart>.Closable
23-
24-
private let newStreams: AsyncStream<RPCStream<Inbound, Outbound>>
25-
private let newStreamsContinuation: AsyncStream<RPCStream<Inbound, Outbound>>.Continuation
26-
27-
/// Creates a new instance of ``InProcessServerTransport``.
28-
public init() {
29-
(self.newStreams, self.newStreamsContinuation) = AsyncStream.makeStream()
30-
}
31-
32-
/// Publish a new ``RPCStream``, which will be returned by the transport's ``RPCAsyncSequence``,
33-
/// returned when calling ``listen()``.
34-
///
35-
/// - Parameter stream: The new ``RPCStream`` to publish.
36-
public func acceptStream(_ stream: RPCStream<Inbound, Outbound>) {
37-
self.newStreamsContinuation.yield(stream)
38-
}
39-
40-
/// Return a new ``RPCAsyncSequence`` that will contain all published ``RPCStream``s published
41-
/// to this transport using the ``acceptStream(_:)`` method.
42-
///
43-
/// - Returns: An ``RPCAsyncSequence`` of all published ``RPCStream``s.
44-
public func listen() -> RPCAsyncSequence<RPCStream<Inbound, Outbound>> {
45-
RPCAsyncSequence(wrapping: self.newStreams)
46-
}
47-
48-
/// Stop listening to any new ``RPCStream`` publications.
49-
///
50-
/// All further calls to ``acceptStream(_:)`` will not produce any new elements on the
51-
/// ``RPCAsyncSequence`` returned by ``listen()``.
52-
public func stopListening() {
53-
self.newStreamsContinuation.finish()
54-
}
20+
public typealias Inbound = RPCAsyncSequence<RPCRequestPart>
21+
public typealias Outbound = RPCWriter<RPCResponsePart>.Closable
22+
23+
private let newStreams: AsyncStream<RPCStream<Inbound, Outbound>>
24+
private let newStreamsContinuation: AsyncStream<RPCStream<Inbound, Outbound>>.Continuation
25+
26+
/// Creates a new instance of ``InProcessServerTransport``.
27+
public init() {
28+
(self.newStreams, self.newStreamsContinuation) = AsyncStream.makeStream()
29+
}
30+
31+
/// Publish a new ``RPCStream``, which will be returned by the transport's ``RPCAsyncSequence``,
32+
/// returned when calling ``listen()``.
33+
///
34+
/// - Parameter stream: The new ``RPCStream`` to publish.
35+
internal func acceptStream(_ stream: RPCStream<Inbound, Outbound>) {
36+
self.newStreamsContinuation.yield(stream)
37+
}
38+
39+
/// Return a new ``RPCAsyncSequence`` that will contain all published ``RPCStream``s published
40+
/// to this transport using the ``acceptStream(_:)`` method.
41+
///
42+
/// - Returns: An ``RPCAsyncSequence`` of all published ``RPCStream``s.
43+
public func listen() -> RPCAsyncSequence<RPCStream<Inbound, Outbound>> {
44+
RPCAsyncSequence(wrapping: self.newStreams)
45+
}
46+
47+
/// Stop listening to any new ``RPCStream`` publications.
48+
///
49+
/// All further calls to ``acceptStream(_:)`` will not produce any new elements on the
50+
/// ``RPCAsyncSequence`` returned by ``listen()``.
51+
public func stopListening() {
52+
self.newStreamsContinuation.finish()
53+
}
5554
}

Tests/GRPCCoreTests/Transport/InProcessServerTransportTest.swift

Lines changed: 35 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -24,46 +24,60 @@ final class InProcessServerTransportTest: XCTestCase {
2424
let stream = RPCStream<RPCAsyncSequence<RPCRequestPart>, RPCWriter<RPCResponsePart>.Closable>(
2525
descriptor: .init(service: "testService", method: "testMethod"),
2626
inbound: .elements([.message([42])]),
27-
outbound: .init(wrapping: BufferedStream.Source(storage: .init(backPressureStrategy: .watermark(.init(low: 1, high: 1)))))
27+
outbound: .init(
28+
wrapping: BufferedStream.Source(
29+
storage: .init(backPressureStrategy: .watermark(.init(low: 1, high: 1)))
30+
)
31+
)
2832
)
29-
33+
3034
let streamSequence = transport.listen()
3135
var streamSequenceInterator = streamSequence.makeAsyncIterator()
32-
36+
3337
transport.acceptStream(stream)
34-
38+
3539
let testStream = try await streamSequenceInterator.next()
36-
var inboundIterator = testStream?.inbound.makeAsyncIterator()
37-
let rpcRequestPart = try await inboundIterator?.next()
38-
XCTAssertEqual(rpcRequestPart, .message([42]))
40+
let messages = try await testStream?.inbound.collect()
41+
XCTAssertEqual(messages, [.message([42])])
3942
}
40-
43+
4144
func testStopListening() async throws {
4245
let transport = InProcessServerTransport()
43-
let firstStream = RPCStream<RPCAsyncSequence<RPCRequestPart>, RPCWriter<RPCResponsePart>.Closable>(
46+
let firstStream = RPCStream<
47+
RPCAsyncSequence<RPCRequestPart>, RPCWriter<RPCResponsePart>.Closable
48+
>(
4449
descriptor: .init(service: "testService1", method: "testMethod1"),
4550
inbound: .elements([.message([42])]),
46-
outbound: .init(wrapping: BufferedStream.Source(storage: .init(backPressureStrategy: .watermark(.init(low: 1, high: 1)))))
51+
outbound: .init(
52+
wrapping: BufferedStream.Source(
53+
storage: .init(backPressureStrategy: .watermark(.init(low: 1, high: 1)))
54+
)
55+
)
4756
)
48-
57+
4958
let streamSequence = transport.listen()
5059
var streamSequenceInterator = streamSequence.makeAsyncIterator()
51-
60+
5261
transport.acceptStream(firstStream)
53-
62+
5463
let firstTestStream = try await streamSequenceInterator.next()
55-
var inboundIterator = firstTestStream?.inbound.makeAsyncIterator()
56-
let rpcRequestPart = try await inboundIterator?.next()
57-
XCTAssertEqual(rpcRequestPart, .message([42]))
58-
64+
let firstStreamMessages = try await firstTestStream?.inbound.collect()
65+
XCTAssertEqual(firstStreamMessages, [.message([42])])
66+
5967
transport.stopListening()
60-
61-
let secondStream = RPCStream<RPCAsyncSequence<RPCRequestPart>, RPCWriter<RPCResponsePart>.Closable>(
68+
69+
let secondStream = RPCStream<
70+
RPCAsyncSequence<RPCRequestPart>, RPCWriter<RPCResponsePart>.Closable
71+
>(
6272
descriptor: .init(service: "testService1", method: "testMethod1"),
6373
inbound: .elements([.message([42])]),
64-
outbound: .init(wrapping: BufferedStream.Source(storage: .init(backPressureStrategy: .watermark(.init(low: 1, high: 1)))))
74+
outbound: .init(
75+
wrapping: BufferedStream.Source(
76+
storage: .init(backPressureStrategy: .watermark(.init(low: 1, high: 1)))
77+
)
78+
)
6579
)
66-
80+
6781
transport.acceptStream(secondStream)
6882
let secondTestStream = try await streamSequenceInterator.next()
6983
XCTAssertNil(secondTestStream)

0 commit comments

Comments
 (0)