Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CLI] fix a few things in domain migration command #5374

Merged
merged 5 commits into from
Aug 9, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 56 additions & 73 deletions tools/cli/domainCommands.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,10 @@ func newDomainCLI(
) *domainCLIImpl {
d := &domainCLIImpl{}
d.frontendClient = initializeFrontendClient(c)
d.destinationClient = newClientFactory(func(c *cli.Context) string {
return c.String(FlagDestinationAddress)
}).ServerFrontendClient(c)
d.destinationClient = cFactory.ServerFrontendClientForMigration(c)
if isAdminMode {
d.frontendAdminClient = initializeFrontendAdminClient(c)
d.destinationAdminClient = newClientFactory(func(c *cli.Context) string {
return c.String(FlagDestinationAddress)
}).ServerAdminClient(c)
d.destinationAdminClient = cFactory.ServerAdminClientForMigration(c)
d.domainHandler = initializeAdminDomainHandler(c)
}
return d
Expand Down Expand Up @@ -414,60 +410,59 @@ VisibilityArchivalURI: {{.}}{{end}}
{{with .FailoverInfo}}Graceful failover info:
{{table .}}{{end}}`

var newtemplateDomain = `Validation Check:
var domainMigrationTemplate = `Validation Check:
{{- range .}}
- {{.ValidationCheck}}: {{.ValidationResult}}
{{- with .ValidationDetails}}
{{- with .CurrentDomainRow}}
Current Domain:
Name: {{.DomainInfo.Name}}
UUID: {{.DomainInfo.UUID}}
{{- end}}
{{- with .NewDomainRow}}
New Domain:
Name: {{.DomainInfo.Name}}
UUID: {{.DomainInfo.UUID}}
{{- end}}
{{- if .LongRunningWorkFlowNum}}
Long Running Workflow Num: {{.LongRunningWorkFlowNum}}
{{- end}}
{{- if .MissingCurrSearchAttributes}}
Missing Search Attributes in Current Domain:
{{- range .MissingCurrSearchAttributes}}
- {{.}}
{{- end}}
{{- end}}
{{- if .MissingNewSearchAttributes}}
Missing Search Attributes in New Domain:
{{- range .MissingNewSearchAttributes}}
- {{.}}
{{- end}}
{{- end}}
{{- if ne (len .MismatchedDomainMetaData) 0 }}
Mismatched Domain Meta Data: {{.MismatchedDomainMetaData}}
{{- end }}
{{- range .MismatchedDynamicConfig}}
{{ $dynamicConfig := . }}
Mismatched Dynamic Config:
Config Key: {{.Key}}
{{- range $i, $v := .CurrValues}}
Current Response:
Data: {{ printf "%s" (index $dynamicConfig.CurrValues $i).Value.Data }}
Filters:
{{- range $filter := (index $dynamicConfig.CurrValues $i).Filters}}
- Name: {{ $filter.Name }}
Value: {{ printf "%s" $filter.Value.Data }}
{{- with .CurrentDomainRow}}
Current Domain:
Name: {{.DomainInfo.Name}}
UUID: {{.DomainInfo.UUID}}
{{- end}}
{{- with .NewDomainRow}}
New Domain:
Name: {{.DomainInfo.Name}}
UUID: {{.DomainInfo.UUID}}
{{- end}}
{{- if ne (len .MismatchedDomainMetaData) 0 }}
Mismatched Domain Meta Data: {{.MismatchedDomainMetaData}}
{{- end }}
{{- if .LongRunningWorkFlowNum}}
Long Running Workflow Num: {{.LongRunningWorkFlowNum}}
{{- end}}
{{- if .MissingCurrSearchAttributes}}
Missing Search Attributes in Current Domain:
{{- range .MissingCurrSearchAttributes}}
- {{.}}
{{- end}}
New Response:
Data: {{ printf "%s" (index $dynamicConfig.NewValues $i).Value.Data }}
Filters:
{{- range $filter := (index $dynamicConfig.NewValues $i).Filters}}
- Name: {{ $filter.Name }}
Value: {{ printf "%s" $filter.Value.Data }}
{{- end}}
{{- if .MissingNewSearchAttributes}}
Missing Search Attributes in New Domain:
{{- range .MissingNewSearchAttributes}}
- {{.}}
{{- end}}
{{- end}}
{{- end}}
{{- end}}
{{- range .MismatchedDynamicConfig}}
{{- $dynamicConfig := . }}
- Config Key: {{.Key}}
{{- range $i, $v := .CurrValues}}
Current Response:
Data: {{ printf "%s" (index $dynamicConfig.CurrValues $i).Value.Data }}
Filters:
{{- range $filter := (index $dynamicConfig.CurrValues $i).Filters}}
- Name: {{ $filter.Name }}
Value: {{ printf "%s" $filter.Value.Data }}
{{- end}}
New Response:
Data: {{ printf "%s" (index $dynamicConfig.NewValues $i).Value.Data }}
Filters:
{{- range $filter := (index $dynamicConfig.NewValues $i).Filters}}
- Name: {{ $filter.Name }}
Value: {{ printf "%s" $filter.Value.Data }}
{{- end}}
{{- end}}
{{- end}}
{{- end}}
{{- end}}
`

Expand Down Expand Up @@ -778,26 +773,25 @@ func (d *domainCLIImpl) describeDomain(
}

func (d *domainCLIImpl) migrateDomain(c *cli.Context) {
var results []DomainMigrationRow
checkers := []func(*cli.Context) DomainMigrationRow{
d.migrationDomainMetaDataCheck,
d.migrationDomainWorkFlowCheck,
d.migrationDynamicConfigCheck,
d.searchAttributesChecker,
}
wg := &sync.WaitGroup{}
results := make([]DomainMigrationRow, len(checkers))
for i := range checkers {
wg.Add(1)
go func(i int) {
defer wg.Done()
result := checkers[i](c)
results = append(results, result)
results[i] = checkers[i](c)
}(i)
}
wg.Wait()

renderOpts := RenderOptions{
DefaultTemplate: newtemplateDomain,
DefaultTemplate: domainMigrationTemplate,
Color: true,
Border: true,
PrintDateTime: true,
Expand All @@ -809,9 +803,6 @@ func (d *domainCLIImpl) migrateDomain(c *cli.Context) {
}

func (d *domainCLIImpl) migrationDomainMetaDataCheck(c *cli.Context) DomainMigrationRow {
d.destinationClient = newClientFactory(func(c *cli.Context) string {
return c.String(FlagDestinationAddress)
}).ServerFrontendClient(c)
domain := c.GlobalString(FlagDomain)
newDomain := c.String(FlagDestinationDomain)
ctx, cancel := newContext(c)
Expand Down Expand Up @@ -853,10 +844,7 @@ func metaDataValidation(currResp *types.DescribeDomainResponse, newResp *types.D
}

func (d *domainCLIImpl) migrationDomainWorkFlowCheck(c *cli.Context) DomainMigrationRow {
d.destinationClient = newClientFactory(func(c *cli.Context) string {
return c.String(FlagDestinationAddress)
}).ServerFrontendClient(c)
countWorkFlows := d.countLongRunningWorkflowinDest(c)
countWorkFlows := d.countLongRunningWorkflow(c)
check := countWorkFlows == 0
return DomainMigrationRow{
ValidationCheck: "Workflow Check",
Expand Down Expand Up @@ -901,10 +889,6 @@ func (d *domainCLIImpl) searchAttributesChecker(c *cli.Context) DomainMigrationR
ErrorAndExit("Unable to get search attributes for current domain.", err)
}

d.destinationClient = newClientFactory(func(c *cli.Context) string {
return c.String(FlagDestinationAddress)
}).ServerFrontendClient(c)

// getting search attributes for new domain
destinationSearchAttributes, err := d.destinationClient.GetSearchAttributes(ctx)
if err != nil {
Expand Down Expand Up @@ -1138,7 +1122,6 @@ func toDynamicConfigValue(value *types.DataBlob, filterMaps map[dynamicconfig.Fi
Name: filter.String(),
Value: valueToDataBlob(filterValue),
})
fmt.Println("Data:", string(configFilters[len(configFilters)-1].Value.Data))
}

return &types.DynamicConfigValue{
Expand Down Expand Up @@ -1184,8 +1167,8 @@ func archivalStatus(c *cli.Context, statusFlagName string) *types.ArchivalStatus
return nil
}

func (d *domainCLIImpl) countLongRunningWorkflowinDest(c *cli.Context) int {
domain := getRequiredOption(c, FlagDestinationDomain)
func (d *domainCLIImpl) countLongRunningWorkflow(c *cli.Context) int {
domain := c.GlobalString(FlagDomain)
now := time.Now()
past14Days := now.Add(-14 * 24 * time.Hour)
request := &types.CountWorkflowExecutionsRequest{
Expand All @@ -1194,7 +1177,7 @@ func (d *domainCLIImpl) countLongRunningWorkflowinDest(c *cli.Context) int {
}
ctx, cancel := newContextForLongPoll(c)
defer cancel()
response, err := d.destinationClient.CountWorkflowExecutions(ctx, request)
response, err := d.frontendClient.CountWorkflowExecutions(ctx, request)
if err != nil {
ErrorAndExit("Failed to count workflow.", err)
}
Expand Down
79 changes: 56 additions & 23 deletions tools/cli/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,35 +70,31 @@ type ClientFactory interface {
ServerFrontendClient(c *cli.Context) frontend.Client
ServerAdminClient(c *cli.Context) admin.Client

// ServerFrontendClientForMigration frontend client of the migration destination
ServerFrontendClientForMigration(c *cli.Context) frontend.Client
// ServerAdminClientForMigration admin client of the migration destination
ServerAdminClientForMigration(c *cli.Context) admin.Client

ElasticSearchClient(c *cli.Context) *elastic.Client

ServerConfig(c *cli.Context) (*config.Config, error)
}

type clientFactory struct {
addressFlagFunc func(c *cli.Context) string
hostPort string
dispatcher *yarpc.Dispatcher
logger *zap.Logger
dispatcher *yarpc.Dispatcher
dispatcherMigration *yarpc.Dispatcher
logger *zap.Logger
}

// DEPRECATED don't use, only reserved for backward compatibility purposes
// NewClientFactory creates a new ClientFactory
func NewClientFactory() ClientFactory {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method was marked as deprecated, is there a way to not use it?

Copy link
Contributor Author

@shijiesheng shijiesheng Aug 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it was deprecated in the previous CLI pr but then I realize it should not be deprecated.

return newClientFactory(func(c *cli.Context) string {
return c.GlobalString(FlagAddress)
})
}

func newClientFactory(f func(c *cli.Context) string) ClientFactory {
logger, err := zap.NewDevelopment()
if err != nil {
panic(err)
}

return &clientFactory{
logger: logger,
addressFlagFunc: f,
logger: logger,
}
}

Expand Down Expand Up @@ -139,6 +135,31 @@ func (b *clientFactory) ServerAdminClient(c *cli.Context) admin.Client {
return admin.NewThriftClient(serverAdmin.New(clientConfig))
}

// ServerFrontendClientForMigration builds a frontend client (based on server side thrift interface)
func (b *clientFactory) ServerFrontendClientForMigration(c *cli.Context) frontend.Client {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the code is exactly the same as ServerFrontendClient, should dispatcher be replaced with something else?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch. fixed

b.ensureDispatcherForMigration(c)
clientConfig := b.dispatcherMigration.ClientConfig(cadenceFrontendService)
if c.GlobalString(FlagTransport) == grpcTransport {
return frontend.NewGRPCClient(
apiv1.NewDomainAPIYARPCClient(clientConfig),
apiv1.NewWorkflowAPIYARPCClient(clientConfig),
apiv1.NewWorkerAPIYARPCClient(clientConfig),
apiv1.NewVisibilityAPIYARPCClient(clientConfig),
)
}
return frontend.NewThriftClient(serverFrontend.New(clientConfig))
}

// ServerAdminClientForMigration builds an admin client (based on server side thrift interface)
func (b *clientFactory) ServerAdminClientForMigration(c *cli.Context) admin.Client {
b.ensureDispatcherForMigration(c)
clientConfig := b.dispatcherMigration.ClientConfig(cadenceFrontendService)
if c.GlobalString(FlagTransport) == grpcTransport {
return admin.NewGRPCClient(adminv1.NewAdminAPIYARPCClient(clientConfig))
}
return admin.NewThriftClient(serverAdmin.New(clientConfig))
}

// ElasticSearchClient builds an ElasticSearch client
func (b *clientFactory) ElasticSearchClient(c *cli.Context) *elastic.Client {
url := getRequiredOption(c, FlagURL)
Expand All @@ -159,19 +180,30 @@ func (b *clientFactory) ensureDispatcher(c *cli.Context) {
if b.dispatcher != nil {
return
}
b.dispatcher = b.newClientDispatcher(c, c.GlobalString(FlagAddress))
}

func (b *clientFactory) ensureDispatcherForMigration(c *cli.Context) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this method is not used anywhere

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

used now

if b.dispatcherMigration != nil {
return
}
b.dispatcherMigration = b.newClientDispatcher(c, c.String(FlagDestinationAddress))
}

func (b *clientFactory) newClientDispatcher(c *cli.Context, hostPortOverride string) *yarpc.Dispatcher {
shouldUseGrpc := c.GlobalString(FlagTransport) == grpcTransport

b.hostPort = tchannelPort
hostPort := tchannelPort
if shouldUseGrpc {
b.hostPort = grpcPort
hostPort = grpcPort
}
if addr := b.addressFlagFunc(c); addr != "" {
b.hostPort = addr
if hostPortOverride != "" {
hostPort = hostPortOverride
}
var outbounds transport.Outbounds
if shouldUseGrpc {
grpcTransport := grpc.NewTransport()
outbounds = transport.Outbounds{Unary: grpc.NewTransport().NewSingleOutbound(b.hostPort)}
outbounds = transport.Outbounds{Unary: grpc.NewTransport().NewSingleOutbound(hostPort)}

tlsCertificatePath := c.GlobalString(FlagTLSCertPath)
if tlsCertificatePath != "" {
Expand All @@ -187,29 +219,30 @@ func (b *clientFactory) ensureDispatcher(c *cli.Context) {
RootCAs: caCertPool,
}
tlsCreds := credentials.NewTLS(&tlsConfig)
tlsChooser := peer.NewSingle(hostport.Identify(b.hostPort), grpcTransport.NewDialer(grpc.DialerCredentials(tlsCreds)))
tlsChooser := peer.NewSingle(hostport.Identify(hostPort), grpcTransport.NewDialer(grpc.DialerCredentials(tlsCreds)))
outbounds = transport.Outbounds{Unary: grpc.NewTransport().NewOutbound(tlsChooser)}
}
} else {
ch, err := tchannel.NewChannelTransport(tchannel.ServiceName(cadenceClientName), tchannel.ListenAddr("127.0.0.1:0"))
if err != nil {
b.logger.Fatal("Failed to create transport channel", zap.Error(err))
}
outbounds = transport.Outbounds{Unary: ch.NewSingleOutbound(b.hostPort)}
outbounds = transport.Outbounds{Unary: ch.NewSingleOutbound(hostPort)}
}

b.dispatcher = yarpc.NewDispatcher(yarpc.Config{
dispatcher := yarpc.NewDispatcher(yarpc.Config{
Name: cadenceClientName,
Outbounds: yarpc.Outbounds{cadenceFrontendService: outbounds},
OutboundMiddleware: yarpc.OutboundMiddleware{
Unary: &versionMiddleware{},
},
})

if err := b.dispatcher.Start(); err != nil {
b.dispatcher.Stop()
if err := dispatcher.Start(); err != nil {
dispatcher.Stop()
b.logger.Fatal("Failed to create outbound transport channel: %v", zap.Error(err))
}
return dispatcher
}

type versionMiddleware struct {
Expand Down