Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Promtail WAL support: Implement writer side #8267

Merged
merged 43 commits into from
Feb 6, 2023
Merged
Show file tree
Hide file tree
Changes from 41 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
a9bd4d5
relicensed WAL main definitions
thepalbi Jan 27, 2023
33472a7
relicensed ingester wal code
thepalbi Jan 27, 2023
f7af55d
remove unnecessary commit noise
thepalbi Jan 27, 2023
2dae764
imports order
thepalbi Jan 27, 2023
5f32685
one more fix
thepalbi Jan 27, 2023
bcd862d
part 1 implemented
thepalbi Jan 19, 2023
32675e9
WIP
thepalbi Jan 24, 2023
c9a5c47
lint passing
thepalbi Jan 24, 2023
1e1b1cb
remove unnecessary changes
thepalbi Jan 24, 2023
c2b89ea
post rebase changes
thepalbi Jan 24, 2023
5827391
fix licensing issues
thepalbi Jan 26, 2023
e2db382
PR fixes wip. Tests not working
thepalbi Jan 26, 2023
7009644
added tests for working version
thepalbi Jan 26, 2023
34d682a
rename wal.WALRecord to wal.Record in promtail
thepalbi Jan 26, 2023
cc0af32
added utils comments
thepalbi Jan 26, 2023
601d2c0
fix import order
thepalbi Jan 26, 2023
43ef3be
restorin main changes under pkg/ingester
thepalbi Jan 26, 2023
558863c
post rebase fixes
thepalbi Jan 26, 2023
a8533b9
moving wal writer out of client manager
thepalbi Jan 27, 2023
03f9c5f
re-using code instead of copying
thepalbi Jan 27, 2023
b10192c
post rebase-fixes
thepalbi Jan 27, 2023
4420525
implemented simple old segments cleaner
thepalbi Jan 27, 2023
1bdfa40
post-rebase fixes
thepalbi Jan 30, 2023
5af6e20
moved clients fanout out of writer
thepalbi Jan 30, 2023
9b9cc79
use config name in repeated client error
thepalbi Jan 30, 2023
0ee9c6c
graceful shutdown of writer
thepalbi Jan 30, 2023
e56249a
tested writer clean up
thepalbi Jan 30, 2023
ac99782
clear up naming
thepalbi Jan 30, 2023
e2bdad0
added reclaimed space metric
thepalbi Jan 30, 2023
590c0f7
post rebase fixes
thepalbi Jan 31, 2023
ba679c5
fix linter
thepalbi Jan 31, 2023
b94a3cf
removing unused stuff
thepalbi Jan 31, 2023
7773395
concurrent fanout entry handler
thepalbi Jan 31, 2023
4d986f3
likely deprecation notice
thepalbi Jan 31, 2023
5128fc4
Merge branch 'main' into pablo/wal-part-1
thepalbi Jan 31, 2023
63a73da
fanout with timeout
thepalbi Feb 1, 2023
3a420d3
cleaning up hard stop on fanouter
thepalbi Feb 1, 2023
29903c5
naming on const
thepalbi Feb 1, 2023
42171ed
Merge branch 'main' into pablo/wal-part-1
thepalbi Feb 1, 2023
b8bb6d4
refactor promtail.go wal writer into one if branch
thepalbi Feb 3, 2023
1ac07a0
Merge branch 'main' into pablo/wal-part-1
thepalbi Feb 3, 2023
cb5dc2b
Merge branch 'main' into pablo/wal-part-1
thepalbi Feb 6, 2023
bb1d2c0
Merge branch 'main' into pablo/wal-part-1
cstyan Feb 6, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 2 additions & 28 deletions clients/pkg/promtail/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@ package client
import (
"fmt"
"io"
"math"
"net/http"
"net/http/httptest"
"net/url"
"strings"
"testing"
Expand All @@ -24,7 +22,6 @@ import (
"github.com/grafana/loki/clients/pkg/promtail/api"

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/util"
lokiflag "github.com/grafana/loki/pkg/util/flagext"
)

Expand All @@ -38,11 +35,6 @@ var logEntries = []api.Entry{
{Labels: model.LabelSet{}, Entry: logproto.Entry{Timestamp: time.Unix(6, 0).UTC(), Line: "line0123456789"}},
}

type receivedReq struct {
tenantID string
pushReq logproto.PushRequest
}

func TestClient_Handle(t *testing.T) {
tests := map[string]struct {
clientBatchSize int
Expand Down Expand Up @@ -500,7 +492,7 @@ func TestClient_Handle(t *testing.T) {
receivedReqsChan := make(chan receivedReq, 10)

// Start a local HTTP server
server := httptest.NewServer(createServerHandler(receivedReqsChan, testData.serverResponseStatus))
thepalbi marked this conversation as resolved.
Show resolved Hide resolved
server := newTestRemoteWriteServer(receivedReqsChan, testData.serverResponseStatus)
require.NotNil(t, server)
defer server.Close()

Expand Down Expand Up @@ -642,7 +634,7 @@ func TestClient_StopNow(t *testing.T) {
receivedReqsChan := make(chan receivedReq, 10)

// Start a local HTTP server
server := httptest.NewServer(createServerHandler(receivedReqsChan, c.serverResponseStatus))
server := newTestRemoteWriteServer(receivedReqsChan, c.serverResponseStatus)
require.NotNil(t, server)
defer server.Close()

Expand Down Expand Up @@ -710,24 +702,6 @@ func TestClient_StopNow(t *testing.T) {
}
}

func createServerHandler(receivedReqsChan chan receivedReq, status int) http.HandlerFunc {
return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
// Parse the request
var pushReq logproto.PushRequest
if err := util.ParseProtoReader(req.Context(), req.Body, int(req.ContentLength), math.MaxInt32, &pushReq, util.RawSnappy); err != nil {
rw.WriteHeader(500)
return
}

receivedReqsChan <- receivedReq{
tenantID: req.Header.Get("X-Scope-OrgID"),
pushReq: pushReq,
}

rw.WriteHeader(status)
})
}

type RoundTripperFunc func(*http.Request) (*http.Response, error)

func (r RoundTripperFunc) RoundTrip(req *http.Request) (*http.Response, error) {
Expand Down
108 changes: 108 additions & 0 deletions clients/pkg/promtail/client/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package client

import (
"fmt"
"strings"
"sync"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"

"github.com/grafana/loki/clients/pkg/promtail/api"
"github.com/grafana/loki/clients/pkg/promtail/wal"
)

// Manager manages remote write client instantiation, and connects the related components to orchestrate the flow of api.Entry
// from the scrape targets, to the remote write clients themselves.
//
// Right now it just supports instantiating the WAL writer side of the future-to-be WAL enabled client. In follow-up
// work, tracked in https://github.com/grafana/loki/issues/8197, this Manager will be responsible for instantiating all client
// types: Logger, Multi and WAL.
type Manager struct {
clients []Client

entries chan api.Entry
once sync.Once

wg sync.WaitGroup
}

// NewManager creates a new Manager
func NewManager(metrics *Metrics, logger log.Logger, maxStreams, maxLineSize int, maxLineSizeTruncate bool, reg prometheus.Registerer, walCfg wal.Config, clientCfgs ...Config) (*Manager, error) {
// TODO: refactor this to instantiate all clients types
var fake struct{}

if len(clientCfgs) == 0 {
return nil, fmt.Errorf("at least one client config should be provided")
}
clientsCheck := make(map[string]struct{})
clients := make([]Client, 0, len(clientCfgs))
for _, cfg := range clientCfgs {
client, err := New(metrics, cfg, maxStreams, maxLineSize, maxLineSizeTruncate, logger)
if err != nil {
return nil, err
}

// Don't allow duplicate clients, we have client specific metrics that need at least one unique label value (name).
if _, ok := clientsCheck[client.Name()]; ok {
return nil, fmt.Errorf("duplicate client configs are not allowed, found duplicate for name: %s", cfg.Name)
}

clientsCheck[client.Name()] = fake
clients = append(clients, client)
}

manager := &Manager{
clients: clients,
entries: make(chan api.Entry),
}
manager.start()
return manager, nil
}

func (m *Manager) start() {
m.wg.Add(1)
go func() {
defer m.wg.Done()
// keep reading received entries
for e := range m.entries {
// then fanout to every remote write client
for _, c := range m.clients {
c.Chan() <- e
}
}
}()
}

func (m *Manager) StopNow() {
for _, c := range m.clients {
c.StopNow()
}
}

func (m *Manager) Name() string {
var sb strings.Builder
// name contains wal since manager is used as client only when WAL enabled for now
sb.WriteString("wal:")
for i, c := range m.clients {
sb.WriteString(c.Name())
if i != len(m.clients)-1 {
sb.WriteString(",")
}
}
return sb.String()
}

func (m *Manager) Chan() chan<- api.Entry {
return m.entries
}

func (m *Manager) Stop() {
// first stop the receiving channel
m.once.Do(func() { close(m.entries) })
m.wg.Wait()
// close clients
for _, c := range m.clients {
c.Stop()
}
}
108 changes: 108 additions & 0 deletions clients/pkg/promtail/client/manager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package client

import (
"net/http"
"net/url"
"os"
"testing"
"time"

"github.com/go-kit/log"
"github.com/grafana/dskit/backoff"
"github.com/grafana/dskit/flagext"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/clients/pkg/promtail/api"
"github.com/grafana/loki/clients/pkg/promtail/wal"
"github.com/grafana/loki/pkg/logproto"
)

func TestManager_ErrorCreatingWhenNoClientConfigsProvided(t *testing.T) {
walDir := t.TempDir()
_, err := NewManager(nil, log.NewLogfmtLogger(os.Stdout), 0, 0, false, prometheus.NewRegistry(), wal.Config{
Dir: walDir,
Enabled: true,
})
require.Error(t, err)
}

type closer interface {
Close()
}

type closerFunc func()

func (c closerFunc) Close() {
c()
}

func newServerAncClientConfig(t *testing.T) (Config, chan receivedReq, closer) {
receivedReqsChan := make(chan receivedReq, 10)

// Start a local HTTP server
server := newTestRemoteWriteServer(receivedReqsChan, http.StatusOK)
require.NotNil(t, server)

testClientURL, _ := url.Parse(server.URL)
testClientConfig := Config{
Name: "test-client",
URL: flagext.URLValue{URL: testClientURL},
Timeout: time.Second * 2,
BatchSize: 1,
BackoffConfig: backoff.Config{
MaxRetries: 0,
},
}
return testClientConfig, receivedReqsChan, closerFunc(func() {
server.Close()
close(receivedReqsChan)
})
}

func TestManager_EntriesAreWrittenToClients(t *testing.T) {
walDir := t.TempDir()
reg := prometheus.NewRegistry()
testClientConfig, rwReceivedReqs, closeServer := newServerAncClientConfig(t)
clientMetrics := NewMetrics(reg)
manager, err := NewManager(clientMetrics, log.NewLogfmtLogger(os.Stdout), 0, 0, false, reg, wal.Config{
Dir: walDir,
Enabled: true,
}, testClientConfig)
require.NoError(t, err)
require.Equal(t, "wal:test-client", manager.Name())

var testLabels = model.LabelSet{
"wal_enabled": "true",
}
var lines = []string{
"Lorem ipsum dolor sit amet, consectetur adipiscing elit.",
"In eu nisl ac massa ultricies rutrum.",
"Sed eget felis at ipsum auctor congue.",
}
for _, line := range lines {
manager.Chan() <- api.Entry{
Labels: testLabels,
Entry: logproto.Entry{
Timestamp: time.Now(),
Line: line,
},
}
}

// stop client to flush WAL, stop rw server
manager.Stop()
closeServer.Close()

// assert over rw client received entries
rwSeenEntriesCount := 0
for req := range rwReceivedReqs {
rwSeenEntriesCount++
require.Len(t, req.pushReq.Streams, 1, "expected 1 stream requests to be received")
require.Len(t, req.pushReq.Streams[0].Entries, 1, "expected 1 entry in the only stream received per request")
require.Equal(t, `{wal_enabled="true"}`, req.pushReq.Streams[0].Labels)
require.Contains(t, lines, req.pushReq.Streams[0].Entries[0].Line)
}
require.Equal(t, 3, rwSeenEntriesCount)
}
40 changes: 40 additions & 0 deletions clients/pkg/promtail/client/test_server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package client

import (
"math"
"net/http"
"net/http/httptest"

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/util"
)

type receivedReq struct {
tenantID string
pushReq logproto.PushRequest
}

// newTestMoreWriteServer creates a new httpserver.Server that can handle remote write request. When a request is handled,
// the received entries are written to receivedChan, and status is responded.
func newTestRemoteWriteServer(receivedChan chan receivedReq, status int) *httptest.Server {
server := httptest.NewServer(createServerHandler(receivedChan, status))
return server
}

func createServerHandler(receivedReqsChan chan receivedReq, receivedOKStatus int) http.HandlerFunc {
return func(rw http.ResponseWriter, req *http.Request) {
// Parse the request
var pushReq logproto.PushRequest
if err := util.ParseProtoReader(req.Context(), req.Body, int(req.ContentLength), math.MaxInt32, &pushReq, util.RawSnappy); err != nil {
rw.WriteHeader(500)
return
}

receivedReqsChan <- receivedReq{
tenantID: req.Header.Get("X-Scope-OrgID"),
pushReq: pushReq,
}

rw.WriteHeader(receivedOKStatus)
}
}
7 changes: 3 additions & 4 deletions clients/pkg/promtail/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,6 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
dskit_flagext "github.com/grafana/dskit/flagext"

"github.com/grafana/loki/pkg/tracing"

"gopkg.in/yaml.v2"

"github.com/grafana/loki/clients/pkg/promtail/client"
Expand All @@ -18,7 +15,8 @@ import (
"github.com/grafana/loki/clients/pkg/promtail/scrapeconfig"
"github.com/grafana/loki/clients/pkg/promtail/server"
"github.com/grafana/loki/clients/pkg/promtail/targets/file"

"github.com/grafana/loki/clients/pkg/promtail/wal"
"github.com/grafana/loki/pkg/tracing"
"github.com/grafana/loki/pkg/util/flagext"
)

Expand All @@ -39,6 +37,7 @@ type Config struct {
LimitsConfig limit.Config `yaml:"limits_config,omitempty"`
Options Options `yaml:"options,omitempty"`
Tracing tracing.Config `yaml:"tracing"`
WAL wal.Config `yaml:"wal"`
}

// RegisterFlags with prefix registers flags where every name is prefixed by
Expand Down
Loading