Skip to content

Commit b40fddb

Browse files
committed
Add in-process server transport
1 parent b28658f commit b40fddb

File tree

3 files changed

+130
-5
lines changed

3 files changed

+130
-5
lines changed

Sources/GRPCCore/Streaming/Internal/RPCWriter+Closable.swift

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,7 @@
1616

1717
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
1818
extension RPCWriter {
19-
@usableFromInline
20-
struct Closable: ClosableRPCWriterProtocol {
19+
public struct Closable: ClosableRPCWriterProtocol {
2120
@usableFromInline
2221
let writer: any ClosableRPCWriterProtocol<Element>
2322

@@ -36,7 +35,7 @@ extension RPCWriter {
3635
///
3736
/// - Parameter elements: The elements to write.
3837
@inlinable
39-
func write(contentsOf elements: some Sequence<Element>) async throws {
38+
public func write(contentsOf elements: some Sequence<Element>) async throws {
4039
try await self.writer.write(contentsOf: elements)
4140
}
4241

@@ -45,7 +44,7 @@ extension RPCWriter {
4544
/// All writes after ``finish()`` has been called should result in an error
4645
/// being thrown.
4746
@inlinable
48-
func finish() {
47+
public func finish() {
4948
self.writer.finish()
5049
}
5150

@@ -54,7 +53,7 @@ extension RPCWriter {
5453
/// All writes after ``finish(throwing:)`` has been called should result in an error
5554
/// being thrown.
5655
@inlinable
57-
func finish(throwing error: Error) {
56+
public func finish(throwing error: Error) {
5857
self.writer.finish(throwing: error)
5958
}
6059
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Copyright 2023, gRPC Authors All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
18+
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
19+
/// An in-process implementation of a ``ServerTransport``.
20+
public struct InProcessServerTransport: ServerTransport {
21+
public typealias Inbound = RPCAsyncSequence<RPCRequestPart>
22+
public typealias Outbound = RPCWriter<RPCResponsePart>.Closable
23+
24+
private let newStreams: AsyncStream<RPCStream<Inbound, Outbound>>
25+
private let newStreamsContinuation: AsyncStream<RPCStream<Inbound, Outbound>>.Continuation
26+
27+
/// Creates a new instance of ``InProcessServerTransport``.
28+
public init() {
29+
(self.newStreams, self.newStreamsContinuation) = AsyncStream.makeStream()
30+
}
31+
32+
/// Publish a new ``RPCStream``, which will be returned by the transport's ``RPCAsyncSequence``,
33+
/// returned when calling ``listen()``.
34+
///
35+
/// - Parameter stream: The new ``RPCStream`` to publish.
36+
public func acceptStream(_ stream: RPCStream<Inbound, Outbound>) {
37+
self.newStreamsContinuation.yield(stream)
38+
}
39+
40+
/// Return a new ``RPCAsyncSequence`` that will contain all published ``RPCStream``s published
41+
/// to this transport using the ``acceptStream(_:)`` method.
42+
///
43+
/// - Returns: An ``RPCAsyncSequence`` of all published ``RPCStream``s.
44+
public func listen() -> RPCAsyncSequence<RPCStream<Inbound, Outbound>> {
45+
RPCAsyncSequence(wrapping: self.newStreams)
46+
}
47+
48+
/// Stop listening to any new ``RPCStream`` publications.
49+
///
50+
/// All further calls to ``acceptStream(_:)`` will not produce any new elements on the
51+
/// ``RPCAsyncSequence`` returned by ``listen()``.
52+
public func stopListening() {
53+
self.newStreamsContinuation.finish()
54+
}
55+
}
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Copyright 2023, gRPC Authors All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import XCTest
18+
19+
@testable import GRPCCore
20+
21+
final class InProcessServerTransportTest: XCTestCase {
22+
func testStartListening() async throws {
23+
let transport = InProcessServerTransport()
24+
let stream = RPCStream<RPCAsyncSequence<RPCRequestPart>, RPCWriter<RPCResponsePart>.Closable>(
25+
descriptor: .init(service: "testService", method: "testMethod"),
26+
inbound: .elements([.message([42])]),
27+
outbound: .init(wrapping: BufferedStream.Source(storage: .init(backPressureStrategy: .watermark(.init(low: 1, high: 1)))))
28+
)
29+
30+
let streamSequence = transport.listen()
31+
var streamSequenceInterator = streamSequence.makeAsyncIterator()
32+
33+
transport.acceptStream(stream)
34+
35+
let testStream = try await streamSequenceInterator.next()
36+
var inboundIterator = testStream?.inbound.makeAsyncIterator()
37+
let rpcRequestPart = try await inboundIterator?.next()
38+
XCTAssertEqual(rpcRequestPart, .message([42]))
39+
}
40+
41+
func testStopListening() async throws {
42+
let transport = InProcessServerTransport()
43+
let firstStream = RPCStream<RPCAsyncSequence<RPCRequestPart>, RPCWriter<RPCResponsePart>.Closable>(
44+
descriptor: .init(service: "testService1", method: "testMethod1"),
45+
inbound: .elements([.message([42])]),
46+
outbound: .init(wrapping: BufferedStream.Source(storage: .init(backPressureStrategy: .watermark(.init(low: 1, high: 1)))))
47+
)
48+
49+
let streamSequence = transport.listen()
50+
var streamSequenceInterator = streamSequence.makeAsyncIterator()
51+
52+
transport.acceptStream(firstStream)
53+
54+
let firstTestStream = try await streamSequenceInterator.next()
55+
var inboundIterator = firstTestStream?.inbound.makeAsyncIterator()
56+
let rpcRequestPart = try await inboundIterator?.next()
57+
XCTAssertEqual(rpcRequestPart, .message([42]))
58+
59+
transport.stopListening()
60+
61+
let secondStream = RPCStream<RPCAsyncSequence<RPCRequestPart>, RPCWriter<RPCResponsePart>.Closable>(
62+
descriptor: .init(service: "testService1", method: "testMethod1"),
63+
inbound: .elements([.message([42])]),
64+
outbound: .init(wrapping: BufferedStream.Source(storage: .init(backPressureStrategy: .watermark(.init(low: 1, high: 1)))))
65+
)
66+
67+
transport.acceptStream(secondStream)
68+
let secondTestStream = try await streamSequenceInterator.next()
69+
XCTAssertNil(secondTestStream)
70+
}
71+
}

0 commit comments

Comments
 (0)