Skip to content

Add support for compression on the client #707

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 17 commits into from
Jan 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ let package = Package(
"NIOHTTP1",
"NIOHTTP2",
"NIOSSL",
"CGRPCZlib",
"SwiftProtobuf",
"Logging"
]
Expand All @@ -70,6 +71,13 @@ let package = Package(
]
),

.target(
name: "CGRPCZlib",
linkerSettings: [
.linkedLibrary("z")
]
),

// The `protoc` plugin.
.target(
name: "protoc-gen-grpc-swift",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,3 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/// Supported message compression algorithms.
///
/// These algorithms are indicated in the "grpc-encoding" header. As such, a lack of "grpc-encoding"
/// header indicates that there is no message compression.
enum CompressionAlgorithm: String {
/// Identity compression; "no" compression but indicated via the "grpc-encoding" header.
case identity
}
62 changes: 62 additions & 0 deletions Sources/CGRPCZlib/include/CGRPCZlib.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright 2020, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef C_GRPC_ZLIB_H_
#define C_GRPC_ZLIB_H_

#include <zlib.h>

static inline int CGRPCZlib_deflateInit2(z_streamp stream, int level, int method, int windowBits,
int memLevel, int strategy) {
return deflateInit2(stream, level, method, windowBits, memLevel, strategy);
}

static inline unsigned long CGRPCZlib_deflateBound(z_streamp strm, unsigned long sourceLen) {
return deflateBound(strm, sourceLen);
}

static inline int CGRPCZlib_deflate(z_streamp strm, int flush) {
return deflate(strm, flush);
}

static inline int CGRPCZlib_deflateReset(z_streamp strm) {
return deflateReset(strm);
}

static inline int CGRPCZlib_deflateEnd(z_streamp strm) {
return deflateEnd(strm);
}

static inline int CGRPCZlib_inflateInit2(z_streamp stream, int windowBits) {
return inflateInit2(stream, windowBits);
}

static inline int CGRPCZlib_inflate(z_streamp strm, int flush) {
return inflate(strm, flush);
}

static inline int CGRPCZlib_inflateReset(z_streamp strm) {
return inflateReset(strm);
}

static inline int CGRPCZlib_inflateEnd(z_streamp strm) {
return inflateEnd(strm);
}

static inline Bytef *CGRPCZlib_castVoidToBytefPointer(void *in) {
return (Bytef *) in;
}

#endif // C_GRPC_ZLIB_H_
11 changes: 10 additions & 1 deletion Sources/GRPC/ClientCalls/BaseClientCall.swift
Original file line number Diff line number Diff line change
Expand Up @@ -135,20 +135,29 @@ extension _GRPCRequestHead {
path: String,
host: String,
requestID: String,
encoding: ClientConnection.Configuration.MessageEncoding,
options: CallOptions
) {
var customMetadata = options.customMetadata
if let requestIDHeader = options.requestIDHeader {
customMetadata.add(name: requestIDHeader, value: requestID)
}

var encoding = encoding
// Compression is disabled at the RPC level; remove outbound (request) encoding. This will stop
// any 'grpc-encoding' header being sent to the peer.
if options.disableCompression {
encoding.outbound = nil
}

self = _GRPCRequestHead(
method: options.cacheable ? "GET" : "POST",
scheme: scheme,
path: path,
host: host,
timeout: options.timeout,
customMetadata: customMetadata
customMetadata: customMetadata,
encoding: encoding
)
}
}
1 change: 1 addition & 0 deletions Sources/GRPC/ClientCalls/BidirectionalStreamingCall.swift
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public final class BidirectionalStreamingCall<RequestMessage: Message, ResponseM
path: path,
host: connection.configuration.target.host,
requestID: requestID,
encoding: connection.configuration.messageEncoding,
options: callOptions
)

Expand Down
50 changes: 36 additions & 14 deletions Sources/GRPC/ClientCalls/ClientCall.swift
Original file line number Diff line number Diff line change
Expand Up @@ -61,35 +61,43 @@ public protocol StreamingRequestClientCall: ClientCall {
/// - Important: Callers must terminate the stream of messages by calling `sendEnd()` or `sendEnd(promise:)`.
///
/// - Parameters:
/// - message: The message to
/// - message: The message to send.
/// - disableCompression: Whether compression should be disabled for this message. Ignored if
/// compression was not enabled for the connection or RPC.
/// - Returns: A future which will be fullfilled when the message has been sent.
func sendMessage(_ message: RequestMessage) -> EventLoopFuture<Void>
func sendMessage(_ message: RequestMessage, disableCompression: Bool) -> EventLoopFuture<Void>

/// Sends a message to the service.
///
/// - Important: Callers must terminate the stream of messages by calling `sendEnd()` or `sendEnd(promise:)`.
///
/// - Parameters:
/// - message: The message to send.
/// - disableCompression: Whether compression should be disabled for this message. Ignored if
/// compression was not enabled for the connection or RPC.
/// - promise: A promise to be fulfilled when the message has been sent.
func sendMessage(_ message: RequestMessage, promise: EventLoopPromise<Void>?)
func sendMessage(_ message: RequestMessage, disableCompression: Bool, promise: EventLoopPromise<Void>?)

/// Sends a sequence of messages to the service.
///
/// - Important: Callers must terminate the stream of messages by calling `sendEnd()` or `sendEnd(promise:)`.
///
/// - Parameters:
/// - messages: The sequence of messages to send.
func sendMessages<S: Sequence>(_ messages: S) -> EventLoopFuture<Void> where S.Element == RequestMessage
/// - disableCompression: Whether compression should be disabled for these messages. Ignored if
/// compression was not enabled for the connection or RPC.
func sendMessages<S: Sequence>(_ messages: S, disableCompression: Bool) -> EventLoopFuture<Void> where S.Element == RequestMessage

/// Sends a sequence of messages to the service.
///
/// - Important: Callers must terminate the stream of messages by calling `sendEnd()` or `sendEnd(promise:)`.
///
/// - Parameters:
/// - messages: The sequence of messages to send.
/// - disableCompression: Whether compression should be disabled for these messages. Ignored if
/// compression was not enabled for the connection or RPC.
/// - promise: A promise to be fulfilled when all messages have been sent successfully.
func sendMessages<S: Sequence>(_ messages: S, promise: EventLoopPromise<Void>?) where S.Element == RequestMessage
func sendMessages<S: Sequence>(_ messages: S, disableCompression: Bool, promise: EventLoopPromise<Void>?) where S.Element == RequestMessage

/// Returns a future which can be used as a message queue.
///
Expand All @@ -107,7 +115,7 @@ public protocol StreamingRequestClientCall: ClientCall {
/// Terminates a stream of messages sent to the service.
///
/// - Important: This should only ever be called once.
/// - Returns: A future which will be fullfilled when the end has been sent.
/// - Returns: A future which will be fulfilled when the end has been sent.
func sendEnd() -> EventLoopFuture<Void>

/// Terminates a stream of messages sent to the service.
Expand All @@ -127,35 +135,49 @@ public protocol UnaryResponseClientCall: ClientCall {
}

extension StreamingRequestClientCall {
public func sendMessage(_ message: RequestMessage) -> EventLoopFuture<Void> {
public func sendMessage(
_ message: RequestMessage,
disableCompression: Bool = false
) -> EventLoopFuture<Void> {
return self.subchannel.flatMap { channel in
return channel.writeAndFlush(_GRPCClientRequestPart.message(.init(message)))
return channel.writeAndFlush(_GRPCClientRequestPart.message(.init(message, disableCompression: disableCompression)))
}
}

public func sendMessage(_ message: RequestMessage, promise: EventLoopPromise<Void>?) {
public func sendMessage(
_ message: RequestMessage,
disableCompression: Bool = false,
promise: EventLoopPromise<Void>?
) {
self.subchannel.whenSuccess { channel in
channel.writeAndFlush(_GRPCClientRequestPart.message(.init(message)), promise: promise)
channel.writeAndFlush(_GRPCClientRequestPart.message(.init(message, disableCompression: disableCompression)), promise: promise)
}
}

public func sendMessages<S: Sequence>(_ messages: S) -> EventLoopFuture<Void> where S.Element == RequestMessage {
public func sendMessages<S: Sequence>(
_ messages: S,
disableCompression: Bool = false
) -> EventLoopFuture<Void> where S.Element == RequestMessage {
return self.subchannel.flatMap { channel -> EventLoopFuture<Void> in
let writeFutures = messages.map { message in
channel.write(_GRPCClientRequestPart.message(.init(message)))
channel.write(_GRPCClientRequestPart.message(.init(message, disableCompression: disableCompression)))
}
channel.flush()
return EventLoopFuture.andAllSucceed(writeFutures, on: channel.eventLoop)
}
}

public func sendMessages<S: Sequence>(_ messages: S, promise: EventLoopPromise<Void>?) where S.Element == RequestMessage {
public func sendMessages<S: Sequence>(
_ messages: S,
disableCompression: Bool = false,
promise: EventLoopPromise<Void>?
) where S.Element == RequestMessage {
if let promise = promise {
self.sendMessages(messages).cascade(to: promise)
} else {
self.subchannel.whenSuccess { channel in
for message in messages {
channel.write(_GRPCClientRequestPart.message(.init(message)), promise: nil)
channel.write(_GRPCClientRequestPart.message(.init(message, disableCompression: disableCompression)), promise: nil)
}
channel.flush()
}
Expand Down
1 change: 1 addition & 0 deletions Sources/GRPC/ClientCalls/ClientStreamingCall.swift
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public final class ClientStreamingCall<RequestMessage: Message, ResponseMessage:
path: path,
host: connection.configuration.target.host,
requestID: requestID,
encoding: connection.configuration.messageEncoding,
options: callOptions
)

Expand Down
1 change: 1 addition & 0 deletions Sources/GRPC/ClientCalls/ServerStreamingCall.swift
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public final class ServerStreamingCall<RequestMessage: Message, ResponseMessage:
path: path,
host: connection.configuration.target.host,
requestID: requestID,
encoding: connection.configuration.messageEncoding,
options: callOptions
)

Expand Down
1 change: 1 addition & 0 deletions Sources/GRPC/ClientCalls/UnaryCall.swift
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public final class UnaryCall<RequestMessage: Message, ResponseMessage: Message>
path: path,
host: connection.configuration.target.host,
requestID: requestID,
encoding: connection.configuration.messageEncoding,
options: callOptions
)

Expand Down
19 changes: 18 additions & 1 deletion Sources/GRPC/ClientConnection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,20 @@ extension ClientConnection {
/// be `nil`.
public var connectionBackoff: ConnectionBackoff?

/// The compression used for requests, and the compression algorithms to advertise as acceptable
/// for the remote peer to use for encoding responses.
///
/// If compression is enabled for a connection it may be disabled for requests on any RPC by
/// setting `CallOptions.disableCompression` to `true`.
///
/// Compression may also be disabled at the message-level for streaming requests (i.e. client
/// streaming and bidirectional streaming RPCs) by setting `disableCompression` to `true` in
/// `sendMessage(_:disableCompression)`, `sendMessage(_:disableCompression:promise)`,
/// `sendMessages(_:disableCompression)` or `sendMessages(_:disableCompression:promise)`.
///
/// Note that disabling compression has no effect if compression is disabled on the connection.
public var messageEncoding: MessageEncoding

/// The HTTP protocol used for this connection.
public var httpProtocol: HTTP2ToHTTP1ClientCodec.HTTPProtocol {
return self.tls == nil ? .http : .https
Expand All @@ -490,20 +504,23 @@ extension ClientConnection {
/// - Parameter connectivityStateDelegate: A connectivity state delegate, defaulting to `nil`.
/// - Parameter tlsConfiguration: TLS configuration, defaulting to `nil`.
/// - Parameter connectionBackoff: The connection backoff configuration to use.
/// - Parameter messageEncoding: Message compression configuration, defaults to no compression.
public init(
target: ConnectionTarget,
eventLoopGroup: EventLoopGroup,
errorDelegate: ClientErrorDelegate? = LoggingClientErrorDelegate(),
connectivityStateDelegate: ConnectivityStateDelegate? = nil,
tls: Configuration.TLS? = nil,
connectionBackoff: ConnectionBackoff? = ConnectionBackoff()
connectionBackoff: ConnectionBackoff? = ConnectionBackoff(),
messageEncoding: MessageEncoding = .none
) {
self.target = target
self.eventLoopGroup = eventLoopGroup
self.errorDelegate = errorDelegate
self.connectivityStateDelegate = connectivityStateDelegate
self.tls = tls
self.connectionBackoff = connectionBackoff
self.messageEncoding = messageEncoding
}
}
}
Expand Down
8 changes: 7 additions & 1 deletion Sources/GRPC/ClientOptions.swift
Original file line number Diff line number Diff line change
Expand Up @@ -46,18 +46,24 @@ public struct CallOptions {
/// messages associated with the call.
public var requestIDHeader: String?

/// Disables request compression on this call. Ignored if compression is disabled at the
/// connection level.
public var disableCompression: Bool

public init(
customMetadata: HPACKHeaders = HPACKHeaders(),
timeout: GRPCTimeout = GRPCTimeout.infinite,
cacheable: Bool = false,
requestIDProvider: RequestIDProvider = .autogenerated,
requestIDHeader: String? = nil
requestIDHeader: String? = nil,
disableCompression: Bool = false
) {
self.customMetadata = customMetadata
self.timeout = timeout
self.cacheable = false
self.requestIDProvider = requestIDProvider
self.requestIDHeader = requestIDHeader
self.disableCompression = disableCompression
}

/// How Request IDs should be provided.
Expand Down
Loading