Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions temporalcli/commands.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,7 @@ type TemporalTaskQueueDescribeCommand struct {
Command cobra.Command
TaskQueue string
TaskQueueType StringEnum
Partitions int
}

func NewTemporalTaskQueueDescribeCommand(cctx *CommandContext, parent *TemporalTaskQueueCommand) *TemporalTaskQueueDescribeCommand {
Expand All @@ -505,6 +506,7 @@ func NewTemporalTaskQueueDescribeCommand(cctx *CommandContext, parent *TemporalT
_ = cobra.MarkFlagRequired(s.Command.Flags(), "task-queue")
s.TaskQueueType = NewStringEnum([]string{"workflow", "activity"}, "workflow")
s.Command.Flags().Var(&s.TaskQueueType, "task-queue-type", "Task Queue type. Accepted values: workflow, activity.")
s.Command.Flags().IntVar(&s.Partitions, "partitions", 1, "Query for all partitions up to this number (experimental+temporary feature).")
s.Command.Run = func(c *cobra.Command, args []string) {
if err := s.run(cctx, args); err != nil {
cctx.Options.Fail(err)
Expand Down
60 changes: 55 additions & 5 deletions temporalcli/commands.taskqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@ package temporalcli

import (
"fmt"
commonpb "go.temporal.io/api/common/v1"
"time"

"github.com/fatih/color"
"github.com/temporalio/cli/temporalcli/internal/printer"
"go.temporal.io/api/enums/v1"
"go.temporal.io/api/taskqueue/v1"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/server/common/tqname"
)

func (c *TemporalTaskQueueDescribeCommand) run(cctx *CommandContext, args []string) error {
Expand All @@ -25,14 +29,60 @@ func (c *TemporalTaskQueueDescribeCommand) run(cctx *CommandContext, args []stri
default:
return fmt.Errorf("unrecognized task queue type: %q", c.TaskQueueType.Value)
}
resp, err := cl.DescribeTaskQueue(cctx, c.TaskQueue, taskQueueType)

taskQueueName, err := tqname.FromBaseName(c.TaskQueue)
if err != nil {
return fmt.Errorf("failed describing task queue")
return fmt.Errorf("failed to parse task queue name: %w", err)
}
partitions := c.Partitions

type statusWithPartition struct {
Partition int `json:"partition"`
taskqueue.TaskQueueStatus
}
type pollerWithPartition struct {
Partition int `json:"partition"`
taskqueue.PollerInfo
// copy this out to display nicer in table or card, but not json
Versioning *commonpb.WorkerVersionCapabilities `json:"-"`
}

var statuses []*statusWithPartition
var pollers []*pollerWithPartition

// TODO: remove this when the server does partition fan-out
for p := 0; p < partitions; p++ {
resp, err := cl.WorkflowService().DescribeTaskQueue(cctx, &workflowservice.DescribeTaskQueueRequest{
Namespace: c.Parent.Namespace,
TaskQueue: &taskqueue.TaskQueue{
Name: taskQueueName.WithPartition(p).FullName(),
Kind: enums.TASK_QUEUE_KIND_NORMAL,
},
TaskQueueType: taskQueueType,
IncludeTaskQueueStatus: true,
})
if err != nil {
return fmt.Errorf("unable to describe task queue: %w", err)
}
statuses = append(statuses, &statusWithPartition{
Partition: p,
TaskQueueStatus: *resp.TaskQueueStatus,
})
for _, pi := range resp.Pollers {
pollers = append(pollers, &pollerWithPartition{
Partition: p,
PollerInfo: *pi,
Versioning: pi.WorkerVersionCapabilities,
})
}
}

// For JSON, we'll just dump the proto
if cctx.JSONOutput {
return cctx.Printer.PrintStructured(resp, printer.StructuredOptions{})
return cctx.Printer.PrintStructured(map[string]any{
"taskQueues": statuses,
"pollers": pollers,
}, printer.StructuredOptions{})
}

// For text, we will use a table for pollers
Expand All @@ -41,8 +91,8 @@ func (c *TemporalTaskQueueDescribeCommand) run(cctx *CommandContext, args []stri
Identity string
LastAccessTime time.Time
RatePerSecond float64
}, len(resp.Pollers))
for i, poller := range resp.Pollers {
}, len(pollers))
for i, poller := range pollers {
items[i].Identity = poller.Identity
items[i].LastAccessTime = poller.LastAccessTime.AsTime()
items[i].RatePerSecond = poller.RatePerSecond
Expand Down
16 changes: 15 additions & 1 deletion temporalcli/commands.taskqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,23 @@ func (s *SharedServerSuite) TestTaskQueue_Describe_Simple() {
)
s.NoError(res.Err)
var jsonOut struct {
Pollers []map[string]any `json:"pollers"`
Pollers []map[string]any `json:"pollers"`
TaskQueues []map[string]any `json:"taskQueues"`
}
s.NoError(json.Unmarshal(res.Stdout.Bytes(), &jsonOut))
s.Equal(1, len(jsonOut.TaskQueues))
// Check identity in the output
s.Equal(s.DevServer.Options.ClientOptions.Identity, jsonOut.Pollers[0]["identity"])

// Multiple partitions
res = s.Execute(
"task-queue", "describe",
"-o", "json",
"--address", s.Address(),
"--task-queue", s.Worker.Options.TaskQueue,
"--partitions", "10",
)
s.NoError(res.Err)
s.NoError(json.Unmarshal(res.Stdout.Bytes(), &jsonOut))
s.GreaterOrEqual(10, len(jsonOut.TaskQueues))
}
1 change: 1 addition & 0 deletions temporalcli/commandsmd/commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ Use the options listed below to modify what this command returns.

* `--task-queue`, `-t` (string) - Task queue name. Required.
* `--task-queue-type` (string-enum) - Task Queue type. Options: workflow, activity. Default: workflow.
* `--partitions` (int) - Query for all partitions up to this number (experimental+temporary feature). Default: 1.

### temporal workflow: Start, list, and operate on Workflows.

Expand Down