Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
209 changes: 209 additions & 0 deletions client/grpc/codec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
package grpc

import (
b "bytes"
"encoding/json"
"fmt"
"strings"

"go-micro.dev/v5/codec"
"go-micro.dev/v5/codec/bytes"
"google.golang.org/grpc"
"google.golang.org/grpc/encoding"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/runtime/protoiface"
"google.golang.org/protobuf/runtime/protoimpl"
)

type jsonCodec struct{}
type protoCodec struct{}
type bytesCodec struct{}
type wrapCodec struct{ encoding.Codec }

var useNumber bool

var (
defaultGRPCCodecs = map[string]encoding.Codec{
"application/json": jsonCodec{},
"application/proto": protoCodec{},
"application/protobuf": protoCodec{},
"application/octet-stream": protoCodec{},
"application/grpc": protoCodec{},
"application/grpc+json": jsonCodec{},
"application/grpc+proto": protoCodec{},
"application/grpc+bytes": bytesCodec{},
}
)

// UseNumber fix unmarshal Number(8234567890123456789) to interface(8.234567890123457e+18).
func UseNumber() {
useNumber = true
}

func (w wrapCodec) String() string {
return w.Codec.Name()
}

func (w wrapCodec) Marshal(v interface{}) ([]byte, error) {
b, ok := v.(*bytes.Frame)
if ok {
return b.Data, nil
}
return w.Codec.Marshal(v)
}

func (w wrapCodec) Unmarshal(data []byte, v interface{}) error {
b, ok := v.(*bytes.Frame)
if ok {
b.Data = data
return nil
}
return w.Codec.Unmarshal(data, v)
}

func (protoCodec) Marshal(v interface{}) ([]byte, error) {
switch m := v.(type) {
case *bytes.Frame:
return m.Data, nil
case proto.Message:
return proto.Marshal(m)
case protoiface.MessageV1:
// #2333 compatible with etcd legacy proto.Message
m2 := protoimpl.X.ProtoMessageV2Of(m)
return proto.Marshal(m2)
}
return nil, fmt.Errorf("failed to marshal: %v is not type of *bytes.Frame or proto.Message", v)
}

func (protoCodec) Unmarshal(data []byte, v interface{}) error {
switch m := v.(type) {
case proto.Message:
return proto.Unmarshal(data, m)
case protoiface.MessageV1:
// #2333 compatible with etcd legacy proto.Message
m2 := protoimpl.X.ProtoMessageV2Of(m)
return proto.Unmarshal(data, m2)
}
return fmt.Errorf("failed to unmarshal: %v is not type of proto.Message", v)
}

func (protoCodec) Name() string {
return "proto"
}

func (bytesCodec) Marshal(v interface{}) ([]byte, error) {
b, ok := v.(*[]byte)
if !ok {
return nil, fmt.Errorf("failed to marshal: %v is not type of *[]byte", v)
}
return *b, nil
}

func (bytesCodec) Unmarshal(data []byte, v interface{}) error {
b, ok := v.(*[]byte)
if !ok {
return fmt.Errorf("failed to unmarshal: %v is not type of *[]byte", v)
}
*b = data
return nil
}

func (bytesCodec) Name() string {
return "bytes"
}

func (jsonCodec) Marshal(v interface{}) ([]byte, error) {
if b, ok := v.(*bytes.Frame); ok {
return b.Data, nil
}

if pb, ok := v.(proto.Message); ok {
bytes, err := protojson.Marshal(pb)
if err != nil {
return nil, err
}
return bytes, nil
}

return json.Marshal(v)
}

func (jsonCodec) Unmarshal(data []byte, v interface{}) error {
if len(data) == 0 {
return nil
}
if b, ok := v.(*bytes.Frame); ok {
b.Data = data
return nil
}
if pb, ok := v.(proto.Message); ok {
return protojson.Unmarshal(data, pb)
}

dec := json.NewDecoder(b.NewReader(data))
if useNumber {
dec.UseNumber()
}
return dec.Decode(v)
}

func (jsonCodec) Name() string {
return "json"
}

type grpcCodec struct {
// headers
id string
target string
method string
endpoint string

s grpc.ClientStream
c encoding.Codec
}

func (g *grpcCodec) ReadHeader(m *codec.Message, mt codec.MessageType) error {
md, err := g.s.Header()
if err != nil {
return err
}
if m == nil {
m = new(codec.Message)
}
if m.Header == nil {
m.Header = make(map[string]string, len(md))
}
for k, v := range md {
m.Header[k] = strings.Join(v, ",")
}
m.Id = g.id
m.Target = g.target
m.Method = g.method
m.Endpoint = g.endpoint
return nil
}

func (g *grpcCodec) ReadBody(v interface{}) error {
if f, ok := v.(*bytes.Frame); ok {
return g.s.RecvMsg(f)
}
return g.s.RecvMsg(v)
}

func (g *grpcCodec) Write(m *codec.Message, v interface{}) error {
// if we don't have a body
if v != nil {
return g.s.SendMsg(v)
}
// write the body using the framing codec
return g.s.SendMsg(&bytes.Frame{Data: m.Body})
}

func (g *grpcCodec) Close() error {
return g.s.CloseSend()
}

func (g *grpcCodec) String() string {
return g.c.Name()
}
69 changes: 69 additions & 0 deletions client/grpc/error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package grpc

import (
"net/http"

"go-micro.dev/v5/errors"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

func microError(err error) error {
// no error
switch err {
case nil:
return nil
}

if verr, ok := err.(*errors.Error); ok {
return verr
}

// grpc error
s, ok := status.FromError(err)
if !ok {
return err
}

// return first error from details
if details := s.Details(); len(details) > 0 {
return microError(details[0].(error))
}

// try to decode micro *errors.Error
if e := errors.Parse(s.Message()); e.Code > 0 {
return e // actually a micro error
}

// fallback
return errors.New("go.micro.client", s.Message(), microStatusFromGrpcCode(s.Code()))
}

func microStatusFromGrpcCode(code codes.Code) int32 {
switch code {
case codes.OK:
return http.StatusOK
case codes.InvalidArgument:
return http.StatusBadRequest
case codes.DeadlineExceeded:
return http.StatusRequestTimeout
case codes.NotFound:
return http.StatusNotFound
case codes.AlreadyExists:
return http.StatusConflict
case codes.PermissionDenied:
return http.StatusForbidden
case codes.Unauthenticated:
return http.StatusUnauthorized
case codes.FailedPrecondition:
return http.StatusPreconditionFailed
case codes.Unimplemented:
return http.StatusNotImplemented
case codes.Internal:
return http.StatusInternalServerError
case codes.Unavailable:
return http.StatusServiceUnavailable
}

return http.StatusInternalServerError
}
35 changes: 35 additions & 0 deletions client/grpc/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
module go-micro.dev/v5/client/grpc

go 1.24

require (
go-micro.dev/v5 v5.5.1-0.20250507183911-01b8394c8119
google.golang.org/grpc v1.53.0
google.golang.org/grpc/examples v0.0.0-20211102180624-670c133e568e
google.golang.org/protobuf v1.33.0
)

require (
github.com/bitly/go-simplejson v0.5.0 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/imdario/mergo v0.3.13 // indirect
github.com/miekg/dns v1.1.50 // indirect
github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c // indirect
github.com/patrickmn/go-cache v2.1.0+incompatible // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/rogpeppe/go-internal v1.12.0 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/urfave/cli/v2 v2.25.7 // indirect
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect
go.etcd.io/bbolt v1.4.0 // indirect
golang.org/x/mod v0.9.0 // indirect
golang.org/x/net v0.23.0 // indirect
golang.org/x/sync v0.10.0 // indirect
golang.org/x/sys v0.29.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/tools v0.7.0 // indirect
google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4 // indirect
)
Loading
Loading