diff --git a/winlogbeat/beater/eventlogger.go b/winlogbeat/beater/eventlogger.go new file mode 100644 index 000000000000..e68068a88ee4 --- /dev/null +++ b/winlogbeat/beater/eventlogger.go @@ -0,0 +1,128 @@ +package beater + +import ( + "time" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/libbeat/processors" + "github.com/elastic/beats/libbeat/publisher/bc/publisher" + "github.com/elastic/beats/libbeat/publisher/beat" + "github.com/elastic/beats/winlogbeat/checkpoint" + "github.com/elastic/beats/winlogbeat/eventlog" +) + +type eventLogger struct { + source eventlog.EventLog + eventMeta common.EventMetadata + processors beat.ProcessorList +} + +type eventLoggerConfig struct { + common.EventMetadata `config:",inline"` // Fields and tags to add to events. + Processors processors.PluginConfig `config:"processors"` +} + +func newEventLogger( + source eventlog.EventLog, + options *common.Config, +) (*eventLogger, error) { + config := eventLoggerConfig{} + if err := options.Unpack(&config); err != nil { + return nil, err + } + + processors, err := processors.New(config.Processors) + if err != nil { + return nil, err + } + + return &eventLogger{ + source: source, + eventMeta: config.EventMetadata, + processors: processors, + }, nil +} + +func (e *eventLogger) connect(pipeline publisher.Publisher) (beat.Client, error) { + api := e.source.Name() + return pipeline.ConnectX(beat.ClientConfig{ + PublishMode: beat.GuaranteedSend, + EventMetadata: e.eventMeta, + Meta: nil, // TODO: configure modules/ES ingest pipeline? + Processor: e.processors, + ACKCount: func(n int) { + addPublished(api, n) + logp.Info("EventLog[%s] successfully published %d events", api, n) + }, + }) +} + +func (e *eventLogger) run( + done <-chan struct{}, + pipeline publisher.Publisher, + state checkpoint.EventLogState, +) { + api := e.source + + // Initialize per event log metrics. + initMetrics(api.Name()) + + client, err := e.connect(pipeline) + if err != nil { + logp.Warn("EventLog[%s] Pipeline error. Failed to connect to publisher pipeline", + api.Name()) + return + } + + // close client on function return or when `done` is triggered (unblock client) + defer client.Close() + go func() { + <-done + client.Close() + }() + + err = api.Open(state.RecordNumber) + if err != nil { + logp.Warn("EventLog[%s] Open() error. No events will be read from "+ + "this source. %v", api.Name(), err) + return + } + defer func() { + logp.Info("EventLog[%s] Stop processing.", api.Name()) + + if err := api.Close(); err != nil { + logp.Warn("EventLog[%s] Close() error. %v", api.Name(), err) + return + } + }() + + debugf("EventLog[%s] opened successfully", api.Name()) + + for { + select { + case <-done: + return + default: + } + + // Read from the event. + records, err := api.Read() + if err != nil { + logp.Warn("EventLog[%s] Read() error: %v", api.Name(), err) + break + } + + debugf("EventLog[%s] Read() returned %d records", api.Name(), len(records)) + if len(records) == 0 { + // TODO: Consider implementing notifications using + // NotifyChangeEventLog instead of polling. + time.Sleep(time.Second) + continue + } + + for _, lr := range records { + client.Publish(lr.ToEvent()) + } + } +} diff --git a/winlogbeat/beater/metrics.go b/winlogbeat/beater/metrics.go new file mode 100644 index 000000000000..1a1c6030678e --- /dev/null +++ b/winlogbeat/beater/metrics.go @@ -0,0 +1,20 @@ +package beater + +import "expvar" + +// Metrics that can retrieved through the expvar web interface. Metrics must be +// enable through configuration in order for the web service to be started. +var ( + publishedEvents = expvar.NewMap("published_events") +) + +func initMetrics(namespace string) { + // Initialize metrics. + publishedEvents.Add(namespace, 0) +} + +func addPublished(namespace string, n int) { + numEvents := int64(n) + publishedEvents.Add("total", numEvents) + publishedEvents.Add(namespace, numEvents) +} diff --git a/winlogbeat/beater/winlogbeat.go b/winlogbeat/beater/winlogbeat.go index 26ce5da6dcf4..8ab5f7abd063 100644 --- a/winlogbeat/beater/winlogbeat.go +++ b/winlogbeat/beater/winlogbeat.go @@ -15,19 +15,13 @@ import ( "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/paths" "github.com/elastic/beats/libbeat/publisher/bc/publisher" + pub "github.com/elastic/beats/libbeat/publisher/beat" "github.com/elastic/beats/winlogbeat/checkpoint" "github.com/elastic/beats/winlogbeat/config" "github.com/elastic/beats/winlogbeat/eventlog" ) -// Metrics that can retrieved through the expvar web interface. Metrics must be -// enable through configuration in order for the web service to be started. -var ( - publishedEvents = expvar.NewMap("published_events") - ignoredEvents = expvar.NewMap("ignored_events") -) - func init() { expvar.Publish("uptime", expvar.Func(uptime)) } @@ -44,9 +38,9 @@ var startTime = time.Now().UTC() type Winlogbeat struct { beat *beat.Beat // Common beat information. config *config.Settings // Configuration settings. - eventLogs []eventlog.EventLog // List of all event logs being monitored. + eventLogs []*eventLogger // List of all event logs being monitored. done chan struct{} // Channel to initiate shutdown of main event loop. - client publisher.Client // Interface to publish event. + pipeline publisher.Publisher // Interface to publish event. checkpoint *checkpoint.Checkpoint // Persists event log state to disk. } @@ -86,7 +80,7 @@ func (eb *Winlogbeat) init(b *beat.Beat) error { // Create the event logs. This will validate the event log specific // configuration. - eb.eventLogs = make([]eventlog.EventLog, 0, len(config.EventLogs)) + eb.eventLogs = make([]*eventLogger, 0, len(config.EventLogs)) for _, config := range config.EventLogs { eventLog, err := eventlog.New(config) if err != nil { @@ -94,7 +88,12 @@ func (eb *Winlogbeat) init(b *beat.Beat) error { } debugf("Initialized EventLog[%s]", eventLog.Name()) - eb.eventLogs = append(eb.eventLogs, eventLog) + logger, err := newEventLogger(eventLog, config) + if err != nil { + return fmt.Errorf("Failed to create new event log. %v", err) + } + + eb.eventLogs = append(eb.eventLogs, logger) } return nil @@ -105,14 +104,13 @@ func (eb *Winlogbeat) init(b *beat.Beat) error { func (eb *Winlogbeat) setup(b *beat.Beat) error { config := &eb.config.Winlogbeat - eb.client = b.Publisher.Connect() - var err error eb.checkpoint, err = checkpoint.NewCheckpoint(config.RegistryFile, 10, 5*time.Second) if err != nil { return err } + eb.pipeline = b.Publisher return nil } @@ -125,16 +123,33 @@ func (eb *Winlogbeat) Run(b *beat.Beat) error { persistedState := eb.checkpoint.States() // Initialize metrics. - publishedEvents.Add("total", 0) - ignoredEvents.Add("total", 0) + initMetrics("total") + + // setup global event ACK handler + err := eb.pipeline.SetACKHandler(pub.PipelineACKHandler{ + ACKLastEvents: func(events []pub.Event) { + for _, event := range events { + priv := event.Private + if priv == nil { + continue + } + + st, ok := priv.(checkpoint.EventLogState) + if !ok { + continue + } + + eb.checkpoint.PersistState(st) + } + }, + }) + if err != nil { + return err + } var wg sync.WaitGroup for _, log := range eb.eventLogs { - state, _ := persistedState[log.Name()] - - // Initialize per event log metrics. - publishedEvents.Add(log.Name(), 0) - ignoredEvents.Add(log.Name(), 0) + state, _ := persistedState[log.source.Name()] // Start a goroutine for each event log. wg.Add(1) @@ -150,79 +165,17 @@ func (eb *Winlogbeat) Run(b *beat.Beat) error { func (eb *Winlogbeat) Stop() { logp.Info("Stopping Winlogbeat") if eb.done != nil { - eb.client.Close() close(eb.done) } } func (eb *Winlogbeat) processEventLog( wg *sync.WaitGroup, - api eventlog.EventLog, + logger *eventLogger, state checkpoint.EventLogState, ) { defer wg.Done() - - err := api.Open(state.RecordNumber) - if err != nil { - logp.Warn("EventLog[%s] Open() error. No events will be read from "+ - "this source. %v", api.Name(), err) - return - } - defer func() { - logp.Info("EventLog[%s] Stop processing.", api.Name()) - - if err := api.Close(); err != nil { - logp.Warn("EventLog[%s] Close() error. %v", api.Name(), err) - return - } - }() - - debugf("EventLog[%s] opened successfully", api.Name()) - - for { - select { - case <-eb.done: - return - default: - } - - // Read from the event. - records, err := api.Read() - if err != nil { - logp.Warn("EventLog[%s] Read() error: %v", api.Name(), err) - break - } - debugf("EventLog[%s] Read() returned %d records", api.Name(), len(records)) - if len(records) == 0 { - // TODO: Consider implementing notifications using - // NotifyChangeEventLog instead of polling. - time.Sleep(time.Second) - continue - } - - events := make([]common.MapStr, 0, len(records)) - for _, lr := range records { - events = append(events, lr.ToMapStr()) - } - - // Publish events. - numEvents := int64(len(events)) - ok := eb.client.PublishEvents(events, publisher.Sync, publisher.Guaranteed) - if !ok { - // due to using Sync and Guaranteed the ok will only be false on shutdown. - // Do not update the internal state and return in this case - return - } - - publishedEvents.Add("total", numEvents) - publishedEvents.Add(api.Name(), numEvents) - logp.Info("EventLog[%s] Successfully published %d events", - api.Name(), numEvents) - - eb.checkpoint.Persist(api.Name(), - records[len(records)-1].RecordID, - records[len(records)-1].TimeCreated.SystemTime.UTC()) - } + logger.run(eb.done, eb.pipeline, state) } // uptime returns a map of uptime related metrics. diff --git a/winlogbeat/checkpoint/checkpoint.go b/winlogbeat/checkpoint/checkpoint.go index d521c68f43ba..46a312295ebb 100644 --- a/winlogbeat/checkpoint/checkpoint.go +++ b/winlogbeat/checkpoint/checkpoint.go @@ -156,11 +156,16 @@ func (c *Checkpoint) States() map[string]EventLogState { // Persist queues the given event log state information to be written to disk. func (c *Checkpoint) Persist(name string, recordNumber uint64, ts time.Time) { - c.save <- EventLogState{ + c.PersistState(EventLogState{ Name: name, RecordNumber: recordNumber, Timestamp: ts, - } + }) +} + +// PersistState queues the given event log state to be written to disk. +func (c *Checkpoint) PersistState(st EventLogState) { + c.save <- st } // persist writes the current state to disk if the in-memory state is dirty. diff --git a/winlogbeat/config/config.go b/winlogbeat/config/config.go index fd78464026fe..d8e65fc19b0b 100644 --- a/winlogbeat/config/config.go +++ b/winlogbeat/config/config.go @@ -4,6 +4,7 @@ package config import ( "fmt" + "github.com/elastic/beats/libbeat/common" "github.com/joeshaw/multierror" ) @@ -43,8 +44,8 @@ func (s Settings) Validate() error { // WinlogbeatConfig contains all of Winlogbeat configuration data. type WinlogbeatConfig struct { - EventLogs []map[string]interface{} `config:"event_logs"` - RegistryFile string `config:"registry_file"` + EventLogs []*common.Config `config:"event_logs"` + RegistryFile string `config:"registry_file"` } // Validate validates the WinlogbeatConfig data and returns an error describing diff --git a/winlogbeat/config/config_test.go b/winlogbeat/config/config_test.go index f9f5be79559b..15b136ec4411 100644 --- a/winlogbeat/config/config_test.go +++ b/winlogbeat/config/config_test.go @@ -5,6 +5,7 @@ package config import ( "testing" + "github.com/elastic/beats/libbeat/common" "github.com/stretchr/testify/assert" ) @@ -31,8 +32,10 @@ func TestConfigValidate(t *testing.T) { // Top-level config { WinlogbeatConfig{ - EventLogs: []map[string]interface{}{ - {"Name": "App"}, + EventLogs: []*common.Config{ + newConfig(map[string]interface{}{ + "Name": "App", + }), }, }, "", // No Error @@ -48,3 +51,11 @@ func TestConfigValidate(t *testing.T) { test.run(t) } } + +func newConfig(from map[string]interface{}) *common.Config { + cfg, err := common.NewConfigFrom(from) + if err != nil { + panic(err) + } + return cfg +} diff --git a/winlogbeat/eventlog/bench_test.go b/winlogbeat/eventlog/bench_test.go index 8aca84d3006a..f09be4014b3c 100644 --- a/winlogbeat/eventlog/bench_test.go +++ b/winlogbeat/eventlog/bench_test.go @@ -53,27 +53,13 @@ func TestBenchmarkBatchReadSize(t *testing.T) { } } - setup := func(t testing.TB, batchReadSize int) (EventLog, func()) { - eventlog, err := newWinEventLog(map[string]interface{}{"name": providerName, "batch_read_size": batchReadSize}) - if err != nil { - t.Fatal(err) - } - err = eventlog.Open(0) - if err != nil { - t.Fatal(err) - } - return eventlog, func() { - err := eventlog.Close() - if err != nil { - t.Fatal(err) - } - } - } - benchTest := func(batchSize int) { var err error result := testing.Benchmark(func(b *testing.B) { - eventlog, tearDown := setup(b, batchSize) + eventlog, tearDown := setupWinEventLog(t, 0, map[string]interface{}{ + "name": providerName, + "batch_read_size": batchSize, + }) defer tearDown() b.ResetTimer() diff --git a/winlogbeat/eventlog/common_test.go b/winlogbeat/eventlog/common_test.go new file mode 100644 index 000000000000..d5f4b35e47fe --- /dev/null +++ b/winlogbeat/eventlog/common_test.go @@ -0,0 +1,30 @@ +package eventlog + +import ( + "testing" + + "github.com/elastic/beats/libbeat/common" +) + +type factory func(*common.Config) (EventLog, error) +type teardown func() + +func fatalErr(t *testing.T, err error) { + if err != nil { + t.Fatal(err) + } +} + +func newTestEventLog(t *testing.T, factory factory, options map[string]interface{}) EventLog { + config, err := common.NewConfigFrom(options) + fatalErr(t, err) + eventLog, err := factory(config) + fatalErr(t, err) + return eventLog +} + +func setupEventLog(t *testing.T, factory factory, recordID uint64, options map[string]interface{}) (EventLog, teardown) { + eventLog := newTestEventLog(t, factory, options) + fatalErr(t, eventLog.Open(recordID)) + return eventLog, func() { fatalErr(t, eventLog.Close()) } +} diff --git a/winlogbeat/eventlog/eventlog.go b/winlogbeat/eventlog/eventlog.go index 2839cfb1b596..31f0842d1904 100644 --- a/winlogbeat/eventlog/eventlog.go +++ b/winlogbeat/eventlog/eventlog.go @@ -9,6 +9,8 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/libbeat/publisher/beat" + "github.com/elastic/beats/winlogbeat/checkpoint" "github.com/elastic/beats/winlogbeat/sys" ) @@ -53,22 +55,19 @@ type EventLog interface { // Record represents a single event from the log. type Record struct { sys.Event - common.EventMetadata // Fields and tags to add to the event. - API string // The event log API type used to read the record. - XML string // XML representation of the event. + API string // The event log API type used to read the record. + XML string // XML representation of the event. } // ToMapStr returns a new MapStr containing the data from this Record. -func (e Record) ToMapStr() common.MapStr { +func (e Record) ToEvent() beat.Event { m := common.MapStr{ - "type": e.API, - common.EventMetadataKey: e.EventMetadata, - "@timestamp": common.Time(e.TimeCreated.SystemTime), - "log_name": e.Channel, - "source_name": e.Provider.Name, - "computer_name": e.Computer, - "record_number": strconv.FormatUint(e.RecordID, 10), - "event_id": e.EventIdentifier.ID, + "type": e.API, + "log_name": e.Channel, + "source_name": e.Provider.Name, + "computer_name": e.Computer, + "record_number": strconv.FormatUint(e.RecordID, 10), + "event_id": e.EventIdentifier.ID, } addOptional(m, "xml", e.XML) @@ -109,7 +108,15 @@ func (e Record) ToMapStr() common.MapStr { userData := addPairs(m, "user_data", e.UserData.Pairs) addOptional(userData, "xml_name", e.UserData.Name.Local) - return m + return beat.Event{ + Timestamp: e.TimeCreated.SystemTime, + Fields: m, + Private: checkpoint.EventLogState{ + Name: e.API, + RecordNumber: e.RecordID, + Timestamp: e.TimeCreated.SystemTime, + }, + } } // addOptional adds a key and value to the given MapStr if the value is not the diff --git a/winlogbeat/eventlog/eventlogging.go b/winlogbeat/eventlog/eventlogging.go index ba77d7085fa6..32164cb4bc2e 100644 --- a/winlogbeat/eventlog/eventlogging.go +++ b/winlogbeat/eventlog/eventlogging.go @@ -58,14 +58,13 @@ var _ EventLog = &eventLogging{} // eventLogging implements the EventLog interface for reading from the Event // Logging API. type eventLogging struct { - config eventLoggingConfig - name string // Name of the log that is opened. - handle win.Handle // Handle to the event log. - readBuf []byte // Buffer for reading in events. - formatBuf []byte // Buffer for formatting messages. - handles *messageFilesCache // Cached mapping of source name to event message file handles. - logPrefix string // Prefix to add to all log entries. - eventMetadata common.EventMetadata // Fields and tags to add to each event. + config eventLoggingConfig + name string // Name of the log that is opened. + handle win.Handle // Handle to the event log. + readBuf []byte // Buffer for reading in events. + formatBuf []byte // Buffer for formatting messages. + handles *messageFilesCache // Cached mapping of source name to event message file handles. + logPrefix string // Prefix to add to all log entries. recordNumber uint32 // First record number to read. seek bool // Read should use seek. @@ -168,9 +167,8 @@ func (l *eventLogging) Read() ([]Record, error) { } records = append(records, Record{ - API: eventLoggingAPIName, - EventMetadata: l.eventMetadata, - Event: e, + API: eventLoggingAPIName, + Event: e, }) } @@ -251,7 +249,7 @@ func (l *eventLogging) ignoreOlder(r *Record) bool { // newEventLogging creates and returns a new EventLog for reading event logs // using the Event Logging API. -func newEventLogging(options map[string]interface{}) (EventLog, error) { +func newEventLogging(options *common.Config) (EventLog, error) { c := eventLoggingConfig{ ReadBufferSize: win.MaxEventBufferSize, FormatBufferSize: win.MaxFormatMessageBufferSize, @@ -265,10 +263,9 @@ func newEventLogging(options map[string]interface{}) (EventLog, error) { name: c.Name, handles: newMessageFilesCache(c.Name, win.QueryEventMessageFiles, win.FreeLibrary), - logPrefix: fmt.Sprintf("EventLogging[%s]", c.Name), - readBuf: make([]byte, 0, c.ReadBufferSize), - formatBuf: make([]byte, c.FormatBufferSize), - eventMetadata: c.EventMetadata, + logPrefix: fmt.Sprintf("EventLogging[%s]", c.Name), + readBuf: make([]byte, 0, c.ReadBufferSize), + formatBuf: make([]byte, c.FormatBufferSize), }, nil } diff --git a/winlogbeat/eventlog/eventlogging_test.go b/winlogbeat/eventlog/eventlogging_test.go index 98be3c28aa79..a6e69dd38cb8 100644 --- a/winlogbeat/eventlog/eventlogging_test.go +++ b/winlogbeat/eventlog/eventlogging_test.go @@ -179,20 +179,9 @@ func TestRead(t *testing.T) { } // Read messages: - eventlog, err := newEventLogging(map[string]interface{}{"name": providerName}) - if err != nil { - t.Fatal(err) - } - err = eventlog.Open(0) - if err != nil { - t.Fatal(err) - } - defer func() { - err := eventlog.Close() - if err != nil { - t.Fatal(err) - } - }() + eventlog, teardown := setupEventLogging(t, 0, map[string]interface{}{"name": providerName}) + defer teardown() + records, err := eventlog.Read() if err != nil { t.Fatal(err) @@ -244,24 +233,13 @@ func TestFormatMessageWithLargeMessage(t *testing.T) { requiredBufferSize := len(message+"\r\n")*2 + 2 // Read messages: - eventlog, err := newEventLogging(map[string]interface{}{ + eventlog, teardown := setupEventLogging(t, 0, map[string]interface{}{ "name": providerName, // Use a buffer smaller than what is required. "format_buffer_size": requiredBufferSize / 2, }) - if err != nil { - t.Fatal(err) - } - err = eventlog.Open(0) - if err != nil { - t.Fatal(err) - } - defer func() { - err := eventlog.Close() - if err != nil { - t.Fatal(err) - } - }() + defer teardown() + records, err := eventlog.Read() if err != nil { t.Fatal(err) @@ -299,20 +277,9 @@ func TestReadUnknownEventId(t *testing.T) { } // Read messages: - eventlog, err := newEventLogging(map[string]interface{}{"name": providerName}) - if err != nil { - t.Fatal(err) - } - err = eventlog.Open(0) - if err != nil { - t.Fatal(err) - } - defer func() { - err := eventlog.Close() - if err != nil { - t.Fatal(err) - } - }() + eventlog, teardown := setupEventLogging(t, 0, map[string]interface{}{"name": providerName}) + defer teardown() + records, err := eventlog.Read() if err != nil { t.Fatal(err) @@ -356,20 +323,9 @@ func TestReadTriesMultipleEventMsgFiles(t *testing.T) { } // Read messages: - eventlog, err := newEventLogging(map[string]interface{}{"name": providerName}) - if err != nil { - t.Fatal(err) - } - err = eventlog.Open(0) - if err != nil { - t.Fatal(err) - } - defer func() { - err := eventlog.Close() - if err != nil { - t.Fatal(err) - } - }() + eventlog, teardown := setupEventLogging(t, 0, map[string]interface{}{"name": providerName}) + defer teardown() + records, err := eventlog.Read() if err != nil { t.Fatal(err) @@ -412,20 +368,9 @@ func TestReadMultiParameterMsg(t *testing.T) { } // Read messages: - eventlog, err := newEventLogging(map[string]interface{}{"name": providerName}) - if err != nil { - t.Fatal(err) - } - err = eventlog.Open(0) - if err != nil { - t.Fatal(err) - } - defer func() { - err := eventlog.Close() - if err != nil { - t.Fatal(err) - } - }() + eventlog, teardown := setupEventLogging(t, 0, map[string]interface{}{"name": providerName}) + defer teardown() + records, err := eventlog.Read() if err != nil { t.Fatal(err) @@ -447,13 +392,10 @@ func TestOpenInvalidProvider(t *testing.T) { configureLogp() - el, err := newEventLogging(map[string]interface{}{"name": "nonExistentProvider"}) - if err != nil { - t.Fatal(err) - } + el := newTestEventLogging(t, map[string]interface{}{"name": "nonExistentProvider"}) assert.NoError(t, el.Open(0), "Calling Open() on an unknown provider "+ "should automatically open Application.") - _, err = el.Read() + _, err := el.Read() assert.NoError(t, err) } @@ -481,20 +423,9 @@ func TestReadNoParameterMsg(t *testing.T) { } // Read messages: - eventlog, err := newEventLogging(map[string]interface{}{"name": providerName}) - if err != nil { - t.Fatal(err) - } - err = eventlog.Open(0) - if err != nil { - t.Fatal(err) - } - defer func() { - err := eventlog.Close() - if err != nil { - t.Fatal(err) - } - }() + eventlog, teardown := setupEventLogging(t, 0, map[string]interface{}{"name": providerName}) + defer teardown() + records, err := eventlog.Read() if err != nil { t.Fatal(err) @@ -526,20 +457,8 @@ func TestReadWhileCleared(t *testing.T) { } }() - eventlog, err := newEventLogging(map[string]interface{}{"name": providerName}) - if err != nil { - t.Fatal(err) - } - err = eventlog.Open(0) - if err != nil { - t.Fatal(err) - } - defer func() { - err := eventlog.Close() - if err != nil { - t.Fatal(err) - } - }() + eventlog, teardown := setupEventLogging(t, 0, map[string]interface{}{"name": providerName}) + defer teardown() log.Info(1, "Message 1") log.Info(2, "Message 2") @@ -561,6 +480,14 @@ func TestReadWhileCleared(t *testing.T) { } } +func newTestEventLogging(t *testing.T, options map[string]interface{}) EventLog { + return newTestEventLog(t, newEventLogging, options) +} + +func setupEventLogging(t *testing.T, recordID uint64, options map[string]interface{}) (EventLog, func()) { + return setupEventLog(t, newEventLogging, recordID, options) +} + // TODO: Add more test cases: // - Record number rollover (there may be an issue with this if ++ is used anywhere) // - Reading from a source name instead of provider name (can't be done according to docs). diff --git a/winlogbeat/eventlog/factory.go b/winlogbeat/eventlog/factory.go index c30eabc3b03d..cd8dd9ce06b8 100644 --- a/winlogbeat/eventlog/factory.go +++ b/winlogbeat/eventlog/factory.go @@ -15,9 +15,8 @@ var commonConfigKeys = []string{"api", "name", "fields", "fields_under_root", "t // EventLog. Each implementation is free to support additional configuration // options. type ConfigCommon struct { - API string `config:"api"` // Name of the API to use. Optional. - Name string `config:"name"` // Name of the event log or channel. - common.EventMetadata `config:",inline"` // Fields and tags to add to each event. + API string `config:"api"` // Name of the API to use. Optional. + Name string `config:"name"` // Name of the event log or channel. } type validator interface { @@ -25,15 +24,10 @@ type validator interface { } func readConfig( - data map[string]interface{}, + c *common.Config, config interface{}, validKeys []string, ) error { - c, err := common.NewConfigFrom(data) - if err != nil { - return fmt.Errorf("Failed reading config. %v", err) - } - if err := c.Unpack(config); err != nil { return fmt.Errorf("Failed unpacking config. %v", err) } @@ -43,7 +37,7 @@ func readConfig( sort.Strings(validKeys) // Check for invalid keys. - for k := range data { + for _, k := range c.GetFields() { k = strings.ToLower(k) i := sort.SearchStrings(validKeys, k) if i >= len(validKeys) || validKeys[i] != k { @@ -63,7 +57,7 @@ func readConfig( } // Producer produces a new event log instance for reading event log records. -type producer func(map[string]interface{}) (EventLog, error) +type producer func(*common.Config) (EventLog, error) // Channels lists the available channels (event logs). type channels func() ([]string, error) @@ -99,7 +93,7 @@ func Register(apiName string, priority int, producer producer, channels channels // New creates and returns a new EventLog instance based on the given config // and the registered EventLog producers. -func New(options map[string]interface{}) (EventLog, error) { +func New(options *common.Config) (EventLog, error) { if len(eventLogs) == 0 { return nil, fmt.Errorf("No event log API is available on this system") } diff --git a/winlogbeat/eventlog/wineventlog.go b/winlogbeat/eventlog/wineventlog.go index 69a3b8abbc0a..3ca7d43c32d4 100644 --- a/winlogbeat/eventlog/wineventlog.go +++ b/winlogbeat/eventlog/wineventlog.go @@ -81,8 +81,7 @@ type winEventLog struct { outputBuf *sys.ByteBuffer // Buffer for receiving XML cache *messageFilesCache // Cached mapping of source name to event message file handles. - logPrefix string // String to prefix on log messages. - eventMetadata common.EventMetadata // Field and tags to add to each event. + logPrefix string // String to prefix on log messages. } // Name returns the name of the event log (i.e. Application, Security, etc.). @@ -221,9 +220,8 @@ func (l *winEventLog) buildRecordFromXML(x []byte, recoveredErr error) (Record, } r := Record{ - API: winEventLogAPIName, - EventMetadata: l.eventMetadata, - Event: e, + API: winEventLogAPIName, + Event: e, } if l.config.IncludeXML { @@ -235,7 +233,7 @@ func (l *winEventLog) buildRecordFromXML(x []byte, recoveredErr error) (Record, // newWinEventLog creates and returns a new EventLog for reading event logs // using the Windows Event Log. -func newWinEventLog(options map[string]interface{}) (EventLog, error) { +func newWinEventLog(options *common.Config) (EventLog, error) { c := defaultWinEventLogConfig if err := readConfig(options, &c, winEventLogConfigKeys); err != nil { return nil, err @@ -269,15 +267,14 @@ func newWinEventLog(options map[string]interface{}) (EventLog, error) { } l := &winEventLog{ - config: c, - query: query, - channelName: c.Name, - maxRead: c.BatchReadSize, - renderBuf: make([]byte, renderBufferSize), - outputBuf: sys.NewByteBuffer(renderBufferSize), - cache: newMessageFilesCache(c.Name, eventMetadataHandle, freeHandle), - logPrefix: fmt.Sprintf("WinEventLog[%s]", c.Name), - eventMetadata: c.EventMetadata, + config: c, + query: query, + channelName: c.Name, + maxRead: c.BatchReadSize, + renderBuf: make([]byte, renderBufferSize), + outputBuf: sys.NewByteBuffer(renderBufferSize), + cache: newMessageFilesCache(c.Name, eventMetadataHandle, freeHandle), + logPrefix: fmt.Sprintf("WinEventLog[%s]", c.Name), } // Forwarded events should be rendered using RenderEventXML. It is more diff --git a/winlogbeat/eventlog/wineventlog_test.go b/winlogbeat/eventlog/wineventlog_test.go index 68896477343c..c2ce142867bd 100644 --- a/winlogbeat/eventlog/wineventlog_test.go +++ b/winlogbeat/eventlog/wineventlog_test.go @@ -33,20 +33,8 @@ func TestWinEventLogBatchReadSize(t *testing.T) { } batchReadSize := 2 - eventlog, err := newWinEventLog(map[string]interface{}{"name": providerName, "batch_read_size": batchReadSize}) - if err != nil { - t.Fatal(err) - } - err = eventlog.Open(0) - if err != nil { - t.Fatal(err) - } - defer func() { - err := eventlog.Close() - if err != nil { - t.Fatal(err) - } - }() + eventlog, teardown := setupWinEventLog(t, 0, map[string]interface{}{"name": providerName, "batch_read_size": batchReadSize}) + defer teardown() records, err := eventlog.Read() if err != nil { @@ -83,20 +71,8 @@ func TestReadLargeBatchSize(t *testing.T) { } } - eventlog, err := newWinEventLog(map[string]interface{}{"name": providerName, "batch_read_size": 1024}) - if err != nil { - t.Fatal(err) - } - err = eventlog.Open(0) - if err != nil { - t.Fatal(err) - } - defer func() { - err := eventlog.Close() - if err != nil { - t.Fatal(err) - } - }() + eventlog, teardown := setupWinEventLog(t, 0, map[string]interface{}{"name": providerName, "batch_read_size": 1024}) + defer teardown() var eventCount int for eventCount < totalEvents { @@ -121,3 +97,7 @@ func TestReadLargeBatchSize(t *testing.T) { } }) } + +func setupWinEventLog(t *testing.T, recordID uint64, options map[string]interface{}) (EventLog, func()) { + return setupEventLog(t, newWinEventLog, recordID, options) +} diff --git a/winlogbeat/tests/system/test_config.py b/winlogbeat/tests/system/test_config.py index bd2804cb5690..13b51694cd6b 100644 --- a/winlogbeat/tests/system/test_config.py +++ b/winlogbeat/tests/system/test_config.py @@ -33,7 +33,8 @@ def test_invalid_ignore_older(self): ) self.start_beat(extra_args=["-configtest"]).check_wait(exit_code=1) assert self.log_contains( - "unknown unit hour in duration 1 hour accessing 'ignore_older'") + "unknown unit hour in duration 1 hour " + "accessing 'winlogbeat.event_logs.0.ignore_older'") def test_invalid_level(self): """ @@ -58,5 +59,5 @@ def test_invalid_api(self): ] ) self.start_beat(extra_args=["-configtest"]).check_wait(exit_code=1) - assert self.log_contains(("Failed to create new event log. file API is " - "not available")) + assert self.log_contains("Failed to create new event log. " + "file API is not available")