From 818cdb73fa912c02a51648e84769eb276634fe48 Mon Sep 17 00:00:00 2001 From: Shijie Sheng Date: Thu, 10 Aug 2023 05:43:30 +0800 Subject: [PATCH] [CLI] fix a few things in domain migration command (#5374) What changed? add two methods in ClientFactory for migration countWorkflowCheck should be on current domain fix concurrency bug make template look slightly better Why? add two methods in ClientFactory for migration This is needed to be able to use environment flags for internal usage How did you test it? shengs@shengs-C02XN3VDJGH6 cadence % go run cmd/tools/cli/main.go --do domain-1 domain migration --destination_address localhost:7933 --destination_domain domain-2 --tasklist ts1 --search_attr someID:INT 2023/08/08 10:21:28 Loading configFiles=[config/base.yaml config/development.yaml] 2023/08/08 10:21:28 Loading configFiles=[config/base.yaml config/development.yaml] {"level":"info","ts":"2023-08-08T10:21:28.241-0700","msg":"Updated dynamic config","logging-call-at":"file_based_client.go:284"} Validation Check: - Domain Meta Data: true Current Domain: Name: domain-1 UUID: d3cc0a3c-8f55-40af-9820-c48f8a0f46e5 New Domain: Name: domain-2 UUID: 203f0088-1d0f-40ac-a7c3-fdcf57ec0a99 - Workflow Check: true Long Running Workflow Num: 0 - Dynamic Config Check: false - Config Key: matching.asyncTaskDispatchTimeout Current Response: Data: "50s" Filters: - Name: domainName Value: "domain-1" - Name: taskListName Value: ["ts1"] New Response: Data: "20s" Filters: - Name: domainName Value: "domain-2" - Name: taskListName Value: ["ts1"] - Search Attributes Check: false Missing Search Attributes in Current Domain: - someID:INT Missing Search Attributes in New Domain: - someID:INT Potential risks No Risk --- tools/cli/app_test.go | 8 + tools/cli/domain.go | 3 +- tools/cli/domainCommands.go | 488 +-------------------------- tools/cli/domainMigrationCommand.go | 506 ++++++++++++++++++++++++++++ tools/cli/factory.go | 79 +++-- tools/cli/utils.go | 4 +- 6 files changed, 575 insertions(+), 513 deletions(-) create mode 100644 tools/cli/domainMigrationCommand.go diff --git a/tools/cli/app_test.go b/tools/cli/app_test.go index a2bcb598e1b..da97e857044 100644 --- a/tools/cli/app_test.go +++ b/tools/cli/app_test.go @@ -60,6 +60,14 @@ func (m *clientFactoryMock) ServerAdminClient(c *cli.Context) admin.Client { return m.serverAdminClient } +func (m *clientFactoryMock) ServerFrontendClientForMigration(c *cli.Context) frontend.Client { + panic("not implemented") +} + +func (m *clientFactoryMock) ServerAdminClientForMigration(c *cli.Context) admin.Client { + panic("not implemented") +} + func (m *clientFactoryMock) ElasticSearchClient(c *cli.Context) *elastic.Client { panic("not implemented") } diff --git a/tools/cli/domain.go b/tools/cli/domain.go index c9f5b20ac55..4db0a20cb92 100644 --- a/tools/cli/domain.go +++ b/tools/cli/domain.go @@ -83,14 +83,13 @@ func newDomainCommands() []cli.Command { newDomainCLI(c, false).DescribeDomain(c) }, }, - { Name: "migration", Aliases: []string{"mi"}, Usage: "Migrate existing domain to new domain. This command only validates the settings. It does not perform actual data migration", Flags: migrateDomainFlags, Action: func(c *cli.Context) { - newDomainCLI(c, true).MigrateDomain(c) + newDomainMigrationCLIImpl(c).Validation(c) }, }, } diff --git a/tools/cli/domainCommands.go b/tools/cli/domainCommands.go index 91ef2ceb7da..ab2e359c9e7 100644 --- a/tools/cli/domainCommands.go +++ b/tools/cli/domainCommands.go @@ -25,10 +25,8 @@ import ( "encoding/json" "errors" "fmt" - "reflect" "strconv" "strings" - "sync" "time" "github.com/uber/cadence/client/admin" @@ -42,7 +40,6 @@ import ( "github.com/uber/cadence/client/frontend" "github.com/uber/cadence/common" "github.com/uber/cadence/common/domain" - dc "github.com/uber/cadence/common/dynamicconfig" ) var ( @@ -52,12 +49,8 @@ var ( type ( domainCLIImpl struct { // used when making RPC call to frontend service - frontendClient frontend.Client - // used by migration command to make RPC call to frontend service of the destination domain - destinationClient frontend.Client - - frontendAdminClient admin.Client - destinationAdminClient admin.Client + frontendClient frontend.Client + frontendAdminClient admin.Client // act as admin to modify domain in DB directly domainHandler domain.Handler @@ -71,14 +64,8 @@ func newDomainCLI( ) *domainCLIImpl { d := &domainCLIImpl{} d.frontendClient = initializeFrontendClient(c) - d.destinationClient = newClientFactory(func(c *cli.Context) string { - return c.String(FlagDestinationAddress) - }).ServerFrontendClient(c) if isAdminMode { d.frontendAdminClient = initializeFrontendAdminClient(c) - d.destinationAdminClient = newClientFactory(func(c *cli.Context) string { - return c.String(FlagDestinationAddress) - }).ServerAdminClient(c) d.domainHandler = initializeAdminDomainHandler(c) } return d @@ -414,63 +401,6 @@ VisibilityArchivalURI: {{.}}{{end}} {{with .FailoverInfo}}Graceful failover info: {{table .}}{{end}}` -var newtemplateDomain = `Validation Check: -{{- range .}} -- {{.ValidationCheck}}: {{.ValidationResult}} -{{- with .ValidationDetails}} -{{- with .CurrentDomainRow}} -Current Domain: - Name: {{.DomainInfo.Name}} - UUID: {{.DomainInfo.UUID}} -{{- end}} -{{- with .NewDomainRow}} -New Domain: - Name: {{.DomainInfo.Name}} - UUID: {{.DomainInfo.UUID}} -{{- end}} -{{- if .LongRunningWorkFlowNum}} -Long Running Workflow Num: {{.LongRunningWorkFlowNum}} -{{- end}} -{{- if .MissingCurrSearchAttributes}} -Missing Search Attributes in Current Domain: -{{- range .MissingCurrSearchAttributes}} - - {{.}} -{{- end}} -{{- end}} -{{- if .MissingNewSearchAttributes}} -Missing Search Attributes in New Domain: -{{- range .MissingNewSearchAttributes}} - - {{.}} -{{- end}} -{{- end}} -{{- if ne (len .MismatchedDomainMetaData) 0 }} -Mismatched Domain Meta Data: {{.MismatchedDomainMetaData}} -{{- end }} -{{- range .MismatchedDynamicConfig}} -{{ $dynamicConfig := . }} -Mismatched Dynamic Config: -Config Key: {{.Key}} - {{- range $i, $v := .CurrValues}} - Current Response: - Data: {{ printf "%s" (index $dynamicConfig.CurrValues $i).Value.Data }} - Filters: - {{- range $filter := (index $dynamicConfig.CurrValues $i).Filters}} - - Name: {{ $filter.Name }} - Value: {{ printf "%s" $filter.Value.Data }} - {{- end}} - New Response: - Data: {{ printf "%s" (index $dynamicConfig.NewValues $i).Value.Data }} - Filters: - {{- range $filter := (index $dynamicConfig.NewValues $i).Filters}} - - Name: {{ $filter.Name }} - Value: {{ printf "%s" $filter.Value.Data }} - {{- end}} - {{- end}} -{{- end}} -{{- end}} -{{- end}} -` - // DescribeDomain updates a domain func (d *domainCLIImpl) DescribeDomain(c *cli.Context) { domainName := c.GlobalString(FlagDomain) @@ -515,10 +445,6 @@ func (d *domainCLIImpl) DescribeDomain(c *cli.Context) { }) } -func (d *domainCLIImpl) MigrateDomain(c *cli.Context) { - d.migrateDomain(c) -} - type BadBinaryRow struct { Checksum string `header:"Binary Checksum"` Operator string `header:"Operator"` @@ -777,399 +703,6 @@ func (d *domainCLIImpl) describeDomain( return d.domainHandler.DescribeDomain(ctx, request) } -func (d *domainCLIImpl) migrateDomain(c *cli.Context) { - var results []DomainMigrationRow - checkers := []func(*cli.Context) DomainMigrationRow{ - d.migrationDomainMetaDataCheck, - d.migrationDomainWorkFlowCheck, - d.migrationDynamicConfigCheck, - d.searchAttributesChecker, - } - wg := &sync.WaitGroup{} - for i := range checkers { - wg.Add(1) - go func(i int) { - defer wg.Done() - result := checkers[i](c) - results = append(results, result) - }(i) - } - wg.Wait() - - renderOpts := RenderOptions{ - DefaultTemplate: newtemplateDomain, - Color: true, - Border: true, - PrintDateTime: true, - } - - if err := Render(c, results, renderOpts); err != nil { - ErrorAndExit("Failed to render", err) - } -} - -func (d *domainCLIImpl) migrationDomainMetaDataCheck(c *cli.Context) DomainMigrationRow { - d.destinationClient = newClientFactory(func(c *cli.Context) string { - return c.String(FlagDestinationAddress) - }).ServerFrontendClient(c) - domain := c.GlobalString(FlagDomain) - newDomain := c.String(FlagDestinationDomain) - ctx, cancel := newContext(c) - defer cancel() - currResp, err := d.frontendClient.DescribeDomain(ctx, &types.DescribeDomainRequest{ - Name: &domain, - }) - if err != nil { - ErrorAndExit(fmt.Sprintf("Could not describe old domain, Please check to see if old domain exists before migrating."), err) - } - newResp, err := d.destinationClient.DescribeDomain(ctx, &types.DescribeDomainRequest{ - Name: &newDomain, - }) - if err != nil { - ErrorAndExit(fmt.Sprintf("Could not describe new domain, Please check to see if new domain exists before migrating."), err) - } - validationResult, mismatchedMetaData := metaDataValidation(currResp, newResp) - validationRow := DomainMigrationRow{ - ValidationCheck: "Domain Meta Data", - ValidationResult: validationResult, - ValidationDetails: ValidationDetails{ - CurrentDomainRow: currResp, - NewDomainRow: newResp, - MismatchedDomainMetaData: mismatchedMetaData, - }, - } - return validationRow -} - -func metaDataValidation(currResp *types.DescribeDomainResponse, newResp *types.DescribeDomainResponse) (bool, string) { - if !reflect.DeepEqual(currResp.Configuration, newResp.Configuration) { - return false, "mismatched DomainConfiguration" - } - - if currResp.DomainInfo.OwnerEmail != newResp.DomainInfo.OwnerEmail { - return false, "mismatched OwnerEmail" - } - return true, "" -} - -func (d *domainCLIImpl) migrationDomainWorkFlowCheck(c *cli.Context) DomainMigrationRow { - d.destinationClient = newClientFactory(func(c *cli.Context) string { - return c.String(FlagDestinationAddress) - }).ServerFrontendClient(c) - countWorkFlows := d.countLongRunningWorkflowinDest(c) - check := countWorkFlows == 0 - return DomainMigrationRow{ - ValidationCheck: "Workflow Check", - ValidationResult: check, - ValidationDetails: ValidationDetails{ - LongRunningWorkFlowNum: &countWorkFlows, - }, - } -} - -func (d *domainCLIImpl) searchAttributesChecker(c *cli.Context) DomainMigrationRow { - ctx, cancel := newContext(c) - defer cancel() - - // getting user provided search attributes - searchAttributes := c.StringSlice(FlagSearchAttribute) - if len(searchAttributes) == 0 { - return DomainMigrationRow{ - ValidationCheck: "Search Attributes Check", - ValidationResult: true, - } - } - - // Parse the provided search attributes into a map[string]IndexValueType - requiredAttributes := make(map[string]types.IndexedValueType) - for _, attr := range searchAttributes { - parts := strings.SplitN(attr, ":", 2) - if len(parts) != 2 { - ErrorAndExit(fmt.Sprintf("Invalid search attribute format: %s", attr), nil) - } - key, valueType := parts[0], parts[1] - ivt, err := parseIndexedValueType(valueType) - if err != nil { - ErrorAndExit(fmt.Sprintf("Invalid search attribute type for %s: %s", key, valueType), err) - } - requiredAttributes[key] = ivt - } - - // getting search attributes for current domain - currentSearchAttributes, err := d.frontendClient.GetSearchAttributes(ctx) - if err != nil { - ErrorAndExit("Unable to get search attributes for current domain.", err) - } - - d.destinationClient = newClientFactory(func(c *cli.Context) string { - return c.String(FlagDestinationAddress) - }).ServerFrontendClient(c) - - // getting search attributes for new domain - destinationSearchAttributes, err := d.destinationClient.GetSearchAttributes(ctx) - if err != nil { - ErrorAndExit("Unable to get search attributes for new domain.", err) - } - - currentSearchAttrs := currentSearchAttributes.Keys - destinationSearchAttrs := destinationSearchAttributes.Keys - - // checking to see if search attributes exist - missingInCurrent := findMissingAttributes(requiredAttributes, currentSearchAttrs) - missingInNew := findMissingAttributes(requiredAttributes, destinationSearchAttrs) - - validationResult := len(missingInCurrent) == 0 && len(missingInNew) == 0 - - validationRow := DomainMigrationRow{ - ValidationCheck: "Search Attributes Check", - ValidationResult: validationResult, - ValidationDetails: ValidationDetails{ - MissingCurrSearchAttributes: missingInCurrent, - MissingNewSearchAttributes: missingInNew, - }, - } - - return validationRow -} - -// helper to parse types.IndexedValueType from string -func parseIndexedValueType(valueType string) (types.IndexedValueType, error) { - var result types.IndexedValueType - valueTypeBytes := []byte(valueType) - if err := result.UnmarshalText(valueTypeBytes); err != nil { - return 0, err - } - return result, nil -} - -// finds missing attributed in a map of existing attributed based on required attributes -func findMissingAttributes(requiredAttributes map[string]types.IndexedValueType, existingAttributes map[string]types.IndexedValueType) []string { - missingAttributes := make([]string, 0) - for key, requiredType := range requiredAttributes { - existingType, ok := existingAttributes[key] - if !ok || existingType != requiredType { - // construct the key:type string format - attr := fmt.Sprintf("%s:%s", key, requiredType) - missingAttributes = append(missingAttributes, attr) - } - } - return missingAttributes -} - -func (d *domainCLIImpl) migrationDynamicConfigCheck(c *cli.Context) DomainMigrationRow { - var mismatchedConfigs []MismatchedDynamicConfig - check := true - - resp := dynamicconfig.ListAllProductionKeys() - - currDomain := c.GlobalString(FlagDomain) - newDomain := c.String(FlagDestinationDomain) - - ctx, cancel := newContext(c) - defer cancel() - - currentDomainID := d.getDomainID(ctx, currDomain) - destinationDomainID := d.getDomainID(ctx, newDomain) - if currentDomainID == "" { - ErrorAndExit("Failed to get domainID for the current domain.", nil) - } - - if destinationDomainID == "" { - ErrorAndExit("Failed to get domainID for the destination domain.", nil) - } - - for _, configKey := range resp { - if len(configKey.Filters()) == 1 && configKey.Filters()[0] == dc.DomainName { - // Validate dynamic configs with only domainName filter - currRequest := dynamicconfig.ToGetDynamicConfigFilterRequest(configKey.String(), []dynamicconfig.FilterOption{ - dynamicconfig.DomainFilter(currDomain), - }) - - newRequest := dynamicconfig.ToGetDynamicConfigFilterRequest(configKey.String(), []dynamicconfig.FilterOption{ - dynamicconfig.DomainFilter(newDomain), - }) - - currResp, err := d.frontendAdminClient.GetDynamicConfig(ctx, currRequest) - if err != nil { - // empty to indicate N/A - currResp = &types.GetDynamicConfigResponse{} - } - newResp, err := d.destinationAdminClient.GetDynamicConfig(ctx, newRequest) - if err != nil { - // empty to indicate N/A - newResp = &types.GetDynamicConfigResponse{} - } - - if !reflect.DeepEqual(currResp.Value, newResp.Value) { - check = false - mismatchedConfigs = append(mismatchedConfigs, MismatchedDynamicConfig{ - Key: configKey, - CurrValues: []*types.DynamicConfigValue{ - toDynamicConfigValue(currResp.Value, map[dc.Filter]interface{}{ - dynamicconfig.DomainName: currDomain, - }), - }, - NewValues: []*types.DynamicConfigValue{ - toDynamicConfigValue(newResp.Value, map[dc.Filter]interface{}{ - dynamicconfig.DomainName: newDomain, - }), - }, - }) - } - - } else if len(configKey.Filters()) == 1 && configKey.Filters()[0] == dc.DomainID { - // Validate dynamic configs with only domainID filter - currRequest := dynamicconfig.ToGetDynamicConfigFilterRequest(configKey.String(), []dynamicconfig.FilterOption{ - dynamicconfig.DomainIDFilter(currentDomainID), - }) - - newRequest := dynamicconfig.ToGetDynamicConfigFilterRequest(configKey.String(), []dynamicconfig.FilterOption{ - dynamicconfig.DomainIDFilter(destinationDomainID), - }) - - currResp, err := d.frontendAdminClient.GetDynamicConfig(ctx, currRequest) - if err != nil { - // empty to indicate N/A - currResp = &types.GetDynamicConfigResponse{} - } - newResp, err := d.destinationAdminClient.GetDynamicConfig(ctx, newRequest) - if err != nil { - // empty to indicate N/A - newResp = &types.GetDynamicConfigResponse{} - } - - if !reflect.DeepEqual(currResp.Value, newResp.Value) { - check = false - mismatchedConfigs = append(mismatchedConfigs, MismatchedDynamicConfig{ - Key: configKey, - CurrValues: []*types.DynamicConfigValue{ - toDynamicConfigValue(currResp.Value, map[dc.Filter]interface{}{ - dynamicconfig.DomainID: currentDomainID, - }), - }, - NewValues: []*types.DynamicConfigValue{ - toDynamicConfigValue(newResp.Value, map[dc.Filter]interface{}{ - dynamicconfig.DomainID: destinationDomainID, - }), - }, - }) - } - - } else if containsFilter(configKey, dc.DomainName.String()) && containsFilter(configKey, dc.TaskListName.String()) { - // Validate dynamic configs with only domainName and TaskList filters - taskLists := c.StringSlice(FlagTaskList) - var mismatchedCurValues []*types.DynamicConfigValue - var mismatchedNewValues []*types.DynamicConfigValue - for _, taskList := range taskLists { - - currRequest := dynamicconfig.ToGetDynamicConfigFilterRequest(configKey.String(), []dynamicconfig.FilterOption{ - dynamicconfig.DomainFilter(currDomain), - dynamicconfig.TaskListFilter(taskList), - }) - - newRequest := dynamicconfig.ToGetDynamicConfigFilterRequest(configKey.String(), []dynamicconfig.FilterOption{ - dynamicconfig.DomainFilter(newDomain), - dynamicconfig.TaskListFilter(taskList), - }) - - currResp, err := d.frontendAdminClient.GetDynamicConfig(ctx, currRequest) - if err != nil { - // empty to indicate N/A - currResp = &types.GetDynamicConfigResponse{} - } - newResp, err := d.destinationAdminClient.GetDynamicConfig(ctx, newRequest) - if err != nil { - // empty to indicate N/A - newResp = &types.GetDynamicConfigResponse{} - } - - if !reflect.DeepEqual(currResp.Value, newResp.Value) { - check = false - mismatchedCurValues = append(mismatchedCurValues, toDynamicConfigValue(currResp.Value, map[dc.Filter]interface{}{ - dynamicconfig.DomainName: currDomain, - dynamicconfig.TaskListName: taskLists, - })) - mismatchedNewValues = append(mismatchedNewValues, toDynamicConfigValue(newResp.Value, map[dc.Filter]interface{}{ - dynamicconfig.DomainName: newDomain, - dynamicconfig.TaskListName: taskLists, - })) - - } - } - if len(mismatchedCurValues) > 0 && len(mismatchedNewValues) > 0 { - mismatchedConfigs = append(mismatchedConfigs, MismatchedDynamicConfig{ - Key: configKey, - CurrValues: mismatchedCurValues, - NewValues: mismatchedNewValues, - }) - } - } - } - - validationRow := DomainMigrationRow{ - ValidationCheck: "Dynamic Config Check", - ValidationResult: check, - ValidationDetails: ValidationDetails{ - MismatchedDynamicConfig: mismatchedConfigs, - }, - } - - return validationRow -} - -func valueToDataBlob(value interface{}) *types.DataBlob { - if value == nil { - return nil - } - // No need to handle error as this is a private helper method - // where the correct value will always be passed regardless - data, _ := json.Marshal(value) - - return &types.DataBlob{ - EncodingType: types.EncodingTypeJSON.Ptr(), - Data: data, - } -} - -func toDynamicConfigValue(value *types.DataBlob, filterMaps map[dynamicconfig.Filter]interface{}) *types.DynamicConfigValue { - var configFilters []*types.DynamicConfigFilter - for filter, filterValue := range filterMaps { - configFilters = append(configFilters, &types.DynamicConfigFilter{ - Name: filter.String(), - Value: valueToDataBlob(filterValue), - }) - fmt.Println("Data:", string(configFilters[len(configFilters)-1].Value.Data)) - } - - return &types.DynamicConfigValue{ - Value: value, - Filters: configFilters, - } -} - -func containsFilter(key dynamicconfig.Key, value string) bool { - filters := key.Filters() - for _, filter := range filters { - if filter.String() == value { - return true - } - } - return false -} - -func (d *domainCLIImpl) getDomainID(c context.Context, domain string) string { - request := &types.DescribeDomainRequest{ - Name: &domain, - } - - resp, err := d.describeDomain(c, request) - if err != nil { - ErrorAndExit("Failed to describe domain.", err) - } - - return resp.DomainInfo.GetUUID() -} - func archivalStatus(c *cli.Context, statusFlagName string) *types.ArchivalStatus { if c.IsSet(statusFlagName) { switch c.String(statusFlagName) { @@ -1184,23 +717,6 @@ func archivalStatus(c *cli.Context, statusFlagName string) *types.ArchivalStatus return nil } -func (d *domainCLIImpl) countLongRunningWorkflowinDest(c *cli.Context) int { - domain := getRequiredOption(c, FlagDestinationDomain) - now := time.Now() - past14Days := now.Add(-14 * 24 * time.Hour) - request := &types.CountWorkflowExecutionsRequest{ - Domain: domain, - Query: "CloseTime=missing AND StartTime < " + strconv.FormatInt(past14Days.UnixNano(), 10), - } - ctx, cancel := newContextForLongPoll(c) - defer cancel() - response, err := d.destinationClient.CountWorkflowExecutions(ctx, request) - if err != nil { - ErrorAndExit("Failed to count workflow.", err) - } - return int(response.GetCount()) -} - func clustersToStrings(clusters []*types.ClusterReplicationConfiguration) []string { var res []string for _, cluster := range clusters { diff --git a/tools/cli/domainMigrationCommand.go b/tools/cli/domainMigrationCommand.go new file mode 100644 index 00000000000..540d46a5743 --- /dev/null +++ b/tools/cli/domainMigrationCommand.go @@ -0,0 +1,506 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package cli + +import ( + "context" + "encoding/json" + "fmt" + "reflect" + "strconv" + "strings" + "sync" + "time" + + "github.com/urfave/cli" + + "github.com/uber/cadence/client/admin" + "github.com/uber/cadence/client/frontend" + "github.com/uber/cadence/common/dynamicconfig" + dc "github.com/uber/cadence/common/dynamicconfig" + "github.com/uber/cadence/common/types" +) + +var domainMigrationTemplate = `Validation Check: +{{- range .}} +- {{.ValidationCheck}}: {{.ValidationResult}} +{{- with .ValidationDetails}} + {{- with .CurrentDomainRow}} + Current Domain: + Name: {{.DomainInfo.Name}} + UUID: {{.DomainInfo.UUID}} + {{- end}} + {{- with .NewDomainRow}} + New Domain: + Name: {{.DomainInfo.Name}} + UUID: {{.DomainInfo.UUID}} + {{- end}} + {{- if ne (len .MismatchedDomainMetaData) 0 }} + Mismatched Domain Meta Data: {{.MismatchedDomainMetaData}} + {{- end }} + {{- if .LongRunningWorkFlowNum}} + Long Running Workflow Num: {{.LongRunningWorkFlowNum}} + {{- end}} + {{- if .MissingCurrSearchAttributes}} + Missing Search Attributes in Current Domain: + {{- range .MissingCurrSearchAttributes}} + - {{.}} + {{- end}} + {{- end}} + {{- if .MissingNewSearchAttributes}} + Missing Search Attributes in New Domain: + {{- range .MissingNewSearchAttributes}} + - {{.}} + {{- end}} + {{- end}} + {{- range .MismatchedDynamicConfig}} + {{- $dynamicConfig := . }} + - Config Key: {{.Key}} + {{- range $i, $v := .CurrValues}} + Current Response: + Data: {{ printf "%s" (index $dynamicConfig.CurrValues $i).Value.Data }} + Filters: + {{- range $filter := (index $dynamicConfig.CurrValues $i).Filters}} + - Name: {{ $filter.Name }} + Value: {{ printf "%s" $filter.Value.Data }} + {{- end}} + New Response: + Data: {{ printf "%s" (index $dynamicConfig.NewValues $i).Value.Data }} + Filters: + {{- range $filter := (index $dynamicConfig.NewValues $i).Filters}} + - Name: {{ $filter.Name }} + Value: {{ printf "%s" $filter.Value.Data }} + {{- end}} + {{- end}} + {{- end}} + {{- end}} +{{- end}} +` + +type domainMigrationCLIImpl struct { + frontendClient, destinationClient frontend.Client + frontendAdminClient, destinationAdminClient admin.Client +} + +func newDomainMigrationCLIImpl(c *cli.Context) *domainMigrationCLIImpl { + return &domainMigrationCLIImpl{ + frontendClient: cFactory.ServerFrontendClient(c), + destinationClient: cFactory.ServerFrontendClientForMigration(c), + frontendAdminClient: cFactory.ServerAdminClient(c), + destinationAdminClient: cFactory.ServerAdminClientForMigration(c), + } +} + +func (d *domainMigrationCLIImpl) Validation(c *cli.Context) { + checkers := []func(*cli.Context) DomainMigrationRow{ + d.migrationDomainMetaDataCheck, + d.migrationDomainWorkFlowCheck, + d.migrationDynamicConfigCheck, + d.searchAttributesChecker, + } + wg := &sync.WaitGroup{} + results := make([]DomainMigrationRow, len(checkers)) + for i := range checkers { + wg.Add(1) + go func(i int) { + defer wg.Done() + results[i] = checkers[i](c) + }(i) + } + wg.Wait() + + renderOpts := RenderOptions{ + DefaultTemplate: domainMigrationTemplate, + Color: true, + Border: true, + PrintDateTime: true, + } + + if err := Render(c, results, renderOpts); err != nil { + ErrorAndExit("Failed to render", err) + } +} + +func (d *domainMigrationCLIImpl) migrationDomainMetaDataCheck(c *cli.Context) DomainMigrationRow { + domain := c.GlobalString(FlagDomain) + newDomain := c.String(FlagDestinationDomain) + ctx, cancel := newContext(c) + defer cancel() + currResp, err := d.frontendClient.DescribeDomain(ctx, &types.DescribeDomainRequest{ + Name: &domain, + }) + if err != nil { + ErrorAndExit(fmt.Sprintf("Could not describe old domain, Please check to see if old domain exists before migrating."), err) + } + newResp, err := d.destinationClient.DescribeDomain(ctx, &types.DescribeDomainRequest{ + Name: &newDomain, + }) + if err != nil { + ErrorAndExit(fmt.Sprintf("Could not describe new domain, Please check to see if new domain exists before migrating."), err) + } + validationResult, mismatchedMetaData := metaDataValidation(currResp, newResp) + validationRow := DomainMigrationRow{ + ValidationCheck: "Domain Meta Data", + ValidationResult: validationResult, + ValidationDetails: ValidationDetails{ + CurrentDomainRow: currResp, + NewDomainRow: newResp, + MismatchedDomainMetaData: mismatchedMetaData, + }, + } + return validationRow +} + +func metaDataValidation(currResp *types.DescribeDomainResponse, newResp *types.DescribeDomainResponse) (bool, string) { + if !reflect.DeepEqual(currResp.Configuration, newResp.Configuration) { + return false, "mismatched DomainConfiguration" + } + + if currResp.DomainInfo.OwnerEmail != newResp.DomainInfo.OwnerEmail { + return false, "mismatched OwnerEmail" + } + return true, "" +} + +func (d *domainMigrationCLIImpl) migrationDomainWorkFlowCheck(c *cli.Context) DomainMigrationRow { + countWorkFlows := d.countLongRunningWorkflow(c) + check := countWorkFlows == 0 + return DomainMigrationRow{ + ValidationCheck: "Workflow Check", + ValidationResult: check, + ValidationDetails: ValidationDetails{ + LongRunningWorkFlowNum: &countWorkFlows, + }, + } +} + +func (d *domainMigrationCLIImpl) countLongRunningWorkflow(c *cli.Context) int { + domain := c.GlobalString(FlagDomain) + now := time.Now() + past14Days := now.Add(-14 * 24 * time.Hour) + request := &types.CountWorkflowExecutionsRequest{ + Domain: domain, + Query: "CloseTime=missing AND StartTime < " + strconv.FormatInt(past14Days.UnixNano(), 10), + } + ctx, cancel := newContextForLongPoll(c) + defer cancel() + response, err := d.frontendClient.CountWorkflowExecutions(ctx, request) + if err != nil { + ErrorAndExit("Failed to count workflow.", err) + } + return int(response.GetCount()) +} + +func (d *domainMigrationCLIImpl) searchAttributesChecker(c *cli.Context) DomainMigrationRow { + ctx, cancel := newContext(c) + defer cancel() + + // getting user provided search attributes + searchAttributes := c.StringSlice(FlagSearchAttribute) + if len(searchAttributes) == 0 { + return DomainMigrationRow{ + ValidationCheck: "Search Attributes Check", + ValidationResult: true, + } + } + + // Parse the provided search attributes into a map[string]IndexValueType + requiredAttributes := make(map[string]types.IndexedValueType) + for _, attr := range searchAttributes { + parts := strings.SplitN(attr, ":", 2) + if len(parts) != 2 { + ErrorAndExit(fmt.Sprintf("Invalid search attribute format: %s", attr), nil) + } + key, valueType := parts[0], parts[1] + ivt, err := parseIndexedValueType(valueType) + if err != nil { + ErrorAndExit(fmt.Sprintf("Invalid search attribute type for %s: %s", key, valueType), err) + } + requiredAttributes[key] = ivt + } + + // getting search attributes for current domain + currentSearchAttributes, err := d.frontendClient.GetSearchAttributes(ctx) + if err != nil { + ErrorAndExit("Unable to get search attributes for current domain.", err) + } + + // getting search attributes for new domain + destinationSearchAttributes, err := d.destinationClient.GetSearchAttributes(ctx) + if err != nil { + ErrorAndExit("Unable to get search attributes for new domain.", err) + } + + currentSearchAttrs := currentSearchAttributes.Keys + destinationSearchAttrs := destinationSearchAttributes.Keys + + // checking to see if search attributes exist + missingInCurrent := findMissingAttributes(requiredAttributes, currentSearchAttrs) + missingInNew := findMissingAttributes(requiredAttributes, destinationSearchAttrs) + + validationResult := len(missingInCurrent) == 0 && len(missingInNew) == 0 + + validationRow := DomainMigrationRow{ + ValidationCheck: "Search Attributes Check", + ValidationResult: validationResult, + ValidationDetails: ValidationDetails{ + MissingCurrSearchAttributes: missingInCurrent, + MissingNewSearchAttributes: missingInNew, + }, + } + + return validationRow +} + +// helper to parse types.IndexedValueType from string +func parseIndexedValueType(valueType string) (types.IndexedValueType, error) { + var result types.IndexedValueType + valueTypeBytes := []byte(valueType) + if err := result.UnmarshalText(valueTypeBytes); err != nil { + return 0, err + } + return result, nil +} + +// finds missing attributed in a map of existing attributed based on required attributes +func findMissingAttributes(requiredAttributes map[string]types.IndexedValueType, existingAttributes map[string]types.IndexedValueType) []string { + missingAttributes := make([]string, 0) + for key, requiredType := range requiredAttributes { + existingType, ok := existingAttributes[key] + if !ok || existingType != requiredType { + // construct the key:type string format + attr := fmt.Sprintf("%s:%s", key, requiredType) + missingAttributes = append(missingAttributes, attr) + } + } + return missingAttributes +} + +func (d *domainMigrationCLIImpl) migrationDynamicConfigCheck(c *cli.Context) DomainMigrationRow { + var mismatchedConfigs []MismatchedDynamicConfig + check := true + + resp := dynamicconfig.ListAllProductionKeys() + + currDomain := c.GlobalString(FlagDomain) + newDomain := c.String(FlagDestinationDomain) + + ctx, cancel := newContext(c) + defer cancel() + + currentDomainID := getDomainID(ctx, currDomain, d.frontendClient) + destinationDomainID := getDomainID(ctx, newDomain, d.destinationClient) + if currentDomainID == "" { + ErrorAndExit("Failed to get domainID for the current domain.", nil) + } + + if destinationDomainID == "" { + ErrorAndExit("Failed to get domainID for the destination domain.", nil) + } + + for _, configKey := range resp { + if len(configKey.Filters()) == 1 && configKey.Filters()[0] == dc.DomainName { + // Validate dynamic configs with only domainName filter + currRequest := dynamicconfig.ToGetDynamicConfigFilterRequest(configKey.String(), []dynamicconfig.FilterOption{ + dynamicconfig.DomainFilter(currDomain), + }) + + newRequest := dynamicconfig.ToGetDynamicConfigFilterRequest(configKey.String(), []dynamicconfig.FilterOption{ + dynamicconfig.DomainFilter(newDomain), + }) + + currResp, err := d.frontendAdminClient.GetDynamicConfig(ctx, currRequest) + if err != nil { + // empty to indicate N/A + currResp = &types.GetDynamicConfigResponse{} + } + newResp, err := d.destinationAdminClient.GetDynamicConfig(ctx, newRequest) + if err != nil { + // empty to indicate N/A + newResp = &types.GetDynamicConfigResponse{} + } + + if !reflect.DeepEqual(currResp.Value, newResp.Value) { + check = false + mismatchedConfigs = append(mismatchedConfigs, MismatchedDynamicConfig{ + Key: configKey, + CurrValues: []*types.DynamicConfigValue{ + toDynamicConfigValue(currResp.Value, map[dc.Filter]interface{}{ + dynamicconfig.DomainName: currDomain, + }), + }, + NewValues: []*types.DynamicConfigValue{ + toDynamicConfigValue(newResp.Value, map[dc.Filter]interface{}{ + dynamicconfig.DomainName: newDomain, + }), + }, + }) + } + + } else if len(configKey.Filters()) == 1 && configKey.Filters()[0] == dc.DomainID { + // Validate dynamic configs with only domainID filter + currRequest := dynamicconfig.ToGetDynamicConfigFilterRequest(configKey.String(), []dynamicconfig.FilterOption{ + dynamicconfig.DomainIDFilter(currentDomainID), + }) + + newRequest := dynamicconfig.ToGetDynamicConfigFilterRequest(configKey.String(), []dynamicconfig.FilterOption{ + dynamicconfig.DomainIDFilter(destinationDomainID), + }) + + currResp, err := d.frontendAdminClient.GetDynamicConfig(ctx, currRequest) + if err != nil { + // empty to indicate N/A + currResp = &types.GetDynamicConfigResponse{} + } + newResp, err := d.destinationAdminClient.GetDynamicConfig(ctx, newRequest) + if err != nil { + // empty to indicate N/A + newResp = &types.GetDynamicConfigResponse{} + } + + if !reflect.DeepEqual(currResp.Value, newResp.Value) { + check = false + mismatchedConfigs = append(mismatchedConfigs, MismatchedDynamicConfig{ + Key: configKey, + CurrValues: []*types.DynamicConfigValue{ + toDynamicConfigValue(currResp.Value, map[dc.Filter]interface{}{ + dynamicconfig.DomainID: currentDomainID, + }), + }, + NewValues: []*types.DynamicConfigValue{ + toDynamicConfigValue(newResp.Value, map[dc.Filter]interface{}{ + dynamicconfig.DomainID: destinationDomainID, + }), + }, + }) + } + + } else if containsFilter(configKey, dc.DomainName.String()) && containsFilter(configKey, dc.TaskListName.String()) { + // Validate dynamic configs with only domainName and TaskList filters + taskLists := c.StringSlice(FlagTaskList) + var mismatchedCurValues []*types.DynamicConfigValue + var mismatchedNewValues []*types.DynamicConfigValue + for _, taskList := range taskLists { + + currRequest := dynamicconfig.ToGetDynamicConfigFilterRequest(configKey.String(), []dynamicconfig.FilterOption{ + dynamicconfig.DomainFilter(currDomain), + dynamicconfig.TaskListFilter(taskList), + }) + + newRequest := dynamicconfig.ToGetDynamicConfigFilterRequest(configKey.String(), []dynamicconfig.FilterOption{ + dynamicconfig.DomainFilter(newDomain), + dynamicconfig.TaskListFilter(taskList), + }) + + currResp, err := d.frontendAdminClient.GetDynamicConfig(ctx, currRequest) + if err != nil { + // empty to indicate N/A + currResp = &types.GetDynamicConfigResponse{} + } + newResp, err := d.destinationAdminClient.GetDynamicConfig(ctx, newRequest) + if err != nil { + // empty to indicate N/A + newResp = &types.GetDynamicConfigResponse{} + } + + if !reflect.DeepEqual(currResp.Value, newResp.Value) { + check = false + mismatchedCurValues = append(mismatchedCurValues, toDynamicConfigValue(currResp.Value, map[dc.Filter]interface{}{ + dynamicconfig.DomainName: currDomain, + dynamicconfig.TaskListName: taskLists, + })) + mismatchedNewValues = append(mismatchedNewValues, toDynamicConfigValue(newResp.Value, map[dc.Filter]interface{}{ + dynamicconfig.DomainName: newDomain, + dynamicconfig.TaskListName: taskLists, + })) + + } + } + if len(mismatchedCurValues) > 0 && len(mismatchedNewValues) > 0 { + mismatchedConfigs = append(mismatchedConfigs, MismatchedDynamicConfig{ + Key: configKey, + CurrValues: mismatchedCurValues, + NewValues: mismatchedNewValues, + }) + } + } + } + + validationRow := DomainMigrationRow{ + ValidationCheck: "Dynamic Config Check", + ValidationResult: check, + ValidationDetails: ValidationDetails{ + MismatchedDynamicConfig: mismatchedConfigs, + }, + } + + return validationRow +} + +func getDomainID(c context.Context, domain string, client frontend.Client) string { + resp, err := client.DescribeDomain(c, &types.DescribeDomainRequest{Name: &domain}) + if err != nil { + ErrorAndExit("Failed to describe domain.", err) + } + + return resp.DomainInfo.GetUUID() +} + +func valueToDataBlob(value interface{}) *types.DataBlob { + if value == nil { + return nil + } + // No need to handle error as this is a private helper method + // where the correct value will always be passed regardless + data, _ := json.Marshal(value) + + return &types.DataBlob{ + EncodingType: types.EncodingTypeJSON.Ptr(), + Data: data, + } +} + +func toDynamicConfigValue(value *types.DataBlob, filterMaps map[dynamicconfig.Filter]interface{}) *types.DynamicConfigValue { + var configFilters []*types.DynamicConfigFilter + for filter, filterValue := range filterMaps { + configFilters = append(configFilters, &types.DynamicConfigFilter{ + Name: filter.String(), + Value: valueToDataBlob(filterValue), + }) + } + + return &types.DynamicConfigValue{ + Value: value, + Filters: configFilters, + } +} + +func containsFilter(key dynamicconfig.Key, value string) bool { + filters := key.Filters() + for _, filter := range filters { + if filter.String() == value { + return true + } + } + return false +} diff --git a/tools/cli/factory.go b/tools/cli/factory.go index 23c7cdd4a49..5f1bc4b8976 100644 --- a/tools/cli/factory.go +++ b/tools/cli/factory.go @@ -70,35 +70,31 @@ type ClientFactory interface { ServerFrontendClient(c *cli.Context) frontend.Client ServerAdminClient(c *cli.Context) admin.Client + // ServerFrontendClientForMigration frontend client of the migration destination + ServerFrontendClientForMigration(c *cli.Context) frontend.Client + // ServerAdminClientForMigration admin client of the migration destination + ServerAdminClientForMigration(c *cli.Context) admin.Client + ElasticSearchClient(c *cli.Context) *elastic.Client ServerConfig(c *cli.Context) (*config.Config, error) } type clientFactory struct { - addressFlagFunc func(c *cli.Context) string - hostPort string - dispatcher *yarpc.Dispatcher - logger *zap.Logger + dispatcher *yarpc.Dispatcher + dispatcherMigration *yarpc.Dispatcher + logger *zap.Logger } -// DEPRECATED don't use, only reserved for backward compatibility purposes // NewClientFactory creates a new ClientFactory func NewClientFactory() ClientFactory { - return newClientFactory(func(c *cli.Context) string { - return c.GlobalString(FlagAddress) - }) -} - -func newClientFactory(f func(c *cli.Context) string) ClientFactory { logger, err := zap.NewDevelopment() if err != nil { panic(err) } return &clientFactory{ - logger: logger, - addressFlagFunc: f, + logger: logger, } } @@ -139,6 +135,31 @@ func (b *clientFactory) ServerAdminClient(c *cli.Context) admin.Client { return admin.NewThriftClient(serverAdmin.New(clientConfig)) } +// ServerFrontendClientForMigration builds a frontend client (based on server side thrift interface) +func (b *clientFactory) ServerFrontendClientForMigration(c *cli.Context) frontend.Client { + b.ensureDispatcherForMigration(c) + clientConfig := b.dispatcherMigration.ClientConfig(cadenceFrontendService) + if c.GlobalString(FlagTransport) == grpcTransport { + return frontend.NewGRPCClient( + apiv1.NewDomainAPIYARPCClient(clientConfig), + apiv1.NewWorkflowAPIYARPCClient(clientConfig), + apiv1.NewWorkerAPIYARPCClient(clientConfig), + apiv1.NewVisibilityAPIYARPCClient(clientConfig), + ) + } + return frontend.NewThriftClient(serverFrontend.New(clientConfig)) +} + +// ServerAdminClientForMigration builds an admin client (based on server side thrift interface) +func (b *clientFactory) ServerAdminClientForMigration(c *cli.Context) admin.Client { + b.ensureDispatcherForMigration(c) + clientConfig := b.dispatcherMigration.ClientConfig(cadenceFrontendService) + if c.GlobalString(FlagTransport) == grpcTransport { + return admin.NewGRPCClient(adminv1.NewAdminAPIYARPCClient(clientConfig)) + } + return admin.NewThriftClient(serverAdmin.New(clientConfig)) +} + // ElasticSearchClient builds an ElasticSearch client func (b *clientFactory) ElasticSearchClient(c *cli.Context) *elastic.Client { url := getRequiredOption(c, FlagURL) @@ -159,19 +180,30 @@ func (b *clientFactory) ensureDispatcher(c *cli.Context) { if b.dispatcher != nil { return } + b.dispatcher = b.newClientDispatcher(c, c.GlobalString(FlagAddress)) +} + +func (b *clientFactory) ensureDispatcherForMigration(c *cli.Context) { + if b.dispatcherMigration != nil { + return + } + b.dispatcherMigration = b.newClientDispatcher(c, c.String(FlagDestinationAddress)) +} + +func (b *clientFactory) newClientDispatcher(c *cli.Context, hostPortOverride string) *yarpc.Dispatcher { shouldUseGrpc := c.GlobalString(FlagTransport) == grpcTransport - b.hostPort = tchannelPort + hostPort := tchannelPort if shouldUseGrpc { - b.hostPort = grpcPort + hostPort = grpcPort } - if addr := b.addressFlagFunc(c); addr != "" { - b.hostPort = addr + if hostPortOverride != "" { + hostPort = hostPortOverride } var outbounds transport.Outbounds if shouldUseGrpc { grpcTransport := grpc.NewTransport() - outbounds = transport.Outbounds{Unary: grpc.NewTransport().NewSingleOutbound(b.hostPort)} + outbounds = transport.Outbounds{Unary: grpc.NewTransport().NewSingleOutbound(hostPort)} tlsCertificatePath := c.GlobalString(FlagTLSCertPath) if tlsCertificatePath != "" { @@ -187,7 +219,7 @@ func (b *clientFactory) ensureDispatcher(c *cli.Context) { RootCAs: caCertPool, } tlsCreds := credentials.NewTLS(&tlsConfig) - tlsChooser := peer.NewSingle(hostport.Identify(b.hostPort), grpcTransport.NewDialer(grpc.DialerCredentials(tlsCreds))) + tlsChooser := peer.NewSingle(hostport.Identify(hostPort), grpcTransport.NewDialer(grpc.DialerCredentials(tlsCreds))) outbounds = transport.Outbounds{Unary: grpc.NewTransport().NewOutbound(tlsChooser)} } } else { @@ -195,10 +227,10 @@ func (b *clientFactory) ensureDispatcher(c *cli.Context) { if err != nil { b.logger.Fatal("Failed to create transport channel", zap.Error(err)) } - outbounds = transport.Outbounds{Unary: ch.NewSingleOutbound(b.hostPort)} + outbounds = transport.Outbounds{Unary: ch.NewSingleOutbound(hostPort)} } - b.dispatcher = yarpc.NewDispatcher(yarpc.Config{ + dispatcher := yarpc.NewDispatcher(yarpc.Config{ Name: cadenceClientName, Outbounds: yarpc.Outbounds{cadenceFrontendService: outbounds}, OutboundMiddleware: yarpc.OutboundMiddleware{ @@ -206,10 +238,11 @@ func (b *clientFactory) ensureDispatcher(c *cli.Context) { }, }) - if err := b.dispatcher.Start(); err != nil { - b.dispatcher.Stop() + if err := dispatcher.Start(); err != nil { + dispatcher.Stop() b.logger.Fatal("Failed to create outbound transport channel: %v", zap.Error(err)) } + return dispatcher } type versionMiddleware struct { diff --git a/tools/cli/utils.go b/tools/cli/utils.go index c700558df07..631103313d9 100644 --- a/tools/cli/utils.go +++ b/tools/cli/utils.go @@ -768,8 +768,8 @@ func newIndefiniteContext(c *cli.Context) (context.Context, context.CancelFunc) } func newTimedContext(c *cli.Context, timeout time.Duration) (context.Context, context.CancelFunc) { - if c.GlobalIsSet(FlagContextTimeout) { - timeout = time.Duration(c.GlobalInt(FlagContextTimeout)) * time.Second + if overrideTimeout := c.GlobalInt(FlagContextTimeout); overrideTimeout > 0 { + timeout = time.Duration(overrideTimeout) * time.Second } ctx := populateContextFromCLIContext(context.Background(), c) return context.WithTimeout(ctx, timeout)