@@ -29,9 +29,10 @@ import (
2929 "net"
3030
3131 "google.golang.org/grpc"
32- testpb "google.golang.org/grpc/benchmark/grpc_testing"
3332 "google.golang.org/grpc/codes"
3433 "google.golang.org/grpc/grpclog"
34+ testpb "google.golang.org/grpc/interop/grpc_testing"
35+ "google.golang.org/grpc/metadata"
3536 "google.golang.org/grpc/status"
3637)
3738
@@ -45,8 +46,6 @@ func setPayload(p *testpb.Payload, t testpb.PayloadType, size int) {
4546 body := make ([]byte , size )
4647 switch t {
4748 case testpb .PayloadType_COMPRESSABLE :
48- case testpb .PayloadType_UNCOMPRESSABLE :
49- logger .Fatalf ("PayloadType UNCOMPRESSABLE is not supported" )
5049 default :
5150 logger .Fatalf ("Unsupported payload type: %d" , t )
5251 }
@@ -71,7 +70,15 @@ func (s *testServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*
7170 }, nil
7271}
7372
73+ // UnconstrainedStreamingHeader indicates to the StreamingCall handler that its
74+ // behavior should be unconstrained (constant send/receive in parallel) instead
75+ // of ping-pong.
76+ const UnconstrainedStreamingHeader = "unconstrained-streaming"
77+
7478func (s * testServer ) StreamingCall (stream testpb.BenchmarkService_StreamingCallServer ) error {
79+ if md , ok := metadata .FromIncomingContext (stream .Context ()); ok && len (md [UnconstrainedStreamingHeader ]) != 0 {
80+ return s .UnconstrainedStreamingCall (stream )
81+ }
7582 response := & testpb.SimpleResponse {
7683 Payload : new (testpb.Payload ),
7784 }
@@ -93,7 +100,7 @@ func (s *testServer) StreamingCall(stream testpb.BenchmarkService_StreamingCallS
93100 }
94101}
95102
96- func (s * testServer ) UnconstrainedStreamingCall (stream testpb.BenchmarkService_UnconstrainedStreamingCallServer ) error {
103+ func (s * testServer ) UnconstrainedStreamingCall (stream testpb.BenchmarkService_StreamingCallServer ) error {
97104 in := new (testpb.SimpleRequest )
98105 // Receive a message to learn response type and size.
99106 err := stream .RecvMsg (in )
@@ -171,23 +178,6 @@ func (s *byteBufServer) StreamingCall(stream testpb.BenchmarkService_StreamingCa
171178 }
172179}
173180
174- func (s * byteBufServer ) UnconstrainedStreamingCall (stream testpb.BenchmarkService_UnconstrainedStreamingCallServer ) error {
175- for {
176- var in []byte
177- err := stream .(grpc.ServerStream ).RecvMsg (& in )
178- if err == io .EOF {
179- return nil
180- }
181- if err != nil {
182- return err
183- }
184- out := make ([]byte , s .respSize )
185- if err := stream .(grpc.ServerStream ).SendMsg (& out ); err != nil {
186- return err
187- }
188- }
189- }
190-
191181// ServerInfo contains the information to create a gRPC benchmark server.
192182type ServerInfo struct {
193183 // Type is the type of the server.
0 commit comments