Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,7 @@
<repository>
<id>ossrh-staging</id>
<name>OSS Sonatype Staging</name>
<url>https://oss.sonatype.org/content/groups/staging/</url>
<url>https://central.sonatype.com/</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
Expand All @@ -467,6 +467,17 @@

<repository>
<id>snapshots-repo</id>
<url>https://central.sonatype.com/repository/maven-snapshots</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>

<repository>
<id>snapshots-repo-old</id>
<url>https://oss.sonatype.org/content/repositories/snapshots</url>
<releases>
<enabled>false</enabled>
Expand Down
107 changes: 64 additions & 43 deletions pkg/cmd/federation.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,12 +314,7 @@ service, type and participant. Specify -T to set type outgoing or incoming and -
printWatchHeader(cmd)
sb.WriteString(FormatCurrentCluster(connection))

sb.WriteString("\nFEDERATION DETAILS\n")
sb.WriteString("------------------\n")

sb.WriteString(fmt.Sprintf("Service: %s\n", service))
sb.WriteString(fmt.Sprintf("Type: %s\n", describeFederationType))
sb.WriteString(fmt.Sprintf("Participant: %s\n\n", participant))
sb.WriteString(getDescribeFederationHeader(service, describeFederationType, participant))

if verboseOutput {
for _, v := range results {
Expand Down Expand Up @@ -355,6 +350,21 @@ service, type and participant. Specify -T to set type outgoing or incoming and -
},
}

func getDescribeFederationHeader(serviceName, federationType, participantName string) string {
var sb strings.Builder

if !monitorCluster {
sb.WriteString("\nFEDERATION DETAILS\n")
sb.WriteString("------------------\n")
sb.WriteString(fmt.Sprintf("Service: %s\n", serviceName))
}

sb.WriteString(fmt.Sprintf("Type: %s\n", federationType))
sb.WriteString(fmt.Sprintf("Participant: %s\n\n", participantName))

return sb.String()
}

func decodeFederationData(results [][]byte) ([]config.FederationDescription, error) {
var federationData = make([]config.FederationDescription, 0)
for _, v := range results {
Expand Down Expand Up @@ -438,47 +448,15 @@ func getFederationConnectionDetails(cmd *cobra.Command, service, federationType
return err
}
} else {
var sb strings.Builder

printWatchHeader(cmd)
sb.WriteString(FormatCurrentCluster(connection))

textDirection := "OUTGOING"
if describeFederationType == origins {
textDirection = "INCOMING"
}

sb.WriteString("\n" + textDirection + " FEDERATION CONNECTIONS\n")
sb.WriteString("------------------------------\n")

sb.WriteString(fmt.Sprintf("Service: %s\n", service))
sb.WriteString(fmt.Sprintf("Type: %s\n", describeFederationType))
sb.WriteString(fmt.Sprintf("Participant: %s\n", participant))
sb.WriteString("** Showing destination member details\n\n")
cmd.Println(FormatCurrentCluster(connection))

// encode the mapMembers
federationData, err := decodeFederationData(results)
if err != nil {
return err
output, err2 := getFederationConnectionData(results, service, describeFederationType, participant)
if err2 != nil {
return err2
}

mapAllIncoming := make([]string, 0)
for _, v := range federationData {
for _, v2 := range v.MapMembers {
mapAllIncoming = append(mapAllIncoming, v2)
}
if v.Member != "" && v.Member != "N/A" {
mapAllIncoming = append(mapAllIncoming, v.Member)
}
}
incomingList, err1 := decodeDepartedMembers(mapAllIncoming)
if err1 != nil {
return err1
}

sb.WriteString(FormatDepartedMembers(incomingList))

cmd.Println(sb.String())
cmd.Println(output)
}

// check to see if we should exit if we are not watching
Expand All @@ -493,6 +471,49 @@ func getFederationConnectionDetails(cmd *cobra.Command, service, federationType
return nil
}

func getFederationConnectionData(results [][]byte, service, federationType, participantName string) (string, error) {
var sb strings.Builder

textDirection := "OUTGOING"
if federationType == origins {
textDirection = "INCOMING"
}

if !monitorCluster {
sb.WriteString("\n" + textDirection + " FEDERATION CONNECTIONS\n")
sb.WriteString("-------------------------------\n")
sb.WriteString(fmt.Sprintf("Service: %s\n", service))
}

sb.WriteString(fmt.Sprintf("Type: %s\n", federationType))
sb.WriteString(fmt.Sprintf("Participant: %s\n", participantName))
sb.WriteString("** Showing destination member details\n\n")

// encode the mapMembers
federationData, err := decodeFederationData(results)
if err != nil {
return "", err
}

mapAllIncoming := make([]string, 0)
for _, v := range federationData {
for _, v2 := range v.MapMembers {
mapAllIncoming = append(mapAllIncoming, v2)
}
if v.Member != "" && v.Member != "N/A" {
mapAllIncoming = append(mapAllIncoming, v.Member)
}
}
incomingList, err1 := decodeDepartedMembers(mapAllIncoming)
if err1 != nil {
return "", err1
}

sb.WriteString(FormatDepartedMembers(incomingList))

return sb.String(), nil
}

func encodeFinalData(results [][]byte) []byte {
numResults := len(results)

Expand Down
74 changes: 72 additions & 2 deletions pkg/cmd/monitor_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ var (
"default-members": "members:machines,departed-members:network-stats",
}
errSelectService = errors.New("you must provide a service name via -S option")
errSelectParticipant = errors.New("you must provide a participant name via -p option")
errSelectCache = errors.New("you must provide a cache using the -C option")
errSelectTopic = errors.New("you must provide a topic using the -T option")
errSelectSubscriber = errors.New("you must provide a subscriber using the -B option")
Expand Down Expand Up @@ -183,6 +184,10 @@ var validPanels = []panelImpl{
createContentPanel(4, "federation-all", "Federation All", "show all federation details", federationAllContent, federationPanelData),
createContentPanel(3, "federation-dest", "Federation Destinations", "show federation destinations", federationDestinationsContent, federationPanelData),
createContentPanel(3, "federation-origins", "Federation Origins", "show federation origins", federationOriginsContent, federationPanelData),
createContentPanel(7, "federation-con-outgoing", "Federation Connections Outgoing (%SERVICE)", "show federation connections outgoing", federationOutgoing),
createContentPanel(7, "federation-con-incoming", "Federation Connections Incoming (%SERVICE)", "show federation connections incoming", federationIncoming),
createContentPanel(7, "federation-outgoing", "Federation Details Outgoing (%SERVICE)", "show federation details outgoing", federationDetailsOutgoing),
createContentPanel(7, "federation-incoming", "Federation Details Incoming (%SERVICE)", "show federation details incoming", federationDetailsIncoming),
createContentPanel(7, "http-servers", "HTTP Servers", "show HTTP servers", httpServersContent, proxiesPanelData),
createContentPanel(7, "http-sessions", "HTTP Sessions", "show HTTP sessions", httpSessionsContent, httpSessionsPanelData),
createContentPanel(7, "machines", "Machines", "show machines", machinesContent, memberPanelData, storagePanelData),
Expand Down Expand Up @@ -1049,6 +1054,70 @@ var federationAllContent = func(_ fetcher.Fetcher, clusterSummary clusterSummary
return noContentArray, nil
}

var federationOutgoing = func(dataFetcher fetcher.Fetcher, _ clusterSummaryInfo) ([]string, error) {
return federationOutgoingAndIncoming(dataFetcher, outgoing)
}

var federationIncoming = func(dataFetcher fetcher.Fetcher, _ clusterSummaryInfo) ([]string, error) {
return federationOutgoingAndIncoming(dataFetcher, incoming)
}

func federationOutgoingAndIncoming(dataFetcher fetcher.Fetcher, federationDirection string) ([]string, error) {
if serviceName == "" {
return emptyStringArray, errSelectService
}
if participant == all {
return emptyStringArray, errSelectParticipant
}

results, err := retrieveFederationDetails(dataFetcher, serviceName, federationDirection)
if err != nil {
return noContentArray, nil
}

output, err2 := getFederationConnectionData(results, serviceName, federationDirection, participant)
if err2 != nil {
return noContentArray, nil
}

return strings.Split(output, "\n"), nil
}

func federationDetailsOutgoingAndIncoming(dataFetcher fetcher.Fetcher, federationDirection string) ([]string, error) {
if serviceName == "" {
return emptyStringArray, errSelectService
}
if participant == all {
return emptyStringArray, errSelectParticipant
}

results, err := retrieveFederationDetails(dataFetcher, serviceName, federationDirection)
if err != nil {
return noContentArray, nil
}

var sb strings.Builder

sb.WriteString(getDescribeFederationHeader(serviceName, federationDirection, participant))

federationData, err := decodeFederationData(results)
if err != nil {
return noContentArray, nil
}

sb.WriteString(FormatFederationDetails(federationData, describeFederationType))

return strings.Split(sb.String(), "\n"), nil
}

var federationDetailsOutgoing = func(dataFetcher fetcher.Fetcher, _ clusterSummaryInfo) ([]string, error) {
return federationDetailsOutgoingAndIncoming(dataFetcher, outgoing)
}

var federationDetailsIncoming = func(dataFetcher fetcher.Fetcher, _ clusterSummaryInfo) ([]string, error) {
return federationDetailsOutgoingAndIncoming(dataFetcher, incoming)
}

var federationDestinationsContent = func(_ fetcher.Fetcher, clusterSummary clusterSummaryInfo) ([]string, error) {
if len(clusterSummary.finalSummariesDestinations) > 0 {
return strings.Split(FormatFederationSummary(clusterSummary.finalSummariesDestinations, destinations), "\n"), nil
Expand Down Expand Up @@ -1669,12 +1738,12 @@ func getValidPanelTypes() string {
var sb strings.Builder
sb.WriteString("Default panels\n--------------\n")
for k, v := range defaultMap {
sb.WriteString(fmt.Sprintf("%-22s: %s\n", k, v))
sb.WriteString(fmt.Sprintf("%-25s: %s\n", k, v))
}

sb.WriteString("\nIndividual panels\n-----------------\n")
for _, p := range validPanels {
sb.WriteString(fmt.Sprintf("%-22s: %s\n", p.GetPanelName(), p.GetDescription()))
sb.WriteString(fmt.Sprintf("%-25s: %s\n", p.GetPanelName(), p.GetDescription()))
}

return sb.String()
Expand Down Expand Up @@ -1705,6 +1774,7 @@ func init() {
monitorClusterCmd.Flags().BoolVarP(&ignoreRESTErrors, "ignore-errors", "I", false, "ignore errors after initial refresh")
monitorClusterCmd.Flags().BoolVarP(&disablePadding, "disable-padding", "D", false, "disable padding of panels by default")
monitorClusterCmd.Flags().StringVarP(&serviceName, serviceNameOption, "S", "", serviceNameDescription)
monitorClusterCmd.Flags().StringVarP(&participant, "participant", "p", all, participantMessage)
monitorClusterCmd.Flags().StringVarP(&selectedCache, "cache-name", "C", "", "cache name")
monitorClusterCmd.Flags().StringVarP(&selectedTopic, "topic-name", "T", "", "topic name")
monitorClusterCmd.Flags().StringVarP(&colorStyleParam, "style", "", "", "color style")
Expand Down
Loading