Skip to content

Commit

Permalink
Move winlogbeat to new publisher pipeline
Browse files Browse the repository at this point in the history
- Move winlogbeat to publisher pipeline
- move fields + tags processing to pipeline client
- introduce local `processors` setting for each configured event log.
- minor cleanups in tests
  • Loading branch information
urso authored and andrewkroh committed Jul 19, 2017
1 parent 6093150 commit 6dd6290
Show file tree
Hide file tree
Showing 15 changed files with 335 additions and 298 deletions.
128 changes: 128 additions & 0 deletions winlogbeat/beater/eventlogger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package beater

import (
"time"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/processors"
"github.com/elastic/beats/libbeat/publisher/bc/publisher"
"github.com/elastic/beats/libbeat/publisher/beat"
"github.com/elastic/beats/winlogbeat/checkpoint"
"github.com/elastic/beats/winlogbeat/eventlog"
)

type eventLogger struct {
source eventlog.EventLog
eventMeta common.EventMetadata
processors beat.ProcessorList
}

type eventLoggerConfig struct {
common.EventMetadata `config:",inline"` // Fields and tags to add to events.
Processors processors.PluginConfig `config:"processors"`
}

func newEventLogger(
source eventlog.EventLog,
options *common.Config,
) (*eventLogger, error) {
config := eventLoggerConfig{}
if err := options.Unpack(&config); err != nil {
return nil, err
}

processors, err := processors.New(config.Processors)
if err != nil {
return nil, err
}

return &eventLogger{
source: source,
eventMeta: config.EventMetadata,
processors: processors,
}, nil
}

func (e *eventLogger) connect(pipeline publisher.Publisher) (beat.Client, error) {
api := e.source.Name()
return pipeline.ConnectX(beat.ClientConfig{
PublishMode: beat.GuaranteedSend,
EventMetadata: e.eventMeta,
Meta: nil, // TODO: configure modules/ES ingest pipeline?
Processor: e.processors,
ACKCount: func(n int) {
addPublished(api, n)
logp.Info("EventLog[%s] successfully published %d events", api, n)
},
})
}

func (e *eventLogger) run(
done <-chan struct{},
pipeline publisher.Publisher,
state checkpoint.EventLogState,
) {
api := e.source

// Initialize per event log metrics.
initMetrics(api.Name())

client, err := e.connect(pipeline)
if err != nil {
logp.Warn("EventLog[%s] Pipeline error. Failed to connect to publisher pipeline",
api.Name())
return
}

// close client on function return or when `done` is triggered (unblock client)
defer client.Close()
go func() {
<-done
client.Close()
}()

err = api.Open(state.RecordNumber)
if err != nil {
logp.Warn("EventLog[%s] Open() error. No events will be read from "+
"this source. %v", api.Name(), err)
return
}
defer func() {
logp.Info("EventLog[%s] Stop processing.", api.Name())

if err := api.Close(); err != nil {
logp.Warn("EventLog[%s] Close() error. %v", api.Name(), err)
return
}
}()

debugf("EventLog[%s] opened successfully", api.Name())

for {
select {
case <-done:
return
default:
}

// Read from the event.
records, err := api.Read()
if err != nil {
logp.Warn("EventLog[%s] Read() error: %v", api.Name(), err)
break
}

debugf("EventLog[%s] Read() returned %d records", api.Name(), len(records))
if len(records) == 0 {
// TODO: Consider implementing notifications using
// NotifyChangeEventLog instead of polling.
time.Sleep(time.Second)
continue
}

for _, lr := range records {
client.Publish(lr.ToEvent())
}
}
}
20 changes: 20 additions & 0 deletions winlogbeat/beater/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package beater

import "expvar"

// Metrics that can retrieved through the expvar web interface. Metrics must be
// enable through configuration in order for the web service to be started.
var (
publishedEvents = expvar.NewMap("published_events")
)

func initMetrics(namespace string) {
// Initialize metrics.
publishedEvents.Add(namespace, 0)
}

func addPublished(namespace string, n int) {
numEvents := int64(n)
publishedEvents.Add("total", numEvents)
publishedEvents.Add(namespace, numEvents)
}
121 changes: 37 additions & 84 deletions winlogbeat/beater/winlogbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,13 @@ import (
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/paths"
"github.com/elastic/beats/libbeat/publisher/bc/publisher"
pub "github.com/elastic/beats/libbeat/publisher/beat"

"github.com/elastic/beats/winlogbeat/checkpoint"
"github.com/elastic/beats/winlogbeat/config"
"github.com/elastic/beats/winlogbeat/eventlog"
)

// Metrics that can retrieved through the expvar web interface. Metrics must be
// enable through configuration in order for the web service to be started.
var (
publishedEvents = expvar.NewMap("published_events")
ignoredEvents = expvar.NewMap("ignored_events")
)

func init() {
expvar.Publish("uptime", expvar.Func(uptime))
}
Expand All @@ -44,9 +38,9 @@ var startTime = time.Now().UTC()
type Winlogbeat struct {
beat *beat.Beat // Common beat information.
config *config.Settings // Configuration settings.
eventLogs []eventlog.EventLog // List of all event logs being monitored.
eventLogs []*eventLogger // List of all event logs being monitored.
done chan struct{} // Channel to initiate shutdown of main event loop.
client publisher.Client // Interface to publish event.
pipeline publisher.Publisher // Interface to publish event.
checkpoint *checkpoint.Checkpoint // Persists event log state to disk.
}

Expand Down Expand Up @@ -86,15 +80,20 @@ func (eb *Winlogbeat) init(b *beat.Beat) error {

// Create the event logs. This will validate the event log specific
// configuration.
eb.eventLogs = make([]eventlog.EventLog, 0, len(config.EventLogs))
eb.eventLogs = make([]*eventLogger, 0, len(config.EventLogs))
for _, config := range config.EventLogs {
eventLog, err := eventlog.New(config)
if err != nil {
return fmt.Errorf("Failed to create new event log. %v", err)
}
debugf("Initialized EventLog[%s]", eventLog.Name())

eb.eventLogs = append(eb.eventLogs, eventLog)
logger, err := newEventLogger(eventLog, config)
if err != nil {
return fmt.Errorf("Failed to create new event log. %v", err)
}

eb.eventLogs = append(eb.eventLogs, logger)
}

return nil
Expand All @@ -105,14 +104,13 @@ func (eb *Winlogbeat) init(b *beat.Beat) error {
func (eb *Winlogbeat) setup(b *beat.Beat) error {
config := &eb.config.Winlogbeat

eb.client = b.Publisher.Connect()

var err error
eb.checkpoint, err = checkpoint.NewCheckpoint(config.RegistryFile, 10, 5*time.Second)
if err != nil {
return err
}

eb.pipeline = b.Publisher
return nil
}

Expand All @@ -125,16 +123,33 @@ func (eb *Winlogbeat) Run(b *beat.Beat) error {
persistedState := eb.checkpoint.States()

// Initialize metrics.
publishedEvents.Add("total", 0)
ignoredEvents.Add("total", 0)
initMetrics("total")

// setup global event ACK handler
err := eb.pipeline.SetACKHandler(pub.PipelineACKHandler{
ACKLastEvents: func(events []pub.Event) {
for _, event := range events {
priv := event.Private
if priv == nil {
continue
}

st, ok := priv.(checkpoint.EventLogState)
if !ok {
continue
}

eb.checkpoint.PersistState(st)
}
},
})
if err != nil {
return err
}

var wg sync.WaitGroup
for _, log := range eb.eventLogs {
state, _ := persistedState[log.Name()]

// Initialize per event log metrics.
publishedEvents.Add(log.Name(), 0)
ignoredEvents.Add(log.Name(), 0)
state, _ := persistedState[log.source.Name()]

// Start a goroutine for each event log.
wg.Add(1)
Expand All @@ -150,79 +165,17 @@ func (eb *Winlogbeat) Run(b *beat.Beat) error {
func (eb *Winlogbeat) Stop() {
logp.Info("Stopping Winlogbeat")
if eb.done != nil {
eb.client.Close()
close(eb.done)
}
}

func (eb *Winlogbeat) processEventLog(
wg *sync.WaitGroup,
api eventlog.EventLog,
logger *eventLogger,
state checkpoint.EventLogState,
) {
defer wg.Done()

err := api.Open(state.RecordNumber)
if err != nil {
logp.Warn("EventLog[%s] Open() error. No events will be read from "+
"this source. %v", api.Name(), err)
return
}
defer func() {
logp.Info("EventLog[%s] Stop processing.", api.Name())

if err := api.Close(); err != nil {
logp.Warn("EventLog[%s] Close() error. %v", api.Name(), err)
return
}
}()

debugf("EventLog[%s] opened successfully", api.Name())

for {
select {
case <-eb.done:
return
default:
}

// Read from the event.
records, err := api.Read()
if err != nil {
logp.Warn("EventLog[%s] Read() error: %v", api.Name(), err)
break
}
debugf("EventLog[%s] Read() returned %d records", api.Name(), len(records))
if len(records) == 0 {
// TODO: Consider implementing notifications using
// NotifyChangeEventLog instead of polling.
time.Sleep(time.Second)
continue
}

events := make([]common.MapStr, 0, len(records))
for _, lr := range records {
events = append(events, lr.ToMapStr())
}

// Publish events.
numEvents := int64(len(events))
ok := eb.client.PublishEvents(events, publisher.Sync, publisher.Guaranteed)
if !ok {
// due to using Sync and Guaranteed the ok will only be false on shutdown.
// Do not update the internal state and return in this case
return
}

publishedEvents.Add("total", numEvents)
publishedEvents.Add(api.Name(), numEvents)
logp.Info("EventLog[%s] Successfully published %d events",
api.Name(), numEvents)

eb.checkpoint.Persist(api.Name(),
records[len(records)-1].RecordID,
records[len(records)-1].TimeCreated.SystemTime.UTC())
}
logger.run(eb.done, eb.pipeline, state)
}

// uptime returns a map of uptime related metrics.
Expand Down
9 changes: 7 additions & 2 deletions winlogbeat/checkpoint/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,16 @@ func (c *Checkpoint) States() map[string]EventLogState {

// Persist queues the given event log state information to be written to disk.
func (c *Checkpoint) Persist(name string, recordNumber uint64, ts time.Time) {
c.save <- EventLogState{
c.PersistState(EventLogState{
Name: name,
RecordNumber: recordNumber,
Timestamp: ts,
}
})
}

// PersistState queues the given event log state to be written to disk.
func (c *Checkpoint) PersistState(st EventLogState) {
c.save <- st
}

// persist writes the current state to disk if the in-memory state is dirty.
Expand Down
5 changes: 3 additions & 2 deletions winlogbeat/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package config
import (
"fmt"

"github.com/elastic/beats/libbeat/common"
"github.com/joeshaw/multierror"
)

Expand Down Expand Up @@ -43,8 +44,8 @@ func (s Settings) Validate() error {

// WinlogbeatConfig contains all of Winlogbeat configuration data.
type WinlogbeatConfig struct {
EventLogs []map[string]interface{} `config:"event_logs"`
RegistryFile string `config:"registry_file"`
EventLogs []*common.Config `config:"event_logs"`
RegistryFile string `config:"registry_file"`
}

// Validate validates the WinlogbeatConfig data and returns an error describing
Expand Down
Loading

0 comments on commit 6dd6290

Please sign in to comment.