diff --git a/tools/cli/app_test.go b/tools/cli/app_test.go index a2bcb598e1b..da97e857044 100644 --- a/tools/cli/app_test.go +++ b/tools/cli/app_test.go @@ -60,6 +60,14 @@ func (m *clientFactoryMock) ServerAdminClient(c *cli.Context) admin.Client { return m.serverAdminClient } +func (m *clientFactoryMock) ServerFrontendClientForMigration(c *cli.Context) frontend.Client { + panic("not implemented") +} + +func (m *clientFactoryMock) ServerAdminClientForMigration(c *cli.Context) admin.Client { + panic("not implemented") +} + func (m *clientFactoryMock) ElasticSearchClient(c *cli.Context) *elastic.Client { panic("not implemented") } diff --git a/tools/cli/domain.go b/tools/cli/domain.go index c9f5b20ac55..4db0a20cb92 100644 --- a/tools/cli/domain.go +++ b/tools/cli/domain.go @@ -83,14 +83,13 @@ func newDomainCommands() []cli.Command { newDomainCLI(c, false).DescribeDomain(c) }, }, - { Name: "migration", Aliases: []string{"mi"}, Usage: "Migrate existing domain to new domain. This command only validates the settings. It does not perform actual data migration", Flags: migrateDomainFlags, Action: func(c *cli.Context) { - newDomainCLI(c, true).MigrateDomain(c) + newDomainMigrationCLIImpl(c).Validation(c) }, }, } diff --git a/tools/cli/domainCommands.go b/tools/cli/domainCommands.go index 91ef2ceb7da..ab2e359c9e7 100644 --- a/tools/cli/domainCommands.go +++ b/tools/cli/domainCommands.go @@ -25,10 +25,8 @@ import ( "encoding/json" "errors" "fmt" - "reflect" "strconv" "strings" - "sync" "time" "github.com/uber/cadence/client/admin" @@ -42,7 +40,6 @@ import ( "github.com/uber/cadence/client/frontend" "github.com/uber/cadence/common" "github.com/uber/cadence/common/domain" - dc "github.com/uber/cadence/common/dynamicconfig" ) var ( @@ -52,12 +49,8 @@ var ( type ( domainCLIImpl struct { // used when making RPC call to frontend service - frontendClient frontend.Client - // used by migration command to make RPC call to frontend service of the destination domain - destinationClient frontend.Client - - frontendAdminClient admin.Client - destinationAdminClient admin.Client + frontendClient frontend.Client + frontendAdminClient admin.Client // act as admin to modify domain in DB directly domainHandler domain.Handler @@ -71,14 +64,8 @@ func newDomainCLI( ) *domainCLIImpl { d := &domainCLIImpl{} d.frontendClient = initializeFrontendClient(c) - d.destinationClient = newClientFactory(func(c *cli.Context) string { - return c.String(FlagDestinationAddress) - }).ServerFrontendClient(c) if isAdminMode { d.frontendAdminClient = initializeFrontendAdminClient(c) - d.destinationAdminClient = newClientFactory(func(c *cli.Context) string { - return c.String(FlagDestinationAddress) - }).ServerAdminClient(c) d.domainHandler = initializeAdminDomainHandler(c) } return d @@ -414,63 +401,6 @@ VisibilityArchivalURI: {{.}}{{end}} {{with .FailoverInfo}}Graceful failover info: {{table .}}{{end}}` -var newtemplateDomain = `Validation Check: -{{- range .}} -- {{.ValidationCheck}}: {{.ValidationResult}} -{{- with .ValidationDetails}} -{{- with .CurrentDomainRow}} -Current Domain: - Name: {{.DomainInfo.Name}} - UUID: {{.DomainInfo.UUID}} -{{- end}} -{{- with .NewDomainRow}} -New Domain: - Name: {{.DomainInfo.Name}} - UUID: {{.DomainInfo.UUID}} -{{- end}} -{{- if .LongRunningWorkFlowNum}} -Long Running Workflow Num: {{.LongRunningWorkFlowNum}} -{{- end}} -{{- if .MissingCurrSearchAttributes}} -Missing Search Attributes in Current Domain: -{{- range .MissingCurrSearchAttributes}} - - {{.}} -{{- end}} -{{- end}} -{{- if .MissingNewSearchAttributes}} -Missing Search Attributes in New Domain: -{{- range .MissingNewSearchAttributes}} - - {{.}} -{{- end}} -{{- end}} -{{- if ne (len .MismatchedDomainMetaData) 0 }} -Mismatched Domain Meta Data: {{.MismatchedDomainMetaData}} -{{- end }} -{{- range .MismatchedDynamicConfig}} -{{ $dynamicConfig := . }} -Mismatched Dynamic Config: -Config Key: {{.Key}} - {{- range $i, $v := .CurrValues}} - Current Response: - Data: {{ printf "%s" (index $dynamicConfig.CurrValues $i).Value.Data }} - Filters: - {{- range $filter := (index $dynamicConfig.CurrValues $i).Filters}} - - Name: {{ $filter.Name }} - Value: {{ printf "%s" $filter.Value.Data }} - {{- end}} - New Response: - Data: {{ printf "%s" (index $dynamicConfig.NewValues $i).Value.Data }} - Filters: - {{- range $filter := (index $dynamicConfig.NewValues $i).Filters}} - - Name: {{ $filter.Name }} - Value: {{ printf "%s" $filter.Value.Data }} - {{- end}} - {{- end}} -{{- end}} -{{- end}} -{{- end}} -` - // DescribeDomain updates a domain func (d *domainCLIImpl) DescribeDomain(c *cli.Context) { domainName := c.GlobalString(FlagDomain) @@ -515,10 +445,6 @@ func (d *domainCLIImpl) DescribeDomain(c *cli.Context) { }) } -func (d *domainCLIImpl) MigrateDomain(c *cli.Context) { - d.migrateDomain(c) -} - type BadBinaryRow struct { Checksum string `header:"Binary Checksum"` Operator string `header:"Operator"` @@ -777,399 +703,6 @@ func (d *domainCLIImpl) describeDomain( return d.domainHandler.DescribeDomain(ctx, request) } -func (d *domainCLIImpl) migrateDomain(c *cli.Context) { - var results []DomainMigrationRow - checkers := []func(*cli.Context) DomainMigrationRow{ - d.migrationDomainMetaDataCheck, - d.migrationDomainWorkFlowCheck, - d.migrationDynamicConfigCheck, - d.searchAttributesChecker, - } - wg := &sync.WaitGroup{} - for i := range checkers { - wg.Add(1) - go func(i int) { - defer wg.Done() - result := checkers[i](c) - results = append(results, result) - }(i) - } - wg.Wait() - - renderOpts := RenderOptions{ - DefaultTemplate: newtemplateDomain, - Color: true, - Border: true, - PrintDateTime: true, - } - - if err := Render(c, results, renderOpts); err != nil { - ErrorAndExit("Failed to render", err) - } -} - -func (d *domainCLIImpl) migrationDomainMetaDataCheck(c *cli.Context) DomainMigrationRow { - d.destinationClient = newClientFactory(func(c *cli.Context) string { - return c.String(FlagDestinationAddress) - }).ServerFrontendClient(c) - domain := c.GlobalString(FlagDomain) - newDomain := c.String(FlagDestinationDomain) - ctx, cancel := newContext(c) - defer cancel() - currResp, err := d.frontendClient.DescribeDomain(ctx, &types.DescribeDomainRequest{ - Name: &domain, - }) - if err != nil { - ErrorAndExit(fmt.Sprintf("Could not describe old domain, Please check to see if old domain exists before migrating."), err) - } - newResp, err := d.destinationClient.DescribeDomain(ctx, &types.DescribeDomainRequest{ - Name: &newDomain, - }) - if err != nil { - ErrorAndExit(fmt.Sprintf("Could not describe new domain, Please check to see if new domain exists before migrating."), err) - } - validationResult, mismatchedMetaData := metaDataValidation(currResp, newResp) - validationRow := DomainMigrationRow{ - ValidationCheck: "Domain Meta Data", - ValidationResult: validationResult, - ValidationDetails: ValidationDetails{ - CurrentDomainRow: currResp, - NewDomainRow: newResp, - MismatchedDomainMetaData: mismatchedMetaData, - }, - } - return validationRow -} - -func metaDataValidation(currResp *types.DescribeDomainResponse, newResp *types.DescribeDomainResponse) (bool, string) { - if !reflect.DeepEqual(currResp.Configuration, newResp.Configuration) { - return false, "mismatched DomainConfiguration" - } - - if currResp.DomainInfo.OwnerEmail != newResp.DomainInfo.OwnerEmail { - return false, "mismatched OwnerEmail" - } - return true, "" -} - -func (d *domainCLIImpl) migrationDomainWorkFlowCheck(c *cli.Context) DomainMigrationRow { - d.destinationClient = newClientFactory(func(c *cli.Context) string { - return c.String(FlagDestinationAddress) - }).ServerFrontendClient(c) - countWorkFlows := d.countLongRunningWorkflowinDest(c) - check := countWorkFlows == 0 - return DomainMigrationRow{ - ValidationCheck: "Workflow Check", - ValidationResult: check, - ValidationDetails: ValidationDetails{ - LongRunningWorkFlowNum: &countWorkFlows, - }, - } -} - -func (d *domainCLIImpl) searchAttributesChecker(c *cli.Context) DomainMigrationRow { - ctx, cancel := newContext(c) - defer cancel() - - // getting user provided search attributes - searchAttributes := c.StringSlice(FlagSearchAttribute) - if len(searchAttributes) == 0 { - return DomainMigrationRow{ - ValidationCheck: "Search Attributes Check", - ValidationResult: true, - } - } - - // Parse the provided search attributes into a map[string]IndexValueType - requiredAttributes := make(map[string]types.IndexedValueType) - for _, attr := range searchAttributes { - parts := strings.SplitN(attr, ":", 2) - if len(parts) != 2 { - ErrorAndExit(fmt.Sprintf("Invalid search attribute format: %s", attr), nil) - } - key, valueType := parts[0], parts[1] - ivt, err := parseIndexedValueType(valueType) - if err != nil { - ErrorAndExit(fmt.Sprintf("Invalid search attribute type for %s: %s", key, valueType), err) - } - requiredAttributes[key] = ivt - } - - // getting search attributes for current domain - currentSearchAttributes, err := d.frontendClient.GetSearchAttributes(ctx) - if err != nil { - ErrorAndExit("Unable to get search attributes for current domain.", err) - } - - d.destinationClient = newClientFactory(func(c *cli.Context) string { - return c.String(FlagDestinationAddress) - }).ServerFrontendClient(c) - - // getting search attributes for new domain - destinationSearchAttributes, err := d.destinationClient.GetSearchAttributes(ctx) - if err != nil { - ErrorAndExit("Unable to get search attributes for new domain.", err) - } - - currentSearchAttrs := currentSearchAttributes.Keys - destinationSearchAttrs := destinationSearchAttributes.Keys - - // checking to see if search attributes exist - missingInCurrent := findMissingAttributes(requiredAttributes, currentSearchAttrs) - missingInNew := findMissingAttributes(requiredAttributes, destinationSearchAttrs) - - validationResult := len(missingInCurrent) == 0 && len(missingInNew) == 0 - - validationRow := DomainMigrationRow{ - ValidationCheck: "Search Attributes Check", - ValidationResult: validationResult, - ValidationDetails: ValidationDetails{ - MissingCurrSearchAttributes: missingInCurrent, - MissingNewSearchAttributes: missingInNew, - }, - } - - return validationRow -} - -// helper to parse types.IndexedValueType from string -func parseIndexedValueType(valueType string) (types.IndexedValueType, error) { - var result types.IndexedValueType - valueTypeBytes := []byte(valueType) - if err := result.UnmarshalText(valueTypeBytes); err != nil { - return 0, err - } - return result, nil -} - -// finds missing attributed in a map of existing attributed based on required attributes -func findMissingAttributes(requiredAttributes map[string]types.IndexedValueType, existingAttributes map[string]types.IndexedValueType) []string { - missingAttributes := make([]string, 0) - for key, requiredType := range requiredAttributes { - existingType, ok := existingAttributes[key] - if !ok || existingType != requiredType { - // construct the key:type string format - attr := fmt.Sprintf("%s:%s", key, requiredType) - missingAttributes = append(missingAttributes, attr) - } - } - return missingAttributes -} - -func (d *domainCLIImpl) migrationDynamicConfigCheck(c *cli.Context) DomainMigrationRow { - var mismatchedConfigs []MismatchedDynamicConfig - check := true - - resp := dynamicconfig.ListAllProductionKeys() - - currDomain := c.GlobalString(FlagDomain) - newDomain := c.String(FlagDestinationDomain) - - ctx, cancel := newContext(c) - defer cancel() - - currentDomainID := d.getDomainID(ctx, currDomain) - destinationDomainID := d.getDomainID(ctx, newDomain) - if currentDomainID == "" { - ErrorAndExit("Failed to get domainID for the current domain.", nil) - } - - if destinationDomainID == "" { - ErrorAndExit("Failed to get domainID for the destination domain.", nil) - } - - for _, configKey := range resp { - if len(configKey.Filters()) == 1 && configKey.Filters()[0] == dc.DomainName { - // Validate dynamic configs with only domainName filter - currRequest := dynamicconfig.ToGetDynamicConfigFilterRequest(configKey.String(), []dynamicconfig.FilterOption{ - dynamicconfig.DomainFilter(currDomain), - }) - - newRequest := dynamicconfig.ToGetDynamicConfigFilterRequest(configKey.String(), []dynamicconfig.FilterOption{ - dynamicconfig.DomainFilter(newDomain), - }) - - currResp, err := d.frontendAdminClient.GetDynamicConfig(ctx, currRequest) - if err != nil { - // empty to indicate N/A - currResp = &types.GetDynamicConfigResponse{} - } - newResp, err := d.destinationAdminClient.GetDynamicConfig(ctx, newRequest) - if err != nil { - // empty to indicate N/A - newResp = &types.GetDynamicConfigResponse{} - } - - if !reflect.DeepEqual(currResp.Value, newResp.Value) { - check = false - mismatchedConfigs = append(mismatchedConfigs, MismatchedDynamicConfig{ - Key: configKey, - CurrValues: []*types.DynamicConfigValue{ - toDynamicConfigValue(currResp.Value, map[dc.Filter]interface{}{ - dynamicconfig.DomainName: currDomain, - }), - }, - NewValues: []*types.DynamicConfigValue{ - toDynamicConfigValue(newResp.Value, map[dc.Filter]interface{}{ - dynamicconfig.DomainName: newDomain, - }), - }, - }) - } - - } else if len(configKey.Filters()) == 1 && configKey.Filters()[0] == dc.DomainID { - // Validate dynamic configs with only domainID filter - currRequest := dynamicconfig.ToGetDynamicConfigFilterRequest(configKey.String(), []dynamicconfig.FilterOption{ - dynamicconfig.DomainIDFilter(currentDomainID), - }) - - newRequest := dynamicconfig.ToGetDynamicConfigFilterRequest(configKey.String(), []dynamicconfig.FilterOption{ - dynamicconfig.DomainIDFilter(destinationDomainID), - }) - - currResp, err := d.frontendAdminClient.GetDynamicConfig(ctx, currRequest) - if err != nil { - // empty to indicate N/A - currResp = &types.GetDynamicConfigResponse{} - } - newResp, err := d.destinationAdminClient.GetDynamicConfig(ctx, newRequest) - if err != nil { - // empty to indicate N/A - newResp = &types.GetDynamicConfigResponse{} - } - - if !reflect.DeepEqual(currResp.Value, newResp.Value) { - check = false - mismatchedConfigs = append(mismatchedConfigs, MismatchedDynamicConfig{ - Key: configKey, - CurrValues: []*types.DynamicConfigValue{ - toDynamicConfigValue(currResp.Value, map[dc.Filter]interface{}{ - dynamicconfig.DomainID: currentDomainID, - }), - }, - NewValues: []*types.DynamicConfigValue{ - toDynamicConfigValue(newResp.Value, map[dc.Filter]interface{}{ - dynamicconfig.DomainID: destinationDomainID, - }), - }, - }) - } - - } else if containsFilter(configKey, dc.DomainName.String()) && containsFilter(configKey, dc.TaskListName.String()) { - // Validate dynamic configs with only domainName and TaskList filters - taskLists := c.StringSlice(FlagTaskList) - var mismatchedCurValues []*types.DynamicConfigValue - var mismatchedNewValues []*types.DynamicConfigValue - for _, taskList := range taskLists { - - currRequest := dynamicconfig.ToGetDynamicConfigFilterRequest(configKey.String(), []dynamicconfig.FilterOption{ - dynamicconfig.DomainFilter(currDomain), - dynamicconfig.TaskListFilter(taskList), - }) - - newRequest := dynamicconfig.ToGetDynamicConfigFilterRequest(configKey.String(), []dynamicconfig.FilterOption{ - dynamicconfig.DomainFilter(newDomain), - dynamicconfig.TaskListFilter(taskList), - }) - - currResp, err := d.frontendAdminClient.GetDynamicConfig(ctx, currRequest) - if err != nil { - // empty to indicate N/A - currResp = &types.GetDynamicConfigResponse{} - } - newResp, err := d.destinationAdminClient.GetDynamicConfig(ctx, newRequest) - if err != nil { - // empty to indicate N/A - newResp = &types.GetDynamicConfigResponse{} - } - - if !reflect.DeepEqual(currResp.Value, newResp.Value) { - check = false - mismatchedCurValues = append(mismatchedCurValues, toDynamicConfigValue(currResp.Value, map[dc.Filter]interface{}{ - dynamicconfig.DomainName: currDomain, - dynamicconfig.TaskListName: taskLists, - })) - mismatchedNewValues = append(mismatchedNewValues, toDynamicConfigValue(newResp.Value, map[dc.Filter]interface{}{ - dynamicconfig.DomainName: newDomain, - dynamicconfig.TaskListName: taskLists, - })) - - } - } - if len(mismatchedCurValues) > 0 && len(mismatchedNewValues) > 0 { - mismatchedConfigs = append(mismatchedConfigs, MismatchedDynamicConfig{ - Key: configKey, - CurrValues: mismatchedCurValues, - NewValues: mismatchedNewValues, - }) - } - } - } - - validationRow := DomainMigrationRow{ - ValidationCheck: "Dynamic Config Check", - ValidationResult: check, - ValidationDetails: ValidationDetails{ - MismatchedDynamicConfig: mismatchedConfigs, - }, - } - - return validationRow -} - -func valueToDataBlob(value interface{}) *types.DataBlob { - if value == nil { - return nil - } - // No need to handle error as this is a private helper method - // where the correct value will always be passed regardless - data, _ := json.Marshal(value) - - return &types.DataBlob{ - EncodingType: types.EncodingTypeJSON.Ptr(), - Data: data, - } -} - -func toDynamicConfigValue(value *types.DataBlob, filterMaps map[dynamicconfig.Filter]interface{}) *types.DynamicConfigValue { - var configFilters []*types.DynamicConfigFilter - for filter, filterValue := range filterMaps { - configFilters = append(configFilters, &types.DynamicConfigFilter{ - Name: filter.String(), - Value: valueToDataBlob(filterValue), - }) - fmt.Println("Data:", string(configFilters[len(configFilters)-1].Value.Data)) - } - - return &types.DynamicConfigValue{ - Value: value, - Filters: configFilters, - } -} - -func containsFilter(key dynamicconfig.Key, value string) bool { - filters := key.Filters() - for _, filter := range filters { - if filter.String() == value { - return true - } - } - return false -} - -func (d *domainCLIImpl) getDomainID(c context.Context, domain string) string { - request := &types.DescribeDomainRequest{ - Name: &domain, - } - - resp, err := d.describeDomain(c, request) - if err != nil { - ErrorAndExit("Failed to describe domain.", err) - } - - return resp.DomainInfo.GetUUID() -} - func archivalStatus(c *cli.Context, statusFlagName string) *types.ArchivalStatus { if c.IsSet(statusFlagName) { switch c.String(statusFlagName) { @@ -1184,23 +717,6 @@ func archivalStatus(c *cli.Context, statusFlagName string) *types.ArchivalStatus return nil } -func (d *domainCLIImpl) countLongRunningWorkflowinDest(c *cli.Context) int { - domain := getRequiredOption(c, FlagDestinationDomain) - now := time.Now() - past14Days := now.Add(-14 * 24 * time.Hour) - request := &types.CountWorkflowExecutionsRequest{ - Domain: domain, - Query: "CloseTime=missing AND StartTime < " + strconv.FormatInt(past14Days.UnixNano(), 10), - } - ctx, cancel := newContextForLongPoll(c) - defer cancel() - response, err := d.destinationClient.CountWorkflowExecutions(ctx, request) - if err != nil { - ErrorAndExit("Failed to count workflow.", err) - } - return int(response.GetCount()) -} - func clustersToStrings(clusters []*types.ClusterReplicationConfiguration) []string { var res []string for _, cluster := range clusters { diff --git a/tools/cli/domainMigrationCommand.go b/tools/cli/domainMigrationCommand.go new file mode 100644 index 00000000000..540d46a5743 --- /dev/null +++ b/tools/cli/domainMigrationCommand.go @@ -0,0 +1,506 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package cli + +import ( + "context" + "encoding/json" + "fmt" + "reflect" + "strconv" + "strings" + "sync" + "time" + + "github.com/urfave/cli" + + "github.com/uber/cadence/client/admin" + "github.com/uber/cadence/client/frontend" + "github.com/uber/cadence/common/dynamicconfig" + dc "github.com/uber/cadence/common/dynamicconfig" + "github.com/uber/cadence/common/types" +) + +var domainMigrationTemplate = `Validation Check: +{{- range .}} +- {{.ValidationCheck}}: {{.ValidationResult}} +{{- with .ValidationDetails}} + {{- with .CurrentDomainRow}} + Current Domain: + Name: {{.DomainInfo.Name}} + UUID: {{.DomainInfo.UUID}} + {{- end}} + {{- with .NewDomainRow}} + New Domain: + Name: {{.DomainInfo.Name}} + UUID: {{.DomainInfo.UUID}} + {{- end}} + {{- if ne (len .MismatchedDomainMetaData) 0 }} + Mismatched Domain Meta Data: {{.MismatchedDomainMetaData}} + {{- end }} + {{- if .LongRunningWorkFlowNum}} + Long Running Workflow Num: {{.LongRunningWorkFlowNum}} + {{- end}} + {{- if .MissingCurrSearchAttributes}} + Missing Search Attributes in Current Domain: + {{- range .MissingCurrSearchAttributes}} + - {{.}} + {{- end}} + {{- end}} + {{- if .MissingNewSearchAttributes}} + Missing Search Attributes in New Domain: + {{- range .MissingNewSearchAttributes}} + - {{.}} + {{- end}} + {{- end}} + {{- range .MismatchedDynamicConfig}} + {{- $dynamicConfig := . }} + - Config Key: {{.Key}} + {{- range $i, $v := .CurrValues}} + Current Response: + Data: {{ printf "%s" (index $dynamicConfig.CurrValues $i).Value.Data }} + Filters: + {{- range $filter := (index $dynamicConfig.CurrValues $i).Filters}} + - Name: {{ $filter.Name }} + Value: {{ printf "%s" $filter.Value.Data }} + {{- end}} + New Response: + Data: {{ printf "%s" (index $dynamicConfig.NewValues $i).Value.Data }} + Filters: + {{- range $filter := (index $dynamicConfig.NewValues $i).Filters}} + - Name: {{ $filter.Name }} + Value: {{ printf "%s" $filter.Value.Data }} + {{- end}} + {{- end}} + {{- end}} + {{- end}} +{{- end}} +` + +type domainMigrationCLIImpl struct { + frontendClient, destinationClient frontend.Client + frontendAdminClient, destinationAdminClient admin.Client +} + +func newDomainMigrationCLIImpl(c *cli.Context) *domainMigrationCLIImpl { + return &domainMigrationCLIImpl{ + frontendClient: cFactory.ServerFrontendClient(c), + destinationClient: cFactory.ServerFrontendClientForMigration(c), + frontendAdminClient: cFactory.ServerAdminClient(c), + destinationAdminClient: cFactory.ServerAdminClientForMigration(c), + } +} + +func (d *domainMigrationCLIImpl) Validation(c *cli.Context) { + checkers := []func(*cli.Context) DomainMigrationRow{ + d.migrationDomainMetaDataCheck, + d.migrationDomainWorkFlowCheck, + d.migrationDynamicConfigCheck, + d.searchAttributesChecker, + } + wg := &sync.WaitGroup{} + results := make([]DomainMigrationRow, len(checkers)) + for i := range checkers { + wg.Add(1) + go func(i int) { + defer wg.Done() + results[i] = checkers[i](c) + }(i) + } + wg.Wait() + + renderOpts := RenderOptions{ + DefaultTemplate: domainMigrationTemplate, + Color: true, + Border: true, + PrintDateTime: true, + } + + if err := Render(c, results, renderOpts); err != nil { + ErrorAndExit("Failed to render", err) + } +} + +func (d *domainMigrationCLIImpl) migrationDomainMetaDataCheck(c *cli.Context) DomainMigrationRow { + domain := c.GlobalString(FlagDomain) + newDomain := c.String(FlagDestinationDomain) + ctx, cancel := newContext(c) + defer cancel() + currResp, err := d.frontendClient.DescribeDomain(ctx, &types.DescribeDomainRequest{ + Name: &domain, + }) + if err != nil { + ErrorAndExit(fmt.Sprintf("Could not describe old domain, Please check to see if old domain exists before migrating."), err) + } + newResp, err := d.destinationClient.DescribeDomain(ctx, &types.DescribeDomainRequest{ + Name: &newDomain, + }) + if err != nil { + ErrorAndExit(fmt.Sprintf("Could not describe new domain, Please check to see if new domain exists before migrating."), err) + } + validationResult, mismatchedMetaData := metaDataValidation(currResp, newResp) + validationRow := DomainMigrationRow{ + ValidationCheck: "Domain Meta Data", + ValidationResult: validationResult, + ValidationDetails: ValidationDetails{ + CurrentDomainRow: currResp, + NewDomainRow: newResp, + MismatchedDomainMetaData: mismatchedMetaData, + }, + } + return validationRow +} + +func metaDataValidation(currResp *types.DescribeDomainResponse, newResp *types.DescribeDomainResponse) (bool, string) { + if !reflect.DeepEqual(currResp.Configuration, newResp.Configuration) { + return false, "mismatched DomainConfiguration" + } + + if currResp.DomainInfo.OwnerEmail != newResp.DomainInfo.OwnerEmail { + return false, "mismatched OwnerEmail" + } + return true, "" +} + +func (d *domainMigrationCLIImpl) migrationDomainWorkFlowCheck(c *cli.Context) DomainMigrationRow { + countWorkFlows := d.countLongRunningWorkflow(c) + check := countWorkFlows == 0 + return DomainMigrationRow{ + ValidationCheck: "Workflow Check", + ValidationResult: check, + ValidationDetails: ValidationDetails{ + LongRunningWorkFlowNum: &countWorkFlows, + }, + } +} + +func (d *domainMigrationCLIImpl) countLongRunningWorkflow(c *cli.Context) int { + domain := c.GlobalString(FlagDomain) + now := time.Now() + past14Days := now.Add(-14 * 24 * time.Hour) + request := &types.CountWorkflowExecutionsRequest{ + Domain: domain, + Query: "CloseTime=missing AND StartTime < " + strconv.FormatInt(past14Days.UnixNano(), 10), + } + ctx, cancel := newContextForLongPoll(c) + defer cancel() + response, err := d.frontendClient.CountWorkflowExecutions(ctx, request) + if err != nil { + ErrorAndExit("Failed to count workflow.", err) + } + return int(response.GetCount()) +} + +func (d *domainMigrationCLIImpl) searchAttributesChecker(c *cli.Context) DomainMigrationRow { + ctx, cancel := newContext(c) + defer cancel() + + // getting user provided search attributes + searchAttributes := c.StringSlice(FlagSearchAttribute) + if len(searchAttributes) == 0 { + return DomainMigrationRow{ + ValidationCheck: "Search Attributes Check", + ValidationResult: true, + } + } + + // Parse the provided search attributes into a map[string]IndexValueType + requiredAttributes := make(map[string]types.IndexedValueType) + for _, attr := range searchAttributes { + parts := strings.SplitN(attr, ":", 2) + if len(parts) != 2 { + ErrorAndExit(fmt.Sprintf("Invalid search attribute format: %s", attr), nil) + } + key, valueType := parts[0], parts[1] + ivt, err := parseIndexedValueType(valueType) + if err != nil { + ErrorAndExit(fmt.Sprintf("Invalid search attribute type for %s: %s", key, valueType), err) + } + requiredAttributes[key] = ivt + } + + // getting search attributes for current domain + currentSearchAttributes, err := d.frontendClient.GetSearchAttributes(ctx) + if err != nil { + ErrorAndExit("Unable to get search attributes for current domain.", err) + } + + // getting search attributes for new domain + destinationSearchAttributes, err := d.destinationClient.GetSearchAttributes(ctx) + if err != nil { + ErrorAndExit("Unable to get search attributes for new domain.", err) + } + + currentSearchAttrs := currentSearchAttributes.Keys + destinationSearchAttrs := destinationSearchAttributes.Keys + + // checking to see if search attributes exist + missingInCurrent := findMissingAttributes(requiredAttributes, currentSearchAttrs) + missingInNew := findMissingAttributes(requiredAttributes, destinationSearchAttrs) + + validationResult := len(missingInCurrent) == 0 && len(missingInNew) == 0 + + validationRow := DomainMigrationRow{ + ValidationCheck: "Search Attributes Check", + ValidationResult: validationResult, + ValidationDetails: ValidationDetails{ + MissingCurrSearchAttributes: missingInCurrent, + MissingNewSearchAttributes: missingInNew, + }, + } + + return validationRow +} + +// helper to parse types.IndexedValueType from string +func parseIndexedValueType(valueType string) (types.IndexedValueType, error) { + var result types.IndexedValueType + valueTypeBytes := []byte(valueType) + if err := result.UnmarshalText(valueTypeBytes); err != nil { + return 0, err + } + return result, nil +} + +// finds missing attributed in a map of existing attributed based on required attributes +func findMissingAttributes(requiredAttributes map[string]types.IndexedValueType, existingAttributes map[string]types.IndexedValueType) []string { + missingAttributes := make([]string, 0) + for key, requiredType := range requiredAttributes { + existingType, ok := existingAttributes[key] + if !ok || existingType != requiredType { + // construct the key:type string format + attr := fmt.Sprintf("%s:%s", key, requiredType) + missingAttributes = append(missingAttributes, attr) + } + } + return missingAttributes +} + +func (d *domainMigrationCLIImpl) migrationDynamicConfigCheck(c *cli.Context) DomainMigrationRow { + var mismatchedConfigs []MismatchedDynamicConfig + check := true + + resp := dynamicconfig.ListAllProductionKeys() + + currDomain := c.GlobalString(FlagDomain) + newDomain := c.String(FlagDestinationDomain) + + ctx, cancel := newContext(c) + defer cancel() + + currentDomainID := getDomainID(ctx, currDomain, d.frontendClient) + destinationDomainID := getDomainID(ctx, newDomain, d.destinationClient) + if currentDomainID == "" { + ErrorAndExit("Failed to get domainID for the current domain.", nil) + } + + if destinationDomainID == "" { + ErrorAndExit("Failed to get domainID for the destination domain.", nil) + } + + for _, configKey := range resp { + if len(configKey.Filters()) == 1 && configKey.Filters()[0] == dc.DomainName { + // Validate dynamic configs with only domainName filter + currRequest := dynamicconfig.ToGetDynamicConfigFilterRequest(configKey.String(), []dynamicconfig.FilterOption{ + dynamicconfig.DomainFilter(currDomain), + }) + + newRequest := dynamicconfig.ToGetDynamicConfigFilterRequest(configKey.String(), []dynamicconfig.FilterOption{ + dynamicconfig.DomainFilter(newDomain), + }) + + currResp, err := d.frontendAdminClient.GetDynamicConfig(ctx, currRequest) + if err != nil { + // empty to indicate N/A + currResp = &types.GetDynamicConfigResponse{} + } + newResp, err := d.destinationAdminClient.GetDynamicConfig(ctx, newRequest) + if err != nil { + // empty to indicate N/A + newResp = &types.GetDynamicConfigResponse{} + } + + if !reflect.DeepEqual(currResp.Value, newResp.Value) { + check = false + mismatchedConfigs = append(mismatchedConfigs, MismatchedDynamicConfig{ + Key: configKey, + CurrValues: []*types.DynamicConfigValue{ + toDynamicConfigValue(currResp.Value, map[dc.Filter]interface{}{ + dynamicconfig.DomainName: currDomain, + }), + }, + NewValues: []*types.DynamicConfigValue{ + toDynamicConfigValue(newResp.Value, map[dc.Filter]interface{}{ + dynamicconfig.DomainName: newDomain, + }), + }, + }) + } + + } else if len(configKey.Filters()) == 1 && configKey.Filters()[0] == dc.DomainID { + // Validate dynamic configs with only domainID filter + currRequest := dynamicconfig.ToGetDynamicConfigFilterRequest(configKey.String(), []dynamicconfig.FilterOption{ + dynamicconfig.DomainIDFilter(currentDomainID), + }) + + newRequest := dynamicconfig.ToGetDynamicConfigFilterRequest(configKey.String(), []dynamicconfig.FilterOption{ + dynamicconfig.DomainIDFilter(destinationDomainID), + }) + + currResp, err := d.frontendAdminClient.GetDynamicConfig(ctx, currRequest) + if err != nil { + // empty to indicate N/A + currResp = &types.GetDynamicConfigResponse{} + } + newResp, err := d.destinationAdminClient.GetDynamicConfig(ctx, newRequest) + if err != nil { + // empty to indicate N/A + newResp = &types.GetDynamicConfigResponse{} + } + + if !reflect.DeepEqual(currResp.Value, newResp.Value) { + check = false + mismatchedConfigs = append(mismatchedConfigs, MismatchedDynamicConfig{ + Key: configKey, + CurrValues: []*types.DynamicConfigValue{ + toDynamicConfigValue(currResp.Value, map[dc.Filter]interface{}{ + dynamicconfig.DomainID: currentDomainID, + }), + }, + NewValues: []*types.DynamicConfigValue{ + toDynamicConfigValue(newResp.Value, map[dc.Filter]interface{}{ + dynamicconfig.DomainID: destinationDomainID, + }), + }, + }) + } + + } else if containsFilter(configKey, dc.DomainName.String()) && containsFilter(configKey, dc.TaskListName.String()) { + // Validate dynamic configs with only domainName and TaskList filters + taskLists := c.StringSlice(FlagTaskList) + var mismatchedCurValues []*types.DynamicConfigValue + var mismatchedNewValues []*types.DynamicConfigValue + for _, taskList := range taskLists { + + currRequest := dynamicconfig.ToGetDynamicConfigFilterRequest(configKey.String(), []dynamicconfig.FilterOption{ + dynamicconfig.DomainFilter(currDomain), + dynamicconfig.TaskListFilter(taskList), + }) + + newRequest := dynamicconfig.ToGetDynamicConfigFilterRequest(configKey.String(), []dynamicconfig.FilterOption{ + dynamicconfig.DomainFilter(newDomain), + dynamicconfig.TaskListFilter(taskList), + }) + + currResp, err := d.frontendAdminClient.GetDynamicConfig(ctx, currRequest) + if err != nil { + // empty to indicate N/A + currResp = &types.GetDynamicConfigResponse{} + } + newResp, err := d.destinationAdminClient.GetDynamicConfig(ctx, newRequest) + if err != nil { + // empty to indicate N/A + newResp = &types.GetDynamicConfigResponse{} + } + + if !reflect.DeepEqual(currResp.Value, newResp.Value) { + check = false + mismatchedCurValues = append(mismatchedCurValues, toDynamicConfigValue(currResp.Value, map[dc.Filter]interface{}{ + dynamicconfig.DomainName: currDomain, + dynamicconfig.TaskListName: taskLists, + })) + mismatchedNewValues = append(mismatchedNewValues, toDynamicConfigValue(newResp.Value, map[dc.Filter]interface{}{ + dynamicconfig.DomainName: newDomain, + dynamicconfig.TaskListName: taskLists, + })) + + } + } + if len(mismatchedCurValues) > 0 && len(mismatchedNewValues) > 0 { + mismatchedConfigs = append(mismatchedConfigs, MismatchedDynamicConfig{ + Key: configKey, + CurrValues: mismatchedCurValues, + NewValues: mismatchedNewValues, + }) + } + } + } + + validationRow := DomainMigrationRow{ + ValidationCheck: "Dynamic Config Check", + ValidationResult: check, + ValidationDetails: ValidationDetails{ + MismatchedDynamicConfig: mismatchedConfigs, + }, + } + + return validationRow +} + +func getDomainID(c context.Context, domain string, client frontend.Client) string { + resp, err := client.DescribeDomain(c, &types.DescribeDomainRequest{Name: &domain}) + if err != nil { + ErrorAndExit("Failed to describe domain.", err) + } + + return resp.DomainInfo.GetUUID() +} + +func valueToDataBlob(value interface{}) *types.DataBlob { + if value == nil { + return nil + } + // No need to handle error as this is a private helper method + // where the correct value will always be passed regardless + data, _ := json.Marshal(value) + + return &types.DataBlob{ + EncodingType: types.EncodingTypeJSON.Ptr(), + Data: data, + } +} + +func toDynamicConfigValue(value *types.DataBlob, filterMaps map[dynamicconfig.Filter]interface{}) *types.DynamicConfigValue { + var configFilters []*types.DynamicConfigFilter + for filter, filterValue := range filterMaps { + configFilters = append(configFilters, &types.DynamicConfigFilter{ + Name: filter.String(), + Value: valueToDataBlob(filterValue), + }) + } + + return &types.DynamicConfigValue{ + Value: value, + Filters: configFilters, + } +} + +func containsFilter(key dynamicconfig.Key, value string) bool { + filters := key.Filters() + for _, filter := range filters { + if filter.String() == value { + return true + } + } + return false +} diff --git a/tools/cli/factory.go b/tools/cli/factory.go index 23c7cdd4a49..5f1bc4b8976 100644 --- a/tools/cli/factory.go +++ b/tools/cli/factory.go @@ -70,35 +70,31 @@ type ClientFactory interface { ServerFrontendClient(c *cli.Context) frontend.Client ServerAdminClient(c *cli.Context) admin.Client + // ServerFrontendClientForMigration frontend client of the migration destination + ServerFrontendClientForMigration(c *cli.Context) frontend.Client + // ServerAdminClientForMigration admin client of the migration destination + ServerAdminClientForMigration(c *cli.Context) admin.Client + ElasticSearchClient(c *cli.Context) *elastic.Client ServerConfig(c *cli.Context) (*config.Config, error) } type clientFactory struct { - addressFlagFunc func(c *cli.Context) string - hostPort string - dispatcher *yarpc.Dispatcher - logger *zap.Logger + dispatcher *yarpc.Dispatcher + dispatcherMigration *yarpc.Dispatcher + logger *zap.Logger } -// DEPRECATED don't use, only reserved for backward compatibility purposes // NewClientFactory creates a new ClientFactory func NewClientFactory() ClientFactory { - return newClientFactory(func(c *cli.Context) string { - return c.GlobalString(FlagAddress) - }) -} - -func newClientFactory(f func(c *cli.Context) string) ClientFactory { logger, err := zap.NewDevelopment() if err != nil { panic(err) } return &clientFactory{ - logger: logger, - addressFlagFunc: f, + logger: logger, } } @@ -139,6 +135,31 @@ func (b *clientFactory) ServerAdminClient(c *cli.Context) admin.Client { return admin.NewThriftClient(serverAdmin.New(clientConfig)) } +// ServerFrontendClientForMigration builds a frontend client (based on server side thrift interface) +func (b *clientFactory) ServerFrontendClientForMigration(c *cli.Context) frontend.Client { + b.ensureDispatcherForMigration(c) + clientConfig := b.dispatcherMigration.ClientConfig(cadenceFrontendService) + if c.GlobalString(FlagTransport) == grpcTransport { + return frontend.NewGRPCClient( + apiv1.NewDomainAPIYARPCClient(clientConfig), + apiv1.NewWorkflowAPIYARPCClient(clientConfig), + apiv1.NewWorkerAPIYARPCClient(clientConfig), + apiv1.NewVisibilityAPIYARPCClient(clientConfig), + ) + } + return frontend.NewThriftClient(serverFrontend.New(clientConfig)) +} + +// ServerAdminClientForMigration builds an admin client (based on server side thrift interface) +func (b *clientFactory) ServerAdminClientForMigration(c *cli.Context) admin.Client { + b.ensureDispatcherForMigration(c) + clientConfig := b.dispatcherMigration.ClientConfig(cadenceFrontendService) + if c.GlobalString(FlagTransport) == grpcTransport { + return admin.NewGRPCClient(adminv1.NewAdminAPIYARPCClient(clientConfig)) + } + return admin.NewThriftClient(serverAdmin.New(clientConfig)) +} + // ElasticSearchClient builds an ElasticSearch client func (b *clientFactory) ElasticSearchClient(c *cli.Context) *elastic.Client { url := getRequiredOption(c, FlagURL) @@ -159,19 +180,30 @@ func (b *clientFactory) ensureDispatcher(c *cli.Context) { if b.dispatcher != nil { return } + b.dispatcher = b.newClientDispatcher(c, c.GlobalString(FlagAddress)) +} + +func (b *clientFactory) ensureDispatcherForMigration(c *cli.Context) { + if b.dispatcherMigration != nil { + return + } + b.dispatcherMigration = b.newClientDispatcher(c, c.String(FlagDestinationAddress)) +} + +func (b *clientFactory) newClientDispatcher(c *cli.Context, hostPortOverride string) *yarpc.Dispatcher { shouldUseGrpc := c.GlobalString(FlagTransport) == grpcTransport - b.hostPort = tchannelPort + hostPort := tchannelPort if shouldUseGrpc { - b.hostPort = grpcPort + hostPort = grpcPort } - if addr := b.addressFlagFunc(c); addr != "" { - b.hostPort = addr + if hostPortOverride != "" { + hostPort = hostPortOverride } var outbounds transport.Outbounds if shouldUseGrpc { grpcTransport := grpc.NewTransport() - outbounds = transport.Outbounds{Unary: grpc.NewTransport().NewSingleOutbound(b.hostPort)} + outbounds = transport.Outbounds{Unary: grpc.NewTransport().NewSingleOutbound(hostPort)} tlsCertificatePath := c.GlobalString(FlagTLSCertPath) if tlsCertificatePath != "" { @@ -187,7 +219,7 @@ func (b *clientFactory) ensureDispatcher(c *cli.Context) { RootCAs: caCertPool, } tlsCreds := credentials.NewTLS(&tlsConfig) - tlsChooser := peer.NewSingle(hostport.Identify(b.hostPort), grpcTransport.NewDialer(grpc.DialerCredentials(tlsCreds))) + tlsChooser := peer.NewSingle(hostport.Identify(hostPort), grpcTransport.NewDialer(grpc.DialerCredentials(tlsCreds))) outbounds = transport.Outbounds{Unary: grpc.NewTransport().NewOutbound(tlsChooser)} } } else { @@ -195,10 +227,10 @@ func (b *clientFactory) ensureDispatcher(c *cli.Context) { if err != nil { b.logger.Fatal("Failed to create transport channel", zap.Error(err)) } - outbounds = transport.Outbounds{Unary: ch.NewSingleOutbound(b.hostPort)} + outbounds = transport.Outbounds{Unary: ch.NewSingleOutbound(hostPort)} } - b.dispatcher = yarpc.NewDispatcher(yarpc.Config{ + dispatcher := yarpc.NewDispatcher(yarpc.Config{ Name: cadenceClientName, Outbounds: yarpc.Outbounds{cadenceFrontendService: outbounds}, OutboundMiddleware: yarpc.OutboundMiddleware{ @@ -206,10 +238,11 @@ func (b *clientFactory) ensureDispatcher(c *cli.Context) { }, }) - if err := b.dispatcher.Start(); err != nil { - b.dispatcher.Stop() + if err := dispatcher.Start(); err != nil { + dispatcher.Stop() b.logger.Fatal("Failed to create outbound transport channel: %v", zap.Error(err)) } + return dispatcher } type versionMiddleware struct { diff --git a/tools/cli/utils.go b/tools/cli/utils.go index c700558df07..631103313d9 100644 --- a/tools/cli/utils.go +++ b/tools/cli/utils.go @@ -768,8 +768,8 @@ func newIndefiniteContext(c *cli.Context) (context.Context, context.CancelFunc) } func newTimedContext(c *cli.Context, timeout time.Duration) (context.Context, context.CancelFunc) { - if c.GlobalIsSet(FlagContextTimeout) { - timeout = time.Duration(c.GlobalInt(FlagContextTimeout)) * time.Second + if overrideTimeout := c.GlobalInt(FlagContextTimeout); overrideTimeout > 0 { + timeout = time.Duration(overrideTimeout) * time.Second } ctx := populateContextFromCLIContext(context.Background(), c) return context.WithTimeout(ctx, timeout)