Skip to content

Commit

Permalink
Add flag for changing rps limit in istioctl bug-report (istio#40725)
Browse files Browse the repository at this point in the history
* Remove unused method

* Add option for changing rps limit in istioctl bug-report

* Add release notes
  • Loading branch information
morepork authored Sep 23, 2022
1 parent 0670533 commit 95b2417
Show file tree
Hide file tree
Showing 7 changed files with 139 additions and 127 deletions.
8 changes: 8 additions & 0 deletions releasenotes/notes/bug-report-rps-limit.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
apiVersion: release-notes/v2
kind: feature
area: istioctl
releaseNotes:
- |
**Added** `--rps-limit` flag to `istioctl bug-report` that allows increasing
the requests per second limit to the Kubernetes API server which can greatly
reduce the time to collect bug reports.
65 changes: 21 additions & 44 deletions tools/bug-report/pkg/bugreport/bugreport.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package bugreport

import (
"context"
"errors"
"fmt"
"os"
"path"
Expand All @@ -29,7 +28,6 @@ import (

"github.com/kr/pretty"
"github.com/spf13/cobra"
"k8s.io/client-go/tools/clientcmd"

"istio.io/istio/operator/pkg/util"
"istio.io/istio/pkg/kube"
Expand Down Expand Up @@ -101,7 +99,8 @@ var (
)

func runBugReportCommand(_ *cobra.Command, logOpts *log.Options) error {
kubectlcmd.ReportRunningTasks()
runner := kubectlcmd.NewRunner(gConfig.RequestsPerSecondLimit)
runner.ReportRunningTasks()
if err := configLogs(logOpts); err != nil {
return err
}
Expand All @@ -112,7 +111,7 @@ func runBugReportCommand(_ *cobra.Command, logOpts *log.Options) error {
clusterCtxStr := ""
if config.Context == "" {
var err error
clusterCtxStr, err = content.GetClusterContext(config.KubeConfigPath)
clusterCtxStr, err = content.GetClusterContext(runner, config.KubeConfigPath)
if err != nil {
return err
}
Expand All @@ -123,15 +122,16 @@ func runBugReportCommand(_ *cobra.Command, logOpts *log.Options) error {
common.LogAndPrintf("\nTarget cluster context: %s\n", clusterCtxStr)
common.LogAndPrintf("Running with the following config: \n\n%s\n\n", config)

clientConfig, clientset, err := kubeclient.New(config.KubeConfigPath, config.Context)
restConfig, clientset, err := kubeclient.New(config.KubeConfigPath, config.Context, gConfig.RequestsPerSecondLimit)
if err != nil {
return fmt.Errorf("could not initialize k8s client: %s ", err)
}
client, err := kube.NewCLIClient(clientConfig, "")
client, err := kube.NewCLIClient(kube.NewClientConfigForRestConfig(restConfig), "")
if err != nil {
return err
}
common.LogAndPrintf("\nCluster endpoint: %s\n", client.RESTConfig().Host)
runner.SetClient(client)

clusterResourcesCtx, getClusterResourcesCancel := context.WithTimeout(context.Background(), commandTimeout)
curTime := time.Now()
Expand All @@ -157,7 +157,7 @@ func runBugReportCommand(_ *cobra.Command, logOpts *log.Options) error {

common.LogAndPrintf("\n\nFetching proxy logs for the following containers:\n\n%s\n", strings.Join(paths, "\n"))

gatherInfo(client, config, resources, paths)
gatherInfo(runner, config, resources, paths)
if len(gErrors) != 0 {
log.Error(gErrors.ToError())
}
Expand Down Expand Up @@ -267,7 +267,7 @@ func getIstioVersion(kubeconfig, configContext, istioNamespace, revision string)
// gatherInfo fetches all logs, resources, debug etc. using goroutines.
// proxy logs and info are saved in logs/stats/importance global maps.
// Errors are reported through gErrors.
func gatherInfo(client kube.CLIClient, config *config.BugReportConfig, resources *cluster2.Resources, paths []string) {
func gatherInfo(runner *kubectlcmd.Runner, config *config.BugReportConfig, resources *cluster2.Resources, paths []string) {
// no timeout on mandatoryWg.
var mandatoryWg sync.WaitGroup
cmdTimer := time.NewTimer(time.Duration(config.CommandTimeout))
Expand All @@ -276,7 +276,7 @@ func gatherInfo(client kube.CLIClient, config *config.BugReportConfig, resources
clusterDir := archive.ClusterInfoPath(tempDir)

params := &content.Params{
Client: client,
Runner: runner,
DryRun: config.DryRun,
KubeConfig: config.KubeConfigPath,
KubeContext: config.Context,
Expand Down Expand Up @@ -306,14 +306,14 @@ func gatherInfo(client kube.CLIClient, config *config.BugReportConfig, resources
getFromCluster(content.GetCoredumps, cp, filepath.Join(proxyDir, "cores"), &mandatoryWg)
getFromCluster(content.GetNetstat, cp, proxyDir, &mandatoryWg)
getFromCluster(content.GetProxyInfo, cp, archive.ProxyOutputPath(tempDir, namespace, pod), &optionalWg)
getProxyLogs(client, config, resources, p, namespace, pod, container, &optionalWg)
getProxyLogs(runner, config, resources, p, namespace, pod, container, &optionalWg)

case resources.IsDiscoveryContainer(params.ClusterVersion, namespace, pod, container):
getFromCluster(content.GetIstiodInfo, cp, archive.IstiodPath(tempDir, namespace, pod), &mandatoryWg)
getIstiodLogs(client, config, resources, namespace, pod, &mandatoryWg)
getIstiodLogs(runner, config, resources, namespace, pod, &mandatoryWg)

case common.IsOperatorContainer(params.ClusterVersion, container):
getOperatorLogs(client, config, resources, namespace, pod, &optionalWg)
getOperatorLogs(runner, config, resources, namespace, pod, &optionalWg)
}
}

Expand Down Expand Up @@ -355,14 +355,14 @@ func getFromCluster(f func(params *content.Params) (map[string]string, error), p
// getProxyLogs fetches proxy logs for the given namespace/pod/container and stores the output in global structs.
// Runs if a goroutine, with errors reported through gErrors.
// TODO(stewartbutler): output the logs to a more robust/complete structure.
func getProxyLogs(client kube.CLIClient, config *config.BugReportConfig, resources *cluster2.Resources,
func getProxyLogs(runner *kubectlcmd.Runner, config *config.BugReportConfig, resources *cluster2.Resources,
path, namespace, pod, container string, wg *sync.WaitGroup,
) {
wg.Add(1)
log.Infof("Waiting on logs %s", pod)
go func() {
defer wg.Done()
clog, cstat, imp, err := getLog(client, resources, config, namespace, pod, container)
clog, cstat, imp, err := getLog(runner, resources, config, namespace, pod, container)
appendGlobalErr(err)
lock.Lock()
if err == nil {
Expand All @@ -375,46 +375,46 @@ func getProxyLogs(client kube.CLIClient, config *config.BugReportConfig, resourc

// getIstiodLogs fetches Istiod logs for the given namespace/pod and writes the output.
// Runs if a goroutine, with errors reported through gErrors.
func getIstiodLogs(client kube.CLIClient, config *config.BugReportConfig, resources *cluster2.Resources,
func getIstiodLogs(runner *kubectlcmd.Runner, config *config.BugReportConfig, resources *cluster2.Resources,
namespace, pod string, wg *sync.WaitGroup,
) {
wg.Add(1)
log.Infof("Waiting on logs %s", pod)
go func() {
defer wg.Done()
clog, _, _, err := getLog(client, resources, config, namespace, pod, common.DiscoveryContainerName)
clog, _, _, err := getLog(runner, resources, config, namespace, pod, common.DiscoveryContainerName)
appendGlobalErr(err)
writeFile(filepath.Join(archive.IstiodPath(tempDir, namespace, pod), "discovery.log"), clog)
log.Infof("Done with logs %s", pod)
}()
}

// getOperatorLogs fetches istio-operator logs for the given namespace/pod and writes the output.
func getOperatorLogs(client kube.CLIClient, config *config.BugReportConfig, resources *cluster2.Resources,
func getOperatorLogs(runner *kubectlcmd.Runner, config *config.BugReportConfig, resources *cluster2.Resources,
namespace, pod string, wg *sync.WaitGroup,
) {
wg.Add(1)
log.Infof("Waiting on logs %s", pod)
go func() {
defer wg.Done()
clog, _, _, err := getLog(client, resources, config, namespace, pod, common.OperatorContainerName)
clog, _, _, err := getLog(runner, resources, config, namespace, pod, common.OperatorContainerName)
appendGlobalErr(err)
writeFile(filepath.Join(archive.OperatorPath(tempDir, namespace, pod), "operator.log"), clog)
log.Infof("Done with logs %s", pod)
}()
}

// getLog fetches the logs for the given namespace/pod/container and returns the log text and stats for it.
func getLog(client kube.CLIClient, resources *cluster2.Resources, config *config.BugReportConfig,
func getLog(runner *kubectlcmd.Runner, resources *cluster2.Resources, config *config.BugReportConfig,
namespace, pod, container string,
) (string, *processlog.Stats, int, error) {
log.Infof("Getting logs for %s/%s/%s...", namespace, pod, container)
clog, err := kubectlcmd.Logs(client, namespace, pod, container, false, config.DryRun)
clog, err := runner.Logs(namespace, pod, container, false, config.DryRun)
if err != nil {
return "", nil, 0, err
}
if resources.ContainerRestarts(namespace, pod, container) > 0 {
pclog, err := kubectlcmd.Logs(client, namespace, pod, container, true, config.DryRun)
pclog, err := runner.Logs(namespace, pod, container, true, config.DryRun)
if err != nil {
return "", nil, 0, err
}
Expand Down Expand Up @@ -472,29 +472,6 @@ func appendGlobalErr(err error) {
lock.Unlock()
}

func BuildClientsFromConfig(kubeConfig []byte) (kube.Client, error) {
if len(kubeConfig) == 0 {
return nil, errors.New("kubeconfig is empty")
}

rawConfig, err := clientcmd.Load(kubeConfig)
if err != nil {
return nil, fmt.Errorf("kubeconfig cannot be loaded: %v", err)
}

if err := clientcmd.Validate(*rawConfig); err != nil {
return nil, fmt.Errorf("kubeconfig is not valid: %v", err)
}

clientConfig := clientcmd.NewDefaultClientConfig(*rawConfig, &clientcmd.ConfigOverrides{})

clients, err := kube.NewClient(clientConfig)
if err != nil {
return nil, fmt.Errorf("failed to create kube clients: %v", err)
}
return clients, nil
}

func configLogs(opt *log.Options) error {
logDir := filepath.Join(archive.OutputRootDir(tempDir), "bug-report.log")
mkdirOrExit(logDir)
Expand Down
5 changes: 5 additions & 0 deletions tools/bug-report/pkg/bugreport/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ func addFlags(cmd *cobra.Command, args *config2.BugReportConfig) {
// output/working dir
cmd.PersistentFlags().StringVar(&tempDir, "dir", "",
"Set a specific directory for temporary artifact storage.")

// requests per second limit
cmd.PersistentFlags().IntVar(&args.RequestsPerSecondLimit, "rps-limit", 0,
"Requests per second limit to the Kubernetes API server, defaults to 10."+
"A higher limit can make bug report collection much faster.")
}

func parseConfig() (*config2.BugReportConfig, error) {
Expand Down
3 changes: 3 additions & 0 deletions tools/bug-report/pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,9 @@ type BugReportConfig struct {
// IgnoredErrors are glob error patterns which are ignored when
// calculating the error heuristic for a log.
IgnoredErrors []string `json:"ignoredErrors,omitempty"`

// RequestsPerSecondLimit controls the RPS limit to the API server.
RequestsPerSecondLimit int `json:"requestsPerSecondLimit,omitempty"`
}

func (b *BugReportConfig) String() string {
Expand Down
44 changes: 19 additions & 25 deletions tools/bug-report/pkg/content/content.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ const (

// Params contains parameters for running a kubectl fetch command.
type Params struct {
Client kube.CLIClient
Runner *kubectlcmd.Runner
DryRun bool
Verbose bool
ClusterVersion string
Expand All @@ -48,12 +48,6 @@ type Params struct {
KubeContext string
}

func (p *Params) SetClient(client kube.CLIClient) *Params {
out := *p
out.Client = client
return &out
}

func (p *Params) SetDryRun(dryRun bool) *Params {
out := *p
out.DryRun = dryRun
Expand Down Expand Up @@ -101,7 +95,7 @@ func retMap(filename, text string, err error) (map[string]string, error) {

// GetK8sResources returns all k8s cluster resources.
func GetK8sResources(p *Params) (map[string]string, error) {
out, err := kubectlcmd.RunCmd("get --all-namespaces "+
out, err := p.Runner.RunCmd("get --all-namespaces "+
"all,namespaces,jobs,ingresses,endpoints,customresourcedefinitions,configmaps,events,"+
"mutatingwebhookconfigurations,validatingwebhookconfigurations "+
"-o yaml", "", p.KubeConfig, p.KubeContext, p.DryRun)
Expand All @@ -114,7 +108,7 @@ func GetSecrets(p *Params) (map[string]string, error) {
if p.Verbose {
cmdStr += " -o yaml"
}
out, err := kubectlcmd.RunCmd(cmdStr, "", p.KubeConfig, p.KubeContext, p.DryRun)
out, err := p.Runner.RunCmd(cmdStr, "", p.KubeConfig, p.KubeContext, p.DryRun)
return retMap("secrets", out, err)
}

Expand All @@ -124,20 +118,20 @@ func GetCRs(p *Params) (map[string]string, error) {
if err != nil {
return nil, err
}
out, err := kubectlcmd.RunCmd("get --all-namespaces "+strings.Join(crds, ",")+" -o yaml", "", p.KubeConfig, p.KubeContext, p.DryRun)
out, err := p.Runner.RunCmd("get --all-namespaces "+strings.Join(crds, ",")+" -o yaml", "", p.KubeConfig, p.KubeContext, p.DryRun)
return retMap("crs", out, err)
}

// GetClusterInfo returns the cluster info.
func GetClusterInfo(p *Params) (map[string]string, error) {
out, err := kubectlcmd.RunCmd("config current-context", "", p.KubeConfig, p.KubeContext, p.DryRun)
out, err := p.Runner.RunCmd("config current-context", "", p.KubeConfig, p.KubeContext, p.DryRun)
if err != nil {
return nil, err
}
ret := make(map[string]string)
// Add the endpoint to the context
ret["cluster-context"] = out + p.Client.RESTConfig().Host + "\n"
out, err = kubectlcmd.RunCmd("version", "", p.KubeConfig, p.KubeContext, p.DryRun)
ret["cluster-context"] = out + p.Runner.Client.RESTConfig().Host + "\n"
out, err = p.Runner.RunCmd("version", "", p.KubeConfig, p.KubeContext, p.DryRun)
if err != nil {
return nil, err
}
Expand All @@ -146,13 +140,13 @@ func GetClusterInfo(p *Params) (map[string]string, error) {
}

// GetClusterContext returns the cluster context.
func GetClusterContext(kubeConfig string) (string, error) {
return kubectlcmd.RunCmd("config current-context", "", kubeConfig, "", false)
func GetClusterContext(runner *kubectlcmd.Runner, kubeConfig string) (string, error) {
return runner.RunCmd("config current-context", "", kubeConfig, "", false)
}

// GetNodeInfo returns node information.
func GetNodeInfo(p *Params) (map[string]string, error) {
out, err := kubectlcmd.RunCmd("describe nodes", "", p.KubeConfig, p.KubeContext, p.DryRun)
out, err := p.Runner.RunCmd("describe nodes", "", p.KubeConfig, p.KubeContext, p.DryRun)
return retMap("nodes", out, err)
}

Expand All @@ -161,13 +155,13 @@ func GetDescribePods(p *Params) (map[string]string, error) {
if p.IstioNamespace == "" {
return nil, fmt.Errorf("getDescribePods requires the Istio namespace")
}
out, err := kubectlcmd.RunCmd("describe pods", p.IstioNamespace, p.KubeConfig, p.KubeContext, p.DryRun)
out, err := p.Runner.RunCmd("describe pods", p.IstioNamespace, p.KubeConfig, p.KubeContext, p.DryRun)
return retMap("describe-pods", out, err)
}

// GetEvents returns events for all namespaces.
func GetEvents(p *Params) (map[string]string, error) {
out, err := kubectlcmd.RunCmd("get events --all-namespaces -o wide", "", p.KubeConfig, p.KubeContext, p.DryRun)
out, err := p.Runner.RunCmd("get events --all-namespaces -o wide", "", p.KubeConfig, p.KubeContext, p.DryRun)
return retMap("events", out, err)
}

Expand All @@ -178,7 +172,7 @@ func GetIstiodInfo(p *Params) (map[string]string, error) {
}
ret := make(map[string]string)
for _, url := range common.IstiodDebugURLs(p.ClusterVersion) {
out, err := kubectlcmd.Exec(p.Client, p.Namespace, p.Pod, common.DiscoveryContainerName, fmt.Sprintf(`pilot-discovery request GET %s`, url), p.DryRun)
out, err := p.Runner.Exec(p.Namespace, p.Pod, common.DiscoveryContainerName, fmt.Sprintf(`pilot-discovery request GET %s`, url), p.DryRun)
if err != nil {
return nil, err
}
Expand All @@ -194,7 +188,7 @@ func GetProxyInfo(p *Params) (map[string]string, error) {
}
ret := make(map[string]string)
for _, url := range common.ProxyDebugURLs(p.ClusterVersion) {
out, err := kubectlcmd.EnvoyGet(p.Client, p.Namespace, p.Pod, url, p.DryRun)
out, err := p.Runner.EnvoyGet(p.Namespace, p.Pod, url, p.DryRun)
if err != nil {
return nil, err
}
Expand All @@ -209,7 +203,7 @@ func GetNetstat(p *Params) (map[string]string, error) {
return nil, fmt.Errorf("getNetstat requires namespace and pod")
}

out, err := kubectlcmd.Exec(p.Client, p.Namespace, p.Pod, common.ProxyContainerName, "netstat -natpw", p.DryRun)
out, err := p.Runner.Exec(p.Namespace, p.Pod, common.ProxyContainerName, "netstat -natpw", p.DryRun)
if err != nil {
return nil, err
}
Expand All @@ -221,7 +215,7 @@ func GetAnalyze(p *Params, timeout time.Duration) (map[string]string, error) {
out := make(map[string]string)
sa := local.NewSourceAnalyzer(analyzers.AllCombined(), resource.Namespace(p.Namespace), resource.Namespace(p.IstioNamespace), nil, true, timeout)

k, err := kube.NewClient(kube.NewClientConfigForRestConfig(p.Client.RESTConfig()))
k, err := kube.NewClient(kube.NewClientConfigForRestConfig(p.Runner.Client.RESTConfig()))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -289,7 +283,7 @@ func GetCoredumps(p *Params) (map[string]string, error) {
ret := make(map[string]string)
log.Infof("%s/%s/%s has %d coredumps", p.Namespace, p.Pod, p.Container, len(cds))
for idx, cd := range cds {
outStr, err := kubectlcmd.Cat(p.Client, p.Namespace, p.Pod, p.Container, cd, p.DryRun)
outStr, err := p.Runner.Cat(p.Namespace, p.Pod, p.Container, cd, p.DryRun)
if err != nil {
log.Warn(err)
continue
Expand All @@ -300,7 +294,7 @@ func GetCoredumps(p *Params) (map[string]string, error) {
}

func getCoredumpList(p *Params) ([]string, error) {
out, err := kubectlcmd.Exec(p.Client, p.Namespace, p.Pod, p.Container, fmt.Sprintf("find %s -name core.*", coredumpDir), p.DryRun)
out, err := p.Runner.Exec(p.Namespace, p.Pod, p.Container, fmt.Sprintf("find %s -name core.*", coredumpDir), p.DryRun)
if err != nil {
return nil, err
}
Expand All @@ -314,7 +308,7 @@ func getCoredumpList(p *Params) ([]string, error) {
}

func getCRDList(p *Params) ([]string, error) {
crdStr, err := kubectlcmd.RunCmd("get customresourcedefinitions --no-headers", "", p.KubeConfig, p.KubeContext, p.DryRun)
crdStr, err := p.Runner.RunCmd("get customresourcedefinitions --no-headers", "", p.KubeConfig, p.KubeContext, p.DryRun)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 95b2417

Please sign in to comment.