-
Notifications
You must be signed in to change notification settings - Fork 800
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CLI] fix a few things in domain migration command #5374
Changes from 4 commits
c7b79a1
81920bd
b607975
27f804d
aeb355f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -70,35 +70,31 @@ type ClientFactory interface { | |
ServerFrontendClient(c *cli.Context) frontend.Client | ||
ServerAdminClient(c *cli.Context) admin.Client | ||
|
||
// ServerFrontendClientForMigration frontend client of the migration destination | ||
ServerFrontendClientForMigration(c *cli.Context) frontend.Client | ||
// ServerAdminClientForMigration admin client of the migration destination | ||
ServerAdminClientForMigration(c *cli.Context) admin.Client | ||
|
||
ElasticSearchClient(c *cli.Context) *elastic.Client | ||
|
||
ServerConfig(c *cli.Context) (*config.Config, error) | ||
} | ||
|
||
type clientFactory struct { | ||
addressFlagFunc func(c *cli.Context) string | ||
hostPort string | ||
dispatcher *yarpc.Dispatcher | ||
logger *zap.Logger | ||
dispatcher *yarpc.Dispatcher | ||
dispatcherMigration *yarpc.Dispatcher | ||
logger *zap.Logger | ||
} | ||
|
||
// DEPRECATED don't use, only reserved for backward compatibility purposes | ||
// NewClientFactory creates a new ClientFactory | ||
func NewClientFactory() ClientFactory { | ||
return newClientFactory(func(c *cli.Context) string { | ||
return c.GlobalString(FlagAddress) | ||
}) | ||
} | ||
|
||
func newClientFactory(f func(c *cli.Context) string) ClientFactory { | ||
logger, err := zap.NewDevelopment() | ||
if err != nil { | ||
panic(err) | ||
} | ||
|
||
return &clientFactory{ | ||
logger: logger, | ||
addressFlagFunc: f, | ||
logger: logger, | ||
} | ||
} | ||
|
||
|
@@ -139,6 +135,31 @@ func (b *clientFactory) ServerAdminClient(c *cli.Context) admin.Client { | |
return admin.NewThriftClient(serverAdmin.New(clientConfig)) | ||
} | ||
|
||
// ServerFrontendClientForMigration builds a frontend client (based on server side thrift interface) | ||
func (b *clientFactory) ServerFrontendClientForMigration(c *cli.Context) frontend.Client { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the code is exactly the same as There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good catch. fixed |
||
b.ensureDispatcherForMigration(c) | ||
clientConfig := b.dispatcherMigration.ClientConfig(cadenceFrontendService) | ||
if c.GlobalString(FlagTransport) == grpcTransport { | ||
return frontend.NewGRPCClient( | ||
apiv1.NewDomainAPIYARPCClient(clientConfig), | ||
apiv1.NewWorkflowAPIYARPCClient(clientConfig), | ||
apiv1.NewWorkerAPIYARPCClient(clientConfig), | ||
apiv1.NewVisibilityAPIYARPCClient(clientConfig), | ||
) | ||
} | ||
return frontend.NewThriftClient(serverFrontend.New(clientConfig)) | ||
} | ||
|
||
// ServerAdminClientForMigration builds an admin client (based on server side thrift interface) | ||
func (b *clientFactory) ServerAdminClientForMigration(c *cli.Context) admin.Client { | ||
b.ensureDispatcherForMigration(c) | ||
clientConfig := b.dispatcherMigration.ClientConfig(cadenceFrontendService) | ||
if c.GlobalString(FlagTransport) == grpcTransport { | ||
return admin.NewGRPCClient(adminv1.NewAdminAPIYARPCClient(clientConfig)) | ||
} | ||
return admin.NewThriftClient(serverAdmin.New(clientConfig)) | ||
} | ||
|
||
// ElasticSearchClient builds an ElasticSearch client | ||
func (b *clientFactory) ElasticSearchClient(c *cli.Context) *elastic.Client { | ||
url := getRequiredOption(c, FlagURL) | ||
|
@@ -159,19 +180,30 @@ func (b *clientFactory) ensureDispatcher(c *cli.Context) { | |
if b.dispatcher != nil { | ||
return | ||
} | ||
b.dispatcher = b.newClientDispatcher(c, c.GlobalString(FlagAddress)) | ||
} | ||
|
||
func (b *clientFactory) ensureDispatcherForMigration(c *cli.Context) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this method is not used anywhere There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. used now |
||
if b.dispatcherMigration != nil { | ||
return | ||
} | ||
b.dispatcherMigration = b.newClientDispatcher(c, c.String(FlagDestinationAddress)) | ||
} | ||
|
||
func (b *clientFactory) newClientDispatcher(c *cli.Context, hostPortOverride string) *yarpc.Dispatcher { | ||
shouldUseGrpc := c.GlobalString(FlagTransport) == grpcTransport | ||
|
||
b.hostPort = tchannelPort | ||
hostPort := tchannelPort | ||
if shouldUseGrpc { | ||
b.hostPort = grpcPort | ||
hostPort = grpcPort | ||
} | ||
if addr := b.addressFlagFunc(c); addr != "" { | ||
b.hostPort = addr | ||
if hostPortOverride != "" { | ||
hostPort = hostPortOverride | ||
} | ||
var outbounds transport.Outbounds | ||
if shouldUseGrpc { | ||
grpcTransport := grpc.NewTransport() | ||
outbounds = transport.Outbounds{Unary: grpc.NewTransport().NewSingleOutbound(b.hostPort)} | ||
outbounds = transport.Outbounds{Unary: grpc.NewTransport().NewSingleOutbound(hostPort)} | ||
|
||
tlsCertificatePath := c.GlobalString(FlagTLSCertPath) | ||
if tlsCertificatePath != "" { | ||
|
@@ -187,29 +219,30 @@ func (b *clientFactory) ensureDispatcher(c *cli.Context) { | |
RootCAs: caCertPool, | ||
} | ||
tlsCreds := credentials.NewTLS(&tlsConfig) | ||
tlsChooser := peer.NewSingle(hostport.Identify(b.hostPort), grpcTransport.NewDialer(grpc.DialerCredentials(tlsCreds))) | ||
tlsChooser := peer.NewSingle(hostport.Identify(hostPort), grpcTransport.NewDialer(grpc.DialerCredentials(tlsCreds))) | ||
outbounds = transport.Outbounds{Unary: grpc.NewTransport().NewOutbound(tlsChooser)} | ||
} | ||
} else { | ||
ch, err := tchannel.NewChannelTransport(tchannel.ServiceName(cadenceClientName), tchannel.ListenAddr("127.0.0.1:0")) | ||
if err != nil { | ||
b.logger.Fatal("Failed to create transport channel", zap.Error(err)) | ||
} | ||
outbounds = transport.Outbounds{Unary: ch.NewSingleOutbound(b.hostPort)} | ||
outbounds = transport.Outbounds{Unary: ch.NewSingleOutbound(hostPort)} | ||
} | ||
|
||
b.dispatcher = yarpc.NewDispatcher(yarpc.Config{ | ||
dispatcher := yarpc.NewDispatcher(yarpc.Config{ | ||
Name: cadenceClientName, | ||
Outbounds: yarpc.Outbounds{cadenceFrontendService: outbounds}, | ||
OutboundMiddleware: yarpc.OutboundMiddleware{ | ||
Unary: &versionMiddleware{}, | ||
}, | ||
}) | ||
|
||
if err := b.dispatcher.Start(); err != nil { | ||
b.dispatcher.Stop() | ||
if err := dispatcher.Start(); err != nil { | ||
dispatcher.Stop() | ||
b.logger.Fatal("Failed to create outbound transport channel: %v", zap.Error(err)) | ||
} | ||
return dispatcher | ||
} | ||
|
||
type versionMiddleware struct { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The method was marked as deprecated, is there a way to not use it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it was deprecated in the previous CLI pr but then I realize it should not be deprecated.