-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix concurrent harvesters #2541
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,7 @@ | ||
package prospector | ||
|
||
import ( | ||
"errors" | ||
"expvar" | ||
"fmt" | ||
"sync" | ||
|
@@ -116,18 +117,10 @@ func (p *Prospector) Run() { | |
logp.Info("Prospector channel stopped") | ||
return | ||
case event := <-p.harvesterChan: | ||
// Add ttl if cleanOlder is enabled | ||
if p.config.CleanInactive > 0 { | ||
event.State.TTL = p.config.CleanInactive | ||
} | ||
|
||
ok := p.outlet.OnEvent(event) | ||
if !ok { | ||
logp.Info("Prospector outlet closed") | ||
err := p.updateState(event) | ||
if err != nil { | ||
return | ||
} | ||
|
||
p.states.Update(event.State) | ||
} | ||
} | ||
}() | ||
|
@@ -147,6 +140,25 @@ func (p *Prospector) Run() { | |
} | ||
} | ||
|
||
// updateState updates the prospector state and forwards the event to the spooler | ||
// All state updates done by the prospector itself are synchronous to make sure not states are overwritten | ||
func (p *Prospector) updateState(event *input.Event) error { | ||
|
||
// Add ttl if cleanOlder is enabled | ||
if p.config.CleanInactive > 0 { | ||
event.State.TTL = p.config.CleanInactive | ||
} | ||
|
||
ok := p.outlet.OnEvent(event) | ||
if !ok { | ||
logp.Info("Prospector outlet closed") | ||
return errors.New("prospector outlet closed") | ||
} | ||
|
||
p.states.Update(event.State) | ||
return nil | ||
} | ||
|
||
func (p *Prospector) Stop() { | ||
logp.Info("Stopping Prospector") | ||
close(p.done) | ||
|
@@ -176,12 +188,27 @@ func (p *Prospector) startHarvester(state file.State, offset int64) error { | |
} | ||
|
||
state.Offset = offset | ||
// Set state to "not" finished to indicate that a harvester is running | ||
state.Finished = false | ||
|
||
// Create harvester with state | ||
h, err := p.createHarvester(state) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
reader, err := h.Setup() | ||
if err != nil { | ||
return fmt.Errorf("Error setting up harvester: %s", err) | ||
} | ||
|
||
// State is directly updated and not through channel to make state update immidiate | ||
// State is only updated after setup is completed successfully | ||
err = p.updateState(input.NewEvent(state)) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
p.wg.Add(1) | ||
// startHarvester is not run concurrently, but atomic operations are need for the decrementing of the counter | ||
// inside the following go routine | ||
|
@@ -191,8 +218,9 @@ func (p *Prospector) startHarvester(state file.State, offset int64) error { | |
atomic.AddUint64(&p.harvesterCounter, ^uint64(0)) | ||
p.wg.Done() | ||
}() | ||
|
||
// Starts harvester and picks the right type. In case type is not set, set it to defeault (log) | ||
h.Harvest() | ||
h.Harvest(reader) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why have harvester create reader in setup and finally pass it to Harvest? Isn't the reader stored in harvester? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Currently reader is not stored in the Harvester but we could. |
||
}() | ||
|
||
return nil | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we want to filter out 'empty' events here or are empty events required for some state update somehwere else?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
state updates will also be needed for registrar.