Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Re-license and export pkg/ingester WAL code to be used in Promtail's WAL #8315

Merged
merged 10 commits into from
Jan 30, 2023
Merged
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@

##### Changes

* [8315](https://github.com/grafana/loki/pull/8315) **thepalbi** Relicense and export `pkg/ingester` WAL code to be used in Promtail's WAL.

#### Promtail

##### Enhancements
Expand Down
1 change: 1 addition & 0 deletions LICENSING.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ The following folders and their subfolders are licensed under Apache-2.0:

```
clients/
pkg/ingester/wal
pkg/logproto/
pkg/loghttp/
pkg/logqlmodel/
Expand Down
10 changes: 6 additions & 4 deletions pkg/ingester/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"strconv"
"time"

"github.com/grafana/loki/pkg/ingester/wal"
thepalbi marked this conversation as resolved.
Show resolved Hide resolved

"github.com/dustin/go-humanize"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand Down Expand Up @@ -124,15 +126,15 @@ func decodeCheckpointRecord(rec []byte, s *Series) error {
cpy := make([]byte, len(rec))
copy(cpy, rec)

switch RecordType(cpy[0]) {
case CheckpointRecord:
switch wal.RecordType(cpy[0]) {
case wal.CheckpointRecord:
return proto.Unmarshal(cpy[1:], s)
default:
return errors.Errorf("unexpected record type: %d", rec[0])
}
}

func encodeWithTypeHeader(m *Series, typ RecordType, buf []byte) ([]byte, error) {
func encodeWithTypeHeader(m *Series, typ wal.RecordType, buf []byte) ([]byte, error) {
size := m.Size()
if cap(buf) < size+1 {
buf = make([]byte, size+1)
Expand Down Expand Up @@ -370,7 +372,7 @@ func (w *WALCheckpointWriter) Write(s *Series) error {
size := s.Size() + 1 // +1 for header
buf := recordBufferPool.Get(size).([]byte)[:size]

b, err := encodeWithTypeHeader(s, CheckpointRecord, buf)
b, err := encodeWithTypeHeader(s, wal.CheckpointRecord, buf)
if err != nil {
return err
}
Expand Down
196 changes: 3 additions & 193 deletions pkg/ingester/encoding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,205 +5,15 @@ import (
"testing"
"time"

"github.com/grafana/loki/pkg/ingester/wal"
thepalbi marked this conversation as resolved.
Show resolved Hide resolved

"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/tsdb/record"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/logproto"
)

func Test_Encoding_Series(t *testing.T) {
record := &WALRecord{
entryIndexMap: make(map[uint64]int),
UserID: "123",
Series: []record.RefSeries{
{
Ref: 456,
Labels: labels.FromMap(map[string]string{
"foo": "bar",
"bazz": "buzz",
}),
},
{
Ref: 789,
Labels: labels.FromMap(map[string]string{
"abc": "123",
"def": "456",
}),
},
},
}

buf := record.encodeSeries(nil)

decoded := recordPool.GetRecord()

err := decodeWALRecord(buf, decoded)
require.Nil(t, err)

// Since we use a pool, there can be subtle differentiations between nil slices and len(0) slices.
// Both are valid, so check length.
require.Equal(t, 0, len(decoded.RefEntries))
decoded.RefEntries = nil
require.Equal(t, record, decoded)
}

func Test_Encoding_Entries(t *testing.T) {
for _, tc := range []struct {
desc string
rec *WALRecord
version RecordType
}{
{
desc: "v1",
rec: &WALRecord{
entryIndexMap: make(map[uint64]int),
UserID: "123",
RefEntries: []RefEntries{
{
Ref: 456,
Entries: []logproto.Entry{
{
Timestamp: time.Unix(1000, 0),
Line: "first",
},
{
Timestamp: time.Unix(2000, 0),
Line: "second",
},
},
},
{
Ref: 789,
Entries: []logproto.Entry{
{
Timestamp: time.Unix(3000, 0),
Line: "third",
},
{
Timestamp: time.Unix(4000, 0),
Line: "fourth",
},
},
},
},
},
version: WALRecordEntriesV1,
},
{
desc: "v2",
rec: &WALRecord{
entryIndexMap: make(map[uint64]int),
UserID: "123",
RefEntries: []RefEntries{
{
Ref: 456,
Counter: 1, // v2 uses counter for WAL replay
Entries: []logproto.Entry{
{
Timestamp: time.Unix(1000, 0),
Line: "first",
},
{
Timestamp: time.Unix(2000, 0),
Line: "second",
},
},
},
{
Ref: 789,
Counter: 2, // v2 uses counter for WAL replay
Entries: []logproto.Entry{
{
Timestamp: time.Unix(3000, 0),
Line: "third",
},
{
Timestamp: time.Unix(4000, 0),
Line: "fourth",
},
},
},
},
},
version: WALRecordEntriesV2,
},
} {
decoded := recordPool.GetRecord()
buf := tc.rec.encodeEntries(tc.version, nil)
err := decodeWALRecord(buf, decoded)
require.Nil(t, err)
require.Equal(t, tc.rec, decoded)

}
}

func Benchmark_EncodeEntries(b *testing.B) {
var entries []logproto.Entry
for i := int64(0); i < 10000; i++ {
entries = append(entries, logproto.Entry{
Timestamp: time.Unix(0, i),
Line: fmt.Sprintf("long line with a lot of data like a log %d", i),
})
}
record := &WALRecord{
entryIndexMap: make(map[uint64]int),
UserID: "123",
RefEntries: []RefEntries{
{
Ref: 456,
Entries: entries,
},
{
Ref: 789,
Entries: entries,
},
},
}
b.ReportAllocs()
b.ResetTimer()
buf := recordPool.GetBytes()[:0]
defer recordPool.PutBytes(buf)

for n := 0; n < b.N; n++ {
record.encodeEntries(CurrentEntriesRec, buf)
}
}

func Benchmark_DecodeWAL(b *testing.B) {
var entries []logproto.Entry
for i := int64(0); i < 10000; i++ {
entries = append(entries, logproto.Entry{
Timestamp: time.Unix(0, i),
Line: fmt.Sprintf("long line with a lot of data like a log %d", i),
})
}
record := &WALRecord{
entryIndexMap: make(map[uint64]int),
UserID: "123",
RefEntries: []RefEntries{
{
Ref: 456,
Entries: entries,
},
{
Ref: 789,
Entries: entries,
},
},
}

buf := record.encodeEntries(CurrentEntriesRec, nil)
rec := recordPool.GetRecord()
b.ReportAllocs()
b.ResetTimer()

for n := 0; n < b.N; n++ {
require.NoError(b, decodeWALRecord(buf, rec))
}
}

func fillChunk(t testing.TB, c chunkenc.Chunk) {
t.Helper()
var i int64
Expand Down Expand Up @@ -338,7 +148,7 @@ func Test_EncodingCheckpoint(t *testing.T) {
},
}

b, err := encodeWithTypeHeader(s, CheckpointRecord, nil)
b, err := encodeWithTypeHeader(s, wal.CheckpointRecord, nil)
require.Nil(t, err)

out := &Series{}
Expand Down
8 changes: 5 additions & 3 deletions pkg/ingester/flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"testing"
"time"

"github.com/grafana/loki/pkg/ingester/wal"
thepalbi marked this conversation as resolved.
Show resolved Hide resolved

gokitlog "github.com/go-kit/log"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/kv"
Expand Down Expand Up @@ -65,9 +67,9 @@ func TestChunkFlushingShutdown(t *testing.T) {

type fullWAL struct{}

func (fullWAL) Log(_ *WALRecord) error { return &os.PathError{Err: syscall.ENOSPC} }
func (fullWAL) Start() {}
func (fullWAL) Stop() error { return nil }
func (fullWAL) Log(_ *wal.Record) error { return &os.PathError{Err: syscall.ENOSPC} }
func (fullWAL) Start() {}
func (fullWAL) Stop() error { return nil }

func Benchmark_FlushLoop(b *testing.B) {
var (
Expand Down
6 changes: 4 additions & 2 deletions pkg/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"syscall"
"time"

"github.com/grafana/loki/pkg/ingester/wal"

"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -228,7 +230,7 @@ func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error {
return appendErr
}

func (i *instance) createStream(pushReqStream logproto.Stream, record *WALRecord) (*stream, error) {
func (i *instance) createStream(pushReqStream logproto.Stream, record *wal.Record) (*stream, error) {
// record is only nil when replaying WAL. We don't want to drop data when replaying a WAL after
// reducing the stream limits, for instance.
var err error
Expand Down Expand Up @@ -315,7 +317,7 @@ func (i *instance) createStreamByFP(ls labels.Labels, fp model.Fingerprint) *str
// getOrCreateStream returns the stream or creates it.
// It's safe to use this function if returned stream is not consistency sensitive to streamsMap(e.g. ingesterRecoverer),
// otherwise use streamsMap.LoadOrStoreNew with locking stream's chunkMtx inside.
func (i *instance) getOrCreateStream(pushReqStream logproto.Stream, record *WALRecord) (*stream, error) {
func (i *instance) getOrCreateStream(pushReqStream logproto.Stream, record *wal.Record) (*stream, error) {
s, _, err := i.streams.LoadOrStoreNew(pushReqStream.Labels, func() (*stream, error) {
return i.createStream(pushReqStream, record)
}, nil)
Expand Down
9 changes: 5 additions & 4 deletions pkg/ingester/recovery.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ingester

import (
"github.com/grafana/loki/pkg/ingester/wal"
io "io"
"runtime"
"sync"
Expand Down Expand Up @@ -52,7 +53,7 @@ type Recoverer interface {
NumWorkers() int
Series(series *Series) error
SetStream(userID string, series record.RefSeries) error
Push(userID string, entries RefEntries) error
Push(userID string, entries wal.RefEntries) error
Done() <-chan struct{}
}

Expand Down Expand Up @@ -152,7 +153,7 @@ func (r *ingesterRecoverer) SetStream(userID string, series record.RefSeries) er
return nil
}

func (r *ingesterRecoverer) Push(userID string, entries RefEntries) error {
func (r *ingesterRecoverer) Push(userID string, entries wal.RefEntries) error {
return r.ing.replayController.WithBackPressure(func() error {
out, ok := r.users.Load(userID)
if !ok {
Expand Down Expand Up @@ -232,7 +233,7 @@ func (r *ingesterRecoverer) Done() <-chan struct{} {
func RecoverWAL(reader WALReader, recoverer Recoverer) error {
dispatch := func(recoverer Recoverer, b []byte, inputs []chan recoveryInput) error {
rec := recordPool.GetRecord()
if err := decodeWALRecord(b, rec); err != nil {
if err := wal.DecodeRecord(b, rec); err != nil {
return err
}

Expand Down Expand Up @@ -267,7 +268,7 @@ func RecoverWAL(reader WALReader, recoverer Recoverer) error {
if !ok {
return
}
entries, ok := next.data.(RefEntries)
entries, ok := next.data.(wal.RefEntries)
var err error
if !ok {
err = errors.Errorf("unexpected type (%T) when recovering WAL, expecting (%T)", next.data, entries)
Expand Down
Loading