Skip to content

Commit

Permalink
Added tail length limit to limit duration of live tailing of logs to …
Browse files Browse the repository at this point in the history
…1 hour (#756)

* Added tail length limit to limit duration of live tailing of logs to 1 hour

* Some code refactoring suggested from PR review for live tailing duration limit

* Fixed error messages in live tailing of logs

* Fixed lint errors
  • Loading branch information
sandeepsukhani authored and cyriltovena committed Jul 17, 2019
1 parent c08ec79 commit 3a2b569
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 15 deletions.
4 changes: 3 additions & 1 deletion pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ import (

// Config for a querier.
type Config struct {
TailMaxDuration time.Duration `yaml:"tail_max_duration"`
}

// RegisterFlags register flags.
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.TailMaxDuration, "querier.tail-max-duration", 1*time.Hour, "Limit the duration for which live tailing request would be served")
}

// Querier handlers queries.
Expand Down Expand Up @@ -262,7 +264,7 @@ func (q *Querier) Tail(ctx context.Context, req *logproto.TailRequest) (*Tailer,
return q.queryDroppedStreams(ctx, req, from, to, labels)
}, func(connectedIngestersAddr []string) (map[string]logproto.Querier_TailClient, error) {
return q.tailDisconnectedIngesters(ctx, req, connectedIngestersAddr)
}), nil
}, q.cfg.TailMaxDuration), nil
}

// passed to tailer for querying dropped streams
Expand Down
39 changes: 25 additions & 14 deletions pkg/querier/tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,13 @@ type Tailer struct {
querierTailClients map[string]logproto.Querier_TailClient // addr -> grpc clients for tailing logs from ingesters
querierTailClientsMtx sync.Mutex

stopped bool
blocked bool
blockedMtx sync.RWMutex
delayFor time.Duration
responseChan chan *TailResponse
closeErrChan chan error
stopped bool
blocked bool
blockedMtx sync.RWMutex
delayFor time.Duration
responseChan chan *TailResponse
closeErrChan chan error
tailMaxDuration time.Duration

// when tail client is slow, drop entry and store its details in droppedEntries to notify client
droppedEntries []droppedEntry
Expand All @@ -103,22 +104,31 @@ func (t *Tailer) readTailClients() {
// keeps sending oldest entry to responseChan. If channel is blocked drop the entry
// When channel is unblocked, send details of dropped entries with current entry
func (t *Tailer) loop() {
ticker := time.NewTicker(checkConnectionsWithIngestersPeriod)
defer ticker.Stop()
checkConnectionTicker := time.NewTicker(checkConnectionsWithIngestersPeriod)
defer checkConnectionTicker.Stop()

tailMaxDurationTicker := time.NewTicker(t.tailMaxDuration)
defer tailMaxDurationTicker.Stop()

tailResponse := new(TailResponse)

for {
if t.stopped {
break
return
}

select {
case <-ticker.C:
case <-checkConnectionTicker.C:
// Try to reconnect dropped ingesters and connect to new ingesters
if err := t.checkIngesterConnections(); err != nil {
level.Error(util.Logger).Log("Error reconnecting to disconnected ingesters", fmt.Sprintf("%v", err))
level.Error(util.Logger).Log("msg", "Error reconnecting to disconnected ingesters", "err", err)
}
case <-tailMaxDurationTicker.C:
if err := t.close(); err != nil {
level.Error(util.Logger).Log("msg", "Error closing Tailer", "err", err)
}
t.closeErrChan <- errors.New("reached tail max duration limit")
return
default:
}

Expand All @@ -134,8 +144,8 @@ func (t *Tailer) loop() {
if err := t.close(); err != nil {
level.Error(util.Logger).Log("Error closing Tailer", fmt.Sprintf("%v", err))
}
t.closeErrChan <- errors.New("All ingesters closed the connection")
break
t.closeErrChan <- errors.New("all ingesters closed the connection")
return
}
time.Sleep(nextEntryWait)
continue
Expand Down Expand Up @@ -316,7 +326,7 @@ func (t *Tailer) getCloseErrorChan() <-chan error {

func newTailer(delayFor time.Duration, querierTailClients map[string]logproto.Querier_TailClient,
queryDroppedStreams func(from, to time.Time, labels string) (iter.EntryIterator, error),
tailDisconnectedIngesters func([]string) (map[string]logproto.Querier_TailClient, error)) *Tailer {
tailDisconnectedIngesters func([]string) (map[string]logproto.Querier_TailClient, error), tailMaxDuration time.Duration) *Tailer {
t := Tailer{
openStreamIterator: iter.NewHeapIterator([]iter.EntryIterator{}, logproto.FORWARD),
//droppedStreamsIterator: &droppedStreamsIterator{},
Expand All @@ -326,6 +336,7 @@ func newTailer(delayFor time.Duration, querierTailClients map[string]logproto.Qu
responseChan: make(chan *TailResponse, bufferSizeForTailResponse),
closeErrChan: make(chan error),
tailDisconnectedIngesters: tailDisconnectedIngesters,
tailMaxDuration: tailMaxDuration,
}

t.readTailClients()
Expand Down

0 comments on commit 3a2b569

Please sign in to comment.