Skip to content

Commit

Permalink
Re-license and export pkg/ingester WAL code to be used in Promtail'…
Browse files Browse the repository at this point in the history
…s WAL (grafana#8315)
  • Loading branch information
thepalbi authored Jan 30, 2023
1 parent 66f7407 commit fda14ca
Showing 14 changed files with 530 additions and 297 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -45,6 +45,8 @@

##### Changes

* [8315](https://github.com/grafana/loki/pull/8315) **thepalbi** Relicense and export `pkg/ingester` WAL code to be used in Promtail's WAL.

#### Promtail

##### Enhancements
1 change: 1 addition & 0 deletions LICENSING.md
Original file line number Diff line number Diff line change
@@ -10,6 +10,7 @@ The following folders and their subfolders are licensed under Apache-2.0:

```
clients/
pkg/ingester/wal
pkg/logproto/
pkg/loghttp/
pkg/logqlmodel/
9 changes: 5 additions & 4 deletions pkg/ingester/checkpoint.go
Original file line number Diff line number Diff line change
@@ -21,6 +21,7 @@ import (
prompool "github.com/prometheus/prometheus/util/pool"

"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/ingester/wal"
"github.com/grafana/loki/pkg/logproto"
util_log "github.com/grafana/loki/pkg/util/log"
"github.com/grafana/loki/pkg/util/pool"
@@ -124,15 +125,15 @@ func decodeCheckpointRecord(rec []byte, s *Series) error {
cpy := make([]byte, len(rec))
copy(cpy, rec)

switch RecordType(cpy[0]) {
case CheckpointRecord:
switch wal.RecordType(cpy[0]) {
case wal.CheckpointRecord:
return proto.Unmarshal(cpy[1:], s)
default:
return errors.Errorf("unexpected record type: %d", rec[0])
}
}

func encodeWithTypeHeader(m *Series, typ RecordType, buf []byte) ([]byte, error) {
func encodeWithTypeHeader(m *Series, typ wal.RecordType, buf []byte) ([]byte, error) {
size := m.Size()
if cap(buf) < size+1 {
buf = make([]byte, size+1)
@@ -370,7 +371,7 @@ func (w *WALCheckpointWriter) Write(s *Series) error {
size := s.Size() + 1 // +1 for header
buf := recordBufferPool.Get(size).([]byte)[:size]

b, err := encodeWithTypeHeader(s, CheckpointRecord, buf)
b, err := encodeWithTypeHeader(s, wal.CheckpointRecord, buf)
if err != nil {
return err
}
195 changes: 2 additions & 193 deletions pkg/ingester/encoding_test.go
Original file line number Diff line number Diff line change
@@ -6,204 +6,13 @@ import (
"time"

"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/tsdb/record"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/ingester/wal"
"github.com/grafana/loki/pkg/logproto"
)

func Test_Encoding_Series(t *testing.T) {
record := &WALRecord{
entryIndexMap: make(map[uint64]int),
UserID: "123",
Series: []record.RefSeries{
{
Ref: 456,
Labels: labels.FromMap(map[string]string{
"foo": "bar",
"bazz": "buzz",
}),
},
{
Ref: 789,
Labels: labels.FromMap(map[string]string{
"abc": "123",
"def": "456",
}),
},
},
}

buf := record.encodeSeries(nil)

decoded := recordPool.GetRecord()

err := decodeWALRecord(buf, decoded)
require.Nil(t, err)

// Since we use a pool, there can be subtle differentiations between nil slices and len(0) slices.
// Both are valid, so check length.
require.Equal(t, 0, len(decoded.RefEntries))
decoded.RefEntries = nil
require.Equal(t, record, decoded)
}

func Test_Encoding_Entries(t *testing.T) {
for _, tc := range []struct {
desc string
rec *WALRecord
version RecordType
}{
{
desc: "v1",
rec: &WALRecord{
entryIndexMap: make(map[uint64]int),
UserID: "123",
RefEntries: []RefEntries{
{
Ref: 456,
Entries: []logproto.Entry{
{
Timestamp: time.Unix(1000, 0),
Line: "first",
},
{
Timestamp: time.Unix(2000, 0),
Line: "second",
},
},
},
{
Ref: 789,
Entries: []logproto.Entry{
{
Timestamp: time.Unix(3000, 0),
Line: "third",
},
{
Timestamp: time.Unix(4000, 0),
Line: "fourth",
},
},
},
},
},
version: WALRecordEntriesV1,
},
{
desc: "v2",
rec: &WALRecord{
entryIndexMap: make(map[uint64]int),
UserID: "123",
RefEntries: []RefEntries{
{
Ref: 456,
Counter: 1, // v2 uses counter for WAL replay
Entries: []logproto.Entry{
{
Timestamp: time.Unix(1000, 0),
Line: "first",
},
{
Timestamp: time.Unix(2000, 0),
Line: "second",
},
},
},
{
Ref: 789,
Counter: 2, // v2 uses counter for WAL replay
Entries: []logproto.Entry{
{
Timestamp: time.Unix(3000, 0),
Line: "third",
},
{
Timestamp: time.Unix(4000, 0),
Line: "fourth",
},
},
},
},
},
version: WALRecordEntriesV2,
},
} {
decoded := recordPool.GetRecord()
buf := tc.rec.encodeEntries(tc.version, nil)
err := decodeWALRecord(buf, decoded)
require.Nil(t, err)
require.Equal(t, tc.rec, decoded)

}
}

func Benchmark_EncodeEntries(b *testing.B) {
var entries []logproto.Entry
for i := int64(0); i < 10000; i++ {
entries = append(entries, logproto.Entry{
Timestamp: time.Unix(0, i),
Line: fmt.Sprintf("long line with a lot of data like a log %d", i),
})
}
record := &WALRecord{
entryIndexMap: make(map[uint64]int),
UserID: "123",
RefEntries: []RefEntries{
{
Ref: 456,
Entries: entries,
},
{
Ref: 789,
Entries: entries,
},
},
}
b.ReportAllocs()
b.ResetTimer()
buf := recordPool.GetBytes()[:0]
defer recordPool.PutBytes(buf)

for n := 0; n < b.N; n++ {
record.encodeEntries(CurrentEntriesRec, buf)
}
}

func Benchmark_DecodeWAL(b *testing.B) {
var entries []logproto.Entry
for i := int64(0); i < 10000; i++ {
entries = append(entries, logproto.Entry{
Timestamp: time.Unix(0, i),
Line: fmt.Sprintf("long line with a lot of data like a log %d", i),
})
}
record := &WALRecord{
entryIndexMap: make(map[uint64]int),
UserID: "123",
RefEntries: []RefEntries{
{
Ref: 456,
Entries: entries,
},
{
Ref: 789,
Entries: entries,
},
},
}

buf := record.encodeEntries(CurrentEntriesRec, nil)
rec := recordPool.GetRecord()
b.ReportAllocs()
b.ResetTimer()

for n := 0; n < b.N; n++ {
require.NoError(b, decodeWALRecord(buf, rec))
}
}

func fillChunk(t testing.TB, c chunkenc.Chunk) {
t.Helper()
var i int64
@@ -338,7 +147,7 @@ func Test_EncodingCheckpoint(t *testing.T) {
},
}

b, err := encodeWithTypeHeader(s, CheckpointRecord, nil)
b, err := encodeWithTypeHeader(s, wal.CheckpointRecord, nil)
require.Nil(t, err)

out := &Series{}
7 changes: 4 additions & 3 deletions pkg/ingester/flush_test.go
Original file line number Diff line number Diff line change
@@ -24,6 +24,7 @@ import (

"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/ingester/client"
"github.com/grafana/loki/pkg/ingester/wal"
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
@@ -65,9 +66,9 @@ func TestChunkFlushingShutdown(t *testing.T) {

type fullWAL struct{}

func (fullWAL) Log(_ *WALRecord) error { return &os.PathError{Err: syscall.ENOSPC} }
func (fullWAL) Start() {}
func (fullWAL) Stop() error { return nil }
func (fullWAL) Log(_ *wal.Record) error { return &os.PathError{Err: syscall.ENOSPC} }
func (fullWAL) Start() {}
func (fullWAL) Stop() error { return nil }

func Benchmark_FlushLoop(b *testing.B) {
var (
5 changes: 3 additions & 2 deletions pkg/ingester/instance.go
Original file line number Diff line number Diff line change
@@ -20,6 +20,7 @@ import (
"go.uber.org/atomic"

"github.com/grafana/loki/pkg/ingester/index"
"github.com/grafana/loki/pkg/ingester/wal"
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
@@ -228,7 +229,7 @@ func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error {
return appendErr
}

func (i *instance) createStream(pushReqStream logproto.Stream, record *WALRecord) (*stream, error) {
func (i *instance) createStream(pushReqStream logproto.Stream, record *wal.Record) (*stream, error) {
// record is only nil when replaying WAL. We don't want to drop data when replaying a WAL after
// reducing the stream limits, for instance.
var err error
@@ -315,7 +316,7 @@ func (i *instance) createStreamByFP(ls labels.Labels, fp model.Fingerprint) *str
// getOrCreateStream returns the stream or creates it.
// It's safe to use this function if returned stream is not consistency sensitive to streamsMap(e.g. ingesterRecoverer),
// otherwise use streamsMap.LoadOrStoreNew with locking stream's chunkMtx inside.
func (i *instance) getOrCreateStream(pushReqStream logproto.Stream, record *WALRecord) (*stream, error) {
func (i *instance) getOrCreateStream(pushReqStream logproto.Stream, record *wal.Record) (*stream, error) {
s, _, err := i.streams.LoadOrStoreNew(pushReqStream.Labels, func() (*stream, error) {
return i.createStream(pushReqStream, record)
}, nil)
11 changes: 6 additions & 5 deletions pkg/ingester/recovery.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package ingester

import (
io "io"
"io"
"runtime"
"sync"

@@ -12,6 +12,7 @@ import (
"github.com/prometheus/prometheus/tsdb/wlog"
"golang.org/x/net/context"

"github.com/grafana/loki/pkg/ingester/wal"
"github.com/grafana/loki/pkg/logproto"
util_log "github.com/grafana/loki/pkg/util/log"
)
@@ -52,7 +53,7 @@ type Recoverer interface {
NumWorkers() int
Series(series *Series) error
SetStream(userID string, series record.RefSeries) error
Push(userID string, entries RefEntries) error
Push(userID string, entries wal.RefEntries) error
Done() <-chan struct{}
}

@@ -152,7 +153,7 @@ func (r *ingesterRecoverer) SetStream(userID string, series record.RefSeries) er
return nil
}

func (r *ingesterRecoverer) Push(userID string, entries RefEntries) error {
func (r *ingesterRecoverer) Push(userID string, entries wal.RefEntries) error {
return r.ing.replayController.WithBackPressure(func() error {
out, ok := r.users.Load(userID)
if !ok {
@@ -232,7 +233,7 @@ func (r *ingesterRecoverer) Done() <-chan struct{} {
func RecoverWAL(reader WALReader, recoverer Recoverer) error {
dispatch := func(recoverer Recoverer, b []byte, inputs []chan recoveryInput) error {
rec := recordPool.GetRecord()
if err := decodeWALRecord(b, rec); err != nil {
if err := wal.DecodeRecord(b, rec); err != nil {
return err
}

@@ -267,7 +268,7 @@ func RecoverWAL(reader WALReader, recoverer Recoverer) error {
if !ok {
return
}
entries, ok := next.data.(RefEntries)
entries, ok := next.data.(wal.RefEntries)
var err error
if !ok {
err = errors.Errorf("unexpected type (%T) when recovering WAL, expecting (%T)", next.data, entries)
Loading

0 comments on commit fda14ca

Please sign in to comment.