Skip to content

Commit

Permalink
Improved CLI DLQ read command (uber#4780)
Browse files Browse the repository at this point in the history
  • Loading branch information
vytautas-karpavicius authored Mar 25, 2022
1 parent 950f5ac commit b445012
Show file tree
Hide file tree
Showing 5 changed files with 298 additions and 140 deletions.
97 changes: 27 additions & 70 deletions tools/cli/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -749,42 +749,41 @@ func newAdminClusterCommands() []cli.Command {
}
}

func getDLQFlags() []cli.Flag {
return []cli.Flag{
cli.StringFlag{
Name: FlagShards,
Usage: "Comma separated shard IDs or inclusive ranges. Example: \"2,5-6,10\". Alternatively, feed one shard ID per line via STDIN.",
},
cli.StringFlag{
Name: FlagDLQTypeWithAlias,
Usage: "Type of DLQ to manage. (Options: domain, history)",
Value: "history",
},
cli.StringFlag{
Name: FlagSourceCluster,
Usage: "The cluster where the task is generated",
},
cli.IntFlag{
Name: FlagLastMessageIDWithAlias,
Usage: "The upper boundary of the read message",
},
}
}

func newAdminDLQCommands() []cli.Command {
return []cli.Command{
{
Name: "read",
Aliases: []string{"r"},
Usage: "Read DLQ Messages",
Flags: []cli.Flag{
cli.StringFlag{
Name: FlagDLQTypeWithAlias,
Usage: "Type of DLQ to manage. (Options: domain, history)",
},
cli.StringFlag{
Name: FlagSourceCluster,
Usage: "The cluster where the task is generated",
},
cli.IntFlag{
Name: FlagShardIDWithAlias,
Usage: "ShardID",
},
Flags: append(getDLQFlags(),
cli.IntFlag{
Name: FlagMaxMessageCountWithAlias,
Usage: "Max message size to fetch",
},
cli.IntFlag{
Name: FlagLastMessageIDWithAlias,
Usage: "The upper boundary of the read message",
},
cli.StringFlag{
Name: FlagOutputFilenameWithAlias,
Usage: "Output file to write to, if not provided output is written to stdout",
},
cli.BoolFlag{
Name: FlagDLQRawTask,
Usage: "Show DLQ raw task information",
},
},
getFormatFlag(),
),
Action: func(c *cli.Context) {
AdminGetDLQMessages(c)
},
Expand All @@ -793,28 +792,7 @@ func newAdminDLQCommands() []cli.Command {
Name: "purge",
Aliases: []string{"p"},
Usage: "Delete DLQ messages with equal or smaller ids than the provided task id",
Flags: []cli.Flag{
cli.StringFlag{
Name: FlagDLQTypeWithAlias,
Usage: "Type of DLQ to manage. (Options: domain, history)",
},
cli.StringFlag{
Name: FlagSourceCluster,
Usage: "The cluster where the task is generated",
},
cli.IntFlag{
Name: FlagLowerShardBound,
Usage: "lower bound of shard to merge (inclusive)",
},
cli.IntFlag{
Name: FlagUpperShardBound,
Usage: "upper bound of shard to merge (inclusive)",
},
cli.IntFlag{
Name: FlagLastMessageIDWithAlias,
Usage: "The upper boundary of the read message",
},
},
Flags: getDLQFlags(),
Action: func(c *cli.Context) {
AdminPurgeDLQMessages(c)
},
Expand All @@ -823,28 +801,7 @@ func newAdminDLQCommands() []cli.Command {
Name: "merge",
Aliases: []string{"m"},
Usage: "Merge DLQ messages with equal or smaller ids than the provided task id",
Flags: []cli.Flag{
cli.StringFlag{
Name: FlagDLQTypeWithAlias,
Usage: "Type of DLQ to manage. (Options: domain, history)",
},
cli.StringFlag{
Name: FlagSourceCluster,
Usage: "The cluster where the task is generated",
},
cli.IntFlag{
Name: FlagLowerShardBound,
Usage: "lower bound of shard to merge (inclusive)",
},
cli.IntFlag{
Name: FlagUpperShardBound,
Usage: "upper bound of shard to merge (inclusive)",
},
cli.IntFlag{
Name: FlagLastMessageIDWithAlias,
Usage: "The upper boundary of the read message",
},
},
Flags: getDLQFlags(),
Action: func(c *cli.Context) {
AdminMergeDLQMessages(c)
},
Expand Down
199 changes: 130 additions & 69 deletions tools/cli/adminDLQCommands.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ package cli

import (
"bufio"
"encoding/json"
"fmt"
"io"
"os"
Expand All @@ -33,7 +32,6 @@ import (
"github.com/urfave/cli"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/collection"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/types"
)
Expand All @@ -42,21 +40,40 @@ const (
defaultPageSize = 1000
)

type DLQRow struct {
ShardID int `header:"Shard ID" json:"shardID"`
DomainName string `header:"Domain Name" json:"domainName"`
DomainID string `header:"Domain ID" json:"domainID"`
WorkflowID string `header:"Workflow ID" json:"workflowID"`
RunID string `header:"Run ID" json:"runID"`
TaskID int64 `header:"Task ID" json:"taskID"`
TaskType *types.ReplicationTaskType `header:"Task Type" json:"taskType"`
Version int64 `json:"version"`
FirstEventID int64 `json:"firstEventID"`
NextEventID int64 `json:"nextEventID"`
ScheduledID int64 `json:"scheduledID"`
ReplicationTask *types.ReplicationTask `json:"replicationTask"`

// Those are deserialized variants from history replications task
Events []*types.HistoryEvent `json:"events"`
NewRunEvents []*types.HistoryEvent `json:"newRunEvents,omitempty"`

// Only event IDs for compact table representation
EventIDs []int64 `header:"Event IDs"`
NewRunEventIDs []int64 `header:"New Run Event IDs"`
}

// AdminGetDLQMessages gets DLQ metadata
func AdminGetDLQMessages(c *cli.Context) {
ctx, cancel := newContext(c)
defer cancel()

client := cFactory.ServerFrontendClient(c)
adminClient := cFactory.ServerAdminClient(c)
dlqType := getRequiredOption(c, FlagDLQType)

dlqType := toQueueType(getRequiredOption(c, FlagDLQType))
sourceCluster := getRequiredOption(c, FlagSourceCluster)
shardID := getRequiredIntOption(c, FlagShardID)
serializer := persistence.NewPayloadSerializer()
outputFile := getOutputFile(c.String(FlagOutputFilename))
defer outputFile.Close()

showRawTask := c.Bool(FlagDLQRawTask)
var rawTasksInfo []*types.ReplicationTaskInfo
remainingMessageCount := common.EndMessageID
if c.IsSet(FlagMaxMessageCount) {
remainingMessageCount = c.Int64(FlagMaxMessageCount)
Expand All @@ -66,75 +83,96 @@ func AdminGetDLQMessages(c *cli.Context) {
lastMessageID = c.Int64(FlagLastMessageID)
}

paginationFunc := func(paginationToken []byte) ([]interface{}, []byte, error) {
resp, err := adminClient.ReadDLQMessages(ctx, &types.ReadDLQMessagesRequest{
Type: toQueueType(dlqType),
SourceCluster: sourceCluster,
ShardID: int32(shardID),
InclusiveEndMessageID: common.Int64Ptr(lastMessageID),
MaximumPageSize: defaultPageSize,
NextPageToken: paginationToken,
})
if err != nil {
return nil, nil, err
}
var paginateItems []interface{}
for _, item := range resp.GetReplicationTasks() {
paginateItems = append(paginateItems, item)
}
if showRawTask {
rawTasksInfo = append(rawTasksInfo, resp.GetReplicationTasksInfo()...)
}

return paginateItems, resp.GetNextPageToken(), err
}

iterator := collection.NewPagingIterator(paginationFunc)
var lastReadMessageID int
for iterator.HasNext() && remainingMessageCount > 0 {
item, err := iterator.Next()
if err != nil {
ErrorAndExit(fmt.Sprintf("fail to read dlq message. Last read message id: %v", lastReadMessageID), err)
// Cache for domain names
domainNames := map[string]string{}
getDomainName := func(domainId string) string {
if domainName, ok := domainNames[domainId]; ok {
return domainName
}

task := item.(*types.ReplicationTask)
taskStr, err := decodeReplicationTask(task, serializer)
resp, err := client.DescribeDomain(ctx, &types.DescribeDomainRequest{UUID: common.StringPtr(domainId)})
if err != nil {
ErrorAndExit(fmt.Sprintf("fail to encode dlq message. Last read message id: %v", lastReadMessageID), err)
}

lastReadMessageID = int(task.SourceTaskID)
remainingMessageCount--
_, err = outputFile.WriteString(fmt.Sprintf("%v\n", string(taskStr)))
if err != nil {
ErrorAndExit("fail to print dlq messages.", err)
ErrorAndExit("failed to describe domain", err)
}
domainNames[domainId] = resp.DomainInfo.Name
return resp.DomainInfo.Name
}

if showRawTask {
_, err := outputFile.WriteString("#### REPLICATION DLQ RAW TASKS INFO ####\n")
if err != nil {
ErrorAndExit("fail to print dlq raw tasks.", err)
}
for _, info := range rawTasksInfo {
str, err := json.Marshal(info)
readShard := func(shardID int) []DLQRow {
var rows []DLQRow
var pageToken []byte

for {
resp, err := adminClient.ReadDLQMessages(ctx, &types.ReadDLQMessagesRequest{
Type: dlqType,
SourceCluster: sourceCluster,
ShardID: int32(shardID),
InclusiveEndMessageID: common.Int64Ptr(lastMessageID),
MaximumPageSize: defaultPageSize,
NextPageToken: pageToken,
})
if err != nil {
ErrorAndExit("fail to encode dlq raw tasks.", err)
ErrorAndExit(fmt.Sprintf("fail to read dlq message for shard: %d", shardID), err)
}

if _, err = outputFile.WriteString(fmt.Sprintf("%v\n", string(str))); err != nil {
ErrorAndExit("fail to print dlq raw tasks.", err)
replicationTasks := map[int64]*types.ReplicationTask{}
for _, task := range resp.ReplicationTasks {
replicationTasks[task.SourceTaskID] = task
}
}
} else {
if lastReadMessageID == 0 && len(rawTasksInfo) > 0 {
if _, err := outputFile.WriteString(
fmt.Sprintf("WARN: Received empty replication task but metadata is not empty. Please use %v to show metadata task.\n", FlagDLQRawTask),
); err != nil {
ErrorAndExit("fail to print warning message.", err)

for _, info := range resp.ReplicationTasksInfo {
task := replicationTasks[info.TaskID]

var taskType *types.ReplicationTaskType
if task != nil {
taskType = task.TaskType
}

events := deserializeBatchEvents(task.GetHistoryTaskV2Attributes().GetEvents())
newRunEvents := deserializeBatchEvents(task.GetHistoryTaskV2Attributes().GetNewRunEvents())

rows = append(rows, DLQRow{
ShardID: shardID,
DomainName: getDomainName(info.DomainID),
DomainID: info.DomainID,
WorkflowID: info.WorkflowID,
RunID: info.RunID,
TaskType: taskType,
TaskID: info.TaskID,
Version: info.Version,
FirstEventID: info.FirstEventID,
NextEventID: info.NextEventID,
ScheduledID: info.ScheduledID,
ReplicationTask: task,
Events: events,
EventIDs: collectEventIDs(events),
NewRunEvents: newRunEvents,
NewRunEventIDs: collectEventIDs(newRunEvents),
})

remainingMessageCount--
if remainingMessageCount <= 0 {
return rows
}
}

if len(resp.NextPageToken) == 0 {
break
}
pageToken = resp.NextPageToken
}
return rows
}

table := []DLQRow{}
for shardID := range getShards(c) {
if remainingMessageCount <= 0 {
break
}
table = append(table, readShard(shardID)...)
}

Render(c, table, RenderOptions{DefaultTemplate: templateTable, Color: true})
}

// AdminPurgeDLQMessages deletes messages from DLQ
Expand Down Expand Up @@ -217,10 +255,13 @@ func getShards(c *cli.Context) chan int {
func generateShardRangeFromFlags(c *cli.Context) chan int {
shards := make(chan int)
go func() {
lower := c.Int(FlagLowerShardBound)
upper := c.Int(FlagUpperShardBound)
for shard := lower; shard <= upper; shard++ {
shards <- shard
shardRange, err := parseIntMultiRange(c.String(FlagShards))
if err != nil {
fmt.Printf("failed to parse shard range: %q\n", c.String(FlagShards))
} else {
for _, shard := range shardRange {
shards <- shard
}
}
close(shards)
}()
Expand Down Expand Up @@ -263,3 +304,23 @@ func toQueueType(dlqType string) *types.DLQType {
}
return nil
}

func deserializeBatchEvents(blob *types.DataBlob) []*types.HistoryEvent {
if blob == nil {
return nil
}
serializer := persistence.NewPayloadSerializer()
events, err := serializer.DeserializeBatchEvents(persistence.NewDataBlobFromInternal(blob))
if err != nil {
ErrorAndExit("Failed to decode DLQ history replication events", err)
}
return events
}

func collectEventIDs(events []*types.HistoryEvent) []int64 {
ids := make([]int64, 0, len(events))
for _, event := range events {
ids = append(ids, event.ID)
}
return ids
}
Loading

0 comments on commit b445012

Please sign in to comment.