From 954fe2732443c3d6a3d74c760a2e561b2e1bc680 Mon Sep 17 00:00:00 2001 From: Can Guler Date: Fri, 11 Jan 2019 17:37:31 -0800 Subject: [PATCH] benchmark: Unconstrained streaming benchmark (#2512) * Adds unconstrained streaming benchmarks. * Adds throughput to all scenarios. * Adds comment to exported function. * Adds comment to the new rpc. * Adds a new run mode for unconstrained benchmarks. * Converts counters to uint64s. * Decreases default warm up time. * Addresses PR comments. * Deletes an unnecessary select/case * Explains the use of RecvMsg rather than Recv. --- benchmark/benchmain/main.go | 192 +++++++++++++++++--------- benchmark/benchmark.go | 75 +++++++++- benchmark/grpc_testing/services.pb.go | 110 ++++++++++++--- benchmark/grpc_testing/services.proto | 3 + 4 files changed, 290 insertions(+), 90 deletions(-) diff --git a/benchmark/benchmain/main.go b/benchmark/benchmain/main.go index 9cb329f0be28..dcc0bef4a854 100644 --- a/benchmark/benchmain/main.go +++ b/benchmark/benchmain/main.go @@ -80,15 +80,16 @@ var allCompressionModes = []string{modeOn, modeOff, modeBoth} var allTraceModes = []string{modeOn, modeOff, modeBoth} const ( - workloadsUnary = "unary" - workloadsStreaming = "streaming" - workloadsAll = "all" + workloadsUnary = "unary" + workloadsStreaming = "streaming" + workloadsUnconstrained = "unconstrained" + workloadsAll = "all" ) -var allWorkloads = []string{workloadsUnary, workloadsStreaming, workloadsAll} +var allWorkloads = []string{workloadsUnary, workloadsStreaming, workloadsUnconstrained, workloadsAll} var ( - runMode = []bool{true, true} // {runUnary, runStream} + runMode = []bool{true, true, true} // {runUnary, runStream, runUnconstrained} // When set the latency to 0 (no delay), the result is slower than the real result with no delay // because latency simulation section has extra operations ltc = []time.Duration{0, 40 * time.Millisecond} // if non-positive, no delay. @@ -113,19 +114,66 @@ var ( } ) -func unaryBenchmark(startTimer func(), stopTimer func(int32), benchFeatures stats.Features, benchtime time.Duration, s *stats.Stats) { +func unaryBenchmark(startTimer func(), stopTimer func(uint64), benchFeatures stats.Features, benchtime time.Duration, s *stats.Stats) uint64 { caller, cleanup := makeFuncUnary(benchFeatures) defer cleanup() - runBenchmark(caller, startTimer, stopTimer, benchFeatures, benchtime, s) + return runBenchmark(caller, startTimer, stopTimer, benchFeatures, benchtime, s) } -func streamBenchmark(startTimer func(), stopTimer func(int32), benchFeatures stats.Features, benchtime time.Duration, s *stats.Stats) { +func streamBenchmark(startTimer func(), stopTimer func(uint64), benchFeatures stats.Features, benchtime time.Duration, s *stats.Stats) uint64 { caller, cleanup := makeFuncStream(benchFeatures) defer cleanup() - runBenchmark(caller, startTimer, stopTimer, benchFeatures, benchtime, s) + return runBenchmark(caller, startTimer, stopTimer, benchFeatures, benchtime, s) } -func makeFuncUnary(benchFeatures stats.Features) (func(int), func()) { +func unconstrainedStreamBenchmark(benchFeatures stats.Features, warmuptime, benchtime time.Duration) (uint64, uint64) { + sender, recver, cleanup := makeFuncUnconstrainedStream(benchFeatures) + defer cleanup() + + var ( + wg sync.WaitGroup + requestCount uint64 + responseCount uint64 + ) + wg.Add(2 * benchFeatures.MaxConcurrentCalls) + + // Resets the counters once warmed up + go func() { + <-time.NewTimer(warmuptime).C + atomic.StoreUint64(&requestCount, 0) + atomic.StoreUint64(&responseCount, 0) + }() + + bmEnd := time.Now().Add(benchtime + warmuptime) + for i := 0; i < benchFeatures.MaxConcurrentCalls; i++ { + go func(pos int) { + for { + t := time.Now() + if t.After(bmEnd) { + break + } + sender(pos) + atomic.AddUint64(&requestCount, 1) + } + wg.Done() + }(i) + go func(pos int) { + for { + t := time.Now() + if t.After(bmEnd) { + break + } + recver(pos) + atomic.AddUint64(&responseCount, 1) + } + wg.Done() + }(i) + } + wg.Wait() + return requestCount, responseCount +} + +func makeClient(benchFeatures stats.Features) (testpb.BenchmarkServiceClient, func()) { nw := &latency.Network{Kbps: benchFeatures.Kbps, Latency: benchFeatures.Latency, MTU: benchFeatures.Mtu} opts := []grpc.DialOption{} sopts := []grpc.ServerOption{} @@ -165,57 +213,22 @@ func makeFuncUnary(benchFeatures stats.Features) (func(int), func()) { lis = nw.Listener(lis) stopper := bm.StartServer(bm.ServerInfo{Type: "protobuf", Listener: lis}, sopts...) conn := bm.NewClientConn("" /* target not used */, opts...) - tc := testpb.NewBenchmarkServiceClient(conn) + return testpb.NewBenchmarkServiceClient(conn), func() { + conn.Close() + stopper() + } +} + +func makeFuncUnary(benchFeatures stats.Features) (func(int), func()) { + tc, cleanup := makeClient(benchFeatures) return func(int) { - unaryCaller(tc, benchFeatures.ReqSizeBytes, benchFeatures.RespSizeBytes) - }, func() { - conn.Close() - stopper() - } + unaryCaller(tc, benchFeatures.ReqSizeBytes, benchFeatures.RespSizeBytes) + }, cleanup } func makeFuncStream(benchFeatures stats.Features) (func(int), func()) { - // TODO: Refactor to remove duplication with makeFuncUnary. - nw := &latency.Network{Kbps: benchFeatures.Kbps, Latency: benchFeatures.Latency, MTU: benchFeatures.Mtu} - opts := []grpc.DialOption{} - sopts := []grpc.ServerOption{} - if benchFeatures.EnableCompressor { - sopts = append(sopts, - grpc.RPCCompressor(grpc.NewGZIPCompressor()), - grpc.RPCDecompressor(grpc.NewGZIPDecompressor()), - ) - opts = append(opts, - grpc.WithCompressor(grpc.NewGZIPCompressor()), - grpc.WithDecompressor(grpc.NewGZIPDecompressor()), - ) - } - sopts = append(sopts, grpc.MaxConcurrentStreams(uint32(benchFeatures.MaxConcurrentCalls+1))) - opts = append(opts, grpc.WithInsecure()) + tc, cleanup := makeClient(benchFeatures) - var lis net.Listener - if *useBufconn { - bcLis := bufconn.Listen(256 * 1024) - lis = bcLis - opts = append(opts, grpc.WithDialer(func(string, time.Duration) (net.Conn, error) { - return nw.TimeoutDialer( - func(string, string, time.Duration) (net.Conn, error) { - return bcLis.Dial() - })("", "", 0) - })) - } else { - var err error - lis, err = net.Listen("tcp", "localhost:0") - if err != nil { - grpclog.Fatalf("Failed to listen: %v", err) - } - opts = append(opts, grpc.WithDialer(func(_ string, timeout time.Duration) (net.Conn, error) { - return nw.TimeoutDialer(net.DialTimeout)("tcp", lis.Addr().String(), timeout) - })) - } - lis = nw.Listener(lis) - stopper := bm.StartServer(bm.ServerInfo{Type: "protobuf", Listener: lis}, sopts...) - conn := bm.NewClientConn("" /* target not used */, opts...) - tc := testpb.NewBenchmarkServiceClient(conn) streams := make([]testpb.BenchmarkService_StreamingCallClient, benchFeatures.MaxConcurrentCalls) for i := 0; i < benchFeatures.MaxConcurrentCalls; i++ { stream, err := tc.StreamingCall(context.Background()) @@ -224,12 +237,36 @@ func makeFuncStream(benchFeatures stats.Features) (func(int), func()) { } streams[i] = stream } + return func(pos int) { - streamCaller(streams[pos], benchFeatures.ReqSizeBytes, benchFeatures.RespSizeBytes) - }, func() { - conn.Close() - stopper() + streamCaller(streams[pos], benchFeatures.ReqSizeBytes, benchFeatures.RespSizeBytes) + }, cleanup +} + +func makeFuncUnconstrainedStream(benchFeatures stats.Features) (func(int), func(int), func()) { + tc, cleanup := makeClient(benchFeatures) + + streams := make([]testpb.BenchmarkService_StreamingCallClient, benchFeatures.MaxConcurrentCalls) + for i := 0; i < benchFeatures.MaxConcurrentCalls; i++ { + stream, err := tc.UnconstrainedStreamingCall(context.Background()) + if err != nil { + grpclog.Fatalf("%v.UnconstrainedStreamingCall(_) = _, %v", tc, err) } + streams[i] = stream + } + + pl := bm.NewPayload(testpb.PayloadType_COMPRESSABLE, benchFeatures.ReqSizeBytes) + req := &testpb.SimpleRequest{ + ResponseType: pl.Type, + ResponseSize: int32(benchFeatures.RespSizeBytes), + Payload: pl, + } + + return func(pos int) { + streams[pos].Send(req) + }, func(pos int) { + streams[pos].Recv() + }, cleanup } func unaryCaller(client testpb.BenchmarkServiceClient, reqSize, respSize int) { @@ -244,7 +281,7 @@ func streamCaller(stream testpb.BenchmarkService_StreamingCallClient, reqSize, r } } -func runBenchmark(caller func(int), startTimer func(), stopTimer func(int32), benchFeatures stats.Features, benchtime time.Duration, s *stats.Stats) { +func runBenchmark(caller func(int), startTimer func(), stopTimer func(uint64), benchFeatures stats.Features, benchtime time.Duration, s *stats.Stats) uint64 { // Warm up connection. for i := 0; i < 10; i++ { caller(0) @@ -257,7 +294,7 @@ func runBenchmark(caller func(int), startTimer func(), stopTimer func(int32), be ) wg.Add(benchFeatures.MaxConcurrentCalls) bmEnd := time.Now().Add(benchtime) - var count int32 + var count uint64 for i := 0; i < benchFeatures.MaxConcurrentCalls; i++ { go func(pos int) { for { @@ -268,7 +305,7 @@ func runBenchmark(caller func(int), startTimer func(), stopTimer func(int32), be start := time.Now() caller(pos) elapse := time.Since(start) - atomic.AddInt32(&count, 1) + atomic.AddUint64(&count, 1) mu.Lock() s.Add(elapse) mu.Unlock() @@ -278,6 +315,7 @@ func runBenchmark(caller func(int), startTimer func(), stopTimer func(int32), be } wg.Wait() stopTimer(count) + return count } var useBufconn = flag.Bool("bufconn", false, "Use in-memory connection instead of system network I/O") @@ -318,12 +356,19 @@ func init() { case workloadsUnary: runMode[0] = true runMode[1] = false + runMode[2] = false case workloadsStreaming: runMode[0] = false runMode[1] = true + runMode[2] = false + case workloadsUnconstrained: + runMode[0] = false + runMode[1] = false + runMode[2] = true case workloadsAll: runMode[0] = true runMode[1] = true + runMode[2] = true default: log.Fatalf("Unknown workloads setting: %v (want one of: %v)", workloads, strings.Join(allWorkloads, ", ")) @@ -402,6 +447,14 @@ func readTimeFromInput(values *[]time.Duration, replace string) { } } +func printThroughput(requestCount uint64, requestSize int, responseCount uint64, responseSize int) { + requestThroughput := float64(requestCount) * float64(requestSize) * 8 / benchtime.Seconds() + responseThroughput := float64(responseCount) * float64(responseSize) * 8 / benchtime.Seconds() + fmt.Printf("Number of requests: %v\tRequest throughput: %v bit/s\n", requestCount, requestThroughput) + fmt.Printf("Number of responses: %v\tResponse throughput: %v bit/s\n", responseCount, responseThroughput) + fmt.Println() +} + func main() { before() featuresPos := make([]int, 9) @@ -422,7 +475,7 @@ func main() { startBytes = memStats.TotalAlloc startTime = time.Now() } - var stopTimer = func(count int32) { + var stopTimer = func(count uint64) { runtime.ReadMemStats(&memStats) results = testing.BenchmarkResult{N: int(count), T: time.Since(startTime), Bytes: 0, MemAllocs: memStats.Mallocs - startAllocs, MemBytes: memStats.TotalAlloc - startBytes} @@ -456,23 +509,30 @@ func main() { channelz.TurnOn() } if runMode[0] { - unaryBenchmark(startTimer, stopTimer, benchFeature, benchtime, s) + count := unaryBenchmark(startTimer, stopTimer, benchFeature, benchtime, s) s.SetBenchmarkResult("Unary", benchFeature, results.N, results.AllocedBytesPerOp(), results.AllocsPerOp(), sharedPos) fmt.Println(s.BenchString()) fmt.Println(s.String()) + printThroughput(count, benchFeature.ReqSizeBytes, count, benchFeature.RespSizeBytes) resultSlice = append(resultSlice, s.GetBenchmarkResults()) s.Clear() } if runMode[1] { - streamBenchmark(startTimer, stopTimer, benchFeature, benchtime, s) + count := streamBenchmark(startTimer, stopTimer, benchFeature, benchtime, s) s.SetBenchmarkResult("Stream", benchFeature, results.N, results.AllocedBytesPerOp(), results.AllocsPerOp(), sharedPos) fmt.Println(s.BenchString()) fmt.Println(s.String()) + printThroughput(count, benchFeature.ReqSizeBytes, count, benchFeature.RespSizeBytes) resultSlice = append(resultSlice, s.GetBenchmarkResults()) s.Clear() } + if runMode[2] { + requestCount, responseCount := unconstrainedStreamBenchmark(benchFeature, time.Second, benchtime) + fmt.Printf("Unconstrained Stream-%v\n", benchFeature) + printThroughput(requestCount, benchFeature.ReqSizeBytes, responseCount, benchFeature.RespSizeBytes) + } bm.AddOne(featuresPos, featuresNum) } after(resultSlice) diff --git a/benchmark/benchmark.go b/benchmark/benchmark.go index e9a2723495eb..1feaf9e5ed3a 100644 --- a/benchmark/benchmark.go +++ b/benchmark/benchmark.go @@ -27,6 +27,7 @@ import ( "context" "fmt" "io" + "log" "net" "sync" "testing" @@ -36,7 +37,9 @@ import ( testpb "google.golang.org/grpc/benchmark/grpc_testing" "google.golang.org/grpc/benchmark/latency" "google.golang.org/grpc/benchmark/stats" + "google.golang.org/grpc/codes" "google.golang.org/grpc/grpclog" + "google.golang.org/grpc/status" ) // AddOne add 1 to the features slice @@ -67,7 +70,8 @@ func setPayload(p *testpb.Payload, t testpb.PayloadType, size int) { p.Body = body } -func newPayload(t testpb.PayloadType, size int) *testpb.Payload { +// NewPayload creates a payload with the given type and size. +func NewPayload(t testpb.PayloadType, size int) *testpb.Payload { p := new(testpb.Payload) setPayload(p, t, size) return p @@ -78,7 +82,7 @@ type testServer struct { func (s *testServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { return &testpb.SimpleResponse{ - Payload: newPayload(in.ResponseType, int(in.ResponseSize)), + Payload: NewPayload(in.ResponseType, int(in.ResponseSize)), }, nil } @@ -104,6 +108,52 @@ func (s *testServer) StreamingCall(stream testpb.BenchmarkService_StreamingCallS } } +func (s *testServer) UnconstrainedStreamingCall(stream testpb.BenchmarkService_UnconstrainedStreamingCallServer) error { + in := new(testpb.SimpleRequest) + // Receive a message to learn response type and size. + err := stream.RecvMsg(in) + if err == io.EOF { + // read done. + return nil + } + if err != nil { + return err + } + + response := &testpb.SimpleResponse{ + Payload: new(testpb.Payload), + } + setPayload(response.Payload, in.ResponseType, int(in.ResponseSize)) + + go func() { + for { + // Using RecvMsg rather than Recv to prevent reallocation of SimpleRequest. + err := stream.RecvMsg(in) + switch status.Code(err) { + case codes.Canceled: + case codes.OK: + default: + log.Fatalf("server recv error: %v", err) + } + } + }() + + go func() { + for { + err := stream.Send(response) + switch status.Code(err) { + case codes.Unavailable: + case codes.OK: + default: + log.Fatalf("server send error: %v", err) + } + } + }() + + <-stream.Context().Done() + return stream.Context().Err() +} + // byteBufServer is a gRPC server that sends and receives byte buffer. // The purpose is to benchmark the gRPC performance without protobuf serialization/deserialization overhead. type byteBufServer struct { @@ -133,6 +183,23 @@ func (s *byteBufServer) StreamingCall(stream testpb.BenchmarkService_StreamingCa } } +func (s *byteBufServer) UnconstrainedStreamingCall(stream testpb.BenchmarkService_UnconstrainedStreamingCallServer) error { + for { + var in []byte + err := stream.(grpc.ServerStream).RecvMsg(&in) + if err == io.EOF { + return nil + } + if err != nil { + return err + } + out := make([]byte, s.respSize) + if err := stream.(grpc.ServerStream).SendMsg(&out); err != nil { + return err + } + } +} + // ServerInfo contains the information to create a gRPC benchmark server. type ServerInfo struct { // Type is the type of the server. @@ -174,7 +241,7 @@ func StartServer(info ServerInfo, opts ...grpc.ServerOption) func() { // DoUnaryCall performs an unary RPC with given stub and request and response sizes. func DoUnaryCall(tc testpb.BenchmarkServiceClient, reqSize, respSize int) error { - pl := newPayload(testpb.PayloadType_COMPRESSABLE, reqSize) + pl := NewPayload(testpb.PayloadType_COMPRESSABLE, reqSize) req := &testpb.SimpleRequest{ ResponseType: pl.Type, ResponseSize: int32(respSize), @@ -188,7 +255,7 @@ func DoUnaryCall(tc testpb.BenchmarkServiceClient, reqSize, respSize int) error // DoStreamingRoundTrip performs a round trip for a single streaming rpc. func DoStreamingRoundTrip(stream testpb.BenchmarkService_StreamingCallClient, reqSize, respSize int) error { - pl := newPayload(testpb.PayloadType_COMPRESSABLE, reqSize) + pl := NewPayload(testpb.PayloadType_COMPRESSABLE, reqSize) req := &testpb.SimpleRequest{ ResponseType: pl.Type, ResponseSize: int32(respSize), diff --git a/benchmark/grpc_testing/services.pb.go b/benchmark/grpc_testing/services.pb.go index 5156ae7334e3..30f8fb65a0ec 100644 --- a/benchmark/grpc_testing/services.pb.go +++ b/benchmark/grpc_testing/services.pb.go @@ -41,6 +41,9 @@ type BenchmarkServiceClient interface { // One request followed by one response. // The server returns the client payload as-is. StreamingCall(ctx context.Context, opts ...grpc.CallOption) (BenchmarkService_StreamingCallClient, error) + // Unconstrainted streaming. + // Both server and client keep sending & receiving simultaneously. + UnconstrainedStreamingCall(ctx context.Context, opts ...grpc.CallOption) (BenchmarkService_UnconstrainedStreamingCallClient, error) } type benchmarkServiceClient struct { @@ -91,6 +94,37 @@ func (x *benchmarkServiceStreamingCallClient) Recv() (*SimpleResponse, error) { return m, nil } +func (c *benchmarkServiceClient) UnconstrainedStreamingCall(ctx context.Context, opts ...grpc.CallOption) (BenchmarkService_UnconstrainedStreamingCallClient, error) { + stream, err := c.cc.NewStream(ctx, &_BenchmarkService_serviceDesc.Streams[1], "/grpc.testing.BenchmarkService/UnconstrainedStreamingCall", opts...) + if err != nil { + return nil, err + } + x := &benchmarkServiceUnconstrainedStreamingCallClient{stream} + return x, nil +} + +type BenchmarkService_UnconstrainedStreamingCallClient interface { + Send(*SimpleRequest) error + Recv() (*SimpleResponse, error) + grpc.ClientStream +} + +type benchmarkServiceUnconstrainedStreamingCallClient struct { + grpc.ClientStream +} + +func (x *benchmarkServiceUnconstrainedStreamingCallClient) Send(m *SimpleRequest) error { + return x.ClientStream.SendMsg(m) +} + +func (x *benchmarkServiceUnconstrainedStreamingCallClient) Recv() (*SimpleResponse, error) { + m := new(SimpleResponse) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + // BenchmarkServiceServer is the server API for BenchmarkService service. type BenchmarkServiceServer interface { // One request followed by one response. @@ -99,6 +133,9 @@ type BenchmarkServiceServer interface { // One request followed by one response. // The server returns the client payload as-is. StreamingCall(BenchmarkService_StreamingCallServer) error + // Unconstrainted streaming. + // Both server and client keep sending & receiving simultaneously. + UnconstrainedStreamingCall(BenchmarkService_UnconstrainedStreamingCallServer) error } func RegisterBenchmarkServiceServer(s *grpc.Server, srv BenchmarkServiceServer) { @@ -149,6 +186,32 @@ func (x *benchmarkServiceStreamingCallServer) Recv() (*SimpleRequest, error) { return m, nil } +func _BenchmarkService_UnconstrainedStreamingCall_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(BenchmarkServiceServer).UnconstrainedStreamingCall(&benchmarkServiceUnconstrainedStreamingCallServer{stream}) +} + +type BenchmarkService_UnconstrainedStreamingCallServer interface { + Send(*SimpleResponse) error + Recv() (*SimpleRequest, error) + grpc.ServerStream +} + +type benchmarkServiceUnconstrainedStreamingCallServer struct { + grpc.ServerStream +} + +func (x *benchmarkServiceUnconstrainedStreamingCallServer) Send(m *SimpleResponse) error { + return x.ServerStream.SendMsg(m) +} + +func (x *benchmarkServiceUnconstrainedStreamingCallServer) Recv() (*SimpleRequest, error) { + m := new(SimpleRequest) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + var _BenchmarkService_serviceDesc = grpc.ServiceDesc{ ServiceName: "grpc.testing.BenchmarkService", HandlerType: (*BenchmarkServiceServer)(nil), @@ -165,6 +228,12 @@ var _BenchmarkService_serviceDesc = grpc.ServiceDesc{ ServerStreams: true, ClientStreams: true, }, + { + StreamName: "UnconstrainedStreamingCall", + Handler: _BenchmarkService_UnconstrainedStreamingCall_Handler, + ServerStreams: true, + ClientStreams: true, + }, }, Metadata: "services.proto", } @@ -425,24 +494,25 @@ var _WorkerService_serviceDesc = grpc.ServiceDesc{ Metadata: "services.proto", } -func init() { proto.RegisterFile("services.proto", fileDescriptor_services_bf68f4d7cbd0e0a1) } - -var fileDescriptor_services_bf68f4d7cbd0e0a1 = []byte{ - // 255 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x91, 0xc1, 0x4a, 0xc4, 0x30, - 0x10, 0x86, 0xa9, 0x07, 0xa1, 0xc1, 0x2e, 0x92, 0x93, 0x46, 0x1f, 0xc0, 0x53, 0x91, 0xd5, 0x17, - 0x70, 0x8b, 0x1e, 0x05, 0xb7, 0xa8, 0xe7, 0x58, 0x87, 0x1a, 0x36, 0xcd, 0xd4, 0x99, 0x89, 0xe0, - 0x93, 0xf8, 0x0e, 0x3e, 0xa5, 0xec, 0x66, 0x57, 0xd6, 0x92, 0x9b, 0xc7, 0xf9, 0xbf, 0xe1, 0x23, - 0x7f, 0x46, 0xcd, 0x18, 0xe8, 0xc3, 0x75, 0xc0, 0xf5, 0x48, 0x28, 0xa8, 0x8f, 0x7a, 0x1a, 0xbb, - 0x5a, 0x80, 0xc5, 0x85, 0xde, 0xcc, 0x06, 0x60, 0xb6, 0xfd, 0x8e, 0x9a, 0xaa, 0xc3, 0x20, 0x84, - 0x3e, 0x8d, 0xf3, 0xef, 0x42, 0x1d, 0x2f, 0x20, 0x74, 0x6f, 0x83, 0xa5, 0x55, 0x9b, 0x44, 0xfa, - 0x4e, 0x95, 0x8f, 0xc1, 0xd2, 0x67, 0x63, 0xbd, 0xd7, 0x67, 0xf5, 0xbe, 0xaf, 0x6e, 0xdd, 0x30, - 0x7a, 0x58, 0xc2, 0x7b, 0x04, 0x16, 0x73, 0x9e, 0x87, 0x3c, 0x62, 0x60, 0xd0, 0xf7, 0xaa, 0x6a, - 0x85, 0xc0, 0x0e, 0x2e, 0xf4, 0xff, 0x74, 0x5d, 0x14, 0x97, 0xc5, 0xfc, 0xeb, 0x40, 0x55, 0xcf, - 0x48, 0x2b, 0xa0, 0xdd, 0x4b, 0x6f, 0x55, 0xb9, 0x8c, 0x61, 0x3d, 0x01, 0xe9, 0x93, 0x89, 0x60, - 0x93, 0xde, 0x50, 0xcf, 0xc6, 0xe4, 0x48, 0x2b, 0x56, 0x22, 0xaf, 0xc5, 0x5b, 0x4d, 0xe3, 0x1d, - 0x04, 0x99, 0x6a, 0x52, 0x9a, 0xd3, 0x24, 0xb2, 0xa7, 0x59, 0xa8, 0xb2, 0x41, 0x82, 0x06, 0x63, - 0x10, 0x7d, 0x3a, 0x59, 0x46, 0xfa, 0x6d, 0x6a, 0x72, 0x68, 0xfb, 0x67, 0xd7, 0x4a, 0x3d, 0x44, - 0x27, 0xa9, 0xa6, 0xd6, 0x7f, 0x37, 0x9f, 0xd0, 0xbd, 0x9a, 0x4c, 0xf6, 0x72, 0xb8, 0xb9, 0xe6, - 0xd5, 0x4f, 0x00, 0x00, 0x00, 0xff, 0xff, 0x3b, 0x84, 0x02, 0xe3, 0x0c, 0x02, 0x00, 0x00, +func init() { proto.RegisterFile("services.proto", fileDescriptor_services_e4655369b5d7f4d0) } + +var fileDescriptor_services_e4655369b5d7f4d0 = []byte{ + // 271 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x92, 0xc1, 0x4a, 0xc3, 0x40, + 0x10, 0x86, 0x69, 0x0f, 0x42, 0x16, 0x53, 0x64, 0x4f, 0xba, 0xfa, 0x00, 0x9e, 0x82, 0x54, 0x5f, + 0xc0, 0x06, 0x3d, 0x0a, 0x36, 0x54, 0x0f, 0x9e, 0xd6, 0x74, 0x88, 0x4b, 0x93, 0x99, 0x38, 0x33, + 0x11, 0x7c, 0x02, 0x1f, 0xc1, 0xd7, 0x15, 0xb3, 0x56, 0x6a, 0xc8, 0xcd, 0x1e, 0xe7, 0xff, 0x86, + 0x8f, 0xfd, 0x77, 0xd7, 0xcc, 0x04, 0xf8, 0x2d, 0x94, 0x20, 0x59, 0xcb, 0xa4, 0x64, 0x0f, 0x2b, + 0x6e, 0xcb, 0x4c, 0x41, 0x34, 0x60, 0xe5, 0x66, 0x0d, 0x88, 0xf8, 0x6a, 0x4b, 0x5d, 0x5a, 0x12, + 0x2a, 0x53, 0x1d, 0xc7, 0xf9, 0xc7, 0xd4, 0x1c, 0x2d, 0x00, 0xcb, 0x97, 0xc6, 0xf3, 0xa6, 0x88, + 0x22, 0x7b, 0x6b, 0x92, 0x15, 0x7a, 0x7e, 0xcf, 0x7d, 0x5d, 0xdb, 0xd3, 0x6c, 0xd7, 0x97, 0x15, + 0xa1, 0x69, 0x6b, 0x58, 0xc2, 0x6b, 0x07, 0xa2, 0xee, 0x6c, 0x1c, 0x4a, 0x4b, 0x28, 0x60, 0xef, + 0x4c, 0x5a, 0x28, 0x83, 0x6f, 0x02, 0x56, 0xff, 0x74, 0x9d, 0x4f, 0x2e, 0x26, 0xf6, 0xc9, 0xb8, + 0x15, 0x96, 0x84, 0xa2, 0xec, 0x03, 0xc2, 0x7a, 0x9f, 0xf2, 0xf9, 0xe7, 0xd4, 0xa4, 0x8f, 0xc4, + 0x1b, 0xe0, 0xed, 0x35, 0xdc, 0x98, 0x64, 0xd9, 0xe1, 0xf7, 0x04, 0x6c, 0x8f, 0x07, 0x82, 0x3e, + 0xbd, 0xe6, 0x4a, 0x9c, 0x1b, 0x23, 0x85, 0x7a, 0xed, 0xa4, 0x3f, 0x75, 0xd4, 0xe4, 0x75, 0x00, + 0xd4, 0xa1, 0x26, 0xa6, 0x63, 0x9a, 0x48, 0x76, 0x34, 0x0b, 0x93, 0xe4, 0xc4, 0x90, 0x53, 0x87, + 0x6a, 0x4f, 0x06, 0xcb, 0xc4, 0xbf, 0x4d, 0xdd, 0x18, 0xfa, 0x79, 0x90, 0x2b, 0x63, 0xee, 0xbb, + 0xa0, 0xb1, 0xa6, 0xb5, 0x7f, 0x37, 0x1f, 0x28, 0xac, 0xdd, 0x48, 0xf6, 0x7c, 0xd0, 0x7f, 0x95, + 0xcb, 0xaf, 0x00, 0x00, 0x00, 0xff, 0xff, 0x9a, 0xb4, 0x19, 0x36, 0x69, 0x02, 0x00, 0x00, } diff --git a/benchmark/grpc_testing/services.proto b/benchmark/grpc_testing/services.proto index f4e7907824b0..6ae2f9d1118a 100644 --- a/benchmark/grpc_testing/services.proto +++ b/benchmark/grpc_testing/services.proto @@ -29,6 +29,9 @@ service BenchmarkService { // One request followed by one response. // The server returns the client payload as-is. rpc StreamingCall(stream SimpleRequest) returns (stream SimpleResponse); + // Unconstrainted streaming. + // Both server and client keep sending & receiving simultaneously. + rpc UnconstrainedStreamingCall(stream SimpleRequest) returns (stream SimpleResponse); } service WorkerService {