Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Re-license and export pkg/ingester WAL code to be used in Promtail's WAL #8315

Merged
merged 10 commits into from
Jan 30, 2023
Merged
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@

##### Changes

* [8315](https://github.com/grafana/loki/pull/8315) **thepalbi** Relicense and export `pkg/ingester` WAL code to be used in Promtail's WAL.

#### Promtail

##### Enhancements
Expand Down
1 change: 1 addition & 0 deletions LICENSING.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ The following folders and their subfolders are licensed under Apache-2.0:

```
clients/
pkg/ingester/wal
pkg/logproto/
pkg/loghttp/
pkg/logqlmodel/
Expand Down
1 change: 1 addition & 0 deletions clients/cmd/promtail/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"os"
"reflect"
"sync"

// embed time zone data
_ "time/tzdata"

Expand Down
9 changes: 5 additions & 4 deletions clients/pkg/promtail/targets/gcplog/pull_target.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
package gcplog

import (
"cloud.google.com/go/pubsub"
"context"
"io"
"sync"
"time"

"cloud.google.com/go/pubsub"
thepalbi marked this conversation as resolved.
Show resolved Hide resolved
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/backoff"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/relabel"
"google.golang.org/api/option"
"io"
"sync"
"time"

"github.com/grafana/loki/clients/pkg/promtail/api"
"github.com/grafana/loki/clients/pkg/promtail/scrapeconfig"
Expand Down
5 changes: 3 additions & 2 deletions clients/pkg/promtail/targets/gcplog/pull_target_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ package gcplog

import (
"context"
"github.com/grafana/dskit/backoff"
"github.com/pkg/errors"
"io"
"testing"
"time"

"github.com/grafana/dskit/backoff"
"github.com/pkg/errors"

"cloud.google.com/go/pubsub"
"github.com/go-kit/log"
"github.com/grafana/loki/clients/pkg/promtail/client/fake"
Expand Down
2 changes: 1 addition & 1 deletion cmd/loki-canary/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func main() {
}

if *addr == "" {
_, _ = fmt.Fprintf(os.Stderr, "Must specify a Loki address with -addr or set the environemnt variable LOKI_ADDRESS\n")
_, _ = fmt.Fprintf(os.Stderr, "Must specify a Loki address with -addr or set the environment variable LOKI_ADDRESS\n")
os.Exit(1)
}

Expand Down
10 changes: 6 additions & 4 deletions pkg/ingester/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"strconv"
"time"

"github.com/grafana/loki/pkg/ingester/wal"
thepalbi marked this conversation as resolved.
Show resolved Hide resolved

"github.com/dustin/go-humanize"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand Down Expand Up @@ -124,15 +126,15 @@ func decodeCheckpointRecord(rec []byte, s *Series) error {
cpy := make([]byte, len(rec))
copy(cpy, rec)

switch RecordType(cpy[0]) {
case CheckpointRecord:
switch wal.RecordType(cpy[0]) {
case wal.CheckpointRecord:
return proto.Unmarshal(cpy[1:], s)
default:
return errors.Errorf("unexpected record type: %d", rec[0])
}
}

func encodeWithTypeHeader(m *Series, typ RecordType, buf []byte) ([]byte, error) {
func encodeWithTypeHeader(m *Series, typ wal.RecordType, buf []byte) ([]byte, error) {
size := m.Size()
if cap(buf) < size+1 {
buf = make([]byte, size+1)
Expand Down Expand Up @@ -370,7 +372,7 @@ func (w *WALCheckpointWriter) Write(s *Series) error {
size := s.Size() + 1 // +1 for header
buf := recordBufferPool.Get(size).([]byte)[:size]

b, err := encodeWithTypeHeader(s, CheckpointRecord, buf)
b, err := encodeWithTypeHeader(s, wal.CheckpointRecord, buf)
if err != nil {
return err
}
Expand Down
196 changes: 3 additions & 193 deletions pkg/ingester/encoding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,205 +5,15 @@ import (
"testing"
"time"

"github.com/grafana/loki/pkg/ingester/wal"
thepalbi marked this conversation as resolved.
Show resolved Hide resolved

"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/tsdb/record"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/logproto"
)

func Test_Encoding_Series(t *testing.T) {
record := &WALRecord{
entryIndexMap: make(map[uint64]int),
UserID: "123",
Series: []record.RefSeries{
{
Ref: 456,
Labels: labels.FromMap(map[string]string{
"foo": "bar",
"bazz": "buzz",
}),
},
{
Ref: 789,
Labels: labels.FromMap(map[string]string{
"abc": "123",
"def": "456",
}),
},
},
}

buf := record.encodeSeries(nil)

decoded := recordPool.GetRecord()

err := decodeWALRecord(buf, decoded)
require.Nil(t, err)

// Since we use a pool, there can be subtle differentiations between nil slices and len(0) slices.
// Both are valid, so check length.
require.Equal(t, 0, len(decoded.RefEntries))
decoded.RefEntries = nil
require.Equal(t, record, decoded)
}

func Test_Encoding_Entries(t *testing.T) {
for _, tc := range []struct {
desc string
rec *WALRecord
version RecordType
}{
{
desc: "v1",
rec: &WALRecord{
entryIndexMap: make(map[uint64]int),
UserID: "123",
RefEntries: []RefEntries{
{
Ref: 456,
Entries: []logproto.Entry{
{
Timestamp: time.Unix(1000, 0),
Line: "first",
},
{
Timestamp: time.Unix(2000, 0),
Line: "second",
},
},
},
{
Ref: 789,
Entries: []logproto.Entry{
{
Timestamp: time.Unix(3000, 0),
Line: "third",
},
{
Timestamp: time.Unix(4000, 0),
Line: "fourth",
},
},
},
},
},
version: WALRecordEntriesV1,
},
{
desc: "v2",
rec: &WALRecord{
entryIndexMap: make(map[uint64]int),
UserID: "123",
RefEntries: []RefEntries{
{
Ref: 456,
Counter: 1, // v2 uses counter for WAL replay
Entries: []logproto.Entry{
{
Timestamp: time.Unix(1000, 0),
Line: "first",
},
{
Timestamp: time.Unix(2000, 0),
Line: "second",
},
},
},
{
Ref: 789,
Counter: 2, // v2 uses counter for WAL replay
Entries: []logproto.Entry{
{
Timestamp: time.Unix(3000, 0),
Line: "third",
},
{
Timestamp: time.Unix(4000, 0),
Line: "fourth",
},
},
},
},
},
version: WALRecordEntriesV2,
},
} {
decoded := recordPool.GetRecord()
buf := tc.rec.encodeEntries(tc.version, nil)
err := decodeWALRecord(buf, decoded)
require.Nil(t, err)
require.Equal(t, tc.rec, decoded)

}
}

func Benchmark_EncodeEntries(b *testing.B) {
var entries []logproto.Entry
for i := int64(0); i < 10000; i++ {
entries = append(entries, logproto.Entry{
Timestamp: time.Unix(0, i),
Line: fmt.Sprintf("long line with a lot of data like a log %d", i),
})
}
record := &WALRecord{
entryIndexMap: make(map[uint64]int),
UserID: "123",
RefEntries: []RefEntries{
{
Ref: 456,
Entries: entries,
},
{
Ref: 789,
Entries: entries,
},
},
}
b.ReportAllocs()
b.ResetTimer()
buf := recordPool.GetBytes()[:0]
defer recordPool.PutBytes(buf)

for n := 0; n < b.N; n++ {
record.encodeEntries(CurrentEntriesRec, buf)
}
}

func Benchmark_DecodeWAL(b *testing.B) {
var entries []logproto.Entry
for i := int64(0); i < 10000; i++ {
entries = append(entries, logproto.Entry{
Timestamp: time.Unix(0, i),
Line: fmt.Sprintf("long line with a lot of data like a log %d", i),
})
}
record := &WALRecord{
entryIndexMap: make(map[uint64]int),
UserID: "123",
RefEntries: []RefEntries{
{
Ref: 456,
Entries: entries,
},
{
Ref: 789,
Entries: entries,
},
},
}

buf := record.encodeEntries(CurrentEntriesRec, nil)
rec := recordPool.GetRecord()
b.ReportAllocs()
b.ResetTimer()

for n := 0; n < b.N; n++ {
require.NoError(b, decodeWALRecord(buf, rec))
}
}

func fillChunk(t testing.TB, c chunkenc.Chunk) {
t.Helper()
var i int64
Expand Down Expand Up @@ -338,7 +148,7 @@ func Test_EncodingCheckpoint(t *testing.T) {
},
}

b, err := encodeWithTypeHeader(s, CheckpointRecord, nil)
b, err := encodeWithTypeHeader(s, wal.CheckpointRecord, nil)
require.Nil(t, err)

out := &Series{}
Expand Down
8 changes: 5 additions & 3 deletions pkg/ingester/flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"testing"
"time"

"github.com/grafana/loki/pkg/ingester/wal"
thepalbi marked this conversation as resolved.
Show resolved Hide resolved

gokitlog "github.com/go-kit/log"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/kv"
Expand Down Expand Up @@ -65,9 +67,9 @@ func TestChunkFlushingShutdown(t *testing.T) {

type fullWAL struct{}

func (fullWAL) Log(_ *WALRecord) error { return &os.PathError{Err: syscall.ENOSPC} }
func (fullWAL) Start() {}
func (fullWAL) Stop() error { return nil }
func (fullWAL) Log(_ *wal.Record) error { return &os.PathError{Err: syscall.ENOSPC} }
func (fullWAL) Start() {}
func (fullWAL) Stop() error { return nil }

func Benchmark_FlushLoop(b *testing.B) {
var (
Expand Down
6 changes: 4 additions & 2 deletions pkg/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"syscall"
"time"

"github.com/grafana/loki/pkg/ingester/wal"

"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -228,7 +230,7 @@ func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error {
return appendErr
}

func (i *instance) createStream(pushReqStream logproto.Stream, record *WALRecord) (*stream, error) {
func (i *instance) createStream(pushReqStream logproto.Stream, record *wal.Record) (*stream, error) {
// record is only nil when replaying WAL. We don't want to drop data when replaying a WAL after
// reducing the stream limits, for instance.
var err error
Expand Down Expand Up @@ -315,7 +317,7 @@ func (i *instance) createStreamByFP(ls labels.Labels, fp model.Fingerprint) *str
// getOrCreateStream returns the stream or creates it.
// It's safe to use this function if returned stream is not consistency sensitive to streamsMap(e.g. ingesterRecoverer),
// otherwise use streamsMap.LoadOrStoreNew with locking stream's chunkMtx inside.
func (i *instance) getOrCreateStream(pushReqStream logproto.Stream, record *WALRecord) (*stream, error) {
func (i *instance) getOrCreateStream(pushReqStream logproto.Stream, record *wal.Record) (*stream, error) {
s, _, err := i.streams.LoadOrStoreNew(pushReqStream.Labels, func() (*stream, error) {
return i.createStream(pushReqStream, record)
}, nil)
Expand Down
Loading