Skip to content

Commit

Permalink
Merge branch 'master' into reduce-config-option
Browse files Browse the repository at this point in the history
  • Loading branch information
daixiang0 committed Jun 19, 2019
2 parents 2715306 + 7794e21 commit 274170f
Show file tree
Hide file tree
Showing 23 changed files with 2,152 additions and 170 deletions.
1 change: 1 addition & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ jobs:
name: Install k3s
command: |
curl -sfL https://get.k3s.io | sh -
sudo chmod 755 /etc/rancher/k3s/k3s.yaml
mkdir -p ~/.kube
cp /etc/rancher/k3s/k3s.yaml ~/.kube/config
- run:
Expand Down
4 changes: 2 additions & 2 deletions cmd/logcli/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ const (
queryPath = "/api/prom/query?query=%s&limit=%d&start=%d&end=%d&direction=%s&regexp=%s"
labelsPath = "/api/prom/label"
labelValuesPath = "/api/prom/label/%s/values"
tailPath = "/api/prom/tail?query=%s&regexp=%s"
tailPath = "/api/prom/tail?query=%s&regexp=%s&delay_for=%d"
)

func query(from, through time.Time, direction logproto.Direction) (*logproto.QueryResponse, error) {
Expand Down Expand Up @@ -98,7 +98,7 @@ func doRequest(path string, out interface{}) error {
}

func liveTailQueryConn() (*websocket.Conn, error) {
path := fmt.Sprintf(tailPath, url.QueryEscape(*queryStr), url.QueryEscape(*regexpStr))
path := fmt.Sprintf(tailPath, url.QueryEscape(*queryStr), url.QueryEscape(*regexpStr), *delayFor)
return wsConnect(path)
}

Expand Down
1 change: 1 addition & 0 deletions cmd/logcli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ var (
since = queryCmd.Flag("since", "Lookback window.").Default("1h").Duration()
forward = queryCmd.Flag("forward", "Scan forwards through logs.").Default("false").Bool()
tail = queryCmd.Flag("tail", "Tail the logs").Short('t').Default("false").Bool()
delayFor = queryCmd.Flag("delay-for", "Delay in tailing by number of seconds to accumulate logs").Default("0").Int()
noLabels = queryCmd.Flag("no-labels", "Do not print any labels").Default("false").Bool()
ignoreLabelsKey = queryCmd.Flag("exclude-label", "Exclude labels given the provided key during output.").Strings()
showLabelsKey = queryCmd.Flag("include-label", "Include labels given the provided key during output.").Strings()
Expand Down
6 changes: 6 additions & 0 deletions cmd/logcli/tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,5 +59,11 @@ func tailQuery() {
printLogEntry(entry.Timestamp, labels, entry.Line)
}
}
if len(tailReponse.DroppedEntries) != 0 {
log.Println("Server dropped following entries due to slow client")
for _, d := range tailReponse.DroppedEntries {
log.Println(d.Timestamp, d.Labels)
}
}
}
}
10 changes: 1 addition & 9 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package distributor
import (
"context"
"flag"
"hash/fnv"
"sync/atomic"
"time"

Expand Down Expand Up @@ -148,7 +147,7 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
continue
}
stream.Entries = entries
keys = append(keys, tokenFor(userID, stream.Labels))
keys = append(keys, util.TokenFor(userID, stream.Labels))
streams = append(streams, streamTracker{
stream: stream,
})
Expand Down Expand Up @@ -264,13 +263,6 @@ func (d *Distributor) sendSamplesErr(ctx context.Context, ingester ring.Ingester
return err
}

func tokenFor(userID, labels string) uint32 {
h := fnv.New32()
_, _ = h.Write([]byte(userID))
_, _ = h.Write([]byte(labels))
return h.Sum32()
}

// Check implements the grpc healthcheck
func (*Distributor) Check(_ context.Context, _ *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) {
return &grpc_health_v1.HealthCheckResponse{Status: grpc_health_v1.HealthCheckResponse_SERVING}, nil
Expand Down
39 changes: 37 additions & 2 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ingester

import (
"context"
"errors"
"flag"
"net/http"
"sync"
Expand Down Expand Up @@ -60,8 +61,9 @@ type Ingester struct {
lifecycler *ring.Lifecycler
store ChunkStore

done sync.WaitGroup
quit chan struct{}
done sync.WaitGroup
quit chan struct{}
quitting chan struct{}

// One queue per flush thread. Fingerprint is used to
// pick a queue.
Expand All @@ -82,6 +84,7 @@ func New(cfg Config, store ChunkStore) (*Ingester, error) {
store: store,
quit: make(chan struct{}),
flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes),
quitting: make(chan struct{}),
}

i.flushQueuesDone.Add(cfg.ConcurrentFlushes)
Expand Down Expand Up @@ -127,6 +130,14 @@ func (i *Ingester) Shutdown() {
i.lifecycler.Shutdown()
}

// Stopping helps cleaning up resources before actual shutdown
func (i *Ingester) Stopping() {
close(i.quitting)
for _, instance := range i.getInstances() {
instance.closeTailers()
}
}

// StopIncomingRequests implements ring.Lifecycler.
func (i *Ingester) StopIncomingRequests() {

Expand Down Expand Up @@ -230,3 +241,27 @@ func (i *Ingester) getInstances() []*instance {
}
return instances
}

// Tail logs matching given query
func (i *Ingester) Tail(req *logproto.TailRequest, queryServer logproto.Querier_TailServer) error {
select {
case <-i.quitting:
return errors.New("Ingester is stopping")
default:
}

instanceID, err := user.ExtractOrgID(queryServer.Context())
if err != nil {
return err
}

instance := i.getOrCreateInstance(instanceID)
tailer, err := newTailer(instanceID, req.Query, req.Regex, queryServer)
if err != nil {
return err
}

instance.addNewTailer(tailer)
tailer.loop()
return nil
}
53 changes: 52 additions & 1 deletion pkg/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ type instance struct {
streamsRemovedTotal prometheus.Counter

blockSize int
tailers map[uint32]*tailer
tailerMtx sync.RWMutex
}

func newInstance(instanceID string, blockSize int) *instance {
Expand All @@ -65,6 +67,7 @@ func newInstance(instanceID string, blockSize int) *instance {
streamsRemovedTotal: streamsRemovedTotal.WithLabelValues(instanceID),

blockSize: blockSize,
tailers: map[uint32]*tailer{},
}
}

Expand All @@ -87,6 +90,7 @@ func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error {
i.index.Add(labels, fp)
i.streams[fp] = stream
i.streamsCreatedTotal.Inc()
i.addTailersToNewStream(stream)
}

if err := stream.Push(ctx, s.Entries); err != nil {
Expand Down Expand Up @@ -126,7 +130,7 @@ func (i *instance) Query(req *logproto.QueryRequest, queryServer logproto.Querie
return sendBatches(iter, queryServer, req.Limit)
}

func (i *instance) Label(ctx context.Context, req *logproto.LabelRequest) (*logproto.LabelResponse, error) {
func (i *instance) Label(_ context.Context, req *logproto.LabelRequest) (*logproto.LabelResponse, error) {
var labels []string
if req.Values {
values := i.index.LabelValues(req.Name)
Expand Down Expand Up @@ -175,6 +179,53 @@ outer:
return iterators, nil
}

func (i *instance) addNewTailer(t *tailer) {
i.streamsMtx.RLock()
for _, stream := range i.streams {
if stream.matchesTailer(t) {
stream.addTailer(t)
}
}
i.streamsMtx.RUnlock()

i.tailerMtx.Lock()
defer i.tailerMtx.Unlock()
i.tailers[t.getID()] = t
}

func (i *instance) addTailersToNewStream(stream *stream) {
closedTailers := []uint32{}

i.tailerMtx.RLock()
for _, t := range i.tailers {
if t.isClosed() {
closedTailers = append(closedTailers, t.getID())
continue
}

if stream.matchesTailer(t) {
stream.addTailer(t)
}
}
i.tailerMtx.RUnlock()

if len(closedTailers) != 0 {
i.tailerMtx.Lock()
defer i.tailerMtx.Unlock()
for _, closedTailer := range closedTailers {
delete(i.tailers, closedTailer)
}
}
}

func (i *instance) closeTailers() {
i.tailerMtx.Lock()
defer i.tailerMtx.Unlock()
for _, t := range i.tailers {
t.close()
}
}

func isDone(ctx context.Context) bool {
select {
case <-ctx.Done():
Expand Down
55 changes: 52 additions & 3 deletions pkg/ingester/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ingester
import (
"context"
"net/http"
"sync"
"time"

"github.com/cortexproject/cortex/pkg/ingester/client"
Expand Down Expand Up @@ -49,6 +50,9 @@ type stream struct {
fp model.Fingerprint
labels []client.LabelAdapter
blockSize int

tailers map[uint32]*tailer
tailerMtx sync.RWMutex
}

type chunkDesc struct {
Expand All @@ -64,6 +68,7 @@ func newStream(fp model.Fingerprint, labels []client.LabelAdapter, blockSize int
fp: fp,
labels: labels,
blockSize: blockSize,
tailers: map[uint32]*tailer{},
}
}

Expand All @@ -75,6 +80,8 @@ func (s *stream) Push(_ context.Context, entries []logproto.Entry) error {
chunksCreatedTotal.Inc()
}

storedEntries := []logproto.Entry{}

// Don't fail on the first append error - if samples are sent out of order,
// we still want to append the later ones.
var appendErr error
Expand All @@ -93,10 +100,40 @@ func (s *stream) Push(_ context.Context, entries []logproto.Entry) error {
}
if err := chunk.chunk.Append(&entries[i]); err != nil {
appendErr = err
} else {
// send only stored entries to tailers
storedEntries = append(storedEntries, entries[i])
}
chunk.lastUpdated = time.Now()
}

if len(storedEntries) != 0 {
go func() {
stream := logproto.Stream{Labels: client.FromLabelAdaptersToLabels(s.labels).String(), Entries: storedEntries}

closedTailers := []uint32{}

s.tailerMtx.RLock()
for _, tailer := range s.tailers {
if tailer.isClosed() {
closedTailers = append(closedTailers, tailer.getID())
continue
}
tailer.send(stream)
}
s.tailerMtx.RUnlock()

if len(closedTailers) != 0 {
s.tailerMtx.Lock()
defer s.tailerMtx.Unlock()

for _, closedTailerID := range closedTailers {
delete(s.tailers, closedTailerID)
}
}
}()
}

if appendErr == chunkenc.ErrOutOfOrder {
return httpgrpc.Errorf(http.StatusBadRequest, "entry out of order for stream: %s", client.FromLabelAdaptersToLabels(s.labels).String())
}
Expand All @@ -108,12 +145,12 @@ func (s *stream) Push(_ context.Context, entries []logproto.Entry) error {
func (s *stream) Iterator(from, through time.Time, direction logproto.Direction) (iter.EntryIterator, error) {
iterators := make([]iter.EntryIterator, 0, len(s.chunks))
for _, c := range s.chunks {
iter, err := c.chunk.Iterator(from, through, direction)
itr, err := c.chunk.Iterator(from, through, direction)
if err != nil {
return nil, err
}
if iter != nil {
iterators = append(iterators, iter)
if itr != nil {
iterators = append(iterators, itr)
}
}

Expand All @@ -125,3 +162,15 @@ func (s *stream) Iterator(from, through time.Time, direction logproto.Direction)

return iter.NewNonOverlappingIterator(iterators, client.FromLabelAdaptersToLabels(s.labels).String()), nil
}

func (s *stream) addTailer(t *tailer) {
s.tailerMtx.Lock()
defer s.tailerMtx.Unlock()

s.tailers[t.getID()] = t
}

func (s *stream) matchesTailer(t *tailer) bool {
metric := client.FromLabelAdaptersToMetric(s.labels)
return t.isWatchingLabels(metric)
}
Loading

0 comments on commit 274170f

Please sign in to comment.