Skip to content

Commit

Permalink
fix mockbeat stop functionality (#40619)
Browse files Browse the repository at this point in the history
* Refactor mockbeat Stop into two functions: 
    * `stopNonsynced`: performs the actual stop logic
    * `Stop`: synchronizes the `stop` function, allowing usage within methods that have already acquired the `cmdMutex`
* Implement wait logic in `stopNonsynced` to ensure the beat process fully stops before proceeding. This relies on the `"beatName stopped."` log message for confirmation.
* Remove the `time.Sleep` workaround from `./libbeat/tests/integration/http_test.go`, which was previously necessary due to incomplete beat stopping between tests.
  • Loading branch information
AndersonQ authored Aug 27, 2024
1 parent 6d25d46 commit 76ea160
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 25 deletions.
4 changes: 2 additions & 2 deletions filebeat/tests/integration/filebeat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func TestFilebeatRunsAndLogsJSONToFile(t *testing.T) {
// 2. Create the log file
integration.GenerateLogFile(t, logFilePath, 10, false)

// 3. Write configuration file ans start Filebeat
// 3. Write configuration file and start Filebeat
filebeat.WriteConfigFile(fmt.Sprintf(filebeatBasicConfig, logFilePath, tempDir))
filebeat.Start()

Expand All @@ -78,7 +78,7 @@ func TestFilebeatRunsAndLogsJSONToFile(t *testing.T) {
require.Eventuallyf(t, func() bool {
f, err = os.Open(filebeatLogFile)
return err == nil
}, 10*time.Second, time.Millisecond, "could not read log file '%s'", filebeatLogFile)
}, 10*time.Second, 100*time.Millisecond, "could not read log file '%s'", filebeatLogFile)
defer f.Close()

r := bufio.NewScanner(f)
Expand Down
59 changes: 43 additions & 16 deletions libbeat/tests/integration/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"os/exec"
"path/filepath"
"regexp"
"strconv"
"strings"
"sync"
"testing"
Expand All @@ -50,6 +51,7 @@ type BeatProc struct {
RestartOnBeatOnExit bool
beatName string
cmdMutex sync.Mutex
waitingMutex sync.Mutex
configFile string
fullPath string
logFileOffset int64
Expand Down Expand Up @@ -202,13 +204,8 @@ func (b *BeatProc) Start(args ...string) {

t.Cleanup(func() {
b.cmdMutex.Lock()
// 1. Kill the Beat
if err := b.Process.Signal(os.Interrupt); err != nil {
if !errors.Is(err, os.ErrProcessDone) {
t.Fatalf("could not stop process with PID: %d, err: %s",
b.Process.Pid, err)
}
}
// 1. Send an interrupt signal to the Beat
b.stopNonsynced()

// Make sure the goroutine restarting the Beat has exited
if b.RestartOnBeatOnExit {
Expand All @@ -220,7 +217,7 @@ func (b *BeatProc) Start(args ...string) {
// wg.Wait() or there is a possibility of
// deadlock.
b.cmdMutex.Unlock()
// 4. Wait for the goroutine to finish, this helps ensuring
// 4. Wait for the goroutine to finish, this helps to ensure
// no other Beat process was started
wg.Wait()
} else {
Expand Down Expand Up @@ -257,29 +254,59 @@ func (b *BeatProc) startBeat() {
b.Process = cmd.Process
}

// waitBeatToExit blocks until the Beat exits, it returns
// the process' exit code.
// waitBeatToExit blocks until the Beat exits.
// `startBeat` must be called before this method.
func (b *BeatProc) waitBeatToExit() int {
func (b *BeatProc) waitBeatToExit() {
if !b.waitingMutex.TryLock() {
// b.stopNonsynced must be waiting on the process already. Nothing to do.
return
}
defer b.waitingMutex.Unlock()

processState, err := b.Process.Wait()
if err != nil {
b.t.Fatalf("error waiting for %q to finish: %s. Exit code: %d",
b.beatName, err, processState.ExitCode())
exitCode := "unknown"
if processState != nil {
exitCode = strconv.Itoa(processState.ExitCode())
}

b.t.Fatalf("error waiting for %q to finish: %s. Exit code: %s",
b.beatName, err, exitCode)
}

return processState.ExitCode()
return
}

// Stop stops the Beat process
// Start adds Cleanup function to stop when test ends, only run this if you want to inspect logs after beat shutsdown
func (b *BeatProc) Stop() {
b.cmdMutex.Lock()
defer b.cmdMutex.Unlock()
b.stopNonsynced()
}

// stopNonsynced is the actual stop code, but without locking so it can be reused
// by methods that have already acquired the lock.
func (b *BeatProc) stopNonsynced() {
if err := b.Process.Signal(os.Interrupt); err != nil {
if errors.Is(err, os.ErrProcessDone) {
return
}
b.t.Fatalf("could not stop process with PID: %d, err: %s", b.Process.Pid, err)
b.t.Fatalf("could not send interrupt signal to process with PID: %d, err: %s",
b.Process.Pid, err)
}

if !b.waitingMutex.TryLock() {
// b.waitBeatToExit must be waiting on the process already. Nothing to do.
return
}
defer b.waitingMutex.Unlock()
ps, err := b.Process.Wait()
if err != nil {
b.t.Logf("[WARN] got an error waiting mockbeat to top: %v", err)
}
if !ps.Success() {
b.t.Logf("[WARN] mockbeat did not stopped successfully: %v", ps.String())
}
}

Expand Down Expand Up @@ -629,7 +656,7 @@ func (b *BeatProc) LoadMeta() (Meta, error) {

// RemoveAllCLIArgs removes all CLI arguments configured.
// It will also remove all configuration for home path and
// logs, there fore some methods, like the ones that read logs,
// logs, therefore some methods, like the ones that read logs,
// might fail if Filebeat is not configured the way this framework
// expects.
//
Expand Down
11 changes: 4 additions & 7 deletions libbeat/tests/integration/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ output.console:
require.Equal(t, http.StatusOK, r.StatusCode, "incorrect status code")

body, err := io.ReadAll(r.Body)
r.Body.Close()
_ = r.Body.Close()
require.NoError(t, err)
var m map[string]interface{}
err = json.Unmarshal(body, &m)
Expand All @@ -90,14 +90,13 @@ output.console:
mockbeat.WriteConfigFile(cfg)
mockbeat.Start()
mockbeat.WaitForLogs("Starting stats endpoint", 60*time.Second)
time.Sleep(time.Second)

r, err := http.Get("http://localhost:5066/stats") //nolint:noctx // fine for tests
require.NoError(t, err)
require.Equal(t, http.StatusOK, r.StatusCode, "incorrect status code")

body, err := io.ReadAll(r.Body)
r.Body.Close()
_ = r.Body.Close()
require.NoError(t, err)
var m Stats

Expand Down Expand Up @@ -125,11 +124,10 @@ output.console:
mockbeat.WriteConfigFile(cfg)
mockbeat.Start()
mockbeat.WaitForLogs("Starting stats endpoint", 60*time.Second)
time.Sleep(time.Second)

r, err := http.Get("http://localhost:5066/not-exist") //nolint:noctx // fine for tests
r.Body.Close()
require.NoError(t, err)
_ = r.Body.Close()
require.Equal(t, http.StatusNotFound, r.StatusCode, "incorrect status code")
}

Expand All @@ -149,10 +147,9 @@ output.console:
mockbeat.WriteConfigFile(cfg)
mockbeat.Start()
mockbeat.WaitForLogs("Starting stats endpoint", 60*time.Second)
time.Sleep(time.Second)

r, err := http.Get("http://localhost:5066/debug/pprof/") //nolint:noctx // fine for tests
r.Body.Close()
require.NoError(t, err)
_ = r.Body.Close()
require.Equal(t, http.StatusNotFound, r.StatusCode, "incorrect status code")
}

0 comments on commit 76ea160

Please sign in to comment.