Skip to content

Commit d5ad41e

Browse files
authored
Add in-process server transport (grpc#1704)
Motivation: We want to have a basic in-process transport implementation, to be used for example for testing purposes. Modification: Added a new `InProcessServerTransport`. Result: We now have an in-process implementation of `ServerTransport`.
1 parent b28658f commit d5ad41e

File tree

3 files changed

+154
-5
lines changed

3 files changed

+154
-5
lines changed

Sources/GRPCCore/Streaming/Internal/RPCWriter+Closable.swift renamed to Sources/GRPCCore/Streaming/RPCWriter+Closable.swift

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,7 @@
1616

1717
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
1818
extension RPCWriter {
19-
@usableFromInline
20-
struct Closable: ClosableRPCWriterProtocol {
19+
public struct Closable: ClosableRPCWriterProtocol {
2120
@usableFromInline
2221
let writer: any ClosableRPCWriterProtocol<Element>
2322

@@ -36,7 +35,7 @@ extension RPCWriter {
3635
///
3736
/// - Parameter elements: The elements to write.
3837
@inlinable
39-
func write(contentsOf elements: some Sequence<Element>) async throws {
38+
public func write(contentsOf elements: some Sequence<Element>) async throws {
4039
try await self.writer.write(contentsOf: elements)
4140
}
4241

@@ -45,7 +44,7 @@ extension RPCWriter {
4544
/// All writes after ``finish()`` has been called should result in an error
4645
/// being thrown.
4746
@inlinable
48-
func finish() {
47+
public func finish() {
4948
self.writer.finish()
5049
}
5150

@@ -54,7 +53,7 @@ extension RPCWriter {
5453
/// All writes after ``finish(throwing:)`` has been called should result in an error
5554
/// being thrown.
5655
@inlinable
57-
func finish(throwing error: Error) {
56+
public func finish(throwing error: Error) {
5857
self.writer.finish(throwing: error)
5958
}
6059
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Copyright 2023, gRPC Authors All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
18+
/// An in-process implementation of a ``ServerTransport``.
19+
public struct InProcessServerTransport: ServerTransport {
20+
public typealias Inbound = RPCAsyncSequence<RPCRequestPart>
21+
public typealias Outbound = RPCWriter<RPCResponsePart>.Closable
22+
23+
private let newStreams: AsyncStream<RPCStream<Inbound, Outbound>>
24+
private let newStreamsContinuation: AsyncStream<RPCStream<Inbound, Outbound>>.Continuation
25+
26+
/// Creates a new instance of ``InProcessServerTransport``.
27+
public init() {
28+
(self.newStreams, self.newStreamsContinuation) = AsyncStream.makeStream()
29+
}
30+
31+
/// Publish a new ``RPCStream``, which will be returned by the transport's ``RPCAsyncSequence``,
32+
/// returned when calling ``listen()``.
33+
///
34+
/// - Parameter stream: The new ``RPCStream`` to publish.
35+
/// - Throws: ``RPCError`` with code ``RPCError/Code-swift.struct/failedPrecondition``
36+
/// if the server transport stopped listening to new streams (i.e., if ``stopListening()`` has been called).
37+
internal func acceptStream(_ stream: RPCStream<Inbound, Outbound>) throws {
38+
let yieldResult = self.newStreamsContinuation.yield(stream)
39+
if case .terminated = yieldResult {
40+
throw RPCError(
41+
code: .failedPrecondition,
42+
message: "The server transport is closed."
43+
)
44+
}
45+
}
46+
47+
/// Return a new ``RPCAsyncSequence`` that will contain all published ``RPCStream``s published
48+
/// to this transport using the ``acceptStream(_:)`` method.
49+
///
50+
/// - Returns: An ``RPCAsyncSequence`` of all published ``RPCStream``s.
51+
public func listen() -> RPCAsyncSequence<RPCStream<Inbound, Outbound>> {
52+
RPCAsyncSequence(wrapping: self.newStreams)
53+
}
54+
55+
/// Stop listening to any new ``RPCStream`` publications.
56+
///
57+
/// All further calls to ``acceptStream(_:)`` will not produce any new elements on the
58+
/// ``RPCAsyncSequence`` returned by ``listen()``.
59+
public func stopListening() {
60+
self.newStreamsContinuation.finish()
61+
}
62+
}
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Copyright 2023, gRPC Authors All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import XCTest
18+
19+
@testable import GRPCCore
20+
21+
final class InProcessServerTransportTest: XCTestCase {
22+
func testStartListening() async throws {
23+
let transport = InProcessServerTransport()
24+
let stream = RPCStream<RPCAsyncSequence<RPCRequestPart>, RPCWriter<RPCResponsePart>.Closable>(
25+
descriptor: .init(service: "testService", method: "testMethod"),
26+
inbound: .elements([.message([42])]),
27+
outbound: .init(
28+
wrapping: BufferedStream.Source(
29+
storage: .init(backPressureStrategy: .watermark(.init(low: 1, high: 1)))
30+
)
31+
)
32+
)
33+
34+
let streamSequence = transport.listen()
35+
var streamSequenceInterator = streamSequence.makeAsyncIterator()
36+
37+
try transport.acceptStream(stream)
38+
39+
let testStream = try await streamSequenceInterator.next()
40+
let messages = try await testStream?.inbound.collect()
41+
XCTAssertEqual(messages, [.message([42])])
42+
}
43+
44+
func testStopListening() async throws {
45+
let transport = InProcessServerTransport()
46+
let firstStream = RPCStream<
47+
RPCAsyncSequence<RPCRequestPart>, RPCWriter<RPCResponsePart>.Closable
48+
>(
49+
descriptor: .init(service: "testService1", method: "testMethod1"),
50+
inbound: .elements([.message([42])]),
51+
outbound: .init(
52+
wrapping: BufferedStream.Source(
53+
storage: .init(backPressureStrategy: .watermark(.init(low: 1, high: 1)))
54+
)
55+
)
56+
)
57+
58+
let streamSequence = transport.listen()
59+
var streamSequenceInterator = streamSequence.makeAsyncIterator()
60+
61+
try transport.acceptStream(firstStream)
62+
63+
let firstTestStream = try await streamSequenceInterator.next()
64+
let firstStreamMessages = try await firstTestStream?.inbound.collect()
65+
XCTAssertEqual(firstStreamMessages, [.message([42])])
66+
67+
transport.stopListening()
68+
69+
let secondStream = RPCStream<
70+
RPCAsyncSequence<RPCRequestPart>, RPCWriter<RPCResponsePart>.Closable
71+
>(
72+
descriptor: .init(service: "testService1", method: "testMethod1"),
73+
inbound: .elements([.message([42])]),
74+
outbound: .init(
75+
wrapping: BufferedStream.Source(
76+
storage: .init(backPressureStrategy: .watermark(.init(low: 1, high: 1)))
77+
)
78+
)
79+
)
80+
81+
XCTAssertThrowsRPCError(try transport.acceptStream(secondStream)) { error in
82+
XCTAssertEqual(error.code, .failedPrecondition)
83+
}
84+
85+
let secondTestStream = try await streamSequenceInterator.next()
86+
XCTAssertNil(secondTestStream)
87+
}
88+
}

0 commit comments

Comments
 (0)