Skip to content

Commit

Permalink
Really dumb chunk implementation.
Browse files Browse the repository at this point in the history
  • Loading branch information
tomwilkie committed Apr 16, 2018
1 parent c922e94 commit af93e20
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 2 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@
.pkg
.cache
cmd/distributor/distributor
cmd/ingester/ingester
45 changes: 45 additions & 0 deletions pkg/ingester/chunk.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package ingester

import (
"github.com/grafana/logish/pkg/logproto"
"github.com/pkg/errors"
)

const (
tmpNumEntries = 1024
)

var (
ErrChunkFull = errors.New("Chunk full")
ErrOutOfOrder = errors.New("Entry out of order")
)

type Chunk interface {
SpaceFor(*logproto.Entry) bool
Push(*logproto.Entry) error
}

func newChunk() Chunk {
return &dumbChunk{}
}

type dumbChunk struct {
entries []*logproto.Entry
}

func (c *dumbChunk) SpaceFor(_ *logproto.Entry) bool {
return len(c.entries) == tmpNumEntries
}

func (c *dumbChunk) Push(entry *logproto.Entry) error {
if len(c.entries) == tmpNumEntries {
return ErrChunkFull
}

if len(c.entries) > 0 && c.entries[len(c.entries)-1].Timestamp.After(entry.Timestamp) {
return ErrOutOfOrder
}

c.entries = append(c.entries, entry)
return nil
}
24 changes: 24 additions & 0 deletions pkg/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,37 @@ package ingester

import (
"context"
"sync"

"github.com/grafana/logish/pkg/logproto"
)

type instance struct {
streamsMtx sync.Mutex
streams map[string]*stream
}

func newInstance() *instance {
return &instance{
streams: map[string]*stream{},
}
}

func (i *instance) Push(ctx context.Context, req *logproto.WriteRequest) error {
i.streamsMtx.Lock()
defer i.streamsMtx.Unlock()

for _, s := range req.Streams {
stream, ok := i.streams[s.Labels]
if !ok {
stream = newStream()
i.streams[s.Labels] = stream
}

if err := stream.Push(ctx, s.Entries); err != nil {
return err
}
}

return nil
}
40 changes: 40 additions & 0 deletions pkg/ingester/stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package ingester

import (
"context"

"github.com/grafana/logish/pkg/logproto"
)

const tmpMaxChunks = 3

type stream struct {
// Newest chunk at chunks[0].
// Not thread-safe; assume accesses to this are locked by caller.
chunks []Chunk
}

func newStream() *stream {
return &stream{}
}

func (s *stream) Push(ctx context.Context, entries []logproto.Entry) error {
if len(s.chunks) == 0 {
s.chunks = append(s.chunks, newChunk())
}

for i := range entries {
if !s.chunks[0].SpaceFor(&entries[i]) {
s.chunks = append([]Chunk{newChunk()}, s.chunks...)
}
if err := s.chunks[0].Push(&entries[i]); err != nil {
return err
}
}

// Temp; until we implement flushing, only keep N chunks in memory.
if len(s.chunks) > tmpMaxChunks {
s.chunks = s.chunks[:tmpMaxChunks]
}
return nil
}
5 changes: 3 additions & 2 deletions pkg/logproto/logproto.proto
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ syntax = "proto3";
package logproto;

import "google/protobuf/timestamp.proto";
import "github.com/gogo/protobuf/gogoproto/gogo.proto";
import "google.golang.org/grpc/health/grpc_health_v1/health.proto";

service Aggregator {
Expand All @@ -19,10 +20,10 @@ message WriteResponse {

message Stream {
string labels = 1;
repeated Entry entries = 2;
repeated Entry entries = 2 [(gogoproto.nullable) = false];
}

message Entry {
google.protobuf.Timestamp timestamp = 1;
google.protobuf.Timestamp timestamp = 1 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
string line = 2;
}

0 comments on commit af93e20

Please sign in to comment.