From af93e2019cb78f555991a54068f4ba4fe2d81af7 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Mon, 16 Apr 2018 11:25:39 +0100 Subject: [PATCH] Really dumb chunk implementation. --- .gitignore | 1 + pkg/ingester/chunk.go | 45 +++++++++++++++++++++++++++++++++++++ pkg/ingester/instance.go | 24 ++++++++++++++++++++ pkg/ingester/stream.go | 40 +++++++++++++++++++++++++++++++++ pkg/logproto/logproto.proto | 5 +++-- 5 files changed, 113 insertions(+), 2 deletions(-) create mode 100644 pkg/ingester/chunk.go create mode 100644 pkg/ingester/stream.go diff --git a/.gitignore b/.gitignore index 9189c00d7680..d91bdd4ed926 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,4 @@ .pkg .cache cmd/distributor/distributor +cmd/ingester/ingester diff --git a/pkg/ingester/chunk.go b/pkg/ingester/chunk.go new file mode 100644 index 000000000000..7c9d635d2e09 --- /dev/null +++ b/pkg/ingester/chunk.go @@ -0,0 +1,45 @@ +package ingester + +import ( + "github.com/grafana/logish/pkg/logproto" + "github.com/pkg/errors" +) + +const ( + tmpNumEntries = 1024 +) + +var ( + ErrChunkFull = errors.New("Chunk full") + ErrOutOfOrder = errors.New("Entry out of order") +) + +type Chunk interface { + SpaceFor(*logproto.Entry) bool + Push(*logproto.Entry) error +} + +func newChunk() Chunk { + return &dumbChunk{} +} + +type dumbChunk struct { + entries []*logproto.Entry +} + +func (c *dumbChunk) SpaceFor(_ *logproto.Entry) bool { + return len(c.entries) == tmpNumEntries +} + +func (c *dumbChunk) Push(entry *logproto.Entry) error { + if len(c.entries) == tmpNumEntries { + return ErrChunkFull + } + + if len(c.entries) > 0 && c.entries[len(c.entries)-1].Timestamp.After(entry.Timestamp) { + return ErrOutOfOrder + } + + c.entries = append(c.entries, entry) + return nil +} diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index b77428083069..37582063826f 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -2,13 +2,37 @@ package ingester import ( "context" + "sync" "github.com/grafana/logish/pkg/logproto" ) type instance struct { + streamsMtx sync.Mutex + streams map[string]*stream +} + +func newInstance() *instance { + return &instance{ + streams: map[string]*stream{}, + } } func (i *instance) Push(ctx context.Context, req *logproto.WriteRequest) error { + i.streamsMtx.Lock() + defer i.streamsMtx.Unlock() + + for _, s := range req.Streams { + stream, ok := i.streams[s.Labels] + if !ok { + stream = newStream() + i.streams[s.Labels] = stream + } + + if err := stream.Push(ctx, s.Entries); err != nil { + return err + } + } + return nil } diff --git a/pkg/ingester/stream.go b/pkg/ingester/stream.go new file mode 100644 index 000000000000..8d3a40ace753 --- /dev/null +++ b/pkg/ingester/stream.go @@ -0,0 +1,40 @@ +package ingester + +import ( + "context" + + "github.com/grafana/logish/pkg/logproto" +) + +const tmpMaxChunks = 3 + +type stream struct { + // Newest chunk at chunks[0]. + // Not thread-safe; assume accesses to this are locked by caller. + chunks []Chunk +} + +func newStream() *stream { + return &stream{} +} + +func (s *stream) Push(ctx context.Context, entries []logproto.Entry) error { + if len(s.chunks) == 0 { + s.chunks = append(s.chunks, newChunk()) + } + + for i := range entries { + if !s.chunks[0].SpaceFor(&entries[i]) { + s.chunks = append([]Chunk{newChunk()}, s.chunks...) + } + if err := s.chunks[0].Push(&entries[i]); err != nil { + return err + } + } + + // Temp; until we implement flushing, only keep N chunks in memory. + if len(s.chunks) > tmpMaxChunks { + s.chunks = s.chunks[:tmpMaxChunks] + } + return nil +} diff --git a/pkg/logproto/logproto.proto b/pkg/logproto/logproto.proto index 774c42293e60..51abd5771454 100644 --- a/pkg/logproto/logproto.proto +++ b/pkg/logproto/logproto.proto @@ -3,6 +3,7 @@ syntax = "proto3"; package logproto; import "google/protobuf/timestamp.proto"; +import "github.com/gogo/protobuf/gogoproto/gogo.proto"; import "google.golang.org/grpc/health/grpc_health_v1/health.proto"; service Aggregator { @@ -19,10 +20,10 @@ message WriteResponse { message Stream { string labels = 1; - repeated Entry entries = 2; + repeated Entry entries = 2 [(gogoproto.nullable) = false]; } message Entry { - google.protobuf.Timestamp timestamp = 1; + google.protobuf.Timestamp timestamp = 1 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false]; string line = 2; }