Skip to content

Commit

Permalink
grpc-register&openServer code style
Browse files Browse the repository at this point in the history
  • Loading branch information
ztelur committed Mar 27, 2021
1 parent d47777f commit 98a8afa
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 44 deletions.
5 changes: 5 additions & 0 deletions config/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ func GetProviderService(name string) common.RPCService {
return proServices[name]
}

// GetAllProviderService gets all ProviderService
func GetAllProviderService() map[string]common.RPCService {
return proServices
}

// GetCallback gets CallbackResponse by @name
func GetCallback(name string) func(response common.CallbackResponse) {
service := GetConsumerService(name)
Expand Down
32 changes: 15 additions & 17 deletions protocol/grpc/grpc_protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,24 +68,22 @@ func (gp *GrpcProtocol) Export(invoker protocol.Invoker) protocol.Exporter {
}

func (gp *GrpcProtocol) openServer(url *common.URL) {
_, ok := gp.serverMap[url.Location]
if !ok {
_, ok := gp.ExporterMap().Load(url.ServiceKey())
if !ok {
panic("[GrpcProtocol]" + url.Key() + "is not existing")
}

gp.serverLock.Lock()
_, ok = gp.serverMap[url.Location]
if !ok {
grpcMessageSize, _ := strconv.Atoi(url.GetParam(constant.MESSAGE_SIZE_KEY, "4"))
srv := NewServer()
srv.SetBufferSize(grpcMessageSize)
gp.serverMap[url.Location] = srv
srv.Start(url)
}
gp.serverLock.Unlock()
gp.serverLock.Lock()
defer gp.serverLock.Unlock()

if _, ok := gp.serverMap[url.Location]; ok {
return
}

if _, ok := gp.ExporterMap().Load(url.ServiceKey()); !ok {
panic("[GrpcProtocol]" + url.Key() + "is not existing")
}

grpcMessageSize, _ := strconv.Atoi(url.GetParam(constant.MESSAGE_SIZE_KEY, "4"))
srv := NewServer()
srv.SetBufferSize(grpcMessageSize)
gp.serverMap[url.Location] = srv
srv.Start(url)
}

// Refer a remote gRPC service
Expand Down
101 changes: 74 additions & 27 deletions protocol/grpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"fmt"
"net"
"reflect"
"sync"
"time"
)

import (
Expand All @@ -31,7 +33,6 @@ import (

import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/config"
"github.com/apache/dubbo-go/protocol"
Expand Down Expand Up @@ -80,42 +81,88 @@ func (s *Server) Start(url *common.URL) {
grpc.UnaryInterceptor(otgrpc.OpenTracingServerInterceptor(tracer)),
grpc.MaxRecvMsgSize(1024*1024*s.bufferSize),
grpc.MaxSendMsgSize(1024*1024*s.bufferSize))
s.grpcServer = server

key := url.GetParam(constant.BEAN_NAME_KEY, "")
service := config.GetProviderService(key)

ds, ok := service.(DubboGrpcService)
if !ok {
panic("illegal service type registered")
}

m, ok := reflect.TypeOf(service).MethodByName("SetProxyImpl")
if !ok {
panic("method SetProxyImpl is necessary for grpc service")
}
go func() {
providerServices := config.GetProviderConfig().Services

exporter, _ := grpcProtocol.ExporterMap().Load(url.ServiceKey())
if exporter == nil {
panic(fmt.Sprintf("no exporter found for servicekey: %v", url.ServiceKey()))
}
invoker := exporter.(protocol.Exporter).GetInvoker()
if invoker == nil {
panic(fmt.Sprintf("no invoker found for servicekey: %v", url.ServiceKey()))
}
in := []reflect.Value{reflect.ValueOf(service)}
in = append(in, reflect.ValueOf(invoker))
m.Func.Call(in)
if len(providerServices) == 0 {
panic("provider service map is null")
}

server.RegisterService(ds.ServiceDesc(), service)
waitGrpcExporter(providerServices)
registerService(providerServices, server)

s.grpcServer = server
go func() {
if err = server.Serve(lis); err != nil {
logger.Errorf("server serve failed with err: %v", err)
}
}()
}

// getSyncMapLen get sync map len
func getSyncMapLen(m *sync.Map) int {
length := 0

m.Range(func(_, _ interface{}) bool {
length++
return true
})
return length
}

// waitGrpcExporter wait until len(providerServices) = len(ExporterMap)
func waitGrpcExporter(providerServices map[string]*config.ServiceConfig) {
t := time.NewTicker(50 * time.Millisecond)
defer t.Stop()
pLen := len(providerServices)
ta := time.After(10 * time.Second)

for {
select {
case <-t.C:
mLen := getSyncMapLen(grpcProtocol.ExporterMap())
if pLen == mLen {
return
}
case <-ta:
panic("wait grpc exporter timeout when start grpc server")
}
}
}

// registerService SetProxyImpl invoker and grpc service
func registerService(providerServices map[string]*config.ServiceConfig, server *grpc.Server) {
for key, providerService := range providerServices {
service := config.GetProviderService(key)

ds, ok := service.(DubboGrpcService)
if !ok {
panic("illegal service type registered")
}

m, ok := reflect.TypeOf(service).MethodByName("SetProxyImpl")
if !ok {
panic("method SetProxyImpl is necessary for grpc service")
}
serviceKey := common.ServiceKey(providerService.InterfaceName, providerService.Group, providerService.Version)

exporter, _ := grpcProtocol.ExporterMap().Load(serviceKey)
if exporter == nil {
panic(fmt.Sprintf("no exporter found for servicekey: %v", serviceKey))
}
invoker := exporter.(protocol.Exporter).GetInvoker()
if invoker == nil {
panic(fmt.Sprintf("no invoker found for servicekey: %v", serviceKey))
}
in := []reflect.Value{reflect.ValueOf(service)}
in = append(in, reflect.ValueOf(invoker))
m.Func.Call(in)

server.RegisterService(ds.ServiceDesc(), service)

}
}

// Stop gRPC server
func (s *Server) Stop() {
s.grpcServer.Stop()
Expand Down

0 comments on commit 98a8afa

Please sign in to comment.