Skip to content

Commit

Permalink
Nacos support subscribe to all with '*' (#2374)
Browse files Browse the repository at this point in the history
  • Loading branch information
FinalT authored Aug 28, 2023
1 parent 98188e4 commit 2e4f4b1
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 20 deletions.
31 changes: 31 additions & 0 deletions registry/nacos/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,20 @@ func NewNacosListener(url, regURL *common.URL, namingClient *nacosClient.NacosNa
return listener, err
}

// NewNacosListener creates a data listener for nacos
func NewNacosListenerWithServiceName(serviceName string, url, regURL *common.URL, namingClient *nacosClient.NacosNamingClient) (*nacosListener, error) {
listener := &nacosListener{
namingClient: namingClient,
listenURL: url,
regURL: regURL,
events: gxchan.NewUnboundedChan(32),
instanceMap: map[string]model.Instance{},
done: make(chan struct{}),
}
err := listener.startListenWithServiceName(serviceName)
return listener, err
}

func generateUrl(instance model.Instance) *common.URL {
if instance.Metadata == nil {
logger.Errorf("nacos instance metadata is empty,instance:%+v", instance)
Expand Down Expand Up @@ -191,6 +205,23 @@ func (nl *nacosListener) startListen() error {
return nil
}

func (nl *nacosListener) startListenWithServiceName(serviceName string) error {
if nl.namingClient == nil {
return perrors.New("nacos naming namingClient stopped")
}
nl.subscribeParam = createSubscribeParamWithServiceName(serviceName, nl.regURL, nl.Callback)
if nl.subscribeParam == nil {
return perrors.New("create nacos subscribeParam failed")
}
go func() {
err := nl.namingClient.Client().Subscribe(nl.subscribeParam)
if err == nil {
listenerCache.Store(nl.subscribeParam.ServiceName+nl.subscribeParam.GroupName, nl)
}
}()
return nil
}

func (nl *nacosListener) stopListen() error {
return nl.namingClient.Client().Unsubscribe(nl.subscribeParam)
}
Expand Down
119 changes: 99 additions & 20 deletions registry/nacos/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,36 +169,95 @@ func (nr *nacosRegistry) Subscribe(url *common.URL, notifyListener registry.Noti
if role != common.CONSUMER {
return nil
}
serviceName := getServiceName(url)
if serviceName == "*" {
// Subscribe to all services
for {
if !nr.IsAvailable() {
logger.Warnf("event listener game over.")
return perrors.New("nacosRegistry is not available.")
}

for {
if !nr.IsAvailable() {
logger.Warnf("event listener game over.")
return perrors.New("nacosRegistry is not available.")
}
services, err := nr.getAllSubscribeServiceNames()
if err != nil {
if !nr.IsAvailable() {
logger.Warnf("event listener game over.")
return err
}
logger.Warnf("getAllServices() = err:%v", perrors.WithStack(err))
time.Sleep(time.Duration(RegistryConnDelay) * time.Second)
continue
}

listener, err := nr.subscribe(url)
defer metrics.Publish(metricsRegistry.NewSubscribeEvent(err == nil))
if err != nil {
for _, service := range services {
listener, err := nr.subscribeToService(url, service)
metrics.Publish(metricsRegistry.NewSubscribeEvent(err == nil))
if err != nil {
logger.Warnf("Failed to subscribe to service '%s': %v", service, err)
continue
}

nr.handleServiceEvents(listener, notifyListener)
}
}
} else {
// Subscribe to a specific service
for {
if !nr.IsAvailable() {
logger.Warnf("event listener game over.")
return err
return perrors.New("nacosRegistry is not available.")
}
logger.Warnf("getListener() = err:%v", perrors.WithStack(err))
time.Sleep(time.Duration(RegistryConnDelay) * time.Second)
continue
}

for {
serviceEvent, err := listener.Next()
listener, err := nr.subscribe(url)
metrics.Publish(metricsRegistry.NewSubscribeEvent(err == nil))
if err != nil {
logger.Warnf("Selector.watch() = error{%v}", perrors.WithStack(err))
listener.Close()
return err
if !nr.IsAvailable() {
logger.Warnf("event listener game over.")
return err
}
logger.Warnf("getListener() = err:%v", perrors.WithStack(err))
time.Sleep(time.Duration(RegistryConnDelay) * time.Second)
continue
}
logger.Infof("[Nacos Registry] Update begin, service event: %v", serviceEvent.String())
notifyListener.Notify(serviceEvent)
nr.handleServiceEvents(listener, notifyListener)
}
}
}

// getAllServices retrieves the list of all services from the registry
func (nr *nacosRegistry) getAllSubscribeServiceNames() ([]string, error) {
services, err := nr.namingClient.Client().GetAllServicesInfo(vo.GetAllServiceInfoParam{
GroupName: nr.GetParam(constant.RegistryGroupKey, defaultGroup),
PageNo: 1,
PageSize: 10,
})
subScribeServiceNames := []string{}
for _, dom := range services.Doms {
if strings.HasPrefix(dom, "providers:") {
subScribeServiceNames = append(subScribeServiceNames, dom)
}
}

return subScribeServiceNames, err
}

// subscribeToService subscribes to a specific service in the registry
func (nr *nacosRegistry) subscribeToService(url *common.URL, service string) (listener registry.Listener, err error) {
return NewNacosListenerWithServiceName(service, url, nr.URL, nr.namingClient)
}

// handleServiceEvents receives service events from the listener and notifies the notifyListener
func (nr *nacosRegistry) handleServiceEvents(listener registry.Listener, notifyListener registry.NotifyListener) {
for {
serviceEvent, err := listener.Next()
if err != nil {
logger.Warnf("Selector.watch() = error{%v}", perrors.WithStack(err))
listener.Close()
return
}
logger.Infof("[Nacos Registry] Update begin, service event: %v", serviceEvent.String())
notifyListener.Notify(serviceEvent)
}
}

// UnSubscribe :
Expand Down Expand Up @@ -256,6 +315,26 @@ func createSubscribeParam(url, regUrl *common.URL, cb callback) *vo.SubscribePar
}
}

func createSubscribeParamWithServiceName(serviceName string, regUrl *common.URL, cb callback) *vo.SubscribeParam {
groupName := regUrl.GetParam(constant.RegistryGroupKey, defaultGroup)
if cb == nil {
v, ok := listenerCache.Load(serviceName + groupName)
if !ok {
return nil
}
listener, ok := v.(*nacosListener)
if !ok {
return nil
}
cb = listener.Callback
}
return &vo.SubscribeParam{
ServiceName: serviceName,
SubscribeCallback: cb,
GroupName: groupName,
}
}

// GetURL gets its registration URL
func (nr *nacosRegistry) GetURL() *common.URL {
return nr.URL
Expand Down

0 comments on commit 2e4f4b1

Please sign in to comment.