Skip to content

Commit

Permalink
Showing 3 changed files with 100 additions and 3 deletions.
19 changes: 17 additions & 2 deletions x-pack/osquerybeat/beater/osquerybeat.go
Original file line number Diff line number Diff line change
@@ -66,6 +66,9 @@ type osquerybeat struct {
// Beat lifecycle context, cancelled on Stop
cancel context.CancelFunc
mx sync.Mutex

// parent process watcher
watcher *Watcher
}

// New creates an instance of osquerybeat.
@@ -87,14 +90,19 @@ func New(b *beat.Beat, cfg *conf.C) (beat.Beater, error) {
return bt, nil
}

func (bt *osquerybeat) initContext() (context.Context, error) {
func (bt *osquerybeat) init() (context.Context, error) {
bt.mx.Lock()
defer bt.mx.Unlock()
if bt.cancel != nil {
return nil, ErrAlreadyRunning
}
var ctx context.Context
ctx, bt.cancel = context.WithCancel(context.Background())

if bt.watcher != nil {
bt.watcher.Close()
}
bt.watcher = NewWatcher(bt.log)
return ctx, nil
}

@@ -108,11 +116,18 @@ func (bt *osquerybeat) close() {
bt.cancel()
bt.cancel = nil
}

// Start watching the parent process.
// The beat exits if the process gets orphaned.
if bt.watcher != nil {
go bt.watcher.Run()
bt.watcher = nil
}
}

// Run starts osquerybeat.
func (bt *osquerybeat) Run(b *beat.Beat) error {
ctx, err := bt.initContext()
ctx, err := bt.init()
if err != nil {
return err
}
79 changes: 79 additions & 0 deletions x-pack/osquerybeat/beater/watcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package beater

import (
"context"
"os"
"sync"
"time"

"github.com/elastic/elastic-agent-libs/logp"
)

const watchFrequency = 10 * time.Second

type Watcher struct {
log *logp.Logger
ppid int

mx sync.Mutex
cancel context.CancelFunc
}

func NewWatcher(log *logp.Logger) *Watcher {
w := &Watcher{
log: log,
ppid: os.Getppid(),
}
return w
}

func (w *Watcher) Start() {
go w.Run()
}

func (w *Watcher) Run() {
w.mx.Lock()
defer w.mx.Unlock()

if w.cancel != nil {
w.log.Debug("watcher is already running")
return
}

var ctx context.Context
ctx, w.cancel = context.WithCancel(context.Background())

ticker := time.NewTicker(watchFrequency)
defer ticker.Stop()

f := func() {
ppid := os.Getppid()
if ppid != w.ppid {
w.log.Errorf("orphaned osquerybeat, expected ppid: %v, found ppid: %v, quitting", w.ppid, ppid)
os.Exit(1)
}
}

for {
select {
case <-ticker.C:
f()
case <-ctx.Done():
w.log.Info("exit watcher on context done")
}
}
}

func (w *Watcher) Close() {
w.mx.Lock()
defer w.mx.Unlock()

if w.cancel != nil {
w.cancel()
w.cancel = nil
}
}
5 changes: 4 additions & 1 deletion x-pack/osquerybeat/internal/osqd/osqueryd_unix.go
Original file line number Diff line number Diff line change
@@ -58,5 +58,8 @@ func setpgid() *syscall.SysProcAttr {
// For clean process tree kill
func killProcessGroup(cmd *exec.Cmd) error {
err := syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL)
return fmt.Errorf("kill process group %d, %w", cmd.Process.Pid, err)
if err != nil {
return fmt.Errorf("kill process group %d, %w", cmd.Process.Pid, err)
}
return nil
}

0 comments on commit 5479958

Please sign in to comment.