Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix concurrent harvesters #2541

Merged
merged 2 commits into from
Sep 14, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha5...master[Check the HEAD d
*Filebeat*
- Fix processor failure in Filebeat when using regex, contain, or equals with the message field. {issue}2178[2178]
- Fix async publisher sending empty events {pull}2455[2455]
- Fix potential issue with multiple harvester per file on large file numbers or slow output {pull}2541[2541]

*Winlogbeat*
- Fix corrupt registry file that occurs on power loss by disabling file write caching. {issue}2313[2313]
Expand Down
43 changes: 21 additions & 22 deletions filebeat/harvester/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,38 +28,37 @@ var (
filesTruncated = expvar.NewInt("filebeat.harvester.files.truncated")
)

// Log harvester reads files line by line and sends events to the defined output
func (h *Harvester) Harvest() {
// Setup opens the file handler and creates the reader for the harvester
func (h *Harvester) Setup() (reader.Reader, error) {
err := h.open()
if err != nil {
return nil, fmt.Errorf("Harvester setup failed. Unexpected file opening error: %s", err)
}

r, err := h.newLogFileReader()
if err != nil {
if h.file != nil {
h.file.Close()
}
return nil, fmt.Errorf("Harvester setup failed. Unexpected encoding line reader error: %s", err)
}

return r, nil

}

// Harvest reads files line by line and sends events to the defined output
func (h *Harvester) Harvest(r reader.Reader) {

harvesterStarted.Add(1)
harvesterRunning.Add(1)
defer harvesterRunning.Add(-1)

h.state.Finished = false

// Makes sure file is properly closed when the harvester is stopped
defer h.close()

err := h.open()
if err != nil {
logp.Err("Stop Harvesting. Unexpected file opening error: %s", err)
return
}

logp.Info("Harvester started for file: %s", h.state.Source)

r, err := h.newLogFileReader()
if err != nil {
logp.Err("Stop Harvesting. Unexpected encoding line reader error: %s", err)
return
}

// Always report the state before starting a harvester
// This is useful in case the file was renamed
if !h.sendStateUpdate() {
return
}

for {
select {
case <-h.done:
Expand Down
50 changes: 39 additions & 11 deletions filebeat/prospector/prospector.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package prospector

import (
"errors"
"expvar"
"fmt"
"sync"
Expand Down Expand Up @@ -116,18 +117,10 @@ func (p *Prospector) Run() {
logp.Info("Prospector channel stopped")
return
case event := <-p.harvesterChan:
// Add ttl if cleanOlder is enabled
if p.config.CleanInactive > 0 {
event.State.TTL = p.config.CleanInactive
}

ok := p.outlet.OnEvent(event)
if !ok {
logp.Info("Prospector outlet closed")
err := p.updateState(event)
if err != nil {
return
}

p.states.Update(event.State)
}
}
}()
Expand All @@ -147,6 +140,25 @@ func (p *Prospector) Run() {
}
}

// updateState updates the prospector state and forwards the event to the spooler
// All state updates done by the prospector itself are synchronous to make sure not states are overwritten
func (p *Prospector) updateState(event *input.Event) error {

// Add ttl if cleanOlder is enabled
if p.config.CleanInactive > 0 {
event.State.TTL = p.config.CleanInactive
}

ok := p.outlet.OnEvent(event)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we want to filter out 'empty' events here or are empty events required for some state update somehwere else?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

state updates will also be needed for registrar.

if !ok {
logp.Info("Prospector outlet closed")
return errors.New("prospector outlet closed")
}

p.states.Update(event.State)
return nil
}

func (p *Prospector) Stop() {
logp.Info("Stopping Prospector")
close(p.done)
Expand Down Expand Up @@ -176,12 +188,27 @@ func (p *Prospector) startHarvester(state file.State, offset int64) error {
}

state.Offset = offset
// Set state to "not" finished to indicate that a harvester is running
state.Finished = false

// Create harvester with state
h, err := p.createHarvester(state)
if err != nil {
return err
}

reader, err := h.Setup()
if err != nil {
return fmt.Errorf("Error setting up harvester: %s", err)
}

// State is directly updated and not through channel to make state update immidiate
// State is only updated after setup is completed successfully
err = p.updateState(input.NewEvent(state))
if err != nil {
return err
}

p.wg.Add(1)
// startHarvester is not run concurrently, but atomic operations are need for the decrementing of the counter
// inside the following go routine
Expand All @@ -191,8 +218,9 @@ func (p *Prospector) startHarvester(state file.State, offset int64) error {
atomic.AddUint64(&p.harvesterCounter, ^uint64(0))
p.wg.Done()
}()

// Starts harvester and picks the right type. In case type is not set, set it to defeault (log)
h.Harvest()
h.Harvest(reader)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why have harvester create reader in setup and finally pass it to Harvest? Isn't the reader stored in harvester?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently reader is not stored in the Harvester but we could.

}()

return nil
Expand Down
10 changes: 8 additions & 2 deletions filebeat/prospector/prospector_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,10 @@ func (p *ProspectorLog) Run() {
if state.Finished {
state.TTL = 0
event := input.NewEvent(state)
p.Prospector.harvesterChan <- event
err := p.Prospector.updateState(event)
if err != nil {
logp.Err("File cleanup state update error: %s", err)
}
logp.Debug("prospector", "Remove state for file as file removed: %s", state.Source)
} else {
logp.Debug("prospector", "State for file not removed because not finished: %s", state.Source)
Expand Down Expand Up @@ -232,7 +235,10 @@ func (p *ProspectorLog) harvestExistingFile(newState file.State, oldState file.S
// Update state because of file rotation
oldState.Source = newState.Source
event := input.NewEvent(oldState)
p.Prospector.harvesterChan <- event
err := p.Prospector.updateState(event)
if err != nil {
logp.Err("File rotation state update error: %s", err)
}

filesRenamed.Add(1)
} else {
Expand Down
8 changes: 7 additions & 1 deletion filebeat/prospector/prospector_stdin.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/elastic/beats/filebeat/harvester"
"github.com/elastic/beats/filebeat/input/file"
"github.com/elastic/beats/libbeat/logp"
)

type ProspectorStdin struct {
Expand Down Expand Up @@ -36,7 +37,12 @@ func (p *ProspectorStdin) Run() {

// Make sure stdin harvester is only started once
if !p.started {
go p.harvester.Harvest()
reader, err := p.harvester.Setup()
if err != nil {
logp.Err("Error starting stdin harvester: %s", err)
return
}
go p.harvester.Harvest(reader)
p.started = true
}
}
48 changes: 48 additions & 0 deletions filebeat/tests/system/test_load.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import logging
import logging.handlers
import json
import time
import unittest
from nose.plugins.skip import Skip, SkipTest
from nose.plugins.attrib import attr
Expand Down Expand Up @@ -146,3 +147,50 @@ def test_large_number_of_files(self):
assert len(data) == number_of_files


@unittest.skipUnless(LOAD_TESTS, "load test")
@attr('load')
def test_concurrent_harvesters(self):
"""
Test large number of files on startup if harvester overlap happens and would create too many events
"""
number_of_files = 5000
lines_per_file = 10

# Create content for each file
content = ""
for n in range(lines_per_file):
content += "Line " + str(n+1) + "\n"

os.mkdir(self.working_dir + "/log/")
testfile = self.working_dir + "/log/test"

for n in range(number_of_files):
with open(testfile + "-" + str(n+1), 'w') as f:
f.write(content)


self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/*",
rotate_every_kb=number_of_files * lines_per_file * 12 * 2,
)
filebeat = self.start_beat()

total_lines = number_of_files * lines_per_file

print total_lines
# wait until all lines are read
self.wait_until(
lambda: self.output_has(lines=total_lines),
max_timeout=120)

time.sleep(2)

# make sure not further lines were read
self.wait_until(
lambda: self.output_has(lines=total_lines),
max_timeout=120)

filebeat.check_kill_and_wait()

data = self.get_registry()
assert len(data) == number_of_files