Skip to content

Commit

Permalink
Domain replication followup changes (uber#2648)
Browse files Browse the repository at this point in the history
* Check host identity before processing domain replication tasks
* Add metrics for domain repliation queue ack level
  • Loading branch information
meiliang86 authored Oct 9, 2019
1 parent f756049 commit 4854135
Show file tree
Hide file tree
Showing 8 changed files with 97 additions and 14 deletions.
18 changes: 15 additions & 3 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ const (
PersistenceDeleteQueueMessagesScope
// PersistenceUpdateAckLevelScope tracks UpdateAckLevel calls made by service to persistence layer
PersistenceUpdateAckLevelScope
// PersistenceGetAckLevelScope tracks GetAckLeve calls made by service to persistence layer
// PersistenceGetAckLevelScope tracks GetAckLevel calls made by service to persistence layer
PersistenceGetAckLevelScope
// HistoryClientStartWorkflowExecutionScope tracks RPC calls to history service
HistoryClientStartWorkflowExecutionScope
Expand Down Expand Up @@ -666,6 +666,8 @@ const (
FrontendGetDomainReplicationMessagesScope
// FrontendReapplyEventsScope is the metric scope for frontend.ReapplyEvents
FrontendReapplyEventsScope
// FrontendDomainReplicationQueueScope is the metrics scope for domain replication queue
FrontendDomainReplicationQueueScope

NumFrontendScopes
)
Expand Down Expand Up @@ -1200,6 +1202,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
FrontendGetReplicationMessagesScope: {operation: "GetReplicationMessages"},
FrontendGetDomainReplicationMessagesScope: {operation: "GetDomainReplicationMessages"},
FrontendReapplyEventsScope: {operation: "ReapplyEvents"},
FrontendDomainReplicationQueueScope: {operation: "DomainReplicationQueue"},
},
// History Scope Names
History: {
Expand Down Expand Up @@ -1423,9 +1426,16 @@ const (
NumCommonMetrics // Needs to be last on this list for iota numbering
)

// Frontend Metrics enum
const (
DomainReplicationTaskAckLevel = iota + NumCommonMetrics

NumFrontendMetrics
)

// History Metrics enum
const (
TaskRequests = iota + NumCommonMetrics
TaskRequests = iota + NumFrontendMetrics
TaskLatency
TaskFailures
TaskDiscarded
Expand Down Expand Up @@ -1727,7 +1737,9 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
MatchingClientForwardedCounter: {metricName: "forwarded", metricType: Counter},
MatchingClientInvalidTaskListName: {metricName: "invalid_task_list_name", metricType: Counter},
},
Frontend: {},
Frontend: {
DomainReplicationTaskAckLevel: {metricName: "domain_replication_task_ack_level", metricType: Gauge},
},
History: {
TaskRequests: {metricName: "task_requests", metricType: Counter},
TaskLatency: {metricName: "task_latency", metricType: Timer},
Expand Down
18 changes: 16 additions & 2 deletions common/persistence/domainReplicationQueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/uber/cadence/common/codec"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/metrics"
)

const (
Expand All @@ -39,9 +40,16 @@ const (
var _ DomainReplicationQueue = (*domainReplicationQueueImpl)(nil)

// NewDomainReplicationQueue creates a new DomainReplicationQueue instance
func NewDomainReplicationQueue(queue Queue, logger log.Logger) DomainReplicationQueue {
func NewDomainReplicationQueue(
queue Queue,
clusterName string,
metricsClient metrics.Client,
logger log.Logger,
) DomainReplicationQueue {
return &domainReplicationQueueImpl{
queue: queue,
clusterName: clusterName,
metricsClient: metricsClient,
logger: logger,
encoder: codec.NewThriftRWEncoder(),
ackNotificationChan: make(chan bool),
Expand All @@ -51,8 +59,10 @@ func NewDomainReplicationQueue(queue Queue, logger log.Logger) DomainReplication

type (
domainReplicationQueueImpl struct {
logger log.Logger
queue Queue
clusterName string
metricsClient metrics.Client
logger log.Logger
encoder codec.BinaryEncoder
ackLevelUpdated bool
ackNotificationChan chan bool
Expand Down Expand Up @@ -137,6 +147,10 @@ func (q *domainReplicationQueueImpl) purgeAckedMessages() error {
return fmt.Errorf("failed to purge messages: %v", err)
}

q.metricsClient.
Scope(metrics.FrontendDomainReplicationQueueScope).
UpdateGauge(metrics.DomainReplicationTaskAckLevel, float64(minAckLevel))

q.ackLevelUpdated = false

return nil
Expand Down
4 changes: 3 additions & 1 deletion common/persistence/persistence-factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ type (
metricsClient metrics.Client
logger log.Logger
datastores map[storeType]Datastore
clusterName string
}

storeType int
Expand Down Expand Up @@ -131,6 +132,7 @@ func New(
config: cfg,
metricsClient: metricsClient,
logger: logger,
clusterName: clusterName,
}
limiters := buildRatelimiters(cfg)
factory.init(clusterName, limiters)
Expand Down Expand Up @@ -279,7 +281,7 @@ func (f *factoryImpl) NewDomainReplicationQueue() (p.DomainReplicationQueue, err
result = p.NewQueuePersistenceMetricsClient(result, f.metricsClient, f.logger)
}

return p.NewDomainReplicationQueue(result, f.logger), nil
return p.NewDomainReplicationQueue(result, f.clusterName, f.metricsClient, f.logger), nil
}

// Close closes this factory
Expand Down
2 changes: 1 addition & 1 deletion common/persistence/sql/storage/mysql/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ package mysql
import (
"database/sql"
"encoding/json"
"github.com/uber/cadence/common"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/persistence/sql/storage/sqldb"
)

Expand Down
9 changes: 8 additions & 1 deletion host/onebox.go
Original file line number Diff line number Diff line change
Expand Up @@ -697,6 +697,10 @@ func (c *cadenceImpl) startWorkerReplicator(params *service.BootstrapParams, ser
metadataManager := persistence.NewMetadataPersistenceMetricsClient(c.metadataMgr, service.GetMetricsClient(), c.logger)
workerConfig := worker.NewConfig(params)
workerConfig.ReplicationCfg.ReplicatorMessageConcurrency = dynamicconfig.GetIntPropertyFn(10)
serviceResolver, err := service.GetMembershipMonitor().GetResolver(common.WorkerServiceName)
if err != nil {
c.logger.Fatal("Fail to start replicator when start worker", tag.Error(err))
}
c.replicator = replicator.NewReplicator(
c.clusterMetadata,
metadataManager,
Expand All @@ -705,7 +709,10 @@ func (c *cadenceImpl) startWorkerReplicator(params *service.BootstrapParams, ser
workerConfig.ReplicationCfg,
c.messagingClient,
c.logger,
service.GetMetricsClient())
service.GetMetricsClient(),
service.GetHostInfo(),
serviceResolver,
)
if err := c.replicator.Start(); err != nil {
c.replicator.Stop()
c.logger.Fatal("Fail to start replicator when start worker", tag.Error(err))
Expand Down
25 changes: 24 additions & 1 deletion service/worker/replicator/domainReplicationMessageProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package replicator

import (
"context"
"fmt"
"sync/atomic"
"time"

Expand All @@ -31,6 +32,7 @@ import (
"github.com/uber/cadence/common/backoff"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/membership"
"github.com/uber/cadence/common/metrics"
)

Expand All @@ -49,12 +51,16 @@ func newDomainReplicationMessageProcessor(
remotePeer workflowserviceclient.Interface,
metricsClient metrics.Client,
domainReplicator DomainReplicator,
hostInfo *membership.HostInfo,
serviceResolver membership.ServiceResolver,
) *domainReplicationMessageProcessor {
retryPolicy := backoff.NewExponentialRetryPolicy(taskProcessorErrorRetryWait)
retryPolicy.SetBackoffCoefficient(taskProcessorErrorRetryBackoffCoefficient)
retryPolicy.SetMaximumAttempts(taskProcessorErrorRetryMaxAttampts)

return &domainReplicationMessageProcessor{
hostInfo: hostInfo,
serviceResolver: serviceResolver,
status: common.DaemonStatusInitialized,
sourceCluster: sourceCluster,
logger: logger,
Expand All @@ -70,6 +76,8 @@ func newDomainReplicationMessageProcessor(

type (
domainReplicationMessageProcessor struct {
hostInfo *membership.HostInfo
serviceResolver membership.ServiceResolver
status int32
sourceCluster string
logger log.Logger
Expand All @@ -91,7 +99,6 @@ func (p *domainReplicationMessageProcessor) Start() {
go p.processorLoop()
}

// TODO: need to make sure only one worker is processing per source DC
func (p *domainReplicationMessageProcessor) processorLoop() {
timer := time.NewTimer(getWaitDuration())

Expand All @@ -108,6 +115,22 @@ func (p *domainReplicationMessageProcessor) processorLoop() {
}

func (p *domainReplicationMessageProcessor) getAndHandleDomainReplicationTasks() {
// The following is a best effort to make sure only one worker is processing tasks for a
// particular source cluster. When the ring is under reconfiguration, it is possible that
// for a small period of time two or more workers think they are the owner and try to execute
// the processing logic. This will not result in correctness issue as domain replication task
// processing will be protected by version check.
info, err := p.serviceResolver.Lookup(p.sourceCluster)
if err != nil {
p.logger.Info("Failed to lookup host info. Skip current run.")
return
}

if info.Identity() != p.hostInfo.Identity() {
p.logger.Info(fmt.Sprintf("Worker not responsible for source cluster %v.", p.sourceCluster))
return
}

ctx, cancel := context.WithTimeout(context.Background(), fetchTaskRequestTimeout)
request := &replicator.GetDomainReplicationMessagesRequest{
LastRetrivedMessageId: common.Int64Ptr(p.lastRetrievedMessageID),
Expand Down
25 changes: 21 additions & 4 deletions service/worker/replicator/replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/uber/cadence/common/cluster"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/membership"
"github.com/uber/cadence/common/messaging"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/persistence"
Expand All @@ -58,6 +59,8 @@ type (
logger log.Logger
metricsClient metrics.Client
historySerializer persistence.PayloadSerializer
hostInfo *membership.HostInfo
serviceResolver membership.ServiceResolver
}

// Config contains all the replication config for worker
Expand All @@ -78,12 +81,23 @@ const (
)

// NewReplicator creates a new replicator for processing replication tasks
func NewReplicator(clusterMetadata cluster.Metadata, metadataManagerV2 persistence.MetadataManager,
domainCache cache.DomainCache, clientBean client.Bean, config *Config,
client messaging.Client, logger log.Logger, metricsClient metrics.Client) *Replicator {
func NewReplicator(
clusterMetadata cluster.Metadata,
metadataManagerV2 persistence.MetadataManager,
domainCache cache.DomainCache,
clientBean client.Bean,
config *Config,
client messaging.Client,
logger log.Logger,
metricsClient metrics.Client,
hostInfo *membership.HostInfo,
serviceResolver membership.ServiceResolver,
) *Replicator {

logger = logger.WithTags(tag.ComponentReplicator)
return &Replicator{
hostInfo: hostInfo,
serviceResolver: serviceResolver,
domainCache: domainCache,
clusterMetadata: clusterMetadata,
domainReplicator: NewDomainReplicator(metadataManagerV2, logger),
Expand Down Expand Up @@ -112,7 +126,10 @@ func (r *Replicator) Start() error {
clusterName,
r.logger.WithTags(tag.ComponentReplicationTaskProcessor, tag.SourceCluster(clusterName)),
r.clientBean.GetRemoteFrontendClient(clusterName),
r.metricsClient, r.domainReplicator,
r.metricsClient,
r.domainReplicator,
r.hostInfo,
r.serviceResolver,
)
r.domainProcessors = append(r.domainProcessors, processor)
} else {
Expand Down
10 changes: 9 additions & 1 deletion service/worker/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,11 @@ func (s *Service) startReplicator(base service.Service, pFactory persistencefact
domainCache := cache.NewDomainCache(metadataV2Mgr, base.GetClusterMetadata(), s.metricsClient, s.logger)
domainCache.Start()

serviceResolver, err := base.GetMembershipMonitor().GetResolver(common.WorkerServiceName)
if err != nil {
s.logger.Fatal("failed to get service resolver", tag.Error(err))
}

replicator := replicator.NewReplicator(
base.GetClusterMetadata(),
metadataV2Mgr,
Expand All @@ -251,7 +256,10 @@ func (s *Service) startReplicator(base service.Service, pFactory persistencefact
s.config.ReplicationCfg,
base.GetMessagingClient(),
s.logger,
s.metricsClient)
s.metricsClient,
base.GetHostInfo(),
serviceResolver,
)
if err := replicator.Start(); err != nil {
replicator.Stop()
s.logger.Fatal("fail to start replicator", tag.Error(err))
Expand Down

0 comments on commit 4854135

Please sign in to comment.