Skip to content

Commit

Permalink
Move ringpop setup to common/membership (uber#4638)
Browse files Browse the repository at this point in the history
  • Loading branch information
mantas-sidlauskas authored Nov 15, 2021
1 parent 2957a70 commit 7e14102
Show file tree
Hide file tree
Showing 10 changed files with 220 additions and 248 deletions.
6 changes: 4 additions & 2 deletions cmd/server/cadence/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/uber/cadence/common/elasticsearch"
"github.com/uber/cadence/common/log/loggerimpl"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/membership"
"github.com/uber/cadence/common/messaging/kafka"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/resource"
Expand Down Expand Up @@ -155,13 +156,14 @@ func (s *server) startService() common.Daemon {
)
rpcFactory := rpc.NewFactory(params.Logger, rpcParams)
params.RPCFactory = rpcFactory
params.MembershipMonitor, err = s.cfg.Ringpop.NewMonitor(
params.MembershipMonitor, err = membership.NewMonitor(
&s.cfg.Ringpop,
rpcFactory.GetChannel(),
params.Name,
params.Logger,
)
if err != nil {
log.Fatalf("error creating ringpop monitor: %v", err)
log.Fatalf("error creating membership monitor: %v", err)
}
params.PProfInitializer = svcCfg.PProf.NewInitializer(params.Logger)

Expand Down
23 changes: 2 additions & 21 deletions common/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,18 @@ import (

"github.com/uber-go/tally/m3"
"github.com/uber-go/tally/prometheus"
"github.com/uber/ringpop-go/discovery"

"github.com/uber/cadence/common/dynamicconfig"
c "github.com/uber/cadence/common/dynamicconfig/configstore/config"
"github.com/uber/cadence/common/membership"
"github.com/uber/cadence/common/service"
)

type (
// Config contains the configuration for a set of cadence services
Config struct {
// Ringpop is the ringpop related configuration
Ringpop Ringpop `yaml:"ringpop"`
Ringpop membership.RingpopConfig `yaml:"ringpop"`
// Persistence contains the configuration for cadence datastores
Persistence Persistence `yaml:"persistence"`
// Log is the logging config
Expand Down Expand Up @@ -153,22 +153,6 @@ type (
OutputDirectory string `yaml:"outputDirectory"`
}

// Ringpop contains the ringpop config items
Ringpop struct {
// Name to be used in ringpop advertisement
Name string `yaml:"name" validate:"nonzero"`
// BootstrapMode is a enum that defines the ringpop bootstrap method, currently supports: hosts, files, custom, dns, and dns-srv
BootstrapMode BootstrapMode `yaml:"bootstrapMode"`
// BootstrapHosts is a list of seed hosts to be used for ringpop bootstrap
BootstrapHosts []string `yaml:"bootstrapHosts"`
// BootstrapFile is the file path to be used for ringpop bootstrap
BootstrapFile string `yaml:"bootstrapFile"`
// MaxJoinDuration is the max wait time to join the ring
MaxJoinDuration time.Duration `yaml:"maxJoinDuration"`
// Custom discovery provider, cannot be specified through yaml
DiscoveryProvider discovery.DiscoverProvider `yaml:"-"`
}

// Persistence contains the configuration for data store / persistence layer
Persistence struct {
// DefaultStore is the name of the default data store to use
Expand Down Expand Up @@ -510,9 +494,6 @@ type (
// URI is the domain default URI for visibility archiver
URI string `yaml:"URI"`
}

// BootstrapMode is an enum type for ringpop bootstrap mode
BootstrapMode int
)

// ValidateAndFillDefaults validates this config and fills default values if needed
Expand Down
129 changes: 128 additions & 1 deletion common/membership/interfaces.go → common/membership/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,17 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

//go:generate mockgen -package $GOPACKAGE -source $GOFILE -destination interfaces_mock.go -self_package github.com/uber/cadence/common/membership
//go:generate mockgen -package $GOPACKAGE -source $GOFILE -destination monitor_mock.go -self_package github.com/uber/cadence/common/membership

package membership

import (
"fmt"
"sync/atomic"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/types"
)

Expand Down Expand Up @@ -84,3 +89,125 @@ type (
Members() []*HostInfo
}
)

// NewRingpopMonitor returns a ringpop-based membership monitor
func NewRingpopMonitor(
serviceName string,
services []string,
rp *RingpopWrapper,
logger log.Logger,
) *RingpopMonitor {

rpo := &RingpopMonitor{
status: common.DaemonStatusInitialized,
serviceName: serviceName,
ringpopWrapper: rp,
logger: logger,
rings: make(map[string]*ringpopServiceResolver),
}
for _, s := range services {
rpo.rings[s] = newRingpopServiceResolver(s, rp, logger)
}
return rpo
}

func (rpo *RingpopMonitor) Start() {
if !atomic.CompareAndSwapInt32(
&rpo.status,
common.DaemonStatusInitialized,
common.DaemonStatusStarted,
) {
return
}

rpo.ringpopWrapper.Start()

labels, err := rpo.ringpopWrapper.Labels()
if err != nil {
rpo.logger.Fatal("unable to get ring pop labels", tag.Error(err))
}

if err = labels.Set(RoleKey, rpo.serviceName); err != nil {
rpo.logger.Fatal("unable to set ring pop labels", tag.Error(err))
}

for _, ring := range rpo.rings {
ring.Start()
}
}

func (rpo *RingpopMonitor) Stop() {
if !atomic.CompareAndSwapInt32(
&rpo.status,
common.DaemonStatusStarted,
common.DaemonStatusStopped,
) {
return
}

for _, ring := range rpo.rings {
ring.Stop()
}

rpo.ringpopWrapper.Stop()
}

func (rpo *RingpopMonitor) WhoAmI() (*HostInfo, error) {
address, err := rpo.ringpopWrapper.WhoAmI()
if err != nil {
return nil, err
}
labels, err := rpo.ringpopWrapper.Labels()
if err != nil {
return nil, err
}
return NewHostInfo(address, labels.AsMap()), nil
}

func (rpo *RingpopMonitor) EvictSelf() error {
return rpo.ringpopWrapper.SelfEvict()
}

func (rpo *RingpopMonitor) GetResolver(service string) (ServiceResolver, error) {
ring, found := rpo.rings[service]
if !found {
return nil, fmt.Errorf("service %q is not tracked by Monitor", service)
}
return ring, nil
}

func (rpo *RingpopMonitor) Lookup(service string, key string) (*HostInfo, error) {
ring, err := rpo.GetResolver(service)
if err != nil {
return nil, err
}
return ring.Lookup(key)
}

func (rpo *RingpopMonitor) AddListener(service string, name string, notifyChannel chan<- *ChangedEvent) error {
ring, err := rpo.GetResolver(service)
if err != nil {
return err
}
return ring.AddListener(name, notifyChannel)
}

func (rpo *RingpopMonitor) RemoveListener(service string, name string) error {
ring, err := rpo.GetResolver(service)
if err != nil {
return err
}
return ring.RemoveListener(name)
}

func (rpo *RingpopMonitor) GetReachableMembers() ([]string, error) {
return rpo.ringpopWrapper.GetReachableMembers()
}

func (rpo *RingpopMonitor) GetMemberCount(service string) (int, error) {
ring, err := rpo.GetResolver(service)
if err != nil {
return 0, err
}
return ring.MemberCount(), nil
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ const (
type ringpopServiceResolver struct {
status int32
service string
rp *RingPop
rp *RingpopWrapper
refreshChan chan struct{}
shutdownCh chan struct{}
shutdownWG sync.WaitGroup
Expand All @@ -68,7 +68,7 @@ var _ ServiceResolver = (*ringpopServiceResolver)(nil)

func newRingpopServiceResolver(
service string,
rp *RingPop,
rp *RingpopWrapper,
logger log.Logger,
) *ringpopServiceResolver {

Expand Down
16 changes: 8 additions & 8 deletions common/membership/ringpop.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,22 +32,22 @@ import (
)

type (
// RingPop is a simple wrapper
RingPop struct {
// RingpopWrapper is a simple wrapper
RingpopWrapper struct {
status int32
*ringpop.Ringpop
bootParams *swim.BootstrapOptions
logger log.Logger
}
)

// NewRingPop create a new ring pop wrapper
func NewRingPop(
// NewRingpopWraper create a new ring pop wrapper
func NewRingpopWraper(
ringPop *ringpop.Ringpop,
bootParams *swim.BootstrapOptions,
logger log.Logger,
) *RingPop {
return &RingPop{
) *RingpopWrapper {
return &RingpopWrapper{
status: common.DaemonStatusInitialized,
Ringpop: ringPop,
bootParams: bootParams,
Expand All @@ -56,7 +56,7 @@ func NewRingPop(
}

// Start start ring pop
func (r *RingPop) Start() {
func (r *RingpopWrapper) Start() {
if !atomic.CompareAndSwapInt32(
&r.status,
common.DaemonStatusInitialized,
Expand All @@ -72,7 +72,7 @@ func (r *RingPop) Start() {
}

// Stop stop ring pop
func (r *RingPop) Stop() {
func (r *RingpopWrapper) Stop() {
if !atomic.CompareAndSwapInt32(
&r.status,
common.DaemonStatusStarted,
Expand Down
Loading

0 comments on commit 7e14102

Please sign in to comment.