diff --git a/go.mod b/go.mod index b6864e1..6c3b495 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,8 @@ go 1.13 require ( github.com/golang/protobuf v1.3.2 + github.com/hashicorp/go-multierror v1.0.0 github.com/stretchr/testify v1.4.0 - golang.org/x/net v0.0.0-20191116160921-f9c825593386 + golang.org/x/net v0.0.0-20191116160921-f9c825593386 // indirect google.golang.org/grpc v1.25.1 ) diff --git a/go.sum b/go.sum index d0cfc61..6ebfe7b 100644 --- a/go.sum +++ b/go.sum @@ -13,6 +13,10 @@ github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5y github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= +github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA= +github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/go-multierror v1.0.0 h1:iVjPR7a6H0tWELX5NxNe7bYopibicUzc7uPribsnS6o= +github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= diff --git a/proxy/handler.go b/proxy/handler.go index 0a6fabc..4317170 100644 --- a/proxy/handler.go +++ b/proxy/handler.go @@ -1,4 +1,5 @@ // Copyright 2017 Michal Witkowski. All Rights Reserved. +// Copyright 2019 Andrey Smirnov. All Rights Reserved. // See LICENSE for licensing terms. package proxy @@ -111,7 +112,7 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error } if len(backendConnections) != 1 { - return status.Error(codes.Unimplemented, "proxying to multiple backends not implemented yet") + return s.handlerMulti(serverStream, backendConnections) } // case of proxying one to one: diff --git a/proxy/handler_multi.go b/proxy/handler_multi.go new file mode 100644 index 0000000..f4c6c78 --- /dev/null +++ b/proxy/handler_multi.go @@ -0,0 +1,276 @@ +// Copyright 2019 Andrey Smirnov. All Rights Reserved. +// See LICENSE for licensing terms. + +package proxy + +import ( + "errors" + "fmt" + "io" + + "github.com/hashicorp/go-multierror" + + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +func (s *handler) handlerMulti(serverStream grpc.ServerStream, backendConnections []backendConnection) error { + // wrap the stream for safe concurrent access + serverStream = &ServerStreamWrapper{ServerStream: serverStream} + + s2cErrChan := s.forwardServerToClientsMulti(serverStream, backendConnections) + var c2sErrChan chan error + + if true { // TODO: if unary + c2sErrChan = s.forwardClientsToServerMultiUnary(backendConnections, serverStream) + } else { + c2sErrChan = s.forwardClientsToServerMultiStreaming(backendConnections, serverStream) + } + + for i := 0; i < 2; i++ { + select { + case s2cErr := <-s2cErrChan: + if s2cErr == io.EOF { + // this is the happy case where the sender has encountered io.EOF, and won't be sending anymore./ + // the clientStream>serverStream may continue pumping though. + for i := range backendConnections { + if backendConnections[i].clientStream != nil { + backendConnections[i].clientStream.CloseSend() //nolint: errcheck + } + } + break + } else { + // however, we may have gotten a receive error (stream disconnected, a read error etc) in which case we need + // to cancel the clientStream to the backend, let all of its goroutines be freed up by the CancelFunc and + // exit with an error to the stack + return status.Errorf(codes.Internal, "failed proxying s2c: %v", s2cErr) + } + case c2sErr := <-c2sErrChan: + // c2sErr will contain RPC error from client code. If not io.EOF return the RPC error as server stream error. + if c2sErr != io.EOF { + return c2sErr + } + return nil + } + } + return status.Errorf(codes.Internal, "gRPC proxying should never reach this stage.") +} + +// formatError tries to format error from upstream as message to the client +func (s *handler) formatError(src *backendConnection, backendErr error) ([]byte, error) { + payload, err := src.backend.BuildError(backendErr) + if err != nil { + return nil, fmt.Errorf("error building error for %s: %w", src.backend, err) + } + + if payload == nil { + err = backendErr + } + + return payload, err +} + +// sendError tries to deliver error back to the client via dst +// +// if sendError fails to deliver the error, error is returned +// if sendError successfully delivers the error, nil is returned +func (s *handler) sendError(src *backendConnection, dst grpc.ServerStream, backendErr error) error { + payload, err := s.formatError(src, backendErr) + if err != nil { + return err + } + + f := &frame{payload: payload} + if err = dst.SendMsg(f); err != nil { + return fmt.Errorf("error sending error back: %w", err) + } + + return nil +} + +// one:many proxying, unary call version (merging results) +func (s *handler) forwardClientsToServerMultiUnary(sources []backendConnection, dst grpc.ServerStream) chan error { + ret := make(chan error, 1) + + payloadCh := make(chan []byte, len(sources)) + errCh := make(chan error, len(sources)) + + for i := 0; i < len(sources); i++ { + go func(src *backendConnection) { + errCh <- func() error { + f := &frame{} + for j := 0; ; j++ { + if err := src.clientStream.RecvMsg(f); err != nil { + if err == io.EOF { + // This happens when the clientStream has nothing else to offer (io.EOF), returned a gRPC error. In those two + // cases we may have received Trailers as part of the call. In case of other errors (stream closed) the trailers + // will be nil. + dst.SetTrailer(src.clientStream.Trailer()) + return nil + } + + payload, err := s.formatError(src, err) + if err != nil { + return err + } + + payloadCh <- payload + return nil + } + if j == 0 { + // This is a bit of a hack, but client to server headers are only readable after first client msg is + // received but must be written to server stream before the first msg is flushed. + // This is the only place to do it nicely. + md, err := src.clientStream.Header() + if err != nil { + payload, err := s.formatError(src, err) + if err != nil { + return err + } + + payloadCh <- payload + return nil + } + + if err := dst.SetHeader(md); err != nil { + return fmt.Errorf("error setting headers from client %s: %w", src.backend, err) + } + } + + var err error + f.payload, err = src.backend.AppendInfo(f.payload) + if err != nil { + return fmt.Errorf("error appending info for %s: %w", src.backend, err) + } + + payloadCh <- f.payload + } + }() + }(&sources[i]) + } + + go func() { + var multiErr *multierror.Error + + for range sources { + multiErr = multierror.Append(multiErr, <-errCh) + } + + if multiErr.ErrorOrNil() != nil { + ret <- multiErr.ErrorOrNil() + return + } + + close(payloadCh) + + var merged []byte + for b := range payloadCh { + merged = append(merged, b...) + } + + ret <- dst.SendMsg(&frame{payload: merged}) + }() + + return ret +} + +// one:many proxying, streaming version (no merge) +func (s *handler) forwardClientsToServerMultiStreaming(sources []backendConnection, dst grpc.ServerStream) chan error { + ret := make(chan error, 1) + + errCh := make(chan error, len(sources)) + + for i := range sources { + go func(src *backendConnection) { + errCh <- func() error { + if src.connError != nil { + return s.sendError(src, dst, src.connError) + } + + f := &frame{} + for j := 0; ; j++ { + if err := src.clientStream.RecvMsg(f); err != nil { + if err == io.EOF { + // This happens when the clientStream has nothing else to offer (io.EOF), returned a gRPC error. In those two + // cases we may have received Trailers as part of the call. In case of other errors (stream closed) the trailers + // will be nil. + dst.SetTrailer(src.clientStream.Trailer()) + return nil + } + return s.sendError(src, dst, fmt.Errorf("error reading from client stream %s: %w", src.backend, err)) + } + if j == 0 { + // This is a bit of a hack, but client to server headers are only readable after first client msg is + // received but must be written to server stream before the first msg is flushed. + // This is the only place to do it nicely. + md, err := src.clientStream.Header() + if err != nil { + return s.sendError(src, dst, fmt.Errorf("error getting headers from client stream %s: %w", src.backend, err)) + } + if err := dst.SetHeader(md); err != nil { + return fmt.Errorf("error setting headers from client %s: %w", src.backend, err) + } + } + + var err error + f.payload, err = src.backend.AppendInfo(f.payload) + if err != nil { + return fmt.Errorf("error appending info for %s: %w", src.backend, err) + } + + if err = dst.SendMsg(f); err != nil { + return fmt.Errorf("error sending back to server from %s: %w", src.backend, err) + } + } + + }() + }(&sources[i]) + } + + go func() { + var multiErr *multierror.Error + + for range sources { + multiErr = multierror.Append(multiErr, <-errCh) + } + + ret <- multiErr.ErrorOrNil() + }() + + return ret +} + +func (s *handler) forwardServerToClientsMulti(src grpc.ServerStream, destinations []backendConnection) chan error { + ret := make(chan error, 1) + go func() { + f := &frame{} + for { + if err := src.RecvMsg(f); err != nil { + ret <- err + return + } + + liveDestinations := 0 + for i := range destinations { + if destinations[i].clientStream == nil || destinations[i].connError != nil { + continue + } + + if err := destinations[i].clientStream.SendMsg(f); err != nil { + // TODO: race with reading connError (?) + // skip this or keep using? + destinations[i].connError = err + } else { + liveDestinations++ + } + } + + if liveDestinations == 0 { + ret <- errors.New("no backend connections to forward to are available") + return + } + } + }() + return ret +} diff --git a/proxy/handler_multi_test.go b/proxy/handler_multi_test.go new file mode 100644 index 0000000..0168222 --- /dev/null +++ b/proxy/handler_multi_test.go @@ -0,0 +1,358 @@ +// Copyright 2017 Michal Witkowski. All Rights Reserved. +// Copyright 2019 Andrey Smirnov. All Rights Reserved. +// See LICENSE for licensing terms. + +package proxy_test + +import ( + "context" + "fmt" + "io" + "log" + "net" + "os" + "strconv" + "strings" + "testing" + "time" + + "github.com/golang/protobuf/proto" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/grpclog" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" + + "github.com/smira/grpc-proxy/proxy" + pb "github.com/smira/grpc-proxy/testservice" +) + +const ( + numUpstreams = 3 +) + +// asserting service is implemented on the server side and serves as a handler for stuff +type assertingMultiService struct { + t *testing.T + server string +} + +func (s *assertingMultiService) PingEmpty(ctx context.Context, _ *pb.Empty) (*pb.MultiPingReply, error) { + // Check that this call has client's metadata. + md, ok := metadata.FromIncomingContext(ctx) + assert.True(s.t, ok, "PingEmpty call must have metadata in context") + _, ok = md[clientMdKey] + assert.True(s.t, ok, "PingEmpty call must have clients's custom headers in metadata") + return &pb.MultiPingReply{ + Response: []*pb.MultiPingResponse{ + { + Value: pingDefaultValue, + Counter: 42, + Server: s.server, + }, + }, + }, nil +} + +func (s *assertingMultiService) Ping(ctx context.Context, ping *pb.PingRequest) (*pb.MultiPingReply, error) { + // Send user trailers and headers. + grpc.SendHeader(ctx, metadata.Pairs(serverHeaderMdKey, "I like turtles.")) //nolint: errcheck + grpc.SetTrailer(ctx, metadata.Pairs(serverTrailerMdKey, "I like ending turtles.")) //nolint: errcheck + return &pb.MultiPingReply{ + Response: []*pb.MultiPingResponse{ + { + Value: ping.Value, + Counter: 42, + Server: s.server, + }, + }, + }, nil +} + +func (s *assertingMultiService) PingError(ctx context.Context, ping *pb.PingRequest) (*pb.Empty, error) { + return nil, status.Errorf(codes.FailedPrecondition, "Userspace error.") +} + +func (s *assertingMultiService) PingList(ping *pb.PingRequest, stream pb.MultiService_PingListServer) error { + // Send user trailers and headers. + stream.SendHeader(metadata.Pairs(serverHeaderMdKey, "I like turtles.")) //nolint: errcheck + for i := 0; i < countListResponses; i++ { + stream.Send(&pb.MultiPingReply{ //nolint: errcheck + Response: []*pb.MultiPingResponse{ + { + Value: ping.Value, + Counter: int32(i), + Server: s.server, + }, + }, + }) + } + stream.SetTrailer(metadata.Pairs(serverTrailerMdKey, "I like ending turtles.")) //nolint: errcheck + return nil +} + +func (s *assertingMultiService) PingStream(stream pb.MultiService_PingStreamServer) error { + stream.SendHeader(metadata.Pairs(serverHeaderMdKey, "I like turtles.")) // nolint: errcheck + counter := int32(0) + for { + ping, err := stream.Recv() + if err == io.EOF { + break + } else if err != nil { + require.NoError(s.t, err, "can't fail reading stream") + return err + } + pong := &pb.MultiPingReply{ + Response: []*pb.MultiPingResponse{ + { + Value: ping.Value, + Counter: counter, + Server: s.server, + }, + }, + } + if err := stream.Send(pong); err != nil { + require.NoError(s.t, err, "can't fail sending back a pong") + } + counter += 1 + } + stream.SetTrailer(metadata.Pairs(serverTrailerMdKey, "I like ending turtles.")) + return nil +} + +type assertingBackend struct { + addr string + i int +} + +func (b *assertingBackend) String() string { + return fmt.Sprintf("backend%d", b.i) +} + +func (b *assertingBackend) GetConnection(ctx context.Context) (context.Context, *grpc.ClientConn, error) { + md, _ := metadata.FromIncomingContext(ctx) + // Explicitly copy the metadata, otherwise the tests will fail. + outCtx := metadata.NewOutgoingContext(ctx, md.Copy()) + + conn, err := grpc.DialContext(ctx, b.addr, grpc.WithInsecure(), grpc.WithCodec(proxy.Codec())) // nolint: staticcheck + + return outCtx, conn, err +} + +func (b *assertingBackend) AppendInfo(resp []byte) ([]byte, error) { + // decode protobuf embedded header + typ, n1 := proto.DecodeVarint(resp) + _, n2 := proto.DecodeVarint(resp[n1:]) // length + + if typ != (1<<3)|2 { // type: 2, field_number: 1 + return nil, fmt.Errorf("unexpected message format: %d", typ) + } + + payload, err := proto.Marshal(&pb.ResponseMetadataPrepender{ + Metadata: &pb.ResponseMetadata{ + Hostname: fmt.Sprintf("server%d", b.i), + }, + }) + + // cut off embedded message header + resp = resp[n1+n2:] + // build new embedded message header + prefix := append(proto.EncodeVarint((1<<3)|2), proto.EncodeVarint(uint64(len(resp)+len(payload)))...) + resp = append(prefix, resp...) + + return append(resp, payload...), err +} + +func (b *assertingBackend) BuildError(err error) ([]byte, error) { + return proto.Marshal(&pb.ResponseMetadataPrepender{ + Metadata: &pb.ResponseMetadata{ + Hostname: fmt.Sprintf("server%d", b.i), + UpstreamError: err.Error(), + }, + }) +} + +type MultiServiceSuite struct { + suite.Suite + + serverListeners []net.Listener + servers []*grpc.Server + proxyListener net.Listener + proxy *grpc.Server + serverClientConn *grpc.ClientConn + + client *grpc.ClientConn + testClient pb.MultiServiceClient + + ctx context.Context + ctxCancel context.CancelFunc +} + +func (s *MultiServiceSuite) TestPingEmptyCarriesClientMetadata() { + ctx := metadata.NewOutgoingContext(s.ctx, metadata.Pairs(clientMdKey, "true")) + out, err := s.testClient.PingEmpty(ctx, &pb.Empty{}) + require.NoError(s.T(), err, "PingEmpty should succeed without errors") + + expectedUpstreams := map[string]struct{}{} + for i := 0; i < numUpstreams; i++ { + expectedUpstreams[fmt.Sprintf("server%d", i)] = struct{}{} + } + + s.Require().Len(out.Response, numUpstreams) + for _, resp := range out.Response { + s.Require().Equal(pingDefaultValue, resp.Value) + s.Require().EqualValues(42, resp.Counter) + + // equal metadata set by proxy and server + s.Require().Equal(resp.Metadata.Hostname, resp.Server) + + delete(expectedUpstreams, resp.Metadata.Hostname) + } + + s.Require().Empty(expectedUpstreams) +} + +func (s *MultiServiceSuite) SetupTest() { + s.ctx, s.ctxCancel = context.WithTimeout(context.TODO(), 120*time.Second) +} + +func (s *MultiServiceSuite) TearDownTest() { + s.ctxCancel() +} + +func (s *MultiServiceSuite) SetupSuite() { + var err error + + s.proxyListener, err = net.Listen("tcp", "127.0.0.1:0") + require.NoError(s.T(), err, "must be able to allocate a port for proxyListener") + + s.serverListeners = make([]net.Listener, numUpstreams) + + for i := range s.serverListeners { + s.serverListeners[i], err = net.Listen("tcp", "127.0.0.1:0") + require.NoError(s.T(), err, "must be able to allocate a port for serverListener") + } + + s.servers = make([]*grpc.Server, numUpstreams) + + for i := range s.servers { + s.servers[i] = grpc.NewServer() + pb.RegisterMultiServiceServer(s.servers[i], + &assertingMultiService{ + t: s.T(), + server: fmt.Sprintf("server%d", i), + }) + } + + backends := make([]*assertingBackend, numUpstreams) + + for i := range backends { + backends[i] = &assertingBackend{ + i: i, + addr: s.serverListeners[i].Addr().String(), + } + } + + // Setup of the proxy's Director. + director := func(ctx context.Context, fullName string) ([]proxy.Backend, error) { + var targets []int + + md, ok := metadata.FromIncomingContext(ctx) + if ok { + if _, exists := md[rejectingMdKey]; exists { + return nil, status.Errorf(codes.PermissionDenied, "testing rejection") + } + + if mdTargets, exists := md["targets"]; exists { + for _, strTarget := range mdTargets { + t, err := strconv.Atoi(strTarget) + if err != nil { + return nil, err + } + + targets = append(targets, t) + } + } + } + + var result []proxy.Backend + + if targets == nil { + for i := range backends { + targets = append(targets, i) + } + } + + for _, t := range targets { + result = append(result, backends[t]) + } + + return result, nil + } + + s.proxy = grpc.NewServer( + grpc.CustomCodec(proxy.Codec()), + grpc.UnknownServiceHandler(proxy.TransparentHandler(director)), + ) + // Ping handler is handled as an explicit registration and not as a TransparentHandler. + proxy.RegisterService(s.proxy, director, + "smira.testproto.MultiService", + "Ping") + + // Start the serving loops. + for i := range s.servers { + s.T().Logf("starting grpc.Server at: %v", s.serverListeners[i].Addr().String()) + go func(i int) { + s.servers[i].Serve(s.serverListeners[i]) // nolint: errcheck + }(i) + } + s.T().Logf("starting grpc.Proxy at: %v", s.proxyListener.Addr().String()) + go func() { + s.proxy.Serve(s.proxyListener) // nolint: errcheck + }() + + ctx, ctxCancel := context.WithTimeout(context.Background(), 1*time.Second) + defer ctxCancel() + clientConn, err := grpc.DialContext(ctx, strings.Replace(s.proxyListener.Addr().String(), "127.0.0.1", "localhost", 1), grpc.WithInsecure()) + require.NoError(s.T(), err, "must not error on deferred client Dial") + s.testClient = pb.NewMultiServiceClient(clientConn) +} + +func (s *MultiServiceSuite) TearDownSuite() { + if s.client != nil { + s.client.Close() + } + if s.serverClientConn != nil { + s.serverClientConn.Close() + } + // Close all transports so the logs don't get spammy. + time.Sleep(10 * time.Millisecond) + + if s.proxy != nil { + s.proxy.Stop() + s.proxyListener.Close() + } + + for _, server := range s.servers { + if server != nil { + server.Stop() + } + } + + for _, serverListener := range s.serverListeners { + if serverListener != nil { + serverListener.Close() + } + } +} +func TestMultiServiceSuite(t *testing.T) { + suite.Run(t, &MultiServiceSuite{}) +} + +func init() { + grpclog.SetLogger(log.New(os.Stderr, "grpc: ", log.LstdFlags)) // nolint: staticcheck +} diff --git a/proxy/handler_test.go b/proxy/handler_test.go index d8f844a..4c06dd8 100644 --- a/proxy/handler_test.go +++ b/proxy/handler_test.go @@ -6,9 +6,7 @@ package proxy_test import ( "context" "io" - "log" "net" - "os" "strings" "testing" "time" @@ -18,7 +16,6 @@ import ( "github.com/stretchr/testify/suite" "google.golang.org/grpc" "google.golang.org/grpc/codes" - "google.golang.org/grpc/grpclog" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" @@ -201,8 +198,6 @@ func (s *ProxyHappySuite) SetupSuite() { s.serverListener, err = net.Listen("tcp", "127.0.0.1:0") require.NoError(s.T(), err, "must be able to allocate a port for serverListener") - grpclog.SetLogger(log.New(os.Stderr, "grpc: ", log.LstdFlags)) // nolint: staticcheck - s.server = grpc.NewServer() pb.RegisterTestServiceServer(s.server, &assertingService{t: s.T()}) diff --git a/proxy/serverstream.go b/proxy/serverstream.go new file mode 100644 index 0000000..84dc88a --- /dev/null +++ b/proxy/serverstream.go @@ -0,0 +1,81 @@ +// Copyright 2019 Andrey Smirnov. All Rights Reserved. +// See LICENSE for licensing terms. + +package proxy + +import ( + "sync" + + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" +) + +// ServerStreamWrapper wraps grpc.ServerStream and adds locking to send path +type ServerStreamWrapper struct { + grpc.ServerStream + + sendMu sync.Mutex +} + +// SetHeader sets the header metadata. It may be called multiple times. +// When call multiple times, all the provided metadata will be merged. +// All the metadata will be sent out when one of the following happens: +// - ServerStream.SendHeader() is called; +// - The first response is sent out; +// - An RPC status is sent out (error or success). +func (wrapper *ServerStreamWrapper) SetHeader(md metadata.MD) error { + wrapper.sendMu.Lock() + defer wrapper.sendMu.Unlock() + + err := wrapper.ServerStream.SetHeader(md) + if err != nil && err.Error() == "transport: the stream is done or WriteHeader was already called" { + // hack: swallow grpc.internal.transport.ErrIllegalHeaderWrite + err = nil + } + return err +} + +// SendHeader sends the header metadata. +// The provided md and headers set by SetHeader() will be sent. +// It fails if called multiple times. +func (wrapper *ServerStreamWrapper) SendHeader(md metadata.MD) error { + wrapper.sendMu.Lock() + defer wrapper.sendMu.Unlock() + + err := wrapper.ServerStream.SendHeader(md) + if err.Error() == "transport: the stream is done or WriteHeader was already called" { + // hack: swallow grpc.internal.transport.ErrIllegalHeaderWrite + err = nil + } + return err +} + +// SetTrailer sets the trailer metadata which will be sent with the RPC status. +// When called more than once, all the provided metadata will be merged. +func (wrapper *ServerStreamWrapper) SetTrailer(md metadata.MD) { + wrapper.sendMu.Lock() + defer wrapper.sendMu.Unlock() + + wrapper.ServerStream.SetTrailer(md) +} + +// SendMsg sends a message. On error, SendMsg aborts the stream and the +// error is returned directly. +// +// SendMsg blocks until: +// - There is sufficient flow control to schedule m with the transport, or +// - The stream is done, or +// - The stream breaks. +// +// SendMsg does not wait until the message is received by the client. An +// untimely stream closure may result in lost messages. +// +// It is safe to have a goroutine calling SendMsg and another goroutine +// calling RecvMsg on the same stream at the same time, but it is not safe +// to call SendMsg on the same stream in different goroutines. +func (wrapper *ServerStreamWrapper) SendMsg(m interface{}) error { + wrapper.sendMu.Lock() + defer wrapper.sendMu.Unlock() + + return wrapper.ServerStream.SendMsg(m) +} diff --git a/testservice/test.pb.go b/testservice/test.pb.go index 0b144ff..34ffba3 100644 --- a/testservice/test.pb.go +++ b/testservice/test.pb.go @@ -141,31 +141,234 @@ func (m *PingResponse) GetCounter() int32 { return 0 } +type ResponseMetadata struct { + Hostname string `protobuf:"bytes,99,opt,name=hostname,proto3" json:"hostname,omitempty"` + UpstreamError string `protobuf:"bytes,100,opt,name=upstream_error,json=upstreamError,proto3" json:"upstream_error,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *ResponseMetadata) Reset() { *m = ResponseMetadata{} } +func (m *ResponseMetadata) String() string { return proto.CompactTextString(m) } +func (*ResponseMetadata) ProtoMessage() {} +func (*ResponseMetadata) Descriptor() ([]byte, []int) { + return fileDescriptor_c161fcfdc0c3ff1e, []int{3} +} + +func (m *ResponseMetadata) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_ResponseMetadata.Unmarshal(m, b) +} +func (m *ResponseMetadata) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_ResponseMetadata.Marshal(b, m, deterministic) +} +func (m *ResponseMetadata) XXX_Merge(src proto.Message) { + xxx_messageInfo_ResponseMetadata.Merge(m, src) +} +func (m *ResponseMetadata) XXX_Size() int { + return xxx_messageInfo_ResponseMetadata.Size(m) +} +func (m *ResponseMetadata) XXX_DiscardUnknown() { + xxx_messageInfo_ResponseMetadata.DiscardUnknown(m) +} + +var xxx_messageInfo_ResponseMetadata proto.InternalMessageInfo + +func (m *ResponseMetadata) GetHostname() string { + if m != nil { + return m.Hostname + } + return "" +} + +func (m *ResponseMetadata) GetUpstreamError() string { + if m != nil { + return m.UpstreamError + } + return "" +} + +type ResponseMetadataPrepender struct { + Metadata *ResponseMetadata `protobuf:"bytes,99,opt,name=metadata,proto3" json:"metadata,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *ResponseMetadataPrepender) Reset() { *m = ResponseMetadataPrepender{} } +func (m *ResponseMetadataPrepender) String() string { return proto.CompactTextString(m) } +func (*ResponseMetadataPrepender) ProtoMessage() {} +func (*ResponseMetadataPrepender) Descriptor() ([]byte, []int) { + return fileDescriptor_c161fcfdc0c3ff1e, []int{4} +} + +func (m *ResponseMetadataPrepender) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_ResponseMetadataPrepender.Unmarshal(m, b) +} +func (m *ResponseMetadataPrepender) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_ResponseMetadataPrepender.Marshal(b, m, deterministic) +} +func (m *ResponseMetadataPrepender) XXX_Merge(src proto.Message) { + xxx_messageInfo_ResponseMetadataPrepender.Merge(m, src) +} +func (m *ResponseMetadataPrepender) XXX_Size() int { + return xxx_messageInfo_ResponseMetadataPrepender.Size(m) +} +func (m *ResponseMetadataPrepender) XXX_DiscardUnknown() { + xxx_messageInfo_ResponseMetadataPrepender.DiscardUnknown(m) +} + +var xxx_messageInfo_ResponseMetadataPrepender proto.InternalMessageInfo + +func (m *ResponseMetadataPrepender) GetMetadata() *ResponseMetadata { + if m != nil { + return m.Metadata + } + return nil +} + +type MultiPingResponse struct { + Metadata *ResponseMetadata `protobuf:"bytes,99,opt,name=metadata,proto3" json:"metadata,omitempty"` + Value string `protobuf:"bytes,1,opt,name=Value,proto3" json:"Value,omitempty"` + Counter int32 `protobuf:"varint,2,opt,name=counter,proto3" json:"counter,omitempty"` + Server string `protobuf:"bytes,3,opt,name=server,proto3" json:"server,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *MultiPingResponse) Reset() { *m = MultiPingResponse{} } +func (m *MultiPingResponse) String() string { return proto.CompactTextString(m) } +func (*MultiPingResponse) ProtoMessage() {} +func (*MultiPingResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_c161fcfdc0c3ff1e, []int{5} +} + +func (m *MultiPingResponse) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_MultiPingResponse.Unmarshal(m, b) +} +func (m *MultiPingResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_MultiPingResponse.Marshal(b, m, deterministic) +} +func (m *MultiPingResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_MultiPingResponse.Merge(m, src) +} +func (m *MultiPingResponse) XXX_Size() int { + return xxx_messageInfo_MultiPingResponse.Size(m) +} +func (m *MultiPingResponse) XXX_DiscardUnknown() { + xxx_messageInfo_MultiPingResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_MultiPingResponse proto.InternalMessageInfo + +func (m *MultiPingResponse) GetMetadata() *ResponseMetadata { + if m != nil { + return m.Metadata + } + return nil +} + +func (m *MultiPingResponse) GetValue() string { + if m != nil { + return m.Value + } + return "" +} + +func (m *MultiPingResponse) GetCounter() int32 { + if m != nil { + return m.Counter + } + return 0 +} + +func (m *MultiPingResponse) GetServer() string { + if m != nil { + return m.Server + } + return "" +} + +type MultiPingReply struct { + Response []*MultiPingResponse `protobuf:"bytes,1,rep,name=response,proto3" json:"response,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *MultiPingReply) Reset() { *m = MultiPingReply{} } +func (m *MultiPingReply) String() string { return proto.CompactTextString(m) } +func (*MultiPingReply) ProtoMessage() {} +func (*MultiPingReply) Descriptor() ([]byte, []int) { + return fileDescriptor_c161fcfdc0c3ff1e, []int{6} +} + +func (m *MultiPingReply) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_MultiPingReply.Unmarshal(m, b) +} +func (m *MultiPingReply) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_MultiPingReply.Marshal(b, m, deterministic) +} +func (m *MultiPingReply) XXX_Merge(src proto.Message) { + xxx_messageInfo_MultiPingReply.Merge(m, src) +} +func (m *MultiPingReply) XXX_Size() int { + return xxx_messageInfo_MultiPingReply.Size(m) +} +func (m *MultiPingReply) XXX_DiscardUnknown() { + xxx_messageInfo_MultiPingReply.DiscardUnknown(m) +} + +var xxx_messageInfo_MultiPingReply proto.InternalMessageInfo + +func (m *MultiPingReply) GetResponse() []*MultiPingResponse { + if m != nil { + return m.Response + } + return nil +} + func init() { proto.RegisterType((*Empty)(nil), "smira.testproto.Empty") proto.RegisterType((*PingRequest)(nil), "smira.testproto.PingRequest") proto.RegisterType((*PingResponse)(nil), "smira.testproto.PingResponse") + proto.RegisterType((*ResponseMetadata)(nil), "smira.testproto.ResponseMetadata") + proto.RegisterType((*ResponseMetadataPrepender)(nil), "smira.testproto.ResponseMetadataPrepender") + proto.RegisterType((*MultiPingResponse)(nil), "smira.testproto.MultiPingResponse") + proto.RegisterType((*MultiPingReply)(nil), "smira.testproto.MultiPingReply") } func init() { proto.RegisterFile("test.proto", fileDescriptor_c161fcfdc0c3ff1e) } var fileDescriptor_c161fcfdc0c3ff1e = []byte{ - // 232 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe2, 0xe2, 0x2a, 0x49, 0x2d, 0x2e, - 0xd1, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x2f, 0xce, 0xcd, 0x2c, 0x4a, 0xd4, 0x03, 0x89, - 0x80, 0x05, 0x94, 0xd8, 0xb9, 0x58, 0x5d, 0x73, 0x0b, 0x4a, 0x2a, 0x95, 0x94, 0xb9, 0xb8, 0x03, - 0x32, 0xf3, 0xd2, 0x83, 0x52, 0x0b, 0x4b, 0x81, 0x92, 0x42, 0x22, 0x5c, 0xac, 0x65, 0x89, 0x39, - 0xa5, 0xa9, 0x12, 0x8c, 0x0a, 0x8c, 0x1a, 0x9c, 0x41, 0x10, 0x8e, 0x92, 0x1d, 0x17, 0x0f, 0x44, - 0x51, 0x71, 0x41, 0x7e, 0x5e, 0x71, 0x2a, 0x48, 0x55, 0x18, 0xb2, 0x2a, 0x30, 0x47, 0x48, 0x82, - 0x8b, 0x3d, 0x39, 0xbf, 0x34, 0xaf, 0x24, 0xb5, 0x48, 0x82, 0x09, 0x28, 0xce, 0x1a, 0x04, 0xe3, - 0x1a, 0xfd, 0x65, 0xe2, 0xe2, 0x0e, 0x01, 0x1a, 0x1f, 0x9c, 0x5a, 0x54, 0x96, 0x99, 0x9c, 0x2a, - 0xe4, 0xc2, 0xc5, 0x09, 0x32, 0x0f, 0xec, 0x02, 0x21, 0x31, 0x3d, 0x34, 0xc7, 0xe9, 0x81, 0xc5, - 0xa5, 0x64, 0x31, 0xc4, 0x91, 0xdd, 0xa0, 0xc4, 0x20, 0xe4, 0xca, 0xc5, 0x02, 0x12, 0x11, 0x92, - 0xc1, 0xa1, 0x10, 0xec, 0x23, 0xc2, 0xc6, 0x38, 0x43, 0x1d, 0x53, 0x54, 0x94, 0x5f, 0x44, 0xc0, - 0x2c, 0x1c, 0x4e, 0x05, 0x1a, 0xe2, 0xcd, 0xc5, 0x01, 0x52, 0xe8, 0x93, 0x09, 0x0c, 0x43, 0xca, - 0xdc, 0x63, 0xc0, 0x28, 0xe4, 0xcf, 0xc5, 0x05, 0x12, 0x0b, 0x2e, 0x29, 0x4a, 0x4d, 0xcc, 0xa5, - 0xd0, 0x38, 0x0d, 0x46, 0x03, 0xc6, 0x24, 0x36, 0xb0, 0x8c, 0x31, 0x20, 0x00, 0x00, 0xff, 0xff, - 0x0a, 0x9f, 0x1b, 0xb7, 0x13, 0x02, 0x00, 0x00, + // 416 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xac, 0x92, 0xcf, 0x4e, 0xea, 0x40, + 0x14, 0xc6, 0x6f, 0xe1, 0x02, 0xe5, 0xc0, 0xe5, 0xde, 0x3b, 0x31, 0xa4, 0x12, 0x8d, 0x3a, 0xc6, + 0x84, 0x55, 0x43, 0x70, 0x2d, 0x1b, 0x45, 0x17, 0x4a, 0xc4, 0xa2, 0x2e, 0xdc, 0x98, 0x0a, 0x13, + 0x6d, 0x42, 0xff, 0x38, 0x33, 0x25, 0xe1, 0x05, 0x7c, 0x08, 0xdf, 0xd5, 0xc4, 0x99, 0xa1, 0x25, + 0x48, 0x83, 0x2d, 0xc1, 0xe5, 0xf9, 0xe6, 0x3b, 0xbf, 0x39, 0x33, 0xdf, 0x01, 0xe0, 0x84, 0x71, + 0x33, 0xa0, 0x3e, 0xf7, 0xd1, 0x5f, 0xe6, 0x3a, 0xd4, 0x36, 0xa5, 0xa2, 0x04, 0x5c, 0x82, 0x42, + 0xd7, 0x0d, 0xf8, 0x14, 0x1f, 0x42, 0xa5, 0xef, 0x78, 0xcf, 0x16, 0x79, 0x0d, 0xc5, 0x21, 0xda, + 0x82, 0xc2, 0xc4, 0x1e, 0x87, 0xc4, 0xd0, 0xf6, 0xb5, 0x66, 0xd9, 0x9a, 0x15, 0xb8, 0x03, 0xd5, + 0x99, 0x89, 0x05, 0xbe, 0xc7, 0x88, 0x74, 0xdd, 0x2f, 0xba, 0x54, 0x81, 0x0c, 0x28, 0x0d, 0xfd, + 0xd0, 0xe3, 0x84, 0x1a, 0x39, 0xa1, 0x17, 0xac, 0xb8, 0xc4, 0x77, 0xf0, 0x2f, 0xee, 0xed, 0x11, + 0x6e, 0x8f, 0x6c, 0x6e, 0xa3, 0x06, 0xe8, 0x2f, 0x3e, 0xe3, 0x9e, 0xed, 0x12, 0x63, 0xa8, 0x30, + 0xf3, 0x1a, 0x1d, 0x41, 0x2d, 0x0c, 0x18, 0xa7, 0xc4, 0x76, 0x1f, 0x09, 0xa5, 0x3e, 0x35, 0x46, + 0xca, 0xf1, 0x27, 0x56, 0xbb, 0x52, 0xc4, 0x0f, 0xb0, 0xbd, 0x8c, 0xed, 0x53, 0x12, 0x10, 0x6f, + 0x44, 0x28, 0x3a, 0x01, 0xdd, 0x8d, 0x44, 0xc5, 0xaf, 0xb4, 0x0f, 0xcc, 0xa5, 0x5f, 0x30, 0x97, + 0xbb, 0xad, 0x79, 0x0b, 0x7e, 0xd7, 0xe0, 0x7f, 0x2f, 0x1c, 0x73, 0xe7, 0xcb, 0xc3, 0x37, 0x83, + 0xae, 0xfb, 0x6f, 0xa8, 0x0e, 0x45, 0x46, 0xe8, 0x44, 0x1c, 0xe4, 0x55, 0x43, 0x54, 0xe1, 0x3e, + 0xd4, 0x16, 0x66, 0x0b, 0xc6, 0x53, 0xd4, 0x01, 0x9d, 0x46, 0xf7, 0x0a, 0x78, 0x5e, 0x0c, 0x86, + 0x13, 0x83, 0x25, 0x9e, 0x63, 0xcd, 0x7b, 0xda, 0x1f, 0x39, 0xa8, 0xdc, 0x0a, 0xe7, 0x40, 0x5c, + 0xe0, 0x0c, 0x09, 0x3a, 0x83, 0xb2, 0x74, 0xaa, 0x1d, 0x41, 0xf5, 0x04, 0x4a, 0xe9, 0x8d, 0xdd, + 0x84, 0xbe, 0x48, 0xc7, 0xbf, 0x50, 0x17, 0x7e, 0x4b, 0x05, 0xed, 0xac, 0x30, 0xaa, 0x9d, 0x4b, + 0xc7, 0x9c, 0x46, 0xc3, 0xc8, 0xd0, 0x53, 0x58, 0x2b, 0x46, 0x15, 0x90, 0x4b, 0xd0, 0xa5, 0xf1, + 0xca, 0x11, 0x5b, 0xbe, 0xd9, 0x3c, 0x2d, 0x0d, 0x5d, 0x03, 0x48, 0x6d, 0xa0, 0x96, 0x71, 0x43, + 0x5c, 0x53, 0x6b, 0x69, 0xed, 0xb7, 0x3c, 0x54, 0x55, 0x3e, 0x71, 0x00, 0xe7, 0x59, 0x02, 0xd8, + 0xfb, 0x2e, 0x63, 0xb1, 0x16, 0xe2, 0xd9, 0x17, 0x99, 0x22, 0xc8, 0x00, 0xfa, 0x91, 0x10, 0x7a, + 0x99, 0x43, 0x48, 0x9f, 0x48, 0xc4, 0x70, 0xb3, 0x46, 0x0c, 0xe9, 0x40, 0x19, 0xc4, 0x53, 0x51, + 0x9d, 0x1d, 0x7f, 0x06, 0x00, 0x00, 0xff, 0xff, 0x6a, 0xdd, 0xde, 0x4d, 0x3e, 0x05, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. @@ -451,3 +654,279 @@ var _TestService_serviceDesc = grpc.ServiceDesc{ }, Metadata: "test.proto", } + +// MultiServiceClient is the client API for MultiService service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. +type MultiServiceClient interface { + PingEmpty(ctx context.Context, in *Empty, opts ...grpc.CallOption) (*MultiPingReply, error) + Ping(ctx context.Context, in *PingRequest, opts ...grpc.CallOption) (*MultiPingReply, error) + PingError(ctx context.Context, in *PingRequest, opts ...grpc.CallOption) (*Empty, error) + PingList(ctx context.Context, in *PingRequest, opts ...grpc.CallOption) (MultiService_PingListClient, error) + PingStream(ctx context.Context, opts ...grpc.CallOption) (MultiService_PingStreamClient, error) +} + +type multiServiceClient struct { + cc *grpc.ClientConn +} + +func NewMultiServiceClient(cc *grpc.ClientConn) MultiServiceClient { + return &multiServiceClient{cc} +} + +func (c *multiServiceClient) PingEmpty(ctx context.Context, in *Empty, opts ...grpc.CallOption) (*MultiPingReply, error) { + out := new(MultiPingReply) + err := c.cc.Invoke(ctx, "/smira.testproto.MultiService/PingEmpty", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *multiServiceClient) Ping(ctx context.Context, in *PingRequest, opts ...grpc.CallOption) (*MultiPingReply, error) { + out := new(MultiPingReply) + err := c.cc.Invoke(ctx, "/smira.testproto.MultiService/Ping", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *multiServiceClient) PingError(ctx context.Context, in *PingRequest, opts ...grpc.CallOption) (*Empty, error) { + out := new(Empty) + err := c.cc.Invoke(ctx, "/smira.testproto.MultiService/PingError", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *multiServiceClient) PingList(ctx context.Context, in *PingRequest, opts ...grpc.CallOption) (MultiService_PingListClient, error) { + stream, err := c.cc.NewStream(ctx, &_MultiService_serviceDesc.Streams[0], "/smira.testproto.MultiService/PingList", opts...) + if err != nil { + return nil, err + } + x := &multiServicePingListClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type MultiService_PingListClient interface { + Recv() (*MultiPingReply, error) + grpc.ClientStream +} + +type multiServicePingListClient struct { + grpc.ClientStream +} + +func (x *multiServicePingListClient) Recv() (*MultiPingReply, error) { + m := new(MultiPingReply) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func (c *multiServiceClient) PingStream(ctx context.Context, opts ...grpc.CallOption) (MultiService_PingStreamClient, error) { + stream, err := c.cc.NewStream(ctx, &_MultiService_serviceDesc.Streams[1], "/smira.testproto.MultiService/PingStream", opts...) + if err != nil { + return nil, err + } + x := &multiServicePingStreamClient{stream} + return x, nil +} + +type MultiService_PingStreamClient interface { + Send(*PingRequest) error + Recv() (*MultiPingReply, error) + grpc.ClientStream +} + +type multiServicePingStreamClient struct { + grpc.ClientStream +} + +func (x *multiServicePingStreamClient) Send(m *PingRequest) error { + return x.ClientStream.SendMsg(m) +} + +func (x *multiServicePingStreamClient) Recv() (*MultiPingReply, error) { + m := new(MultiPingReply) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// MultiServiceServer is the server API for MultiService service. +type MultiServiceServer interface { + PingEmpty(context.Context, *Empty) (*MultiPingReply, error) + Ping(context.Context, *PingRequest) (*MultiPingReply, error) + PingError(context.Context, *PingRequest) (*Empty, error) + PingList(*PingRequest, MultiService_PingListServer) error + PingStream(MultiService_PingStreamServer) error +} + +// UnimplementedMultiServiceServer can be embedded to have forward compatible implementations. +type UnimplementedMultiServiceServer struct { +} + +func (*UnimplementedMultiServiceServer) PingEmpty(ctx context.Context, req *Empty) (*MultiPingReply, error) { + return nil, status.Errorf(codes.Unimplemented, "method PingEmpty not implemented") +} +func (*UnimplementedMultiServiceServer) Ping(ctx context.Context, req *PingRequest) (*MultiPingReply, error) { + return nil, status.Errorf(codes.Unimplemented, "method Ping not implemented") +} +func (*UnimplementedMultiServiceServer) PingError(ctx context.Context, req *PingRequest) (*Empty, error) { + return nil, status.Errorf(codes.Unimplemented, "method PingError not implemented") +} +func (*UnimplementedMultiServiceServer) PingList(req *PingRequest, srv MultiService_PingListServer) error { + return status.Errorf(codes.Unimplemented, "method PingList not implemented") +} +func (*UnimplementedMultiServiceServer) PingStream(srv MultiService_PingStreamServer) error { + return status.Errorf(codes.Unimplemented, "method PingStream not implemented") +} + +func RegisterMultiServiceServer(s *grpc.Server, srv MultiServiceServer) { + s.RegisterService(&_MultiService_serviceDesc, srv) +} + +func _MultiService_PingEmpty_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(Empty) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(MultiServiceServer).PingEmpty(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/smira.testproto.MultiService/PingEmpty", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(MultiServiceServer).PingEmpty(ctx, req.(*Empty)) + } + return interceptor(ctx, in, info, handler) +} + +func _MultiService_Ping_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(PingRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(MultiServiceServer).Ping(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/smira.testproto.MultiService/Ping", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(MultiServiceServer).Ping(ctx, req.(*PingRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _MultiService_PingError_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(PingRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(MultiServiceServer).PingError(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/smira.testproto.MultiService/PingError", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(MultiServiceServer).PingError(ctx, req.(*PingRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _MultiService_PingList_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(PingRequest) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(MultiServiceServer).PingList(m, &multiServicePingListServer{stream}) +} + +type MultiService_PingListServer interface { + Send(*MultiPingReply) error + grpc.ServerStream +} + +type multiServicePingListServer struct { + grpc.ServerStream +} + +func (x *multiServicePingListServer) Send(m *MultiPingReply) error { + return x.ServerStream.SendMsg(m) +} + +func _MultiService_PingStream_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(MultiServiceServer).PingStream(&multiServicePingStreamServer{stream}) +} + +type MultiService_PingStreamServer interface { + Send(*MultiPingReply) error + Recv() (*PingRequest, error) + grpc.ServerStream +} + +type multiServicePingStreamServer struct { + grpc.ServerStream +} + +func (x *multiServicePingStreamServer) Send(m *MultiPingReply) error { + return x.ServerStream.SendMsg(m) +} + +func (x *multiServicePingStreamServer) Recv() (*PingRequest, error) { + m := new(PingRequest) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +var _MultiService_serviceDesc = grpc.ServiceDesc{ + ServiceName: "smira.testproto.MultiService", + HandlerType: (*MultiServiceServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "PingEmpty", + Handler: _MultiService_PingEmpty_Handler, + }, + { + MethodName: "Ping", + Handler: _MultiService_Ping_Handler, + }, + { + MethodName: "PingError", + Handler: _MultiService_PingError_Handler, + }, + }, + Streams: []grpc.StreamDesc{ + { + StreamName: "PingList", + Handler: _MultiService_PingList_Handler, + ServerStreams: true, + }, + { + StreamName: "PingStream", + Handler: _MultiService_PingStream_Handler, + ServerStreams: true, + ClientStreams: true, + }, + }, + Metadata: "test.proto", +} diff --git a/testservice/test.proto b/testservice/test.proto index b441246..89bf33c 100644 --- a/testservice/test.proto +++ b/testservice/test.proto @@ -27,3 +27,35 @@ service TestService { } +message ResponseMetadata { + string hostname = 99; + string upstream_error = 100; +} + +message ResponseMetadataPrepender { + ResponseMetadata metadata = 99; +} + +message MultiPingResponse { + ResponseMetadata metadata = 99; + string Value = 1; + int32 counter = 2; + string server = 3; +} + +message MultiPingReply { + repeated MultiPingResponse response = 1; +} + +service MultiService { + rpc PingEmpty(Empty) returns (MultiPingReply) {} + + rpc Ping(PingRequest) returns (MultiPingReply) {} + + rpc PingError(PingRequest) returns (Empty) {} + + rpc PingList(PingRequest) returns (stream MultiPingReply) {} + + rpc PingStream(stream PingRequest) returns (stream MultiPingReply) {} + +} \ No newline at end of file