Skip to content

Commit

Permalink
*: Change the engine behind ContentPathReloader to be completely in…
Browse files Browse the repository at this point in the history
…dependent of any filesystem concept. (#6503)

* Add fallback config reload for symlinks

Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com>

* Improve pooling config reload

Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com>

* Make tests more reliable

Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com>

* Improve tests again

Assert on runutil.Repeat output

Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com>

* Pass the debounce/reload time to each test

Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com>

* Add comments and ensure interfaces are implemented

Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com>

* Make the limit configuration reload time customizable

Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com>

* goimports file

Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com>

* Fix lint warning

Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com>

* Improve log for polling engine

Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com>

* Fix tests

Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com>

* Make linter happy

Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com>

* Extract symlink identification to a separate function

Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com>

* Update changelog

Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com>

* Fix links to go-grpc-middleware after v2 merge into main

Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com>

* Add period to make linter happy.

Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com>

* Update changelog entry

Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com>

* Replace the fsnotify based engine from `PathContentReloader` with the polling one

Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com>

* Rerun CI

Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com>

* Rerun CI

Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com>

* Remove check for empty filePath

Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com>

* Unexpose pollingEngine.Start

Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com>

* Fix pollingEngine doc-comment

Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com>

* Log file path when config is reloaded

Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com>

---------

Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com>
  • Loading branch information
douglascamata authored Jul 10, 2023
1 parent ca308b0 commit b46b835
Show file tree
Hide file tree
Showing 8 changed files with 97 additions and 100 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#6467](https://github.com/thanos-io/thanos/pull/6467) Mixin (Receive): add alert for tenant reaching head series limit.

### Fixed
- [#6496](https://github.com/thanos-io/thanos/pull/6496): *: Remove unnecessary configuration reload from `ContentPathReloader` and improve its tests.
- [#6503](https://github.com/thanos-io/thanos/pull/6503) *: Change the engine behind `ContentPathReloader` to be completely independent of any filesystem concept. This effectively fixes this configuration reload when used with Kubernetes ConfigMaps, Secrets, or other volume mounts.
- [#6496](https://github.com/thanos-io/thanos/pull/6496) *: Remove unnecessary configuration reload from `ContentPathReloader` and improve its tests.
- [#6456](https://github.com/thanos-io/thanos/pull/6456) Store: fix crash when computing set matches from regex pattern
- [#6427](https://github.com/thanos-io/thanos/pull/6427) Receive: increasing log level for failed uploads to error
- [#6172](https://github.com/thanos-io/thanos/pull/6172) query-frontend: return JSON formatted errors for invalid PromQL expression in the split by interval middleware.
Expand Down
9 changes: 6 additions & 3 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func runReceive(
return errors.Wrap(err, "parse limit configuration")
}
}
limiter, err := receive.NewLimiter(conf.writeLimitsConfig, reg, receiveMode, log.With(logger, "component", "receive-limiter"))
limiter, err := receive.NewLimiter(conf.writeLimitsConfig, reg, receiveMode, log.With(logger, "component", "receive-limiter"), conf.limitsConfigReloadTimer)
if err != nil {
return errors.Wrap(err, "creating limiter")
}
Expand Down Expand Up @@ -822,8 +822,9 @@ type receiveConfig struct {
reqLogConfig *extflag.PathOrContent
relabelConfigPath *extflag.PathOrContent

writeLimitsConfig *extflag.PathOrContent
storeRateLimits store.SeriesSelectLimits
writeLimitsConfig *extflag.PathOrContent
storeRateLimits store.SeriesSelectLimits
limitsConfigReloadTimer time.Duration
}

func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {
Expand Down Expand Up @@ -953,6 +954,8 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {
rc.reqLogConfig = extkingpin.RegisterRequestLoggingFlags(cmd)

rc.writeLimitsConfig = extflag.RegisterPathOrContent(cmd, "receive.limits-config", "YAML file that contains limit configuration.", extflag.WithEnvSubstitution(), extflag.WithHidden())
cmd.Flag("receive.limits-config-reload-timer", "Minimum amount of time to pass for the limit configuration to be reloaded. Helps to avoid excessive reloads.").
Default("1s").Hidden().DurationVar(&rc.limitsConfigReloadTimer)
}

// determineMode returns the ReceiverMode that this receiver is configured to run in.
Expand Down
4 changes: 2 additions & 2 deletions docs/proposals-done/202005-query-logging.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,8 @@ For HTTP, all these nice middlewares are not present, so we need to code up our

**Policy of Request Logging** : We are going to use the following middleware decider for achieving the above objectives -

* [`logging.WithDecider`](https://github.com/grpc-ecosystem/go-grpc-middleware/blob/v2/interceptors/logging/options.go#L51) - This `option` would help us in deciding the logic whether a given request should be logged or not. This should help in logging specific / all queries. It is all pre request interception.
* [`logging.WithLevels`](https://github.com/grpc-ecosystem/go-grpc-middleware/blob/v2/interceptors/logging/options.go#L58) - This `option` would help us in fixating the level of query that we might want to log. So based on a query hitting a certain criteria, we can switch the levels of it.
* [`logging.WithDecider`](https://github.com/grpc-ecosystem/go-grpc-middleware/blob/v2.0.0-rc.5/interceptors/logging/options.go#L51) - This `option` would help us in deciding the logic whether a given request should be logged or not. This should help in logging specific / all queries. It is all pre request interception.
* [`logging.WithLevels`](https://github.com/grpc-ecosystem/go-grpc-middleware/blob/v2.0.0-rc.5/interceptors/logging/options.go#L58) - This `option` would help us in fixating the level of query that we might want to log. So based on a query hitting a certain criteria, we can switch the levels of it.
* For using the request-id, we can store the request-id in the metadata of the `context`, and while logging, we can use it.
* Same equivalent for the HTTP can be implemented for mirroring the logic of *gRPC middlewares*.
* So we would generally have request logging and options:
Expand Down
95 changes: 40 additions & 55 deletions pkg/extkingpin/path_content_reloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,11 @@ package extkingpin

import (
"context"
"fmt"
"crypto/sha256"
"os"
"path"
"path/filepath"
"time"

"github.com/fsnotify/fsnotify"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
Expand All @@ -22,74 +20,61 @@ type fileContent interface {
Path() string
}

// PathContentReloader starts a file watcher that monitors the file indicated by fileContent.Path() and runs
// reloadFunc whenever a change is detected.
// A debounce timer can be configured via opts to handle situations where many "write" events are received together or
// a "create" event is followed up by a "write" event, for example. Files will be effectively reloaded at the latest
// after 2 times the debounce timer. By default the debouncer timer is 1 second.
// To ensure renames and deletes are properly handled, the file watcher is put at the file's parent folder. See
// https://github.com/fsnotify/fsnotify/issues/214 for more details.
// PathContentReloader runs the reloadFunc when it detects that the contents of fileContent have changed.
func PathContentReloader(ctx context.Context, fileContent fileContent, logger log.Logger, reloadFunc func(), debounceTime time.Duration) error {
filePath, err := filepath.Abs(fileContent.Path())
if err != nil {
return errors.Wrap(err, "getting absolute file path")
}

watcher, err := fsnotify.NewWatcher()
if filePath == "" {
level.Debug(logger).Log("msg", "no path detected for config reload")
engine := &pollingEngine{
filePath: filePath,
logger: logger,
debounce: debounceTime,
reloadFunc: reloadFunc,
}
if err != nil {
return errors.Wrap(err, "creating file watcher")
return engine.start(ctx)
}

// pollingEngine keeps rereading the contents at filePath and when its checksum changes it runs the reloadFunc.
type pollingEngine struct {
filePath string
logger log.Logger
debounce time.Duration
reloadFunc func()
previousChecksum [sha256.Size]byte
}

func (p *pollingEngine) start(ctx context.Context) error {
configReader := func() {
// check if file still exists
if _, err := os.Stat(p.filePath); os.IsNotExist(err) {
level.Error(p.logger).Log("msg", "file does not exist", "error", err)
return
}
file, err := os.ReadFile(p.filePath)
if err != nil {
level.Error(p.logger).Log("msg", "error opening file", "error", err)
return
}
checksum := sha256.Sum256(file)
if checksum == p.previousChecksum {
return
}
p.reloadFunc()
p.previousChecksum = checksum
level.Debug(p.logger).Log("msg", "configuration reloaded", "path", p.filePath)
}
go func() {
var reloadTimer *time.Timer
if debounceTime != 0 {
reloadTimer = time.AfterFunc(debounceTime, func() {
reloadFunc()
level.Debug(logger).Log("msg", "configuration reloaded after debouncing")
})
reloadTimer.Stop()
}
defer watcher.Close()
for {
select {
case <-ctx.Done():
if reloadTimer != nil {
reloadTimer.Stop()
}
return
case event := <-watcher.Events:
// fsnotify sometimes sends a bunch of events without name or operation.
// It's unclear what they are and why they are sent - filter them out.
if event.Name == "" {
break
}
// We are watching the file's parent folder (more details on this is done can be found below), but are
// only interested in changed to the target file. Discard every other file as quickly as possible.
if event.Name != filePath {
break
}
// We only react to files being written or created.
// On chmod or remove we have nothing to do.
// On rename we have the old file name (not useful). A create event for the new file will come later.
if event.Op&fsnotify.Write == 0 && event.Op&fsnotify.Create == 0 {
break
}
level.Debug(logger).Log("msg", fmt.Sprintf("change detected for %s", filePath), "eventName", event.Name, "eventOp", event.Op)
if reloadTimer != nil {
reloadTimer.Reset(debounceTime)
}
case err := <-watcher.Errors:
level.Error(logger).Log("msg", "watcher error", "error", err)
case <-time.After(p.debounce):
configReader()
}
}
}()
// We watch the file's parent folder and not the file itself to better handle DELETE and RENAME events. Check
// https://github.com/fsnotify/fsnotify/issues/214 for more details.
if err := watcher.Add(path.Dir(filePath)); err != nil {
return errors.Wrapf(err, "adding path %s to file watcher", filePath)
}
return nil
}

Expand Down
72 changes: 39 additions & 33 deletions pkg/extkingpin/path_content_reloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,73 +11,67 @@ import (
"testing"
"time"

"github.com/pkg/errors"

"github.com/thanos-io/thanos/pkg/runutil"

"github.com/efficientgo/core/testutil"
"github.com/go-kit/log"
)

func TestPathContentReloader(t *testing.T) {
t.Parallel()
type args struct {
runSteps func(t *testing.T, testFile string, pathContent *staticPathContent)
runSteps func(t *testing.T, testFile string, pathContent *staticPathContent, reloadTime time.Duration)
}
tests := []struct {
name string
args args
wantReloads int
}{
{
name: "Many operations, only rewrite triggers one reload",
name: "Many operations, only rewrite triggers one reload + plus the initial reload",
args: args{
runSteps: func(t *testing.T, testFile string, pathContent *staticPathContent) {
runSteps: func(t *testing.T, testFile string, pathContent *staticPathContent, reloadTime time.Duration) {
testutil.Ok(t, os.Chmod(testFile, 0777))
testutil.Ok(t, os.Remove(testFile))
testutil.Ok(t, pathContent.Rewrite([]byte("test modified")))
},
},
wantReloads: 1,
},
{
name: "Many operations, only rename triggers one reload",
args: args{
runSteps: func(t *testing.T, testFile string, pathContent *staticPathContent) {
testutil.Ok(t, os.Chmod(testFile, 0777))
testutil.Ok(t, os.Rename(testFile, testFile+".tmp"))
testutil.Ok(t, os.Rename(testFile+".tmp", testFile))
},
},
wantReloads: 1,
wantReloads: 2,
},
{
name: "Many operations, two rewrites trigger two reloads",
name: "Many operations, two rewrites trigger two reloads + the initial reload",
args: args{
runSteps: func(t *testing.T, testFile string, pathContent *staticPathContent) {
runSteps: func(t *testing.T, testFile string, pathContent *staticPathContent, reloadTime time.Duration) {
testutil.Ok(t, os.Chmod(testFile, 0777))
testutil.Ok(t, os.Remove(testFile))
testutil.Ok(t, pathContent.Rewrite([]byte("test modified")))
time.Sleep(2 * time.Second)
time.Sleep(2 * reloadTime)
testutil.Ok(t, pathContent.Rewrite([]byte("test modified again")))
},
},
wantReloads: 2,
wantReloads: 3,
},
{
name: "Chmod doesn't trigger reload",
name: "Chmod doesn't trigger reload, we have only the initial reload",
args: args{
runSteps: func(t *testing.T, testFile string, pathContent *staticPathContent) {
runSteps: func(t *testing.T, testFile string, pathContent *staticPathContent, reloadTime time.Duration) {
testutil.Ok(t, os.Chmod(testFile, 0777))
testutil.Ok(t, os.Chmod(testFile, 0666))
testutil.Ok(t, os.Chmod(testFile, 0777))
},
},
wantReloads: 0,
wantReloads: 1,
},
{
name: "Remove doesn't trigger reload",
name: "Remove doesn't trigger reload, we have only the initial reload",
args: args{
runSteps: func(t *testing.T, testFile string, pathContent *staticPathContent) {
runSteps: func(t *testing.T, testFile string, pathContent *staticPathContent, reloadTime time.Duration) {
testutil.Ok(t, os.Remove(testFile))
},
},
wantReloads: 0,
wantReloads: 1,
},
}
for _, tt := range tests {
Expand All @@ -86,28 +80,40 @@ func TestPathContentReloader(t *testing.T) {
t.Parallel()
testFile := path.Join(t.TempDir(), "test")
testutil.Ok(t, os.WriteFile(testFile, []byte("test"), 0666))

pathContent, err := NewStaticPathContent(testFile)
testutil.Ok(t, err)

wg := &sync.WaitGroup{}
wg.Add(tt.wantReloads)

ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
reloadCount := 0
configReloadTime := 500 * time.Millisecond
err = PathContentReloader(ctx, pathContent, log.NewLogfmtLogger(os.Stdout), func() {
reloadCount++
wg.Done()
}, 100*time.Millisecond)
}, configReloadTime)
testutil.Ok(t, err)
// wait for the initial reload
testutil.NotOk(t, runutil.Repeat(configReloadTime, ctx.Done(), func() error {
if reloadCount != 1 {
return nil
}
return errors.New("reload count matched")
}))

tt.args.runSteps(t, testFile, pathContent)
if tt.wantReloads == 0 {
// Give things a little time to sync. The fs watcher events are heavily async and could be delayed.
time.Sleep(1 * time.Second)
}
tt.args.runSteps(t, testFile, pathContent, configReloadTime)
wg.Wait()
testutil.Equals(t, tt.wantReloads, reloadCount)

// wait for final reload
testutil.NotOk(t, runutil.Repeat(2*configReloadTime, ctx.Done(), func() error {
if reloadCount != tt.wantReloads {
return nil
}
return errors.New("reload count matched")
}))
})
}
}
4 changes: 2 additions & 2 deletions pkg/receive/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func newTestHandlerHashring(appendables []*fakeAppendable, replicationFactor uin
}

ag := addrGen{}
limiter, _ := NewLimiter(NewNopConfig(), nil, RouterIngestor, log.NewNopLogger())
limiter, _ := NewLimiter(NewNopConfig(), nil, RouterIngestor, log.NewNopLogger(), 1*time.Second)
for i := range appendables {
h := NewHandler(nil, &Options{
TenantHeader: tenancy.DefaultTenantHeader,
Expand Down Expand Up @@ -741,7 +741,7 @@ func TestReceiveWriteRequestLimits(t *testing.T) {
testutil.Ok(t, os.WriteFile(tmpLimitsPath, tenantConfig, 0666))
limitConfig, _ := extkingpin.NewStaticPathContent(tmpLimitsPath)
handler.Limiter, _ = NewLimiter(
limitConfig, nil, RouterIngestor, log.NewNopLogger(),
limitConfig, nil, RouterIngestor, log.NewNopLogger(), 1*time.Second,
)

wreq := &prompb.WriteRequest{
Expand Down
6 changes: 4 additions & 2 deletions pkg/receive/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type Limiter struct {
configReloadCounter prometheus.Counter
configReloadFailedCounter prometheus.Counter
receiverMode ReceiverMode
configReloadTimer time.Duration
}

// headSeriesLimiter encompasses active/head series limiting logic.
Expand All @@ -55,13 +56,14 @@ type fileContent interface {

// NewLimiter creates a new *Limiter given a configuration and prometheus
// registerer.
func NewLimiter(configFile fileContent, reg prometheus.Registerer, r ReceiverMode, logger log.Logger) (*Limiter, error) {
func NewLimiter(configFile fileContent, reg prometheus.Registerer, r ReceiverMode, logger log.Logger, configReloadTimer time.Duration) (*Limiter, error) {
limiter := &Limiter{
writeGate: gate.NewNoop(),
requestLimiter: &noopRequestLimiter{},
HeadSeriesLimiter: NewNopSeriesLimit(),
logger: logger,
receiverMode: r,
configReloadTimer: configReloadTimer,
}

if reg != nil {
Expand Down Expand Up @@ -116,7 +118,7 @@ func (l *Limiter) StartConfigReloader(ctx context.Context) error {
if reloadCounter := l.configReloadCounter; reloadCounter != nil {
reloadCounter.Inc()
}
}, 1*time.Second)
}, l.configReloadTimer)
}

func (l *Limiter) CanReload() bool {
Expand Down
4 changes: 2 additions & 2 deletions pkg/receive/limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestLimiter_StartConfigReloader(t *testing.T) {
t.Fatalf("could not load test content at %s: %s", invalidLimitsPath, err)
}

limiter, err := NewLimiter(goodLimits, nil, RouterIngestor, log.NewLogfmtLogger(os.Stdout))
limiter, err := NewLimiter(goodLimits, nil, RouterIngestor, log.NewLogfmtLogger(os.Stdout), 1*time.Second)
testutil.Ok(t, err)

ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -88,7 +88,7 @@ func TestLimiter_CanReload(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
configFile := tt.args.configFilePath
limiter, err := NewLimiter(configFile, nil, RouterIngestor, log.NewLogfmtLogger(os.Stdout))
limiter, err := NewLimiter(configFile, nil, RouterIngestor, log.NewLogfmtLogger(os.Stdout), 1*time.Second)
testutil.Ok(t, err)
if tt.wantReload {
testutil.Assert(t, limiter.CanReload())
Expand Down

0 comments on commit b46b835

Please sign in to comment.