Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(tonic): compression support #692

Merged
merged 29 commits into from
Jul 2, 2021
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
b4fb623
Initial compression support
davidpdrsn Jun 25, 2021
9ffb8d5
Support configuring compression on `Server`
davidpdrsn Jun 25, 2021
a807ddc
Minor clean up
davidpdrsn Jun 25, 2021
46a12f6
Test that compression is actually happening
davidpdrsn Jun 26, 2021
3ba2858
Clean up some todos
davidpdrsn Jun 26, 2021
aae6015
channels compressing requests
davidpdrsn Jun 27, 2021
bd3ab36
Move compression to be on the codecs
davidpdrsn Jun 28, 2021
50aa2e2
Test sending compressed request to server that doesn't support it
davidpdrsn Jun 28, 2021
6cb6fe2
Clean up a bit
davidpdrsn Jun 28, 2021
6b1946d
Compress server streams
davidpdrsn Jun 28, 2021
82c2297
Compress client streams
davidpdrsn Jun 28, 2021
ab10667
Bidirectional streaming compression
davidpdrsn Jun 28, 2021
91ff0f7
Handle receiving unsupported encoding
davidpdrsn Jun 28, 2021
1d23673
Clean up
davidpdrsn Jun 28, 2021
cc6b91f
Add note to future self
davidpdrsn Jun 28, 2021
cf81479
Support disabling compression for individual responses
davidpdrsn Jun 29, 2021
4276697
Add docs
davidpdrsn Jun 29, 2021
5db1d6a
Add compression examples
davidpdrsn Jun 29, 2021
755967d
Disable compression behind feature flag
davidpdrsn Jun 29, 2021
9c230ba
Add some docs
davidpdrsn Jun 29, 2021
f048f87
Make flate2 optional dependency
davidpdrsn Jun 29, 2021
ab1e953
Fix docs wording
davidpdrsn Jun 29, 2021
ed62228
Format
davidpdrsn Jun 29, 2021
f726acb
Reply with which encodings are supported
davidpdrsn Jun 29, 2021
00f6989
Convert tests to use mocked io
davidpdrsn Jul 1, 2021
1a78480
Fix lints
davidpdrsn Jul 1, 2021
9eaffb8
Use separate counters
davidpdrsn Jul 1, 2021
e1e13a1
Don't make a long stream
davidpdrsn Jul 1, 2021
5458db1
Address review feedback
davidpdrsn Jul 1, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ members = [
"tests/integration_tests",
"tests/stream_conflict",
"tests/root-crate-path",
"tests/compression",
"tonic-web/tests/integration"
]

24 changes: 24 additions & 0 deletions tests/compression/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
[package]
name = "compression"
version = "0.1.0"
authors = ["Lucio Franco <luciofranco14@gmail.com>"]
edition = "2018"
publish = false
license = "MIT"

[dependencies]
tonic = { path = "../../tonic" }
prost = "0.7"
tokio = { version = "1.0", features = ["macros", "rt-multi-thread", "net"] }
tower = { version = "0.4", features = [] }
http-body = "0.4"
http = "0.2"
tokio-stream = { version = "0.1.5", features = ["net"] }
tower-http = { version = "0.1", features = ["map-response-body", "map-request-body"] }
bytes = "1"
futures = "0.3"
pin-project = "1.0"
hyper = "0.14"

[build-dependencies]
tonic-build = { path = "../../tonic-build" }
3 changes: 3 additions & 0 deletions tests/compression/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
fn main() {
tonic_build::compile_protos("proto/test.proto").unwrap();
}
18 changes: 18 additions & 0 deletions tests/compression/proto/test.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
syntax = "proto3";

package test;

import "google/protobuf/empty.proto";

service Test {
rpc CompressOutputUnary(google.protobuf.Empty) returns (SomeData);
rpc CompressInputUnary(SomeData) returns (google.protobuf.Empty);
rpc CompressOutputServerStream(google.protobuf.Empty) returns (stream SomeData);
rpc CompressInputClientStream(stream SomeData) returns (google.protobuf.Empty);
rpc CompressInputOutputBidirectionalStream(stream SomeData) returns (stream SomeData);
}

message SomeData {
// include a bunch of data so there actually is something to compress
bytes data = 1;
}
76 changes: 76 additions & 0 deletions tests/compression/src/bidirectional_stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
use super::*;

#[tokio::test(flavor = "multi_thread")]
davidpdrsn marked this conversation as resolved.
Show resolved Hide resolved
async fn client_enabled_server_enabled() {
let svc = test_server::TestServer::new(Svc).accept_gzip().send_gzip();

let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();

let bytes_sent_counter = Arc::new(AtomicUsize::new(0));

fn assert_right_encoding<B>(req: http::Request<B>) -> http::Request<B> {
assert_eq!(req.headers().get("grpc-encoding").unwrap(), "gzip");
req
}

tokio::spawn({
let bytes_sent_counter = bytes_sent_counter.clone();
async move {
Server::builder()
.layer(
ServiceBuilder::new()
.map_request(assert_right_encoding)
.layer(measure_request_body_size_layer(bytes_sent_counter.clone()))
.layer(MapResponseBodyLayer::new(move |body| {
util::CountBytesBody {
inner: body,
counter: bytes_sent_counter.clone(),
}
}))
.into_inner(),
)
.add_service(svc)
.serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(listener))
.await
.unwrap();
}
});

let channel = Channel::builder(format!("http://{}", addr).parse().unwrap())
.connect()
.await
.unwrap();

let mut client = test_client::TestClient::new(channel)
.send_gzip()
.accept_gzip();

let data = [0_u8; UNCOMPRESSED_MIN_BODY_SIZE].to_vec();
let stream = futures::stream::iter(vec![SomeData { data: data.clone() }, SomeData { data }]);
let req = Request::new(Box::pin(stream));
davidpdrsn marked this conversation as resolved.
Show resolved Hide resolved

let res = client
.compress_input_output_bidirectional_stream(req)
.await
.unwrap();

assert_eq!(res.metadata().get("grpc-encoding").unwrap(), "gzip");

let mut stream: Streaming<SomeData> = res.into_inner();

stream
.next()
.await
.expect("stream empty")
.expect("item was error");

stream
.next()
.await
.expect("stream empty")
.expect("item was error");

let bytes_sent = bytes_sent_counter.load(Relaxed);
assert!(bytes_sent < UNCOMPRESSED_MIN_BODY_SIZE);
}
133 changes: 133 additions & 0 deletions tests/compression/src/client_stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
use super::*;
use http_body::Body as _;

#[tokio::test(flavor = "multi_thread")]
async fn client_enabled_server_enabled() {
let svc = test_server::TestServer::new(Svc).accept_gzip();

let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();

let bytes_sent_counter = Arc::new(AtomicUsize::new(0));

fn assert_right_encoding<B>(req: http::Request<B>) -> http::Request<B> {
assert_eq!(req.headers().get("grpc-encoding").unwrap(), "gzip");
req
}

tokio::spawn({
let bytes_sent_counter = bytes_sent_counter.clone();
async move {
Server::builder()
.layer(
ServiceBuilder::new()
.map_request(assert_right_encoding)
.layer(measure_request_body_size_layer(bytes_sent_counter.clone()))
.into_inner(),
)
.add_service(svc)
.serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(listener))
.await
.unwrap();
}
});

let channel = Channel::builder(format!("http://{}", addr).parse().unwrap())
.connect()
.await
.unwrap();

let mut client = test_client::TestClient::new(channel).send_gzip();

let data = [0_u8; UNCOMPRESSED_MIN_BODY_SIZE].to_vec();
let stream = futures::stream::iter(vec![SomeData { data: data.clone() }, SomeData { data }]);
let req = Request::new(Box::pin(stream));

client.compress_input_client_stream(req).await.unwrap();

let bytes_sent = bytes_sent_counter.load(Relaxed);
assert!(dbg!(bytes_sent) < UNCOMPRESSED_MIN_BODY_SIZE);
davidpdrsn marked this conversation as resolved.
Show resolved Hide resolved
}

#[tokio::test(flavor = "multi_thread")]
async fn client_disabled_server_enabled() {
let svc = test_server::TestServer::new(Svc).accept_gzip();

let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();

let bytes_sent_counter = Arc::new(AtomicUsize::new(0));

fn assert_right_encoding<B>(req: http::Request<B>) -> http::Request<B> {
assert!(req.headers().get("grpc-encoding").is_none());
req
}

tokio::spawn({
let bytes_sent_counter = bytes_sent_counter.clone();
async move {
Server::builder()
.layer(
ServiceBuilder::new()
.map_request(assert_right_encoding)
.layer(measure_request_body_size_layer(bytes_sent_counter.clone()))
.into_inner(),
)
.add_service(svc)
.serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(listener))
.await
.unwrap();
}
});

let channel = Channel::builder(format!("http://{}", addr).parse().unwrap())
.connect()
.await
.unwrap();

let mut client = test_client::TestClient::new(channel);

let data = [0_u8; UNCOMPRESSED_MIN_BODY_SIZE].to_vec();
let stream = futures::stream::iter(vec![SomeData { data: data.clone() }, SomeData { data }]);
let req = Request::new(Box::pin(stream));

client.compress_input_client_stream(req).await.unwrap();

let bytes_sent = bytes_sent_counter.load(Relaxed);
assert!(dbg!(bytes_sent) > UNCOMPRESSED_MIN_BODY_SIZE);
davidpdrsn marked this conversation as resolved.
Show resolved Hide resolved
}

#[tokio::test(flavor = "multi_thread")]
async fn client_enabled_server_disabled() {
let svc = test_server::TestServer::new(Svc);

let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();

tokio::spawn(async move {
Server::builder()
.add_service(svc)
.serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(listener))
.await
.unwrap();
});

let channel = Channel::builder(format!("http://{}", addr).parse().unwrap())
.connect()
.await
.unwrap();

let mut client = test_client::TestClient::new(channel).send_gzip();

let data = [0_u8; UNCOMPRESSED_MIN_BODY_SIZE].to_vec();
let stream = futures::stream::iter(vec![SomeData { data: data.clone() }, SomeData { data }]);
let req = Request::new(Box::pin(stream));

let status = client.compress_input_client_stream(req).await.unwrap_err();

assert_eq!(status.code(), tonic::Code::Unimplemented);
assert_eq!(
status.message(),
"Content is compressed with `gzip` which isn't supported"
);
}
97 changes: 97 additions & 0 deletions tests/compression/src/compressing_request.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
use super::*;
use http_body::Body as _;

#[tokio::test(flavor = "multi_thread")]
async fn client_enabled_server_enabled() {
let svc = test_server::TestServer::new(Svc).accept_gzip();

let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();

let bytes_sent_counter = Arc::new(AtomicUsize::new(0));

fn assert_right_encoding<B>(req: http::Request<B>) -> http::Request<B> {
assert_eq!(req.headers().get("grpc-encoding").unwrap(), "gzip");
req
}

tokio::spawn({
let bytes_sent_counter = bytes_sent_counter.clone();
async move {
Server::builder()
.layer(
ServiceBuilder::new()
.layer(
ServiceBuilder::new()
.map_request(assert_right_encoding)
.layer(measure_request_body_size_layer(bytes_sent_counter))
.into_inner(),
)
.into_inner(),
)
.add_service(svc)
.serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(listener))
.await
.unwrap();
}
});

let channel = Channel::builder(format!("http://{}", addr).parse().unwrap())
.connect()
.await
.unwrap();

let mut client = test_client::TestClient::new(channel).send_gzip();

for _ in 0..3 {
client
.compress_input_unary(SomeData {
data: [0_u8; UNCOMPRESSED_MIN_BODY_SIZE].to_vec(),
})
.await
.unwrap();
let bytes_sent = bytes_sent_counter.load(Relaxed);
assert!(dbg!(bytes_sent) < UNCOMPRESSED_MIN_BODY_SIZE);
}
}

#[tokio::test(flavor = "multi_thread")]
async fn client_enabled_server_disabled() {
let svc = test_server::TestServer::new(Svc);

let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();

tokio::spawn(async move {
Server::builder()
.add_service(svc)
.serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(listener))
.await
.unwrap();
});

let channel = Channel::builder(format!("http://{}", addr).parse().unwrap())
.connect()
.await
.unwrap();

let mut client = test_client::TestClient::new(channel).send_gzip();

let status = client
.compress_input_unary(SomeData {
data: [0_u8; UNCOMPRESSED_MIN_BODY_SIZE].to_vec(),
})
.await
.unwrap_err();

assert_eq!(status.code(), tonic::Code::Unimplemented);
assert_eq!(
status.message(),
"Content is compressed with `gzip` which isn't supported"
);

// TODO(david): include header with which encodings are supported as per the spec:
//
// > The server will then include a grpc-accept-encoding response header which specifies the
// algorithms that the server accepts.
davidpdrsn marked this conversation as resolved.
Show resolved Hide resolved
}
Loading