Skip to content

Commit

Permalink
Update admin rereplicate command to support SQL databases (uber#4227)
Browse files Browse the repository at this point in the history
  • Loading branch information
Shaddoll authored Jun 11, 2021
1 parent 99fb216 commit 263227f
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 13 deletions.
2 changes: 1 addition & 1 deletion tools/cli/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ func newAdminKafkaCommands() []cli.Command {
{
Name: "rereplicate",
Aliases: []string{"rrp"},
Usage: "Rereplicate replication tasks to target topic from history tables",
Usage: "Rereplicate replication tasks from history tables",
Flags: append(getDBFlags(),
cli.StringFlag{
Name: FlagSourceCluster,
Expand Down
16 changes: 4 additions & 12 deletions tools/cli/adminKafkaCommands.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,7 @@ import (
"github.com/uber/cadence/client/admin"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/config"
"github.com/uber/cadence/common/log/loggerimpl"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/persistence/cassandra"
"github.com/uber/cadence/common/persistence/nosql/nosqlplugin/cassandra/gocql"
"github.com/uber/cadence/common/types"
"github.com/uber/cadence/common/types/mapper/thrift"
)
Expand Down Expand Up @@ -462,14 +459,9 @@ func doRereplicate(
endEventID int64,
endEventVersion int64,
sourceCluster string,
cqlClient gocql.Client,
session gocql.Session,
adminClient admin.Client,
exeMgr persistence.ExecutionManager,
) {

exeM, _ := cassandra.NewWorkflowExecutionPersistence(shardID, cqlClient, session, loggerimpl.NewNopLogger())
exeMgr := persistence.NewExecutionManagerImpl(exeM, loggerimpl.NewNopLogger())

fmt.Printf("Start rereplicate for wid: %v, rid:%v \n", wid, rid)
resp, err := exeMgr.GetWorkflowExecution(ctx, &persistence.GetWorkflowExecutionRequest{
DomainID: domainID,
Expand Down Expand Up @@ -517,7 +509,6 @@ func AdminRereplicate(c *cli.Context) {
ErrorAndExit("End event version is not defined", nil)
}

client, session := connectToCassandra(c)
adminClient := cFactory.ServerAdminClient(c)
endEventID := c.Int64(FlagMaxEventID)
endVersion := c.Int64(FlagEndEventVersion)
Expand All @@ -526,6 +517,8 @@ func AdminRereplicate(c *cli.Context) {
rid := getRequiredOption(c, FlagRunID)
shardID := common.WorkflowIDToHistoryShard(wid, numberOfShards)
contextTimeout := defaultResendContextTimeout
executionManager := initializeExecutionStore(c, shardID, 0)
defer executionManager.Close()
if c.GlobalIsSet(FlagContextTimeout) {
contextTimeout = time.Duration(c.GlobalInt(FlagContextTimeout)) * time.Second
}
Expand All @@ -541,9 +534,8 @@ func AdminRereplicate(c *cli.Context) {
endEventID,
endVersion,
sourceCluster,
client,
session,
adminClient,
executionManager,
)
}

Expand Down

0 comments on commit 263227f

Please sign in to comment.