From ee4bdc26985c0328dac4d5c8196304fc12c9050d Mon Sep 17 00:00:00 2001 From: Katie Knister Date: Thu, 9 Apr 2015 15:52:31 -0700 Subject: [PATCH] Improvements to events integration test --- api/handler.go | 2 +- client/client.go | 8 ++-- client/clientexample/main.go | 10 ++--- events/handler.go | 1 + integration/framework/framework.go | 19 ++++++---- integration/tests/api/event_test.go | 58 ++++++++++++++++++++--------- manager/manager.go | 2 +- utils/oomparser/oomparser.go | 1 - 8 files changed, 65 insertions(+), 36 deletions(-) diff --git a/api/handler.go b/api/handler.go index 6015fd5624..9c78c986fe 100644 --- a/api/handler.go +++ b/api/handler.go @@ -151,7 +151,7 @@ func streamResults(eventChannel *events.EventChannel, w http.ResponseWriter, r * for { select { case <-cn.CloseNotify(): - glog.V(3).Infof("Received CloseNotify event") + glog.V(3).Infof("Received CloseNotify event. About to return from api/handler:streamResults") m.CloseEventChannel(eventChannel.GetWatchId()) return nil case ev := <-eventChannel.GetChannel(): diff --git a/client/client.go b/client/client.go index 6313b746d3..4336e9a125 100644 --- a/client/client.go +++ b/client/client.go @@ -205,14 +205,16 @@ func (self *Client) getEventStreamingData(url string, einfo chan *info.Event) er } dec := json.NewDecoder(resp.Body) - var m *info.Event + var m *info.Event = &info.Event{} for { - err := dec.Decode(&m) + err := dec.Decode(m) + glog.V(3).Infof("received m as %v", m) if err != nil { if err == io.EOF { break } - return err + // if called without &stream=true will not be able to parse event and will trigger fatal + glog.Fatalf("Received error %v", err) } einfo <- m } diff --git a/client/clientexample/main.go b/client/clientexample/main.go index 1d38666a4c..ed9e5dc91c 100644 --- a/client/clientexample/main.go +++ b/client/clientexample/main.go @@ -28,7 +28,7 @@ func staticClientExample() { glog.Errorf("tried to make client and got error %v", err) return } - einfo, err := staticClient.EventStaticInfo("?oom_events=true&historical=true") + einfo, err := staticClient.EventStaticInfo("?oom_events=true") if err != nil { glog.Errorf("got error retrieving event info: %v", err) return @@ -38,7 +38,7 @@ func staticClientExample() { } } -func streamingClientExample() { +func streamingClientExample(url string) { streamingClient, err := client.NewClient("http://localhost:8080/") if err != nil { glog.Errorf("tried to make client and got error %v", err) @@ -46,14 +46,14 @@ func streamingClientExample() { } einfo := make(chan *info.Event) go func() { - err = streamingClient.EventStreamingInfo("?oom_events=true", einfo) + err = streamingClient.EventStreamingInfo(url, einfo) if err != nil { glog.Errorf("got error retrieving event info: %v", err) return } }() for ev := range einfo { - glog.Infof("streaming einfo: %v", ev) + glog.Infof("streaming einfo: %v\n", ev) } } @@ -61,5 +61,5 @@ func streamingClientExample() { func main() { flag.Parse() staticClientExample() - streamingClientExample() + streamingClientExample("?creation_events=true&stream=true") } diff --git a/events/handler.go b/events/handler.go index 48886b18ce..24fa7c1db9 100644 --- a/events/handler.go +++ b/events/handler.go @@ -270,6 +270,7 @@ func (self *events) AddEvent(e *info.Event) error { for _, watchObject := range watchesToSend { watchObject.eventChannel.GetChannel() <- e } + glog.V(1).Infof("Added event %v", e) return nil } diff --git a/integration/framework/framework.go b/integration/framework/framework.go index 2dab54b83f..e5992c98c8 100644 --- a/integration/framework/framework.go +++ b/integration/framework/framework.go @@ -111,7 +111,7 @@ type DockerActions interface { // Run(DockerRunArgs{Image: "busybox"}, "ping", "www.google.com") // -> docker run busybox ping www.google.com Run(args DockerRunArgs, cmd ...string) string - RunStress(args DockerRunArgs, cmd ...string) + RunStress(args DockerRunArgs, cmd ...string) string } type ShellActions interface { @@ -240,19 +240,22 @@ func (self dockerActions) Run(args DockerRunArgs, cmd ...string) string { return containerId } -func (self dockerActions) RunStress(args DockerRunArgs, cmd ...string) { - dockerCommand := append(append(append(append([]string{"docker", "run", "-m=4M"}, args.Args...), args.Image), args.InnerArgs...), cmd...) +func (self dockerActions) RunStress(args DockerRunArgs, cmd ...string) string { + dockerCommand := append(append(append(append([]string{"docker", "run", "-m=4M", "-d", "-t", "-i"}, args.Args...), args.Image), args.InnerArgs...), cmd...) - self.fm.Shell().RunStress("sudo", dockerCommand...) + output, _ := self.fm.Shell().RunStress("sudo", dockerCommand...) - if len(args.Args) < 2 { - self.fm.T().Fatalf("need 2 arguments in DockerRunArgs %v to get the name but have %v", args, len(args.Args)) + // The last line is the container ID. + if len(output) < 1 { + self.fm.T().Fatalf("need 1 arguments in output %v to get the name but have %v", output, len(output)) } - containerId := args.Args[1] + elements := strings.Fields(output) + containerId := elements[len(elements)-1] self.fm.cleanups = append(self.fm.cleanups, func() { self.fm.Shell().Run("sudo", "docker", "rm", "-f", containerId) }) + return containerId } func (self shellActions) Run(command string, args ...string) (string, string) { @@ -292,7 +295,7 @@ func (self shellActions) RunStress(command string, args ...string) (string, stri err := cmd.Run() if err != nil { self.fm.T().Logf("Ran %q %v in %q and received error: %q. Stdout: %q, Stderr: %s", command, args, self.fm.Hostname().Host, err, stdout.String(), stderr.String()) - return "", "" + return stdout.String(), stderr.String() } return stdout.String(), stderr.String() } diff --git a/integration/tests/api/event_test.go b/integration/tests/api/event_test.go index 5b9d00d9a3..5be22f688b 100644 --- a/integration/tests/api/event_test.go +++ b/integration/tests/api/event_test.go @@ -15,16 +15,15 @@ package api import ( - "encoding/json" "fmt" "os" "strconv" + "strings" "testing" "time" info "github.com/google/cadvisor/info/v1" "github.com/google/cadvisor/integration/framework" - "github.com/google/cadvisor/utils/oomparser" "github.com/stretchr/testify/require" ) @@ -34,13 +33,13 @@ func TestStreamingEventInformationIsReturned(t *testing.T) { einfo := make(chan *info.Event) go func() { - err := fm.Cadvisor().Client().EventStreamingInfo("?oom_events=true", einfo) - t.Logf("Started event streaming with error %v", err) + err := fm.Cadvisor().Client().EventStreamingInfo("?oom_events=true&stream=true", einfo) + t.Logf("tried to stream events but got error %v", err) require.NoError(t, err) }() containerName := fmt.Sprintf("test-basic-docker-container-%d", os.Getpid()) - fm.Docker().RunStress(framework.DockerRunArgs{ + containerId := fm.Docker().RunStress(framework.DockerRunArgs{ Image: "bernardo/stress", Args: []string{"--name", containerName}, InnerArgs: []string{ @@ -52,24 +51,49 @@ func TestStreamingEventInformationIsReturned(t *testing.T) { }, }) + waitForStreamingEvent(containerId, "?deletion_events=true&stream=true", t, fm, info.EventContainerDeletion) + waitForStaticEvent(containerId, "?creation_events=true", t, fm, info.EventContainerCreation) +} + +func waitForStreamingEvent(containerId string, urlRequest string, t *testing.T, fm framework.Framework, typeEvent info.EventType) { timeout := make(chan bool, 1) go func() { time.Sleep(60 * time.Second) timeout <- true }() - select { - case ev := <-einfo: - if ev.EventType == 0 { - marshaledData, err := json.Marshal(ev.EventData) - require.Nil(t, err) - var oomEvent *oomparser.OomInstance - err = json.Unmarshal(marshaledData, &oomEvent) - require.Nil(t, err) - require.True(t, oomEvent.ProcessName == "stress") + einfo := make(chan *info.Event) + go func() { + err := fm.Cadvisor().Client().EventStreamingInfo(urlRequest, einfo) + require.NoError(t, err) + }() + for { + select { + case ev := <-einfo: + if ev.EventType == typeEvent { + if strings.Contains(strings.Trim(ev.ContainerName, "/ "), strings.Trim(containerId, "/ ")) { + return + } + } + case <-timeout: + t.Fatal( + "timeout happened before destruction event was detected") + } + } +} + +func waitForStaticEvent(containerId string, urlRequest string, t *testing.T, fm framework.Framework, typeEvent info.EventType) { + einfo, err := fm.Cadvisor().Client().EventStaticInfo(urlRequest) + require.NoError(t, err) + + found := false + for _, ev := range einfo { + if ev.EventType == typeEvent { + if strings.Contains(strings.Trim(ev.ContainerName, "/ "), strings.Trim(containerId, "/ ")) { + found = true + break + } } - case <-timeout: - t.Fatal( - "timeout happened before event was detected") } + require.True(t, found) } diff --git a/manager/manager.go b/manager/manager.go index 6fb5bca595..b3fe8be16e 100644 --- a/manager/manager.go +++ b/manager/manager.go @@ -889,7 +889,7 @@ func (self *manager) watchForNewOoms() error { }, }, } - glog.V(1).Infof("Created an oom event: %v", newEvent) + glog.V(2).Infof("Created an oom event: %v", newEvent) err := self.eventHandler.AddEvent(newEvent) if err != nil { glog.Errorf("failed to add event %v, got error: %v", newEvent, err) diff --git a/utils/oomparser/oomparser.go b/utils/oomparser/oomparser.go index 84936e08b8..3a4b32f6fd 100644 --- a/utils/oomparser/oomparser.go +++ b/utils/oomparser/oomparser.go @@ -153,7 +153,6 @@ func (self *OomParser) StreamOoms(outStream chan *OomInstance) { line = <-lineChannel } in_oom_kernel_log = false - glog.V(1).Infof("Sending an oomInstance: %v", oomCurrentInstance) outStream <- oomCurrentInstance } }