Skip to content

Commit

Permalink
Merge branch 'master' into qlong-open-search
Browse files Browse the repository at this point in the history
  • Loading branch information
meiliang86 authored Sep 27, 2021
2 parents 0f40ea0 + 132768a commit 03945e2
Show file tree
Hide file tree
Showing 4 changed files with 280 additions and 278 deletions.
213 changes: 0 additions & 213 deletions common/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,14 @@
package service

import (
"math/rand"
"os"
"sync/atomic"
"time"

"github.com/uber-go/tally"
"go.uber.org/cadence/.gen/go/cadence/workflowserviceclient"
"go.uber.org/yarpc"

"github.com/uber/cadence/client"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/archiver"
"github.com/uber/cadence/common/archiver/provider"
"github.com/uber/cadence/common/authorization"
"github.com/uber/cadence/common/blobstore"
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/cluster"
"github.com/uber/cadence/common/config"
"github.com/uber/cadence/common/dynamicconfig"
Expand All @@ -46,7 +38,6 @@ import (
"github.com/uber/cadence/common/membership"
"github.com/uber/cadence/common/messaging"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/rpc"
)

Expand Down Expand Up @@ -86,85 +77,8 @@ type (
// GetMembershipMonitor return a membership monitor
GetMembershipMonitor() (membership.Monitor, error)
}

// Service contains the objects specific to this service
serviceImpl struct {
status int32
sName string
hostName string
hostInfo *membership.HostInfo
dispatcher *yarpc.Dispatcher
membershipFactory MembershipMonitorFactory
membershipMonitor membership.Monitor
rpcFactory common.RPCFactory
pprofInitializer common.PProfInitializer
clientBean client.Bean
timeSource clock.TimeSource
numberOfHistoryShards int

logger log.Logger
throttledLogger log.Logger

metricsScope tally.Scope
runtimeMetricsReporter *metrics.RuntimeMetricsReporter
metricsClient metrics.Client
clusterMetadata cluster.Metadata
messagingClient messaging.Client
blobstoreClient blobstore.Client
dynamicCollection *dynamicconfig.Collection
dispatcherProvider rpc.DispatcherProvider
archivalMetadata archiver.ArchivalMetadata
archiverProvider provider.ArchiverProvider
serializer persistence.PayloadSerializer
}
)

var _ Service = (*serviceImpl)(nil)

// New instantiates a Service Instance
// TODO: have a better name for Service.
func New(params *BootstrapParams) Service {
sVice := &serviceImpl{
status: common.DaemonStatusInitialized,
sName: params.Name,
logger: params.Logger,
throttledLogger: params.ThrottledLogger,
rpcFactory: params.RPCFactory,
membershipFactory: params.MembershipFactory,
pprofInitializer: params.PProfInitializer,
timeSource: clock.NewRealTimeSource(),
metricsScope: params.MetricScope,
numberOfHistoryShards: params.PersistenceConfig.NumHistoryShards,
clusterMetadata: params.ClusterMetadata,
metricsClient: params.MetricsClient,
messagingClient: params.MessagingClient,
blobstoreClient: params.BlobstoreClient,
dispatcherProvider: params.DispatcherProvider,
dynamicCollection: dynamicconfig.NewCollection(
params.DynamicConfig,
params.Logger,
dynamicconfig.ClusterNameFilter(params.ClusterMetadata.GetCurrentClusterName()),
),
archivalMetadata: params.ArchivalMetadata,
archiverProvider: params.ArchiverProvider,
serializer: persistence.NewPayloadSerializer(),
}

sVice.runtimeMetricsReporter = metrics.NewRuntimeMetricsReporter(params.MetricScope, time.Minute, sVice.GetLogger(), params.InstanceID)
sVice.dispatcher = sVice.rpcFactory.GetDispatcher()
if sVice.dispatcher == nil {
sVice.logger.Fatal("Unable to create yarpc dispatcher")
}

// Get the host name and set it on the service. This is used for emitting metric with a tag for hostname
if hostName, err := os.Hostname(); err != nil {
sVice.logger.WithTags(tag.Error(err)).Fatal("Error getting hostname")
} else {
sVice.hostName = hostName
}
return sVice
}

// UpdateLoggerWithServiceName tag logging with service name from the top level
func (params *BootstrapParams) UpdateLoggerWithServiceName(name string) {
if params.Logger != nil {
Expand All @@ -175,133 +89,6 @@ func (params *BootstrapParams) UpdateLoggerWithServiceName(name string) {
}
}

// GetHostName returns the name of host running the service
func (h *serviceImpl) GetHostName() string {
return h.hostName
}

// Start starts a yarpc service
func (h *serviceImpl) Start() {
if !atomic.CompareAndSwapInt32(&h.status, common.DaemonStatusInitialized, common.DaemonStatusStarted) {
return
}

var err error

h.metricsScope.Counter(metrics.RestartCount).Inc(1)
h.runtimeMetricsReporter.Start()

if err := h.pprofInitializer.Start(); err != nil {
h.logger.WithTags(tag.Error(err)).Fatal("Failed to start pprof")
}

if err := h.dispatcher.Start(); err != nil {
h.logger.WithTags(tag.Error(err)).Fatal("Failed to start yarpc dispatcher")
}

h.membershipMonitor, err = h.membershipFactory.GetMembershipMonitor()
if err != nil {
h.logger.WithTags(tag.Error(err)).Fatal("Membership monitor creation failed")
}

h.membershipMonitor.Start()

hostInfo, err := h.membershipMonitor.WhoAmI()
if err != nil {
h.logger.WithTags(tag.Error(err)).Fatal("failed to get host info from membership monitor")
}
h.hostInfo = hostInfo

h.clientBean, err = client.NewClientBean(
client.NewRPCClientFactory(h.rpcFactory, h.membershipMonitor, h.metricsClient, h.dynamicCollection, h.numberOfHistoryShards, h.logger),
h.dispatcherProvider,
h.clusterMetadata,
)
if err != nil {
h.logger.WithTags(tag.Error(err)).Fatal("fail to initialize client bean")
}

// The service is now started up
h.logger.Info("service started")
// seed the random generator once for this service
rand.Seed(time.Now().UTC().UnixNano())
}

// Stop closes the associated transport
func (h *serviceImpl) Stop() {
if !atomic.CompareAndSwapInt32(&h.status, common.DaemonStatusStarted, common.DaemonStatusStopped) {
return
}

if h.membershipMonitor != nil {
h.membershipMonitor.Stop()
}

if h.dispatcher != nil {
h.dispatcher.Stop() //nolint:errcheck
}

h.runtimeMetricsReporter.Stop()
}

func (h *serviceImpl) GetLogger() log.Logger {
return h.logger
}

func (h *serviceImpl) GetThrottledLogger() log.Logger {
return h.throttledLogger
}

func (h *serviceImpl) GetMetricsClient() metrics.Client {
return h.metricsClient
}

func (h *serviceImpl) GetClientBean() client.Bean {
return h.clientBean
}

func (h *serviceImpl) GetTimeSource() clock.TimeSource {
return h.timeSource
}

func (h *serviceImpl) GetMembershipMonitor() membership.Monitor {
return h.membershipMonitor
}

func (h *serviceImpl) GetHostInfo() *membership.HostInfo {
return h.hostInfo
}

func (h *serviceImpl) GetDispatcher() *yarpc.Dispatcher {
return h.dispatcher
}

// GetClusterMetadata returns the service cluster metadata
func (h *serviceImpl) GetClusterMetadata() cluster.Metadata {
return h.clusterMetadata
}

// GetMessagingClient returns the messaging client against Kafka
func (h *serviceImpl) GetMessagingClient() messaging.Client {
return h.messagingClient
}

func (h *serviceImpl) GetBlobstoreClient() blobstore.Client {
return h.blobstoreClient
}

func (h *serviceImpl) GetArchivalMetadata() archiver.ArchivalMetadata {
return h.archivalMetadata
}

func (h *serviceImpl) GetArchiverProvider() provider.ArchiverProvider {
return h.archiverProvider
}

func (h *serviceImpl) GetPayloadSerializer() persistence.PayloadSerializer {
return h.serializer
}

// GetMetricsServiceIdx returns the metrics name
func GetMetricsServiceIdx(serviceName string, logger log.Logger) metrics.ServiceIdx {
switch serviceName {
Expand Down
61 changes: 0 additions & 61 deletions common/service/serviceinterfaces.go

This file was deleted.

8 changes: 4 additions & 4 deletions host/onebox.go
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,7 @@ func (c *cadenceImpl) startWorker(hosts map[string][]string, startWG *sync.WaitG
c.logger.Fatal("Failed to get dispatcher for worker", tag.Error(err))
}
params.PublicClient = cwsc.New(dispatcher.ClientConfig(common.FrontendServiceName))
service := service.New(params)
service := NewService(params)
service.Start()

var replicatorDomainCache cache.DomainCache
Expand Down Expand Up @@ -631,7 +631,7 @@ func (c *cadenceImpl) startWorker(hosts map[string][]string, startWG *sync.WaitG
c.shutdownWG.Done()
}

func (c *cadenceImpl) startWorkerReplicator(params *service.BootstrapParams, service service.Service, domainCache cache.DomainCache) {
func (c *cadenceImpl) startWorkerReplicator(params *service.BootstrapParams, service Service, domainCache cache.DomainCache) {
serviceResolver, err := service.GetMembershipMonitor().GetResolver(common.WorkerServiceName)
if err != nil {
c.logger.Fatal("Fail to start replicator when start worker", tag.Error(err))
Expand All @@ -653,7 +653,7 @@ func (c *cadenceImpl) startWorkerReplicator(params *service.BootstrapParams, ser
}
}

func (c *cadenceImpl) startWorkerClientWorker(params *service.BootstrapParams, service service.Service, domainCache cache.DomainCache) {
func (c *cadenceImpl) startWorkerClientWorker(params *service.BootstrapParams, service Service, domainCache cache.DomainCache) {
workerConfig := worker.NewConfig(params)
workerConfig.ArchiverConfig.ArchiverConcurrency = dynamicconfig.GetIntPropertyFn(10)
historyArchiverBootstrapContainer := &carchiver.HistoryBootstrapContainer{
Expand Down Expand Up @@ -684,7 +684,7 @@ func (c *cadenceImpl) startWorkerClientWorker(params *service.BootstrapParams, s
}
}

func (c *cadenceImpl) startWorkerIndexer(params *service.BootstrapParams, service service.Service) {
func (c *cadenceImpl) startWorkerIndexer(params *service.BootstrapParams, service Service) {
params.DynamicConfig.UpdateValue(dynamicconfig.AdvancedVisibilityWritingMode, common.AdvancedVisibilityWritingModeDual)
workerConfig := worker.NewConfig(params)
c.indexer = indexer.NewIndexer(
Expand Down
Loading

0 comments on commit 03945e2

Please sign in to comment.