Skip to content

Remove some unnecessary CoWs #906

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 63 additions & 6 deletions Sources/GRPC/GRPCClientStateMachine.swift
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,20 @@ struct GRPCClientStateMachine {

/// The RPC has terminated. There are no valid transitions from this state.
case clientClosedServerClosed

/// This isn't a real state. See `withStateAvoidingCoWs`.
case modifying
}

/// The current state of the state machine.
internal private(set) var state: State {
didSet {
switch (oldValue, self.state) {
// Any modifying transitions are fine.
case (.modifying, _),
(_, .modifying):
break

// All valid transitions:
case (.clientIdleServerIdle, .clientActiveServerIdle),
(.clientIdleServerIdle, .clientClosedServerClosed),
Expand Down Expand Up @@ -190,7 +198,9 @@ struct GRPCClientStateMachine {
mutating func sendRequestHeaders(
requestHead: _GRPCRequestHead
) -> Result<HPACKHeaders, SendRequestHeadersError> {
return self.state.sendRequestHeaders(requestHead: requestHead)
return self.withStateAvoidingCoWs { state in
return state.sendRequestHeaders(requestHead: requestHead)
}
}

/// Formats a request to send to the server.
Expand Down Expand Up @@ -219,7 +229,9 @@ struct GRPCClientStateMachine {
compressed: Bool,
allocator: ByteBufferAllocator
) -> Result<ByteBuffer, MessageWriteError> {
return self.state.sendRequest(message, compressed: compressed, allocator: allocator)
return self.withStateAvoidingCoWs { state in
return state.sendRequest(message, compressed: compressed, allocator: allocator)
}
}

/// Closes the request stream.
Expand All @@ -239,7 +251,9 @@ struct GRPCClientStateMachine {
/// Closing the request stream when both peers are idle (in the `.clientIdleServerIdle` state)
/// will result in a `.invalidState` error.
mutating func sendEndOfRequestStream() -> Result<Void, SendEndOfRequestStreamError> {
return self.state.sendEndOfRequestStream()
return self.withStateAvoidingCoWs { state in
return state.sendEndOfRequestStream()
}
}

/// Receive an acknowledgement of the RPC from the server. This **must not** be a "Trailers-Only"
Expand All @@ -266,7 +280,9 @@ struct GRPCClientStateMachine {
mutating func receiveResponseHeaders(
_ headers: HPACKHeaders
) -> Result<Void, ReceiveResponseHeadError> {
return self.state.receiveResponseHeaders(headers)
return self.withStateAvoidingCoWs { state in
return state.receiveResponseHeaders(headers)
}
}

/// Read a response buffer from the server and return any decoded messages.
Expand Down Expand Up @@ -297,7 +313,9 @@ struct GRPCClientStateMachine {
mutating func receiveResponseBuffer(
_ buffer: inout ByteBuffer
) -> Result<[ByteBuffer], MessageReadError> {
return self.state.receiveResponseBuffer(&buffer)
return self.withStateAvoidingCoWs { state in
state.receiveResponseBuffer(&buffer)
}
}

/// Receive the end of the response stream from the server and parse the results into
Expand All @@ -320,7 +338,28 @@ struct GRPCClientStateMachine {
mutating func receiveEndOfResponseStream(
_ trailers: HPACKHeaders
) -> Result<GRPCStatus, ReceiveEndOfResponseStreamError> {
return self.state.receiveEndOfResponseStream(trailers)
return self.withStateAvoidingCoWs { state in
return state.receiveEndOfResponseStream(trailers)
}
}

/// Temporarily sets `self.state` to `.modifying` before calling the provided block and setting
/// `self.state` to the `State` modified by the block.
///
/// Since we hold state as associated data on our `State` enum, any modification to that state
/// will trigger a copy on write for its heap allocated data. Temporarily setting the `self.state`
/// to `.modifying` allows us to avoid an extra reference to any heap allocated data and therefore
/// avoid a copy on write.
@inline(__always)
private mutating func withStateAvoidingCoWs<ResultType>(
_ body: (inout State) -> ResultType
) -> ResultType {
var state = State.modifying
swap(&self.state, &state)
defer {
swap(&self.state, &state)
}
return body(&state)
}
}

Expand Down Expand Up @@ -355,6 +394,9 @@ extension GRPCClientStateMachine.State {
.clientActiveServerActive,
.clientClosedServerClosed:
result = .failure(.invalidState)

case .modifying:
preconditionFailure("State left as 'modifying'")
}

return result
Expand Down Expand Up @@ -384,6 +426,9 @@ extension GRPCClientStateMachine.State {

case .clientIdleServerIdle:
result = .failure(.invalidState)

case .modifying:
preconditionFailure("State left as 'modifying'")
}

return result
Expand All @@ -409,6 +454,9 @@ extension GRPCClientStateMachine.State {

case .clientIdleServerIdle:
result = .failure(.invalidState)

case .modifying:
preconditionFailure("State left as 'modifying'")
}

return result
Expand Down Expand Up @@ -436,6 +484,9 @@ extension GRPCClientStateMachine.State {
.clientActiveServerActive,
.clientClosedServerClosed:
result = .failure(.invalidState)

case .modifying:
preconditionFailure("State left as 'modifying'")
}

return result
Expand All @@ -461,6 +512,9 @@ extension GRPCClientStateMachine.State {
.clientClosedServerIdle,
.clientClosedServerClosed:
result = .failure(.invalidState)

case .modifying:
preconditionFailure("State left as 'modifying'")
}

return result
Expand Down Expand Up @@ -488,6 +542,9 @@ extension GRPCClientStateMachine.State {
case .clientIdleServerIdle,
.clientClosedServerClosed:
result = .failure(.invalidState)

case .modifying:
preconditionFailure("State left as 'modifying'")
}

return result
Expand Down
13 changes: 13 additions & 0 deletions Sources/GRPC/LengthPrefixedMessageReader.swift
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,19 @@ internal struct LengthPrefixedMessageReader {
// mark the bytes as "read"
buffer.moveReaderIndex(forwardBy: buffer.readableBytes)
} else {
switch self.state {
case .expectingMessage(let length, _):
// We need to reserve enough space for the message or the incoming buffer, whichever
// is larger.
let remainingMessageBytes = Int(length) - self.buffer.readableBytes
self.buffer.reserveCapacity(minimumWritableBytes: max(remainingMessageBytes, buffer.readableBytes))

case .expectingCompressedFlag,
.expectingMessageLength:
// Just append the buffer; these parts are too small to make a meaningful difference.
()
}

self.buffer.writeBuffer(&buffer)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import struct Foundation.Data
import NIO
import NIOHTTP2
import NIOHPACK
Expand All @@ -27,14 +28,17 @@ import Logging
class EmbeddedClientThroughput: Benchmark {
private let requestCount: Int
private let requestText: String
private let maximumResponseFrameSize: Int

private var logger: Logger!
private var requestHead: _GRPCRequestHead!
private var request: Echo_EchoRequest!
private var responseDataChunks: [ByteBuffer]!

init(requests: Int, text: String) {
init(requests: Int, text: String, maxResponseFrameSize: Int = .max) {
self.requestCount = requests
self.requestText = text
self.maximumResponseFrameSize = maxResponseFrameSize
}

func setUp() throws {
Expand All @@ -53,6 +57,21 @@ class EmbeddedClientThroughput: Benchmark {
self.request = .with {
$0.text = self.requestText
}

let response = Echo_EchoResponse.with {
$0.text = self.requestText
}

let serializedResponse = try response.serializedData()
var buffer = ByteBufferAllocator().buffer(capacity: serializedResponse.count + 5)
buffer.writeInteger(UInt8(0)) // compression byte
buffer.writeInteger(UInt32(serializedResponse.count))
buffer.writeBytes(serializedResponse)

self.responseDataChunks = []
while buffer.readableBytes > 0, let slice = buffer.readSlice(length: min(maximumResponseFrameSize, buffer.readableBytes)) {
self.responseDataChunks.append(slice)
}
}

func tearDown() throws {
Expand Down Expand Up @@ -82,7 +101,7 @@ class EmbeddedClientThroughput: Benchmark {
while let _ = try channel.readOutbound(as: HTTP2Frame.self) {
requestFrames += 1
}
assert(requestFrames == 3) // headers, data, empty data (end-stream)
precondition(requestFrames == 3) // headers, data, empty data (end-stream)

// Okay, let's build a response.

Expand All @@ -92,34 +111,29 @@ class EmbeddedClientThroughput: Benchmark {
"content-type": "application/grpc+proto"
]
let headerFrame = HTTP2Frame(streamID: .init(1), payload: .headers(.init(headers: responseHeaders)))
try channel.writeInbound(headerFrame)

// Some data.
let response = try Echo_EchoResponse.with { $0.text = self.requestText }.serializedData()
var buffer = channel.allocator.buffer(capacity: response.count + 5)
buffer.writeInteger(UInt8(0)) // compression byte
buffer.writeInteger(UInt32(response.count))
buffer.writeBytes(response)
let dataFrame = HTTP2Frame(streamID: .init(1), payload: .data(.init(data: .byteBuffer(buffer))))
// The response data.
for chunk in self.responseDataChunks {
let frame = HTTP2Frame(streamID: 1, payload: .data(.init(data: .byteBuffer(chunk))))
try channel.writeInbound(frame)
}

// Required trailers.
let responseTrailers: HPACKHeaders = [
"grpc-status": "0",
"grpc-message": "ok"
]
let trailersFrame = HTTP2Frame(streamID: .init(1), payload: .headers(.init(headers: responseTrailers)))

// Now write the response frames back into the channel.
try channel.writeInbound(headerFrame)
try channel.writeInbound(dataFrame)
try channel.writeInbound(trailersFrame)

// And read them back out.
var responseParts = 0
while let _ = try channel.readOutbound(as: _GRPCClientResponsePart<Echo_EchoResponse>.self) {
while let _ = try channel.readInbound(as: _GRPCClientResponsePart<Echo_EchoResponse>.self) {
responseParts += 1
}

assert(responseParts == 4, "received \(responseParts) response parts")
precondition(responseParts == 4, "received \(responseParts) response parts")
}
}
}
16 changes: 16 additions & 0 deletions Sources/GRPCPerformanceTests/main.swift
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,22 @@ func runBenchmarks(spec: TestSpec) {
spec: spec
)

measureAndPrint(
description: "embedded_client_unary_10k_large_requests",
benchmark: EmbeddedClientThroughput(requests: 10_00, text: largeRequest),
spec: spec
)

measureAndPrint(
description: "embedded_client_unary_10k_large_requests_1k_frames",
benchmark: EmbeddedClientThroughput(
requests: 10_00,
text: largeRequest,
maxResponseFrameSize: 1024
),
spec: spec
)

measureAndPrint(
description: "percent_encode_decode_10k_status_messages",
benchmark: PercentEncoding(iterations: 10_000, requiresEncoding: true),
Expand Down