Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HCP Telemetry] Periodic Refresh for Dynamic Telemetry Configuration #18168

Merged
merged 42 commits into from
Aug 1, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
00cee32
OTElExporter now uses an EndpointProvider to discover the endpoint
Achooo Jul 18, 2023
77f09b8
OTELSink uses a ConfigProvider to obtain filters and labels configura…
Achooo Jul 18, 2023
ff4d85f
improve tests for otel_sink
Achooo Jul 18, 2023
0264d5b
Regex logic is moved into client for a method on the TelemetryConfig …
Achooo Jul 18, 2023
f12219a
Create a telemetry_config_provider and update deps to use it
Achooo Jul 18, 2023
8b4bf1d
Fix conversion
Achooo Jul 18, 2023
5b4c997
fix import newline
Achooo Jul 18, 2023
7550818
Add logger to hcp client and move telemetry_config out of the client.…
Achooo Jul 25, 2023
3f1c53d
Add a telemetry_config.go to refactor client.go
Achooo Jul 25, 2023
8dce916
Update deps
Achooo Jul 25, 2023
2313673
update hcp deps test
Achooo Jul 25, 2023
196a4ef
Modify telemetry_config_providers
Achooo Jul 25, 2023
3b031a2
Check for nil filters
Achooo Jul 25, 2023
cb869b1
PR review updates
Achooo Jul 26, 2023
38d6bca
Fix comments and move around pieces
Achooo Jul 26, 2023
1feea25
Fix comments
Achooo Jul 26, 2023
0d25ba4
Remove context from client struct
Achooo Jul 26, 2023
a58baa7
Moved ctx out of sink struct and fixed filters, added a test
Achooo Jul 26, 2023
f3be3ec
Remove named imports, use errors.New if not fformatting
Achooo Jul 26, 2023
a353527
Remove HCP dependencies in telemetry package
Achooo Jul 26, 2023
de55bf9
Add success metric and move lock only to grab the t.cfgHahs
Achooo Jul 26, 2023
eacf20e
Update hash
Achooo Jul 27, 2023
4ed2b74
fix nits
Achooo Jul 27, 2023
e3839b3
Create an equals method and add tests
Achooo Jul 27, 2023
b867e30
Improve telemetry_config_provider.go tests
Achooo Jul 27, 2023
d4b788a
Add race test
Achooo Jul 27, 2023
932633e
Add missing godoc
Achooo Jul 27, 2023
696966f
Remove mock for MetricsClient
Achooo Jul 27, 2023
1d58bf6
Avoid goroutine test panics
Achooo Jul 28, 2023
fea50d4
trying to kick CI lint issues by upgrading mod
Achooo Jul 28, 2023
bfebfb1
imprve test code and add hasher for testing
Achooo Jul 28, 2023
54eb748
Use structure logging for filters, fix error constants, and default t…
Achooo Jul 31, 2023
9677781
removed hashin and modify logic to simplify
Achooo Jul 31, 2023
c23798c
Improve race test and fix PR feedback by removing hash equals and avo…
Achooo Jul 31, 2023
235acac
Ran make go-mod-tidy
Achooo Jul 31, 2023
1ef0795
Use errtypes in the test
Achooo Jul 31, 2023
2e10100
Add changelog
Achooo Jul 31, 2023
952129c
add safety check for exporter endpoint
Achooo Jul 31, 2023
b46d5b9
remove require.Contains by using error types, fix structure logging, …
Achooo Aug 1, 2023
7ea6280
Fixed race test to have changing config values
Achooo Aug 1, 2023
d3522ee
Send success metric before modifying config
Achooo Aug 1, 2023
aaec9a6
Avoid the defer and move the success metric under
Achooo Aug 1, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
removed hashin and modify logic to simplify
  • Loading branch information
Achooo committed Jul 31, 2023
commit 96777811a1230792070ddf933d7574238751e4ae
125 changes: 41 additions & 84 deletions agent/hcp/telemetry_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (

"github.com/armon/go-metrics"
"github.com/hashicorp/go-hclog"
"github.com/mitchellh/hashstructure/v2"

"github.com/hashicorp/consul/agent/hcp/client"
"github.com/hashicorp/consul/agent/hcp/telemetry"
Expand All @@ -27,19 +26,6 @@ var (
var _ telemetry.ConfigProvider = &hcpProviderImpl{}
var _ telemetry.EndpointProvider = &hcpProviderImpl{}

// hasher can be implemented to compute the hash of a dynamicConfig.
type hasher interface {
hash(cfg *dynamicConfig) (uint64, error)
}

// hasherImpl uses the hashstructure library to compute dynamicConfig hash.
type hasherImpl struct{}

// hash returns a uint64 hash value for a dynamicConfig for equality comparisons.
func (h *hasherImpl) hash(cfg *dynamicConfig) (uint64, error) {
return hashstructure.Hash(*cfg, hashstructure.FormatV2, nil)
}

// hcpProviderImpl holds telemetry configuration and settings for continuous fetch of new config from HCP.
// it updates configuration, if changes are detected.
type hcpProviderImpl struct {
Expand All @@ -52,9 +38,6 @@ type hcpProviderImpl struct {
rw sync.RWMutex
// hcpClient is an authenticated client used to make HTTP requests to HCP.
hcpClient client.Client
// ticker is a reference to the time ticker that can be reset when refreshInterval changes.
ticker *time.Ticker
hasher hasher
}

// dynamicConfig is a set of configurable settings for metrics collection, processing and export.
Expand All @@ -70,6 +53,7 @@ type dynamicConfig struct {
// NewHCPProvider initializes and starts a HCP Telemetry provider with provided params.
func NewHCPProvider(ctx context.Context, hcpClient client.Client, telemetryCfg *client.TelemetryConfig) (*hcpProviderImpl, error) {
refreshInterval := telemetryCfg.RefreshConfig.RefreshInterval
// refreshInterval must be greater than 0, otherwise time.Ticker panics.
if refreshInterval <= 0 {
return nil, fmt.Errorf("invalid refresh interval: %d", refreshInterval)
}
Expand All @@ -81,52 +65,52 @@ func NewHCPProvider(ctx context.Context, hcpClient client.Client, telemetryCfg *
RefreshInterval: refreshInterval,
}

t := new(hcpClient, cfg)

go t.run(ctx, t.ticker.C)

return t, nil
}

func new(hcpClient client.Client, cfg *dynamicConfig) *hcpProviderImpl {
return &hcpProviderImpl{
t := &hcpProviderImpl{
cfg: cfg,
hcpClient: hcpClient,
hasher: &hasherImpl{},
ticker: time.NewTicker(cfg.RefreshInterval),
}

go t.run(ctx, refreshInterval)

return t, nil
}

// run continously checks for updates to the telemetry configuration by making a request to HCP.
// Modification of config only occurs if changes are detected to decrease write locks that block read locks.
func (t *hcpProviderImpl) run(ctx context.Context, tick <-chan time.Time) {
defer t.ticker.Stop()
func (h *hcpProviderImpl) run(ctx context.Context, refreshInterval time.Duration) {
ticker := time.NewTicker(refreshInterval)
defer ticker.Stop()
for {
select {
case <-tick:
if newCfg, hasChanged := t.checkUpdate(ctx); hasChanged {
t.modifyTelemetryConfig(newCfg)
t.ticker.Reset(newCfg.RefreshInterval)
case <-ticker.C:
if newCfg := h.getUpdate(ctx); newCfg != nil {
ticker.Reset(newCfg.RefreshInterval)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might be worth logging the refresh interval but you decide if it's too noisy.

}
case <-ctx.Done():
return
}
}
}

// checkUpdate makes a HTTP request to HCP to return a new metrics configuration and true, if config changed.
// checkUpdate does not update the metricsConfig field to prevent acquiring the write lock unnecessarily.
func (t *hcpProviderImpl) checkUpdate(ctx context.Context) (*dynamicConfig, bool) {
// getUpdate makes a HTTP request to HCP to return a new metrics configuration
// and updates the hcpProviderImpl.
func (h *hcpProviderImpl) getUpdate(ctx context.Context) *dynamicConfig {
logger := hclog.FromContext(ctx).Named("telemetry_config_provider")

ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()

telemetryCfg, err := t.hcpClient.FetchTelemetryConfig(ctx)
telemetryCfg, err := h.hcpClient.FetchTelemetryConfig(ctx)
if err != nil {
logger.Error("failed to fetch telemetry config from HCP", "error", err)
metrics.IncrCounter(internalMetricRefreshFailure, 1)
return nil, false
return nil
}

// RefreshInterval of 0 or less will cause time.Reset() panic.
if telemetryCfg.RefreshConfig.RefreshInterval <= 0 {
logger.Error("invalid refresh interval")
metrics.IncrCounter(internalMetricRefreshFailure, 1)
return nil
}

newDynamicConfig := &dynamicConfig{
Expand All @@ -136,64 +120,37 @@ func (t *hcpProviderImpl) checkUpdate(ctx context.Context) (*dynamicConfig, bool
RefreshInterval: telemetryCfg.RefreshConfig.RefreshInterval,
}

t.rw.RLock()
defer t.rw.RUnlock()
// Acquire write lock to update new configuration.
h.rw.Lock()
defer h.rw.Unlock()

equal, err := t.equals(newDynamicConfig)
if err != nil {
logger.Error("failed to calculate hash for new config", "error", err)
metrics.IncrCounter(internalMetricRefreshFailure, 1)
return nil, false
}
h.cfg = newDynamicConfig

metrics.IncrCounter(internalMetricRefreshSuccess, 1)

return newDynamicConfig, !equal
}

// modifynewTelemetryConfig acquires a write lock to modify it with a given newTelemetryConfig object.
func (t *hcpProviderImpl) modifyTelemetryConfig(newCfg *dynamicConfig) {
t.rw.Lock()
defer t.rw.Unlock()

t.cfg = newCfg
return newDynamicConfig
}

// GetEndpoint acquires a read lock to return endpoint configuration for consumers.
func (t *hcpProviderImpl) GetEndpoint() *url.URL {
t.rw.RLock()
defer t.rw.RUnlock()
func (h *hcpProviderImpl) GetEndpoint() *url.URL {
h.rw.RLock()
defer h.rw.RUnlock()

return t.cfg.Endpoint
return h.cfg.Endpoint
}

// GetFilters acquires a read lock to return filters configuration for consumers.
func (t *hcpProviderImpl) GetFilters() *regexp.Regexp {
t.rw.RLock()
defer t.rw.RUnlock()
func (h *hcpProviderImpl) GetFilters() *regexp.Regexp {
h.rw.RLock()
defer h.rw.RUnlock()

return t.cfg.Filters
return h.cfg.Filters
}

// GetLabels acquires a read lock to return labels configuration for consumers.
func (t *hcpProviderImpl) GetLabels() map[string]string {
t.rw.RLock()
defer t.rw.RUnlock()

return t.cfg.Labels
}

// equals returns true if the new dynamicConfig is equal to the current config.
func (t *hcpProviderImpl) equals(newCfg *dynamicConfig) (bool, error) {
currHash, err := t.hasher.hash(t.cfg)
if err != nil {
return false, err
}

newHash, err := t.hasher.hash(newCfg)
if err != nil {
return false, err
}
func (h *hcpProviderImpl) GetLabels() map[string]string {
h.rw.RLock()
defer h.rw.RUnlock()

return currHash == newHash, err
return h.cfg.Labels
}
Loading