Skip to content

Commit

Permalink
Move iterators to own package; check in protos and parser to make CLI…
Browse files Browse the repository at this point in the history
… go gettable.

Signed-off-by: Tom Wilkie <tom.wilkie@gmail.com>
  • Loading branch information
tomwilkie committed Jun 25, 2018
1 parent c1aa93d commit 248f9e2
Show file tree
Hide file tree
Showing 11 changed files with 2,819 additions and 27 deletions.
2 changes: 0 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
.uptodate
*.pb.go
!vendor/**/*.pb.go
vendor/github.com/weaveworks/cortex/pkg/ingester/client/cortex.pb.go
vendor/github.com/weaveworks/cortex/pkg/ring/ring.pb.go
pkg/parser/labels.go
.pkg
.cache
cmd/distributor/distributor
Expand Down
4 changes: 2 additions & 2 deletions cmd/logcli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ import (
"github.com/prometheus/prometheus/pkg/labels"
kingpin "gopkg.in/alecthomas/kingpin.v2"

"github.com/grafana/logish/pkg/iter"
"github.com/grafana/logish/pkg/logproto"
"github.com/grafana/logish/pkg/parser"
"github.com/grafana/logish/pkg/querier"
)

var (
Expand Down Expand Up @@ -108,7 +108,7 @@ func query() {
if *forward {
d = logproto.FORWARD
}
iter := querier.NewQueryResponseIterator(&queryResponse, d)
iter := iter.NewQueryResponseIterator(&queryResponse, d)
for iter.Next() {
ls := labelsCache[iter.Labels()]
ls = subtract(commonLabels, ls)
Expand Down
6 changes: 3 additions & 3 deletions pkg/ingester/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import (

"github.com/pkg/errors"

"github.com/grafana/logish/pkg/iter"
"github.com/grafana/logish/pkg/logproto"
"github.com/grafana/logish/pkg/querier"
)

const (
Expand All @@ -24,7 +24,7 @@ type Chunk interface {
Bounds() (time.Time, time.Time)
SpaceFor(*logproto.Entry) bool
Push(*logproto.Entry) error
Iterator(from, through time.Time, direction logproto.Direction) querier.EntryIterator
Iterator(from, through time.Time, direction logproto.Direction) iter.EntryIterator
Size() int
}

Expand Down Expand Up @@ -66,7 +66,7 @@ func (c *dumbChunk) Size() int {

// Returns an iterator that goes from _most_ recent to _least_ recent (ie,
// backwards).
func (c *dumbChunk) Iterator(from, through time.Time, direction logproto.Direction) querier.EntryIterator {
func (c *dumbChunk) Iterator(from, through time.Time, direction logproto.Direction) iter.EntryIterator {
i := sort.Search(len(c.entries), func(i int) bool {
return !from.After(c.entries[i].Timestamp)
})
Expand Down
6 changes: 3 additions & 3 deletions pkg/ingester/chunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ import (
"testing"
"time"

"github.com/grafana/logish/pkg/iter"
"github.com/grafana/logish/pkg/logproto"
"github.com/grafana/logish/pkg/querier"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func testIteratorForward(t *testing.T, iter querier.EntryIterator, from, through int64) {
func testIteratorForward(t *testing.T, iter iter.EntryIterator, from, through int64) {
i := from
for iter.Next() {
entry := iter.Entry()
Expand All @@ -24,7 +24,7 @@ func testIteratorForward(t *testing.T, iter querier.EntryIterator, from, through
assert.NoError(t, iter.Error())
}

func testIteratorBackward(t *testing.T, iter querier.EntryIterator, from, through int64) {
func testIteratorBackward(t *testing.T, iter iter.EntryIterator, from, through int64) {
i := through - 1
for iter.Next() {
entry := iter.Entry()
Expand Down
9 changes: 5 additions & 4 deletions pkg/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/pkg/errors"

"github.com/grafana/logish/pkg/iter"
"github.com/grafana/logish/pkg/logproto"
"github.com/grafana/logish/pkg/parser"
"github.com/grafana/logish/pkg/querier"
Expand Down Expand Up @@ -67,7 +68,7 @@ func (i *instance) Query(req *logproto.QueryRequest, queryServer logproto.Querie
// TODO: lock smell
i.streamsMtx.Lock()
ids := i.index.lookup(matchers)
iterators := make([]querier.EntryIterator, len(ids))
iterators := make([]iter.EntryIterator, len(ids))
for j := range ids {
stream, ok := i.streams[ids[j]]
if !ok {
Expand All @@ -78,12 +79,12 @@ func (i *instance) Query(req *logproto.QueryRequest, queryServer logproto.Querie
}
i.streamsMtx.Unlock()

iterator := querier.NewHeapIterator(iterators, req.Direction)
iterator := iter.NewHeapIterator(iterators, req.Direction)
defer iterator.Close()

if req.Regex != "" {
var err error
iterator, err = querier.NewRegexpFilter(req.Regex, iterator)
iterator, err = iter.NewRegexpFilter(req.Regex, iterator)
if err != nil {
return err
}
Expand All @@ -109,7 +110,7 @@ func isDone(ctx context.Context) bool {
}
}

func sendBatches(i querier.EntryIterator, queryServer logproto.Querier_QueryServer, limit uint32) error {
func sendBatches(i iter.EntryIterator, queryServer logproto.Querier_QueryServer, limit uint32) error {
sent := uint32(0)
for sent < limit && !isDone(queryServer.Context()) {
batch, batchSize, err := querier.ReadBatch(i, util.MinUint32(queryBatchSize, limit-sent))
Expand Down
10 changes: 5 additions & 5 deletions pkg/ingester/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (

"github.com/prometheus/prometheus/pkg/labels"

"github.com/grafana/logish/pkg/iter"
"github.com/grafana/logish/pkg/logproto"
"github.com/grafana/logish/pkg/querier"
)

const tmpMaxChunks = 3
Expand Down Expand Up @@ -47,8 +47,8 @@ func (s *stream) Push(ctx context.Context, entries []logproto.Entry) error {
}

// Returns an iterator.
func (s *stream) Iterator(from, through time.Time, direction logproto.Direction) querier.EntryIterator {
iterators := make([]querier.EntryIterator, 0, len(s.chunks))
func (s *stream) Iterator(from, through time.Time, direction logproto.Direction) iter.EntryIterator {
iterators := make([]iter.EntryIterator, 0, len(s.chunks))
for _, c := range s.chunks {
iter := c.Iterator(from, through, direction)
if iter != nil {
Expand All @@ -71,8 +71,8 @@ func (s *stream) Iterator(from, through time.Time, direction logproto.Direction)
type nonOverlappingIterator struct {
labels string
i int
iterators []querier.EntryIterator
curr querier.EntryIterator
iterators []iter.EntryIterator
curr iter.EntryIterator
}

func (i *nonOverlappingIterator) Next() bool {
Expand Down
4 changes: 2 additions & 2 deletions pkg/querier/iterator.go → pkg/iter/iterator.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package querier
package iter

import (
"container/heap"
Expand Down Expand Up @@ -201,7 +201,7 @@ type queryClientIterator struct {
curr EntryIterator
}

func newQueryClientIterator(client logproto.Querier_QueryClient, direction logproto.Direction) EntryIterator {
func NewQueryClientIterator(client logproto.Querier_QueryClient, direction logproto.Direction) EntryIterator {
return &queryClientIterator{
client: client,
direction: direction,
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/iterator_test.go → pkg/iter/iterator_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package querier
package iter

import (
"testing"
Expand Down
Loading

0 comments on commit 248f9e2

Please sign in to comment.