-
Notifications
You must be signed in to change notification settings - Fork 1.1k
feat(tonic): compression support #692
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
29 commits
Select commit
Hold shift + click to select a range
b4fb623
Initial compression support
davidpdrsn 9ffb8d5
Support configuring compression on `Server`
davidpdrsn a807ddc
Minor clean up
davidpdrsn 46a12f6
Test that compression is actually happening
davidpdrsn 3ba2858
Clean up some todos
davidpdrsn aae6015
channels compressing requests
davidpdrsn bd3ab36
Move compression to be on the codecs
davidpdrsn 50aa2e2
Test sending compressed request to server that doesn't support it
davidpdrsn 6cb6fe2
Clean up a bit
davidpdrsn 6b1946d
Compress server streams
davidpdrsn 82c2297
Compress client streams
davidpdrsn ab10667
Bidirectional streaming compression
davidpdrsn 91ff0f7
Handle receiving unsupported encoding
davidpdrsn 1d23673
Clean up
davidpdrsn cc6b91f
Add note to future self
davidpdrsn cf81479
Support disabling compression for individual responses
davidpdrsn 4276697
Add docs
davidpdrsn 5db1d6a
Add compression examples
davidpdrsn 755967d
Disable compression behind feature flag
davidpdrsn 9c230ba
Add some docs
davidpdrsn f048f87
Make flate2 optional dependency
davidpdrsn ab1e953
Fix docs wording
davidpdrsn ed62228
Format
davidpdrsn f726acb
Reply with which encodings are supported
davidpdrsn 00f6989
Convert tests to use mocked io
davidpdrsn 1a78480
Fix lints
davidpdrsn 9eaffb8
Use separate counters
davidpdrsn e1e13a1
Don't make a long stream
davidpdrsn 5458db1
Address review feedback
davidpdrsn File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
use hello_world::greeter_client::GreeterClient; | ||
use hello_world::HelloRequest; | ||
use tonic::transport::Channel; | ||
|
||
pub mod hello_world { | ||
tonic::include_proto!("helloworld"); | ||
} | ||
|
||
#[tokio::main] | ||
async fn main() -> Result<(), Box<dyn std::error::Error>> { | ||
let channel = Channel::builder("http://[::1]:50051".parse().unwrap()) | ||
.connect() | ||
.await | ||
.unwrap(); | ||
|
||
let mut client = GreeterClient::new(channel).send_gzip().accept_gzip(); | ||
|
||
let request = tonic::Request::new(HelloRequest { | ||
name: "Tonic".into(), | ||
}); | ||
|
||
let response = client.say_hello(request).await?; | ||
|
||
dbg!(response); | ||
|
||
Ok(()) | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
use tonic::{transport::Server, Request, Response, Status}; | ||
|
||
use hello_world::greeter_server::{Greeter, GreeterServer}; | ||
use hello_world::{HelloReply, HelloRequest}; | ||
|
||
pub mod hello_world { | ||
tonic::include_proto!("helloworld"); | ||
} | ||
|
||
#[derive(Default)] | ||
pub struct MyGreeter {} | ||
|
||
#[tonic::async_trait] | ||
impl Greeter for MyGreeter { | ||
async fn say_hello( | ||
&self, | ||
request: Request<HelloRequest>, | ||
) -> Result<Response<HelloReply>, Status> { | ||
println!("Got a request from {:?}", request.remote_addr()); | ||
|
||
let reply = hello_world::HelloReply { | ||
message: format!("Hello {}!", request.into_inner().name), | ||
}; | ||
Ok(Response::new(reply)) | ||
} | ||
} | ||
|
||
#[tokio::main] | ||
async fn main() -> Result<(), Box<dyn std::error::Error>> { | ||
let addr = "[::1]:50051".parse().unwrap(); | ||
let greeter = MyGreeter::default(); | ||
|
||
println!("GreeterServer listening on {}", addr); | ||
|
||
let service = GreeterServer::new(greeter).send_gzip().accept_gzip(); | ||
|
||
Server::builder().add_service(service).serve(addr).await?; | ||
|
||
Ok(()) | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
[package] | ||
name = "compression" | ||
version = "0.1.0" | ||
authors = ["Lucio Franco <luciofranco14@gmail.com>"] | ||
edition = "2018" | ||
publish = false | ||
license = "MIT" | ||
|
||
[dependencies] | ||
tonic = { path = "../../tonic", features = ["compression"] } | ||
prost = "0.7" | ||
tokio = { version = "1.0", features = ["macros", "rt-multi-thread", "net"] } | ||
tower = { version = "0.4", features = [] } | ||
http-body = "0.4" | ||
http = "0.2" | ||
tokio-stream = { version = "0.1.5", features = ["net"] } | ||
tower-http = { version = "0.1", features = ["map-response-body", "map-request-body"] } | ||
bytes = "1" | ||
futures = "0.3" | ||
pin-project = "1.0" | ||
hyper = "0.14" | ||
|
||
[build-dependencies] | ||
tonic-build = { path = "../../tonic-build", features = ["compression"] } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
fn main() { | ||
tonic_build::compile_protos("proto/test.proto").unwrap(); | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
syntax = "proto3"; | ||
|
||
package test; | ||
|
||
import "google/protobuf/empty.proto"; | ||
|
||
service Test { | ||
rpc CompressOutputUnary(google.protobuf.Empty) returns (SomeData); | ||
rpc CompressInputUnary(SomeData) returns (google.protobuf.Empty); | ||
rpc CompressOutputServerStream(google.protobuf.Empty) returns (stream SomeData); | ||
rpc CompressInputClientStream(stream SomeData) returns (google.protobuf.Empty); | ||
rpc CompressOutputClientStream(stream SomeData) returns (SomeData); | ||
rpc CompressInputOutputBidirectionalStream(stream SomeData) returns (stream SomeData); | ||
} | ||
|
||
message SomeData { | ||
// include a bunch of data so there actually is something to compress | ||
bytes data = 1; | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,78 @@ | ||
use super::*; | ||
|
||
#[tokio::test(flavor = "multi_thread")] | ||
async fn client_enabled_server_enabled() { | ||
let (client, server) = tokio::io::duplex(UNCOMPRESSED_MIN_BODY_SIZE * 10); | ||
|
||
let svc = test_server::TestServer::new(Svc::default()) | ||
.accept_gzip() | ||
.send_gzip(); | ||
|
||
let request_bytes_counter = Arc::new(AtomicUsize::new(0)); | ||
let response_bytes_counter = Arc::new(AtomicUsize::new(0)); | ||
|
||
fn assert_right_encoding<B>(req: http::Request<B>) -> http::Request<B> { | ||
assert_eq!(req.headers().get("grpc-encoding").unwrap(), "gzip"); | ||
req | ||
} | ||
|
||
tokio::spawn({ | ||
let request_bytes_counter = request_bytes_counter.clone(); | ||
let response_bytes_counter = response_bytes_counter.clone(); | ||
async move { | ||
Server::builder() | ||
.layer( | ||
ServiceBuilder::new() | ||
.map_request(assert_right_encoding) | ||
.layer(measure_request_body_size_layer( | ||
request_bytes_counter.clone(), | ||
)) | ||
.layer(MapResponseBodyLayer::new(move |body| { | ||
util::CountBytesBody { | ||
inner: body, | ||
counter: response_bytes_counter.clone(), | ||
} | ||
})) | ||
.into_inner(), | ||
) | ||
.add_service(svc) | ||
.serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>( | ||
MockStream(server), | ||
)])) | ||
.await | ||
.unwrap(); | ||
} | ||
}); | ||
|
||
let mut client = test_client::TestClient::new(mock_io_channel(client).await) | ||
.send_gzip() | ||
.accept_gzip(); | ||
|
||
let data = [0_u8; UNCOMPRESSED_MIN_BODY_SIZE].to_vec(); | ||
let stream = futures::stream::iter(vec![SomeData { data: data.clone() }, SomeData { data }]); | ||
let req = Request::new(stream); | ||
|
||
let res = client | ||
.compress_input_output_bidirectional_stream(req) | ||
.await | ||
.unwrap(); | ||
|
||
assert_eq!(res.metadata().get("grpc-encoding").unwrap(), "gzip"); | ||
|
||
let mut stream: Streaming<SomeData> = res.into_inner(); | ||
|
||
stream | ||
.next() | ||
.await | ||
.expect("stream empty") | ||
.expect("item was error"); | ||
|
||
stream | ||
.next() | ||
.await | ||
.expect("stream empty") | ||
.expect("item was error"); | ||
|
||
assert!(request_bytes_counter.load(SeqCst) < UNCOMPRESSED_MIN_BODY_SIZE); | ||
assert!(response_bytes_counter.load(SeqCst) < UNCOMPRESSED_MIN_BODY_SIZE); | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,167 @@ | ||
use super::*; | ||
use http_body::Body as _; | ||
|
||
#[tokio::test(flavor = "multi_thread")] | ||
async fn client_enabled_server_enabled() { | ||
let (client, server) = tokio::io::duplex(UNCOMPRESSED_MIN_BODY_SIZE * 10); | ||
|
||
let svc = test_server::TestServer::new(Svc::default()).accept_gzip(); | ||
|
||
let request_bytes_counter = Arc::new(AtomicUsize::new(0)); | ||
|
||
fn assert_right_encoding<B>(req: http::Request<B>) -> http::Request<B> { | ||
assert_eq!(req.headers().get("grpc-encoding").unwrap(), "gzip"); | ||
req | ||
} | ||
|
||
tokio::spawn({ | ||
let request_bytes_counter = request_bytes_counter.clone(); | ||
async move { | ||
Server::builder() | ||
.layer( | ||
ServiceBuilder::new() | ||
.map_request(assert_right_encoding) | ||
.layer(measure_request_body_size_layer( | ||
request_bytes_counter.clone(), | ||
)) | ||
.into_inner(), | ||
) | ||
.add_service(svc) | ||
.serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>( | ||
MockStream(server), | ||
)])) | ||
.await | ||
.unwrap(); | ||
} | ||
}); | ||
|
||
let mut client = test_client::TestClient::new(mock_io_channel(client).await).send_gzip(); | ||
|
||
let data = [0_u8; UNCOMPRESSED_MIN_BODY_SIZE].to_vec(); | ||
let stream = futures::stream::iter(vec![SomeData { data: data.clone() }, SomeData { data }]); | ||
let req = Request::new(Box::pin(stream)); | ||
|
||
client.compress_input_client_stream(req).await.unwrap(); | ||
|
||
let bytes_sent = request_bytes_counter.load(SeqCst); | ||
assert!(bytes_sent < UNCOMPRESSED_MIN_BODY_SIZE); | ||
} | ||
|
||
#[tokio::test(flavor = "multi_thread")] | ||
async fn client_disabled_server_enabled() { | ||
let (client, server) = tokio::io::duplex(UNCOMPRESSED_MIN_BODY_SIZE * 10); | ||
|
||
let svc = test_server::TestServer::new(Svc::default()).accept_gzip(); | ||
|
||
let request_bytes_counter = Arc::new(AtomicUsize::new(0)); | ||
|
||
fn assert_right_encoding<B>(req: http::Request<B>) -> http::Request<B> { | ||
assert!(req.headers().get("grpc-encoding").is_none()); | ||
req | ||
} | ||
|
||
tokio::spawn({ | ||
let request_bytes_counter = request_bytes_counter.clone(); | ||
async move { | ||
Server::builder() | ||
.layer( | ||
ServiceBuilder::new() | ||
.map_request(assert_right_encoding) | ||
.layer(measure_request_body_size_layer( | ||
request_bytes_counter.clone(), | ||
)) | ||
.into_inner(), | ||
) | ||
.add_service(svc) | ||
.serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>( | ||
MockStream(server), | ||
)])) | ||
.await | ||
.unwrap(); | ||
} | ||
}); | ||
|
||
let mut client = test_client::TestClient::new(mock_io_channel(client).await); | ||
|
||
let data = [0_u8; UNCOMPRESSED_MIN_BODY_SIZE].to_vec(); | ||
let stream = futures::stream::iter(vec![SomeData { data: data.clone() }, SomeData { data }]); | ||
let req = Request::new(Box::pin(stream)); | ||
|
||
client.compress_input_client_stream(req).await.unwrap(); | ||
|
||
let bytes_sent = request_bytes_counter.load(SeqCst); | ||
assert!(bytes_sent > UNCOMPRESSED_MIN_BODY_SIZE); | ||
} | ||
|
||
#[tokio::test(flavor = "multi_thread")] | ||
async fn client_enabled_server_disabled() { | ||
let (client, server) = tokio::io::duplex(UNCOMPRESSED_MIN_BODY_SIZE * 10); | ||
|
||
let svc = test_server::TestServer::new(Svc::default()); | ||
|
||
tokio::spawn(async move { | ||
Server::builder() | ||
.add_service(svc) | ||
.serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>( | ||
MockStream(server), | ||
)])) | ||
.await | ||
.unwrap(); | ||
}); | ||
|
||
let mut client = test_client::TestClient::new(mock_io_channel(client).await).send_gzip(); | ||
|
||
let data = [0_u8; UNCOMPRESSED_MIN_BODY_SIZE].to_vec(); | ||
let stream = futures::stream::iter(vec![SomeData { data: data.clone() }, SomeData { data }]); | ||
let req = Request::new(Box::pin(stream)); | ||
|
||
let status = client.compress_input_client_stream(req).await.unwrap_err(); | ||
|
||
assert_eq!(status.code(), tonic::Code::Unimplemented); | ||
assert_eq!( | ||
status.message(), | ||
"Content is compressed with `gzip` which isn't supported" | ||
); | ||
} | ||
|
||
#[tokio::test(flavor = "multi_thread")] | ||
async fn compressing_response_from_client_stream() { | ||
let (client, server) = tokio::io::duplex(UNCOMPRESSED_MIN_BODY_SIZE * 10); | ||
|
||
let svc = test_server::TestServer::new(Svc::default()).send_gzip(); | ||
|
||
let response_bytes_counter = Arc::new(AtomicUsize::new(0)); | ||
|
||
tokio::spawn({ | ||
let response_bytes_counter = response_bytes_counter.clone(); | ||
async move { | ||
Server::builder() | ||
.layer( | ||
ServiceBuilder::new() | ||
.layer(MapResponseBodyLayer::new(move |body| { | ||
util::CountBytesBody { | ||
inner: body, | ||
counter: response_bytes_counter.clone(), | ||
} | ||
})) | ||
.into_inner(), | ||
) | ||
.add_service(svc) | ||
.serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>( | ||
MockStream(server), | ||
)])) | ||
.await | ||
.unwrap(); | ||
} | ||
}); | ||
|
||
let mut client = test_client::TestClient::new(mock_io_channel(client).await).accept_gzip(); | ||
|
||
let stream = futures::stream::iter(vec![]); | ||
let req = Request::new(Box::pin(stream)); | ||
|
||
let res = client.compress_output_client_stream(req).await.unwrap(); | ||
assert_eq!(res.metadata().get("grpc-encoding").unwrap(), "gzip"); | ||
let bytes_sent = response_bytes_counter.load(SeqCst); | ||
assert!(bytes_sent < UNCOMPRESSED_MIN_BODY_SIZE); | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.