Skip to content

Commit

Permalink
feat: support batch get eventbus info (#72)
Browse files Browse the repository at this point in the history
* feat: support batch get eventbus info
  • Loading branch information
wenfengwang authored May 26, 2022
1 parent 9633ab8 commit 0bd008c
Showing 1 changed file with 81 additions and 63 deletions.
144 changes: 81 additions & 63 deletions vsctl/command/eventbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
metapb "github.com/linkall-labs/vsproto/pkg/meta"
"github.com/spf13/cobra"
"os"
"strings"
)

func NewEventbusCommand() *cobra.Command {
Expand Down Expand Up @@ -96,75 +97,90 @@ func deleteEventbusCommand() *cobra.Command {

func getEventbusInfoCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "info <eventbus-name> ",
Use: "info [flag] ",
Short: "get the eventbus info",
Run: func(cmd *cobra.Command, args []string) {
if args[0] == "" {
if eventbus == "" && (len(args) == 0 || args[0] == "") {
cmdFailedf("the eventbus must be set")
}
var buses []string
if len(args) > 0 && args[0] != "" {
buses = append(buses, args[0])
} else {
buses = strings.Split(eventbus, ",")
}
ctx := context.Background()
grpcConn := mustGetLeaderControllerGRPCConn(ctx, cmd)
defer func() {
_ = grpcConn.Close()
}()

cli := ctrlpb.NewEventBusControllerClient(grpcConn)
res, err := cli.GetEventBus(ctx, &metapb.EventBus{Name: args[0]})
if err != nil {
cmdFailedf("get eventbus failed: %s", err)
}

busMetas := make([]*metapb.EventBus, 0)
segs := make(map[uint64][]*metapb.Segment)
if showSegment || showBlock {
logCli := ctrlpb.NewEventLogControllerClient(grpcConn)
logs := res.GetLogs()
for idx := range logs {
segRes, err := logCli.ListSegment(ctx, &ctrlpb.ListSegmentRequest{
EventBusId: res.Id,
EventLogId: logs[idx].EventLogId,
})
if err != nil {
cmdFailedf("get segments failed: %s", err)
for idx := range buses {
res, err := cli.GetEventBus(ctx, &metapb.EventBus{Name: buses[idx]})
if err != nil {
cmdFailedf("get eventbus failed: %s", err)
}

busMetas = append(busMetas, res)
if showSegment || showBlock {
logCli := ctrlpb.NewEventLogControllerClient(grpcConn)
logs := res.GetLogs()
for idx := range logs {
segRes, err := logCli.ListSegment(ctx, &ctrlpb.ListSegmentRequest{
EventBusId: res.Id,
EventLogId: logs[idx].EventLogId,
})
if err != nil {
cmdFailedf("get segments failed: %s", err)
}
segs[logs[idx].EventLogId] = segRes.Segments
}
segs[logs[idx].EventLogId] = segRes.Segments
}
}

t := table.NewWriter()

if !showSegment && !showBlock {
t.AppendHeader(table.Row{"Eventbus", "Eventlog", "Segment Number"})
for idx := 0; idx < len(res.Logs); idx++ {
if idx == 0 {
t.AppendRow(table.Row{res.Name, res.Logs[idx].EventLogId, res.Logs[idx].CurrentSegmentNumbers})
} else {
t.AppendRow(table.Row{"", res.Logs[idx].EventLogId, res.Logs[idx].CurrentSegmentNumbers})
for _, res := range busMetas {
for idx := 0; idx < len(res.Logs); idx++ {
if idx == 0 {
t.AppendRow(table.Row{res.Name, res.Logs[idx].EventLogId, res.Logs[idx].CurrentSegmentNumbers})
} else {
t.AppendRow(table.Row{"", res.Logs[idx].EventLogId, res.Logs[idx].CurrentSegmentNumbers})
}
}
t.SetColumnConfigs([]table.ColumnConfig{
{Number: 1, AutoMerge: true, Align: text.AlignCenter, AlignHeader: text.AlignCenter},
{Number: 2, AutoMerge: true, Align: text.AlignCenter, AlignHeader: text.AlignCenter},
{Number: 3, Align: text.AlignCenter, AlignHeader: text.AlignCenter},
})
}
t.SetColumnConfigs([]table.ColumnConfig{
{Number: 1, AutoMerge: true, Align: text.AlignCenter, AlignHeader: text.AlignCenter},
{Number: 2, AutoMerge: true, Align: text.AlignCenter, AlignHeader: text.AlignCenter},
{Number: 3, AutoMerge: true, Align: text.AlignCenter, AlignHeader: text.AlignCenter},
})
} else {
if !showBlock {
t.AppendHeader(table.Row{"Eventbus", "Eventlog", "Segment", "Start", "End"})
for idx := 0; idx < len(res.Logs); idx++ {
segOfEL := segs[res.Logs[idx].EventLogId]
for sIdx, v := range segOfEL {
if idx == 0 && sIdx == 0 {
t.AppendRow(table.Row{res.Name, res.Logs[idx].EventLogId, v.Id, v.StartOffsetInLog,
v.EndOffsetInLog})
} else if sIdx == 0 {
t.AppendRow(table.Row{"", res.Logs[idx].EventLogId, v.Id, v.StartOffsetInLog,
v.EndOffsetInLog})
} else {
t.AppendRow(table.Row{"", "", v.Id, v.StartOffsetInLog,
v.EndOffsetInLog})
for _, res := range busMetas {
for idx := 0; idx < len(res.Logs); idx++ {
segOfEL := segs[res.Logs[idx].EventLogId]
for sIdx, v := range segOfEL {
if idx == 0 && sIdx == 0 {
t.AppendRow(table.Row{res.Name, res.Logs[idx].EventLogId, v.Id, v.StartOffsetInLog,
v.EndOffsetInLog})
} else if sIdx == 0 {
t.AppendRow(table.Row{"", res.Logs[idx].EventLogId, v.Id, v.StartOffsetInLog,
v.EndOffsetInLog})
} else {
t.AppendRow(table.Row{"", "", v.Id, v.StartOffsetInLog,
v.EndOffsetInLog})
}
}
t.AppendSeparator()
}
t.AppendSeparator()
}

t.SetColumnConfigs([]table.ColumnConfig{
{Number: 1, VAlign: text.VAlignMiddle, AutoMerge: true, Align: text.AlignCenter, AlignHeader: text.AlignCenter},
{Number: 2, VAlign: text.VAlignMiddle, AutoMerge: true, Align: text.AlignCenter, AlignHeader: text.AlignCenter},
Expand All @@ -175,30 +191,32 @@ func getEventbusInfoCommand() *cobra.Command {
} else {
t.AppendHeader(table.Row{"Eventbus", "Eventlog", "Segment", "Start", "End", "Block", "Leader", "Volume", "Endpoint"})
multiReplica := false
for idx := 0; idx < len(res.Logs); idx++ {
segOfEL := segs[res.Logs[idx].EventLogId]
for sIdx, seg := range segOfEL {
tIdx := 0
if !multiReplica && len(seg.Replicas) > 1 {
multiReplica = true
}
for _, blk := range seg.Replicas {
if idx == 0 && sIdx == 0 && tIdx == 0 {
t.AppendRow(table.Row{res.Name, res.Logs[idx].EventLogId, seg.Id, seg.StartOffsetInLog,
seg.EndOffsetInLog, blk.Id, blk.Id == seg.LeaderBlockId, blk.VolumeID, blk.Endpoint})
} else if sIdx == 0 && tIdx == 0 {
t.AppendRow(table.Row{"", res.Logs[idx].EventLogId, seg.Id, seg.StartOffsetInLog,
seg.EndOffsetInLog, blk.Id, blk.Id == seg.LeaderBlockId, blk.VolumeID, blk.Endpoint})
} else if tIdx == 0 {
t.AppendRow(table.Row{"", "", seg.Id, seg.StartOffsetInLog,
seg.EndOffsetInLog, blk.Id, blk.Id == seg.LeaderBlockId, blk.VolumeID, blk.Endpoint})
} else {
t.AppendRow(table.Row{"", "", "", "", "", blk.Id, blk.Id == seg.LeaderBlockId, blk.VolumeID, blk.Endpoint})
for _, res := range busMetas {
for idx := 0; idx < len(res.Logs); idx++ {
segOfEL := segs[res.Logs[idx].EventLogId]
for sIdx, seg := range segOfEL {
tIdx := 0
if !multiReplica && len(seg.Replicas) > 1 {
multiReplica = true
}
for _, blk := range seg.Replicas {
if idx == 0 && sIdx == 0 && tIdx == 0 {
t.AppendRow(table.Row{res.Name, res.Logs[idx].EventLogId, seg.Id, seg.StartOffsetInLog,
seg.EndOffsetInLog, blk.Id, blk.Id == seg.LeaderBlockId, blk.VolumeID, blk.Endpoint})
} else if sIdx == 0 && tIdx == 0 {
t.AppendRow(table.Row{"", res.Logs[idx].EventLogId, seg.Id, seg.StartOffsetInLog,
seg.EndOffsetInLog, blk.Id, blk.Id == seg.LeaderBlockId, blk.VolumeID, blk.Endpoint})
} else if tIdx == 0 {
t.AppendRow(table.Row{"", "", seg.Id, seg.StartOffsetInLog,
seg.EndOffsetInLog, blk.Id, blk.Id == seg.LeaderBlockId, blk.VolumeID, blk.Endpoint})
} else {
t.AppendRow(table.Row{"", "", "", "", "", blk.Id, blk.Id == seg.LeaderBlockId, blk.VolumeID, blk.Endpoint})
}
tIdx++
}
tIdx++
}
t.AppendSeparator()
}
t.AppendSeparator()
}
t.SetColumnConfigs([]table.ColumnConfig{
{Number: 1, VAlign: text.VAlignMiddle, AutoMerge: true, Align: text.AlignCenter, AlignHeader: text.AlignCenter},
Expand All @@ -220,6 +238,7 @@ func getEventbusInfoCommand() *cobra.Command {
t.Render()
},
}
cmd.Flags().StringVar(&eventbus, "eventbus", "", "eventbus to show, use , to separate")
cmd.Flags().BoolVar(&showSegment, "segment", false, "if show all segment of eventlog")
cmd.Flags().BoolVar(&showBlock, "block", false, "if show all block of segment")
return cmd
Expand Down Expand Up @@ -247,6 +266,5 @@ func listEventbusInfoCommand() *cobra.Command {
color.Green("%s", out.String())
},
}
cmd.Flags().StringVar(&eventbus, "name", "", "eventbus name to deleting")
return cmd
}

0 comments on commit 0bd008c

Please sign in to comment.