Skip to content

Commit

Permalink
Fix visibility producer init process (#1408)
Browse files Browse the repository at this point in the history
* Fix visibility producer init

* Fix consumer name
  • Loading branch information
vancexu authored Jan 24, 2019
1 parent 3ba1bec commit d27f279
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 24 deletions.
6 changes: 3 additions & 3 deletions config/development.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -105,14 +105,14 @@ kafka:
cluster: test
applications:
visibility:
topic: cadence-visibility
dlq-topic: cadence-visibility-dlq
topic: cadence-visibility-dev
dlq-topic: cadence-visibility-dev-dlq

elasticsearch:
enable: false
url:
scheme: "http"
host: "127.0.0.1:9200"
indices:
visibility: visibility-dev
visibility: cadence-visibility-dev

11 changes: 10 additions & 1 deletion service/history/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ type (
config *Config
historyEventNotifier historyEventNotifier
publisher messaging.Producer
visibilityProducer messaging.Producer
rateLimiter common.TokenBucket
service.Service
}
Expand Down Expand Up @@ -151,6 +152,14 @@ func (h *Handler) Start() error {
}
}

if h.config.EnableVisibilityToKafka() {
var err error
h.visibilityProducer, err = h.GetMessagingClient().NewProducer(common.VisibilityAppName)
if err != nil {
h.GetLogger().Fatalf("Creating visibility producer failed: %v", err)
}
}

h.domainCache = cache.NewDomainCache(h.metadataMgr, h.GetClusterMetadata(), h.GetMetricsClient(), h.GetLogger())
h.domainCache.Start()
h.controller = newShardController(h.Service, h.GetHostInfo(), hServiceResolver, h.shardManager, h.historyMgr, h.historyV2Mgr,
Expand Down Expand Up @@ -181,7 +190,7 @@ func (h *Handler) Stop() {
// CreateEngine is implementation for HistoryEngineFactory used for creating the engine instance for shard
func (h *Handler) CreateEngine(context ShardContext) Engine {
return NewEngineWithShardContext(context, h.visibilityMgr, h.matchingServiceClient, h.historyServiceClient,
h.frontendServiceClient, h.historyEventNotifier, h.publisher, h.GetMessagingClient(), h.config)
h.frontendServiceClient, h.historyEventNotifier, h.publisher, h.visibilityProducer, h.config)
}

// Health is for health check
Expand Down
21 changes: 2 additions & 19 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func NewEngineWithShardContext(
frontendClient frontend.Client,
historyEventNotifier historyEventNotifier,
publisher messaging.Producer,
messagingClient messaging.Client,
visibilityProducer messaging.Producer,
config *Config,
) Engine {
currentClusterName := shard.GetService().GetClusterMetadata().GetCurrentClusterName()
Expand Down Expand Up @@ -169,10 +169,7 @@ func NewEngineWithShardContext(
config: config,
archivalClient: sysworkflow.NewArchivalClient(frontendClient, shard.GetConfig().NumSysWorkflows),
}
var visibilityProducer messaging.Producer
if config.EnableVisibilityToKafka() {
visibilityProducer = getVisibilityProducer(messagingClient, shard.GetMetricsClient())
}

txProcessor := newTransferQueueProcessor(shard, historyEngImpl, visibilityMgr, visibilityProducer, matching, historyClient, logger)
historyEngImpl.timerProcessor = newTimerQueueProcessor(shard, historyEngImpl, matching, visibilityProducer, logger)
historyEngImpl.txProcessor = txProcessor
Expand Down Expand Up @@ -3328,17 +3325,3 @@ func getWorkflowAlreadyStartedError(errMsg string, createRequestID string, workf
RunId: common.StringPtr(fmt.Sprintf("%v", runID)),
}
}

func getVisibilityProducer(messagingClient messaging.Client, metricsClient metrics.Client) messaging.Producer {
if messagingClient == nil {
return nil
}
visibilityProducer, err := messagingClient.NewProducer(common.VisibilityAppName)
if err != nil {
panic(err)
}
if metricsClient != nil {
visibilityProducer = messaging.NewMetricProducer(visibilityProducer, metricsClient)
}
return visibilityProducer
}
2 changes: 1 addition & 1 deletion service/worker/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func NewIndexer(config *Config, client messaging.Client, esClient *elastic.Clien
// Start indexer
func (x Indexer) Start() error {
visibilityApp := common.VisibilityAppName
visConsumerName := getConsumerName(visibilityApp)
visConsumerName := getConsumerName(x.visibilityIndexName)
x.visibilityProcessor = newIndexProcessor(visibilityApp, visConsumerName, x.kafkaClient, x.esClient,
visibilityProcessorName, x.visibilityIndexName, x.config, x.logger, x.metricsClient)
return x.visibilityProcessor.Start()
Expand Down

0 comments on commit d27f279

Please sign in to comment.