Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mod: update the comments in registy directory #589

Merged
merged 6 commits into from
Jun 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config_center/configuration_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

// ConfigurationListener for changing listener's event
type ConfigurationListener interface {
// Process the notification event once there's any change happens on the config
Process(*ConfigChangeEvent)
}

Expand Down
12 changes: 6 additions & 6 deletions registry/base_configuration_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,19 @@ import (
"github.com/apache/dubbo-go/remoting"
)

// BaseConfigurationListener ...
// nolint
type BaseConfigurationListener struct {
configurators []config_center.Configurator
dynamicConfiguration config_center.DynamicConfiguration
defaultConfiguratorFunc func(url *common.URL) config_center.Configurator
}

// Configurators ...
// Configurators gets Configurator from config center
func (bcl *BaseConfigurationListener) Configurators() []config_center.Configurator {
return bcl.configurators
}

// InitWith ...
// InitWith will init BaseConfigurationListener by @key+@Listener+@f
func (bcl *BaseConfigurationListener) InitWith(key string, listener config_center.ConfigurationListener, f func(url *common.URL) config_center.Configurator) {
bcl.dynamicConfiguration = config.GetEnvInstance().GetDynamicConfiguration()
if bcl.dynamicConfiguration == nil {
Expand All @@ -60,7 +60,7 @@ func (bcl *BaseConfigurationListener) InitWith(key string, listener config_cente
}
}

// Process ...
// Process the notification event once there's any change happens on the config.
func (bcl *BaseConfigurationListener) Process(event *config_center.ConfigChangeEvent) {
logger.Infof("Notification of overriding rule, change type is: %v , raw config content is:%v", event.ConfigType, event.Value)
if event.ConfigType == remoting.EventTypeDel {
Expand All @@ -82,14 +82,14 @@ func (bcl *BaseConfigurationListener) genConfiguratorFromRawRule(rawConfig strin
return nil
}

// OverrideUrl ...
// OverrideUrl gets existing configuration rule and overrides provider url before exporting.
func (bcl *BaseConfigurationListener) OverrideUrl(url *common.URL) {
for _, v := range bcl.configurators {
v.Configure(url)
}
}

// ToConfigurators ...
// ToConfigurators converts @urls by @f to config_center.Configurators
func ToConfigurators(urls []*common.URL, f func(url *common.URL) config_center.Configurator) []config_center.Configurator {
if len(urls) == 0 {
return nil
Expand Down
2 changes: 2 additions & 0 deletions registry/consul/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ func (l *consulListener) handler(idx uint64, raw interface{}) {
}
}

// Next returns the service event from consul.
func (l *consulListener) Next() (*registry.ServiceEvent, error) {
select {
case event := <-l.eventCh:
Expand All @@ -196,6 +197,7 @@ func (l *consulListener) Next() (*registry.ServiceEvent, error) {
}
}

// Close closes this listener
func (l *consulListener) Close() {
close(l.done)
l.plan.Stop()
Expand Down
11 changes: 8 additions & 3 deletions registry/consul/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ import (
)

const (
// RegistryConnDelay ...
RegistryConnDelay = 3
registryConnDelay = 3
)

func init() {
Expand Down Expand Up @@ -74,6 +73,7 @@ func newConsulRegistry(url *common.URL) (registry.Registry, error) {
return r, nil
}

// Register service to consul registry center
func (r *consulRegistry) Register(url common.URL) error {
var err error

Expand All @@ -95,6 +95,7 @@ func (r *consulRegistry) register(url common.URL) error {
return r.client.Agent().ServiceRegister(service)
}

// Unregister service from consul registry center
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unregister deletes service from consul registry center

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think Unregister service from consul registry center is a full sentence.

func (r *consulRegistry) Unregister(url common.URL) error {
var err error

Expand All @@ -112,6 +113,7 @@ func (r *consulRegistry) unregister(url common.URL) error {
return r.client.Agent().ServiceDeregister(buildId(url))
}

// Subscribe service from consul registry center
func (r *consulRegistry) Subscribe(url *common.URL, notifyListener registry.NotifyListener) {
role, _ := strconv.Atoi(r.URL.GetParam(constant.ROLE_KEY, ""))
if role == common.CONSUMER {
Expand All @@ -133,7 +135,7 @@ func (r *consulRegistry) subscribe(url *common.URL, notifyListener registry.Noti
return
}
logger.Warnf("getListener() = err:%v", perrors.WithStack(err))
time.Sleep(time.Duration(RegistryConnDelay) * time.Second)
time.Sleep(time.Duration(registryConnDelay) * time.Second)
continue
}

Expand All @@ -156,10 +158,12 @@ func (r *consulRegistry) getListener(url common.URL) (registry.Listener, error)
return listener, err
}

// GetUrl get registry URL of consul registry center
func (r *consulRegistry) GetUrl() common.URL {
return *r.URL
}

// IsAvailable checks consul registry center whether is available
func (r *consulRegistry) IsAvailable() bool {
select {
case <-r.done:
Expand All @@ -169,6 +173,7 @@ func (r *consulRegistry) IsAvailable() bool {
}
}

// Destroy consul registry center
func (r *consulRegistry) Destroy() {
close(r.done)
}
2 changes: 2 additions & 0 deletions registry/directory/directory.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ func init() {
extension.SetDefaultRegistryDirectory(NewRegistryDirectory)
}

// RegistryDirectory implementation of Directory:
// Invoker list returned from this Directory's list method have been filtered by Routers
type RegistryDirectory struct {
directory.BaseDirectory
cacheInvokers []protocol.Invoker
Expand Down
7 changes: 6 additions & 1 deletion registry/etcdv3/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,17 @@ type dataListener struct {
listener config_center.ConfigurationListener
}

// NewRegistryDataListener
// NewRegistryDataListener creates a data listener for etcd
func NewRegistryDataListener(listener config_center.ConfigurationListener) *dataListener {
return &dataListener{listener: listener}
}

// AddInterestedURL adds a registration @url to listen
func (l *dataListener) AddInterestedURL(url *common.URL) {
l.interestedURL = append(l.interestedURL, url)
}

// DataChange processes the data change event from registry center of etcd
func (l *dataListener) DataChange(eventType remoting.Event) bool {
zouyx marked this conversation as resolved.
Show resolved Hide resolved

index := strings.Index(eventType.Path, "/providers/")
Expand Down Expand Up @@ -88,10 +90,12 @@ func NewConfigurationListener(reg *etcdV3Registry) *configurationListener {
return &configurationListener{registry: reg, events: make(chan *config_center.ConfigChangeEvent, 32)}
}

// Process data change event from config center of etcd
func (l *configurationListener) Process(configType *config_center.ConfigChangeEvent) {
l.events <- configType
}

// Next returns next service event once received
func (l *configurationListener) Next() (*registry.ServiceEvent, error) {
for {
select {
Expand All @@ -114,6 +118,7 @@ func (l *configurationListener) Next() (*registry.ServiceEvent, error) {
}
}

// Close etcd registry center
func (l *configurationListener) Close() {
l.registry.WaitGroup().Done()
}
6 changes: 6 additions & 0 deletions registry/etcdv3/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,27 +104,32 @@ func newETCDV3Registry(url *common.URL) (registry.Registry, error) {
return r, nil
}

// InitListeners init listeners of etcd registry center
func (r *etcdV3Registry) InitListeners() {
r.listener = etcdv3.NewEventListener(r.client)
r.configListener = NewConfigurationListener(r)
r.dataListener = NewRegistryDataListener(r.configListener)
}

// DoRegister actually do the register job in the registry center of etcd
func (r *etcdV3Registry) DoRegister(root string, node string) error {
return r.client.Create(path.Join(root, node), "")
}

// CloseAndNilClient closes listeners and clear client
func (r *etcdV3Registry) CloseAndNilClient() {
r.client.Close()
r.client = nil
}

// CloseListener closes listeners
func (r *etcdV3Registry) CloseListener() {
if r.configListener != nil {
r.configListener.Close()
}
}

// CreatePath create the path in the registry center of etcd
func (r *etcdV3Registry) CreatePath(k string) error {
var tmpPath string
for _, str := range strings.Split(k, "/")[1:] {
Expand All @@ -137,6 +142,7 @@ func (r *etcdV3Registry) CreatePath(k string) error {
return nil
}

// DoSubscribe actually subscribe the provider URL
func (r *etcdV3Registry) DoSubscribe(svc *common.URL) (registry.Listener, error) {

var (
Expand Down
2 changes: 1 addition & 1 deletion registry/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func init() {
// service event
// ////////////////////////////////////////

// ServiceEvent ...
// ServiceEvent includes create, update, delete event
type ServiceEvent struct {
Action remoting.EventType
Service common.URL
Expand Down
1 change: 1 addition & 0 deletions registry/event_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type ConditionalEventListener interface {
Accept(e Event) bool
}

// ServiceInstancesChangedListener is used when the Service Discovery Changed
// TODO (implement ConditionalEventListener)
type ServiceInstancesChangedListener struct {
ServiceName string
Expand Down
8 changes: 6 additions & 2 deletions registry/kubernetes/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,12 @@ type dataListener struct {
listener config_center.ConfigurationListener
}

// NewRegistryDataListener
// NewRegistryDataListener creates a data listener for kubernetes
func NewRegistryDataListener(listener config_center.ConfigurationListener) *dataListener {
return &dataListener{listener: listener}
}

// AddInterestedURL
// AddInterestedURL adds the @url of registry center to the listener
func (l *dataListener) AddInterestedURL(url *common.URL) {
l.interestedURL = append(l.interestedURL, url)
}
Expand Down Expand Up @@ -91,10 +91,12 @@ func NewConfigurationListener(reg *kubernetesRegistry) *configurationListener {
return &configurationListener{registry: reg, events: make(chan *config_center.ConfigChangeEvent, 32)}
}

// Process processes the data change event from config center of kubernetes
func (l *configurationListener) Process(configType *config_center.ConfigChangeEvent) {
l.events <- configType
}

// Next returns next service event once received
func (l *configurationListener) Next() (*registry.ServiceEvent, error) {
for {
select {
Expand All @@ -116,6 +118,8 @@ func (l *configurationListener) Next() (*registry.ServiceEvent, error) {
}
}
}

// Close kubernetes registry center
func (l *configurationListener) Close() {
l.registry.WaitGroup().Done()
}
10 changes: 10 additions & 0 deletions registry/kubernetes/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,23 +68,28 @@ type kubernetesRegistry struct {
configListener *configurationListener
}

// Client gets the etcdv3 kubernetes
func (r *kubernetesRegistry) Client() *kubernetes.Client {
r.cltLock.RLock()
client := r.client
r.cltLock.RUnlock()
return client
}

// SetClient sets the kubernetes client
func (r *kubernetesRegistry) SetClient(client *kubernetes.Client) {
r.cltLock.Lock()
r.client = client
r.cltLock.Unlock()
}

// CloseAndNilClient closes listeners and clear client
func (r *kubernetesRegistry) CloseAndNilClient() {
r.client.Close()
r.client = nil
}

// CloseListener closes listeners
func (r *kubernetesRegistry) CloseListener() {

r.cltLock.Lock()
Expand All @@ -96,17 +101,20 @@ func (r *kubernetesRegistry) CloseListener() {
r.configListener = nil
}

// CreatePath create the path in the registry center of kubernetes
func (r *kubernetesRegistry) CreatePath(k string) error {
if err := r.client.Create(k, ""); err != nil {
return perrors.WithMessagef(err, "create path %s in kubernetes", k)
}
return nil
}

// DoRegister actually do the register job in the registry center of kubernetes
func (r *kubernetesRegistry) DoRegister(root string, node string) error {
return r.client.Create(path.Join(root, node), "")
}

// DoSubscribe actually subscribe the provider URL
func (r *kubernetesRegistry) DoSubscribe(svc *common.URL) (registry.Listener, error) {

var (
Expand Down Expand Up @@ -139,6 +147,7 @@ func (r *kubernetesRegistry) DoSubscribe(svc *common.URL) (registry.Listener, er
return configListener, nil
}

// InitListeners init listeners of kubernetes registry center
func (r *kubernetesRegistry) InitListeners() {
r.listener = kubernetes.NewEventListener(r.client)
r.configListener = NewConfigurationListener(r)
Expand Down Expand Up @@ -183,6 +192,7 @@ func newMockKubernetesRegistry(
return r, nil
}

// HandleClientRestart will reconnect to kubernetes registry center
func (r *kubernetesRegistry) HandleClientRestart() {

var (
Expand Down
Loading