Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Promtail WAL support: Implement writer side #8267

Merged
merged 43 commits into from
Feb 6, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
a9bd4d5
relicensed WAL main definitions
thepalbi Jan 27, 2023
33472a7
relicensed ingester wal code
thepalbi Jan 27, 2023
f7af55d
remove unnecessary commit noise
thepalbi Jan 27, 2023
2dae764
imports order
thepalbi Jan 27, 2023
5f32685
one more fix
thepalbi Jan 27, 2023
bcd862d
part 1 implemented
thepalbi Jan 19, 2023
32675e9
WIP
thepalbi Jan 24, 2023
c9a5c47
lint passing
thepalbi Jan 24, 2023
1e1b1cb
remove unnecessary changes
thepalbi Jan 24, 2023
c2b89ea
post rebase changes
thepalbi Jan 24, 2023
5827391
fix licensing issues
thepalbi Jan 26, 2023
e2db382
PR fixes wip. Tests not working
thepalbi Jan 26, 2023
7009644
added tests for working version
thepalbi Jan 26, 2023
34d682a
rename wal.WALRecord to wal.Record in promtail
thepalbi Jan 26, 2023
cc0af32
added utils comments
thepalbi Jan 26, 2023
601d2c0
fix import order
thepalbi Jan 26, 2023
43ef3be
restorin main changes under pkg/ingester
thepalbi Jan 26, 2023
558863c
post rebase fixes
thepalbi Jan 26, 2023
a8533b9
moving wal writer out of client manager
thepalbi Jan 27, 2023
03f9c5f
re-using code instead of copying
thepalbi Jan 27, 2023
b10192c
post rebase-fixes
thepalbi Jan 27, 2023
4420525
implemented simple old segments cleaner
thepalbi Jan 27, 2023
1bdfa40
post-rebase fixes
thepalbi Jan 30, 2023
5af6e20
moved clients fanout out of writer
thepalbi Jan 30, 2023
9b9cc79
use config name in repeated client error
thepalbi Jan 30, 2023
0ee9c6c
graceful shutdown of writer
thepalbi Jan 30, 2023
e56249a
tested writer clean up
thepalbi Jan 30, 2023
ac99782
clear up naming
thepalbi Jan 30, 2023
e2bdad0
added reclaimed space metric
thepalbi Jan 30, 2023
590c0f7
post rebase fixes
thepalbi Jan 31, 2023
ba679c5
fix linter
thepalbi Jan 31, 2023
b94a3cf
removing unused stuff
thepalbi Jan 31, 2023
7773395
concurrent fanout entry handler
thepalbi Jan 31, 2023
4d986f3
likely deprecation notice
thepalbi Jan 31, 2023
5128fc4
Merge branch 'main' into pablo/wal-part-1
thepalbi Jan 31, 2023
63a73da
fanout with timeout
thepalbi Feb 1, 2023
3a420d3
cleaning up hard stop on fanouter
thepalbi Feb 1, 2023
29903c5
naming on const
thepalbi Feb 1, 2023
42171ed
Merge branch 'main' into pablo/wal-part-1
thepalbi Feb 1, 2023
b8bb6d4
refactor promtail.go wal writer into one if branch
thepalbi Feb 3, 2023
1ac07a0
Merge branch 'main' into pablo/wal-part-1
thepalbi Feb 3, 2023
cb5dc2b
Merge branch 'main' into pablo/wal-part-1
thepalbi Feb 6, 2023
bb1d2c0
Merge branch 'main' into pablo/wal-part-1
cstyan Feb 6, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion clients/pkg/promtail/promtail.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func (p *Promtail) reloadConfig(cfg *config.Config) error {
entryHandlers = append(entryHandlers, p.walWriter)
}

p.entriesFanout = utils.NewEntryHandlerFanouter(entryHandlers...)
p.entriesFanout = utils.NewFanoutEntryHandler(entryHandlers...)

tms, err := targets.NewTargetManagers(p, p.reg, p.logger, cfg.PositionsConfig, p.entriesFanout, cfg.ScrapeConfig, &cfg.TargetConfig)
if err != nil {
Expand Down
39 changes: 26 additions & 13 deletions clients/pkg/promtail/utils/entries.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,38 +6,51 @@ import (
"github.com/grafana/loki/clients/pkg/promtail/api"
)

// EntryHandlerFanouter implements api.EntryHandler, fanning out received entries to one or multiple channels.
type EntryHandlerFanouter struct {
// FanoutEntryHandler implements api.EntryHandler, fanning out received entries to one or multiple channels.
type FanoutEntryHandler struct {
entries chan api.Entry
handlers []api.EntryHandler

once sync.Once
wg sync.WaitGroup
}

func NewEntryHandlerFanouter(handlers ...api.EntryHandler) *EntryHandlerFanouter {
multiplex := &EntryHandlerFanouter{
func NewFanoutEntryHandler(handlers ...api.EntryHandler) *FanoutEntryHandler {
thepalbi marked this conversation as resolved.
Show resolved Hide resolved
eh := &FanoutEntryHandler{
entries: make(chan api.Entry),
handlers: handlers,
}
multiplex.wg.Add(1)
eh.start()
return eh
}

func (eh *FanoutEntryHandler) start() {
eh.wg.Add(1)
thepalbi marked this conversation as resolved.
Show resolved Hide resolved
go func() {
defer multiplex.wg.Done()
for e := range multiplex.entries {
for _, handler := range multiplex.handlers {
handler.Chan() <- e
defer eh.wg.Done()
for e := range eh.entries {
// To prevent a single channel from blocking all others, we run each channel send in a separate go routine.
// This cause each entry to be sent in |eh.handlers| routines concurrently. When all finish, we know the entry
// has been fanned out properly, and we can read the next received entry.
var entryWG sync.WaitGroup
entryWG.Add(len(eh.handlers))
for _, handler := range eh.handlers {
go func(eh api.EntryHandler) {
defer entryWG.Done()
eh.Chan() <- e
}(handler)
}
entryWG.Wait()
}
}()
return multiplex
}

func (eh *EntryHandlerFanouter) Chan() chan<- api.Entry {
func (eh *FanoutEntryHandler) Chan() chan<- api.Entry {
return eh.entries
}

// Stop only stops the channel EntryHandlerFanouter exposes, not the ones it fans out to.
func (eh *EntryHandlerFanouter) Stop() {
// Stop only stops the channel FanoutEntryHandler exposes, not the ones it fans out to.
func (eh *FanoutEntryHandler) Stop() {
eh.once.Do(func() {
close(eh.entries)
})
Expand Down
33 changes: 18 additions & 15 deletions clients/pkg/promtail/utils/entries_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,16 @@ import (
"github.com/grafana/loki/pkg/logproto"
)

func TestEntryHandlerFanouter(t *testing.T) {
func TestFanoutEntryHandler(t *testing.T) {
eh1 := newSavingEntryHandler()
eh2 := newSavingEntryHandler()
fanouter := NewEntryHandlerFanouter(eh1, eh2)
fanout := NewFanoutEntryHandler(eh1, eh2)

defer func() {
fanout.Stop()
eh2.Stop()
eh1.Stop()
}()

var expectedLines = []string{
"some line",
Expand All @@ -24,9 +30,9 @@ func TestEntryHandlerFanouter(t *testing.T) {
}

for _, line := range expectedLines {
fanouter.Chan() <- api.Entry{
fanout.Chan() <- api.Entry{
Labels: model.LabelSet{
"test": "fanouter",
"test": "fanout",
},
Entry: logproto.Entry{
Timestamp: time.Now(),
Expand All @@ -35,12 +41,9 @@ func TestEntryHandlerFanouter(t *testing.T) {
}
}

fanouter.Stop()
eh2.Stop()
eh1.Stop()

require.Len(t, eh1.Received, len(expectedLines))
require.Len(t, eh2.Received, len(expectedLines))
require.Eventually(t, func() bool {
return len(eh1.Received) == len(expectedLines) && len(eh2.Received) == len(expectedLines)
}, time.Second*10, time.Second, "expected entries to be received by fanned out channels")
}

type savingEntryHandler struct {
Expand All @@ -64,11 +67,11 @@ func newSavingEntryHandler() *savingEntryHandler {
return eh
}

func (x *savingEntryHandler) Chan() chan<- api.Entry {
return x.entries
func (eh *savingEntryHandler) Chan() chan<- api.Entry {
return eh.entries
}

func (x *savingEntryHandler) Stop() {
close(x.entries)
x.wg.Wait()
func (eh *savingEntryHandler) Stop() {
close(eh.entries)
eh.wg.Wait()
}