Skip to content

Commit

Permalink
Merge pull request #28231 from vrutkovs/in-cluster-fixes-v2
Browse files Browse the repository at this point in the history
OCPBUGS-18865: add monitortest: in-cluster disruption monitors
  • Loading branch information
openshift-merge-robot authored Sep 23, 2023
2 parents 2c06604 + 70a1738 commit ae51ff2
Show file tree
Hide file tree
Showing 26 changed files with 855 additions and 888 deletions.
7 changes: 7 additions & 0 deletions pkg/clioptions/iooptions/io_options.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package iooptions

import (
"fmt"
"io"
"os"
"path"

"github.com/spf13/pflag"
"k8s.io/cli-runtime/pkg/genericclioptions"
Expand Down Expand Up @@ -36,6 +38,11 @@ func (o *OutputFlags) ConfigureIOStreams(streams genericclioptions.IOStreams, st
return doNothing, nil
}

dir := path.Dir(o.OutFile)
if err := os.MkdirAll(dir, os.ModePerm); err != nil {
return doNothing, fmt.Errorf("failed to create parentdir %q: %w", dir, err)
}

f, err := os.OpenFile(o.OutFile, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0640)
if err != nil {
return doNothing, err
Expand Down
2 changes: 0 additions & 2 deletions pkg/cmd/openshift-tests/monitor/run/run_monitor_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/spf13/pflag"

"github.com/openshift/origin/pkg/defaultmonitortests"
"github.com/openshift/origin/pkg/disruption/backend/sampler"
"github.com/openshift/origin/pkg/monitor"
"github.com/spf13/cobra"
"k8s.io/cli-runtime/pkg/genericclioptions"
Expand Down Expand Up @@ -123,7 +122,6 @@ func (f *RunMonitorOptions) Run() error {
go func() {
<-abortCh
fmt.Fprintf(f.ErrOut, "Interrupted, terminating\n")
sampler.TearDownInClusterMonitors(restConfig)
cancelFn()

sig := <-abortCh
Expand Down
296 changes: 182 additions & 114 deletions pkg/cmd/openshift-tests/run-disruption/disruption.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,44 +6,51 @@ import (
"io"
"os"
"os/signal"
"path/filepath"
"sync"
"syscall"
"time"

monitorserialization "github.com/openshift/origin/pkg/monitor/serialization"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"

"k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
"k8s.io/kubectl/pkg/util/templates"
"k8s.io/apimachinery/pkg/fields"

"github.com/openshift/origin/pkg/clioptions/iooptions"
"github.com/openshift/origin/pkg/disruption/backend"
disruptionci "github.com/openshift/origin/pkg/disruption/ci"
"github.com/openshift/origin/pkg/monitor"
"github.com/openshift/origin/pkg/monitor/apiserveravailability"
"github.com/openshift/origin/pkg/monitor/monitorapi"
"github.com/openshift/origin/test/extended/util/disruption/controlplane"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
corev1 "k8s.io/api/core/v1"
apimachinerywatch "k8s.io/apimachinery/pkg/watch"
"k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/watch"
"k8s.io/kubectl/pkg/util/templates"
)

// RunAPIDisruptionMonitorOptions sets options for api server disruption monitor
type RunAPIDisruptionMonitorOptions struct {
Out, ErrOut io.Writer
type RunAPIDisruptionMonitorFlags struct {
ConfigFlags *genericclioptions.ConfigFlags
OutputFlags *iooptions.OutputFlags

ArtifactDir string
LoadBalancerType string
ExtraMessage string
ArtifactDir string
LoadBalancerType string
StopConfigMapName string

genericclioptions.IOStreams
}

func NewRunInClusterDisruptionMonitorOptions(ioStreams genericclioptions.IOStreams) *RunAPIDisruptionMonitorOptions {
return &RunAPIDisruptionMonitorOptions{
Out: ioStreams.Out,
ErrOut: ioStreams.ErrOut,
func NewRunInClusterDisruptionMonitorFlags(ioStreams genericclioptions.IOStreams) *RunAPIDisruptionMonitorFlags {
return &RunAPIDisruptionMonitorFlags{
ConfigFlags: genericclioptions.NewConfigFlags(false),
OutputFlags: iooptions.NewOutputOptions(),
IOStreams: ioStreams,
}
}

func NewRunInClusterDisruptionMonitorCommand(ioStreams genericclioptions.IOStreams) *cobra.Command {
disruptionOpt := NewRunInClusterDisruptionMonitorOptions(ioStreams)
f := NewRunInClusterDisruptionMonitorFlags(ioStreams)
cmd := &cobra.Command{
Use: "run-disruption",
Short: "Run API server disruption monitor",
Expand All @@ -54,122 +61,183 @@ func NewRunInClusterDisruptionMonitorCommand(ioStreams genericclioptions.IOStrea
SilenceUsage: true,
SilenceErrors: true,
RunE: func(cmd *cobra.Command, args []string) error {
return disruptionOpt.Run()
ctx, cancelFn := context.WithCancel(context.Background())
defer cancelFn()
abortCh := make(chan os.Signal, 2)
go func() {
<-abortCh
fmt.Fprintf(f.ErrOut, "Interrupted, terminating\n")
cancelFn()

sig := <-abortCh
fmt.Fprintf(f.ErrOut, "Interrupted twice, exiting (%s)\n", sig)
switch sig {
case syscall.SIGINT:
os.Exit(130)
default:
os.Exit(0)
}
}()
signal.Notify(abortCh, syscall.SIGINT, syscall.SIGTERM)

if err := f.Validate(); err != nil {
return err
}

o, err := f.ToOptions()
if err != nil {
return err
}

return o.Run(ctx)
},
}
cmd.Flags().StringVar(&disruptionOpt.ArtifactDir,
"artifact-dir", disruptionOpt.ArtifactDir,
"The directory where monitor events will be stored.")
cmd.Flags().StringVar(&disruptionOpt.LoadBalancerType,
"lb-type", disruptionOpt.LoadBalancerType,
"Set load balancer type, available options: internal-lb, service-network, external-lb (default)")
cmd.Flags().StringVar(&disruptionOpt.ExtraMessage,
"extra-message", disruptionOpt.ExtraMessage,
"Add custom label to disruption event message")

f.AddFlags(cmd.Flags())

return cmd
}

func (opt *RunAPIDisruptionMonitorOptions) Run() error {
restConfig, err := monitor.GetMonitorRESTConfig()
if err != nil {
return err
}
func (f *RunAPIDisruptionMonitorFlags) AddFlags(flags *pflag.FlagSet) {
flags.StringVar(&f.LoadBalancerType, "lb-type", f.LoadBalancerType, "Set load balancer type, available options: internal-lb, service-network, external-lb (default)")
flags.StringVar(&f.StopConfigMapName, "stop-configmap", f.StopConfigMapName, "the name of the configmap that indicates that this pod should stop all watchers.")

lb := backend.ParseStringToLoadBalancerType(opt.LoadBalancerType)
f.ConfigFlags.AddFlags(flags)
f.OutputFlags.BindFlags(flags)
}

ctx, cancelFn := context.WithCancel(context.Background())
defer cancelFn()
abortCh := make(chan os.Signal, 2)
go func() {
<-abortCh
fmt.Fprintf(opt.ErrOut, "Interrupted, terminating\n")
// Give some time to store intervals on disk
time.Sleep(5 * time.Second)
cancelFn()
sig := <-abortCh
fmt.Fprintf(opt.ErrOut, "Interrupted twice, exiting (%s)\n", sig)
switch sig {
case syscall.SIGINT:
os.Exit(130)
default:
os.Exit(0)
}
}()
signal.Notify(abortCh, syscall.SIGINT, syscall.SIGTERM)
func (f *RunAPIDisruptionMonitorFlags) SetIOStreams(streams genericclioptions.IOStreams) {
f.IOStreams = streams
}

recorder, err := StartAPIAvailability(ctx, restConfig, lb)
if err != nil {
return err
func (f *RunAPIDisruptionMonitorFlags) Validate() error {
if len(f.OutputFlags.OutFile) == 0 {
return fmt.Errorf("output-file must be specified")
}
if len(f.StopConfigMapName) == 0 {
return fmt.Errorf("stop-configmap must be specified")
}

go func() {
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
var last time.Time
done := false
for !done {
select {
case <-ticker.C:
case <-ctx.Done():
done = true
}
events := recorder.Intervals(last, time.Time{})
if len(events) > 0 {
for _, event := range events {
if !event.From.Equal(event.To) {
continue
}
fmt.Fprintln(opt.Out, event.String())
}
last = events[len(events)-1].From
}
}
}()

<-ctx.Done()
return nil
}

// Store intervals to artifact directory
intervals := recorder.Intervals(time.Time{}, time.Time{})
if len(opt.ExtraMessage) > 0 {
fmt.Fprintf(opt.Out, "\nAppending %s to recorded event message\n", opt.ExtraMessage)
for i, event := range intervals {
intervals[i].Message = fmt.Sprintf("%s user-provided-message=%s", event.Message, opt.ExtraMessage)
}
func (f *RunAPIDisruptionMonitorFlags) ToOptions() (*RunAPIDisruptionMonitorOptions, error) {
originalOutStream := f.IOStreams.Out
closeFn, err := f.OutputFlags.ConfigureIOStreams(f.IOStreams, f)
if err != nil {
return nil, err
}

eventDir := filepath.Join(opt.ArtifactDir, monitorapi.EventDir)
if err := os.MkdirAll(eventDir, os.ModePerm); err != nil {
fmt.Printf("Failed to create monitor-events directory, err: %v\n", err)
return err
namespace, _, err := f.ConfigFlags.ToRawKubeConfigLoader().Namespace()
if err != nil {
return nil, err
}
if len(namespace) == 0 {
return nil, fmt.Errorf("namespace must be specified")
}

timeSuffix := fmt.Sprintf("_%s", time.Now().UTC().Format("20060102-150405"))
if err := monitorserialization.EventsToFile(filepath.Join(eventDir, fmt.Sprintf("e2e-events%s.json", timeSuffix)), intervals); err != nil {
fmt.Printf("Failed to write event data, err: %v\n", err)
return err
restConfig, err := f.ConfigFlags.ToRESTConfig()
if err != nil {
return nil, err
}
kubeClient, err := kubernetes.NewForConfig(restConfig)
if err != nil {
return nil, err
}
fmt.Fprintf(opt.Out, "\nEvent data written, exiting\n")

return nil
return &RunAPIDisruptionMonitorOptions{
KubeClient: kubeClient,
KubeClientConfig: restConfig,
OutputFile: f.OutputFlags.OutFile,
LoadBalancerType: f.LoadBalancerType,
StopConfigMapName: f.StopConfigMapName,
Namespace: namespace,
CloseFn: closeFn,
OriginalOutFile: originalOutStream,
IOStreams: f.IOStreams,
}, nil
}

// StartAPIAvailability monitors just the cluster availability
func StartAPIAvailability(ctx context.Context, restConfig *rest.Config, lb backend.LoadBalancerType) (monitorapi.Recorder, error) {
recorder := monitor.NewRecorder()
// RunAPIDisruptionMonitorOptions sets options for api server disruption monitor
type RunAPIDisruptionMonitorOptions struct {
KubeClient kubernetes.Interface
KubeClientConfig *rest.Config
OutputFile string
LoadBalancerType string
StopConfigMapName string
Namespace string

OriginalOutFile io.Writer
CloseFn iooptions.CloseFunc
genericclioptions.IOStreams
}

client, err := kubernetes.NewForConfig(restConfig)
if err != nil {
return nil, err
func (o *RunAPIDisruptionMonitorOptions) Run(ctx context.Context) error {
ctx, cancelFn := context.WithCancel(ctx)
defer cancelFn()

fmt.Fprintf(o.Out, "Starting up.")

startingContent, err := os.ReadFile(o.OutputFile)
if err != nil && !os.IsNotExist(err) {
return err
}
if err := controlplane.StartAPIMonitoringUsingNewBackend(ctx, recorder, restConfig, lb); err != nil {
return nil, err
if len(startingContent) > 0 {
// print starting content to the log so that we can simply scrape the log to find all entries at the end.
o.OriginalOutFile.Write(startingContent)
}

// read the state of the cluster apiserver client access issues *before* any test (like upgrade) begins
intervals, err := apiserveravailability.APIServerAvailabilityIntervalsFromCluster(client, time.Time{}, time.Time{})
lb := backend.ParseStringToLoadBalancerType(o.LoadBalancerType)

recorder := monitor.WrapWithJSONLRecorder(monitor.NewRecorder(), o.IOStreams.Out, nil)
samplers, err := controlplane.StartAPIMonitoringUsingNewBackend(ctx, recorder, o.KubeClientConfig, o.KubeClient, lb)
if err != nil {
klog.Errorf("error reading initial apiserver availability: %v", err)
return err
}
recorder.AddIntervals(intervals...)
return recorder, nil

go func(ctx context.Context) {
defer cancelFn()
err := o.WaitForStopSignal(ctx)
if err != nil {
fmt.Fprintf(o.ErrOut, "failure waiting for stop: %v", err)
}
}(ctx)

<-ctx.Done()

fmt.Fprintf(o.Out, "waiting for samplers to stop")
wg := sync.WaitGroup{}
for i := range samplers {
wg.Add(1)
func(sampler disruptionci.Sampler) {
defer wg.Done()
sampler.Stop()
}(samplers[i])
}
wg.Wait()
fmt.Fprintf(o.Out, "samplers stopped")

return nil
}

func (o *RunAPIDisruptionMonitorOptions) WaitForStopSignal(ctx context.Context) error {
defer utilruntime.HandleCrash()

_, err := watch.UntilWithSync(
ctx,
cache.NewListWatchFromClient(
o.KubeClient.CoreV1().RESTClient(), "configmaps", o.Namespace, fields.OneTermEqualSelector("metadata.name", o.StopConfigMapName)),
&corev1.ConfigMap{},
nil,
func(event apimachinerywatch.Event) (bool, error) {
switch event.Type {
case apimachinerywatch.Added:
return true, nil
case apimachinerywatch.Modified:
return true, nil
}
return false, nil
},
)
return err
}
Loading

0 comments on commit ae51ff2

Please sign in to comment.