Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 15 additions & 6 deletions app/app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (

"github.com/golang/mock/gomock"
"github.com/pborman/uuid"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"github.com/temporalio/cli/app"
"github.com/temporalio/cli/client"
Expand Down Expand Up @@ -100,7 +99,6 @@ func (s *cliAppSuite) SetupTest() {
}

func (s *cliAppSuite) TearDownTest() {
s.mockCtrl.Finish() // assert mock’s expectations
}

func (s *cliAppSuite) TestTopLevelCommands() {
Expand All @@ -119,6 +117,19 @@ var describeTaskQueueResponse = &workflowservice.DescribeTaskQueueResponse{
{
LastAccessTime: timestamp.TimePtr(time.Now().UTC()),
Identity: "tester",
WorkerVersionCapabilities: &commonpb.WorkerVersionCapabilities{
BuildId: "some-build-id",
UseVersioning: false,
},
},
},
TaskQueueStatus: &taskqueuepb.TaskQueueStatus{
BacklogCountHint: 0,
ReadLevel: 100000,
AckLevel: 100000,
TaskIdBlock: &taskqueuepb.TaskIdBlock{
StartId: 100001,
EndId: 200000,
},
},
}
Expand Down Expand Up @@ -176,17 +187,15 @@ func (s *cliAppSuite) TestAcceptStringSliceArgsWithCommas() {
}

func (s *cliAppSuite) TestDescribeTaskQueue() {
s.sdkClient.On("DescribeTaskQueue", mock.Anything, mock.Anything, mock.Anything).Return(describeTaskQueueResponse, nil).Once()
s.frontendClient.EXPECT().DescribeTaskQueue(gomock.Any(), gomock.Any()).Return(describeTaskQueueResponse, nil)
err := s.app.Run([]string{"", "task-queue", "describe", "--task-queue", "test-taskQueue", "--namespace", cliTestNamespace})
s.Nil(err)
s.sdkClient.AssertExpectations(s.T())
}

func (s *cliAppSuite) TestDescribeTaskQueue_Activity() {
s.sdkClient.On("DescribeTaskQueue", mock.Anything, mock.Anything, mock.Anything).Return(describeTaskQueueResponse, nil).Once()
s.frontendClient.EXPECT().DescribeTaskQueue(gomock.Any(), gomock.Any()).Return(describeTaskQueueResponse, nil)
err := s.app.Run([]string{"", "task-queue", "describe", "--namespace", cliTestNamespace, "--task-queue", "test-taskQueue", "--task-queue-type", "activity"})
s.Nil(err)
s.sdkClient.AssertExpectations(s.T())
}

// TestFlagCategory_IsSet verifies that command flags have Category set
Expand Down
1 change: 1 addition & 0 deletions common/defs-flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ const (
// Task Queue flags
FlagTaskQueueName = "Name of the Task Queue."
FlagTaskQueueTypeDefinition = "Task Queue type [workflow|activity]"
FlagPartitionsDefinition = "Query for all partitions up to this number (experimental+temporary feature)"

// Namespace update flags
FlagActiveClusterDefinition = "Active cluster name."
Expand Down
1 change: 1 addition & 0 deletions common/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ var (
FlagOverlapPolicy = "overlap-policy"
FlagOwnerEmail = "email"
FlagParallelism = "input-parallelism"
FlagPartitions = "partitions"
FlagPause = "pause"
FlagPauseOnFailure = "pause-on-failure"
FlagPort = "port"
Expand Down
7 changes: 7 additions & 0 deletions taskqueue/task_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@ func NewTaskQueueCommands() []*cli.Command {
Usage: common.FlagTaskQueueTypeDefinition,
Category: common.CategoryMain,
},
// TOOD: remove this when the server does partition fan-out
&cli.IntFlag{
Name: common.FlagPartitions,
Value: 1,
Usage: common.FlagPartitionsDefinition,
Category: common.CategoryMain,
},
}, common.FlagsForFormatting...),
Action: func(c *cli.Context) error {
return DescribeTaskQueue(c)
Expand Down
85 changes: 74 additions & 11 deletions taskqueue/task_queue_commands.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package taskqueue

import (
"encoding/json"
"fmt"
"strings"

Expand All @@ -9,37 +10,99 @@ import (
"github.com/temporalio/tctl-kit/pkg/color"
"github.com/temporalio/tctl-kit/pkg/output"
"github.com/urfave/cli/v2"
commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
taskqueuepb "go.temporal.io/api/taskqueue/v1"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/server/common/tqname"
)

// DescribeTaskQueue show pollers info of a given taskqueue
func DescribeTaskQueue(c *cli.Context) error {
sdkClient, err := client.GetSDKClient(c)
taskQueue := c.String(common.FlagTaskQueue)
tqName, err := tqname.FromBaseName(taskQueue)
if err != nil {
return err
}
taskQueue := c.String(common.FlagTaskQueue)
taskQueueType := strToTaskQueueType(c.String(common.FlagTaskQueueType))
partitions := c.Int(common.FlagPartitions)

ctx, cancel := common.NewContext(c)
defer cancel()
resp, err := sdkClient.DescribeTaskQueue(ctx, taskQueue, taskQueueType)

frontendClient := client.Factory(c.App).FrontendClient(c)
namespace, err := common.RequiredFlag(c, common.FlagNamespace)
if err != nil {
return fmt.Errorf("unable to describe task queue: %w", err)
return err
}

type statusWithPartition struct {
Partition int `json:"partition"`
taskqueuepb.TaskQueueStatus
}
type pollerWithPartition struct {
Partition int `json:"partition"`
taskqueuepb.PollerInfo
// copy this out to display nicer in table or card, but not json
Versioning *commonpb.WorkerVersionCapabilities `json:"-"`
}

var statuses []any
var pollers []any

// TOOD: remove this when the server does partition fan-out
for p := 0; p < partitions; p++ {
resp, err := frontendClient.DescribeTaskQueue(ctx, &workflowservice.DescribeTaskQueueRequest{
Namespace: namespace,
TaskQueue: &taskqueuepb.TaskQueue{
Name: tqName.WithPartition(p).FullName(),
Kind: enumspb.TASK_QUEUE_KIND_NORMAL,
},
TaskQueueType: taskQueueType,
IncludeTaskQueueStatus: true,
})
// note that even if it doesn't exist before this call, DescribeTaskQueue will return something
if err != nil {
return fmt.Errorf("unable to describe task queue: %w", err)
}
statuses = append(statuses, &statusWithPartition{
Partition: p,
TaskQueueStatus: *resp.TaskQueueStatus,
})
for _, pi := range resp.Pollers {
pollers = append(pollers, &pollerWithPartition{
Partition: p,
PollerInfo: *pi,
Versioning: pi.WorkerVersionCapabilities,
})
}
}

if output.OutputOption(c.String(output.FlagOutput)) == output.JSON {
// handle specially so we output a single object instead of two
b, err := json.MarshalIndent(map[string]any{
"taskQueues": statuses,
"pollers": pollers,
}, "", " ")
if err != nil {
return err
}
_, err = fmt.Println(string(b))
return err
}

opts := &output.PrintOptions{
// TODO enable when versioning feature is out
// Fields: []string{"Identity", "LastAccessTime", "RatePerSecond", "WorkerVersioningId"},
Fields: []string{"Identity", "LastAccessTime", "RatePerSecond"},
Fields: []string{"Partition", "TaskQueueStatus.RatePerSecond", "TaskQueueStatus.BacklogCountHint", "TaskQueueStatus.ReadLevel", "TaskQueueStatus.AckLevel", "TaskQueueStatus.TaskIdBlock"},
}
var items []interface{}
for _, e := range resp.Pollers {
items = append(items, e)
err = output.PrintItems(c, statuses, opts)
if err != nil {
return err
}
return output.PrintItems(c, items, opts)

opts = &output.PrintOptions{
Fields: []string{"Partition", "PollerInfo.Identity", "PollerInfo.LastAccessTime", "PollerInfo.RatePerSecond", "Versioning.BuildId", "Versioning.UseVersioning"},
}
return output.PrintItems(c, pollers, opts)
}

// ListTaskQueuePartitions gets all the taskqueue partition and host information.
Expand Down