Skip to content

Add admin page for HA tracker information. #1546

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Aug 14, 2019
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
## master / unreleased

* [FEATURE] Add option to use jump hashing to load balance requests to memcached #1554
* [FEATURE] Add status page for HA tracker to distributors #1546

## 0.1.0 / 2019-08-07

Expand Down
1 change: 1 addition & 0 deletions pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ func (t *Cortex) initDistributor(cfg *Config) (err error) {

t.server.HTTP.HandleFunc("/all_user_stats", t.distributor.AllUserStatsHandler)
t.server.HTTP.Handle("/api/prom/push", t.httpAuthMiddleware.Wrap(http.HandlerFunc(t.distributor.PushHandler)))
t.server.HTTP.Handle("/ha-tracker", t.distributor.Replicas)
return
}

Expand Down
27 changes: 10 additions & 17 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ type Distributor struct {
billingClient *billing.Client

// For handling HA replicas.
replicas *haTracker
Replicas *haTracker

// Per-user rate limiters.
ingestLimitersMtx sync.RWMutex
Expand All @@ -123,7 +123,6 @@ type Config struct {
BillingConfig billing.Config `yaml:"billing,omitempty"`
PoolConfig ingester_client.PoolConfig `yaml:"pool,omitempty"`

EnableHATracker bool `yaml:"enable_ha_tracker,omitempty"`
HATrackerConfig HATrackerConfig `yaml:"ha_tracker,omitempty"`

RemoteTimeout time.Duration `yaml:"remote_timeout,omitempty"`
Expand All @@ -143,7 +142,6 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.HATrackerConfig.RegisterFlags(f)

f.BoolVar(&cfg.EnableBilling, "distributor.enable-billing", false, "Report number of ingested samples to billing system.")
f.BoolVar(&cfg.EnableHATracker, "distributor.ha-tracker.enable", false, "Enable the distributors HA tracker so that it can accept samples from Prometheus HA replicas gracefully (requires labels).")
f.DurationVar(&cfg.RemoteTimeout, "distributor.remote-timeout", 2*time.Second, "Timeout for downstream ingesters.")
f.DurationVar(&cfg.ExtraQueryDelay, "distributor.extra-query-delay", 0, "Time to wait before sending more than the minimum successful query requests.")
f.DurationVar(&cfg.LimiterReloadPeriod, "distributor.limiter-reload-period", 5*time.Minute, "Period at which to reload user ingestion limits.")
Expand All @@ -170,6 +168,11 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
replicationFactor.Set(float64(ring.ReplicationFactor()))
cfg.PoolConfig.RemoteTimeout = cfg.RemoteTimeout

replicas, err := newClusterTracker(cfg.HATrackerConfig)
if err != nil {
return nil, err
}

d := &Distributor{
cfg: cfg,
ring: ring,
Expand All @@ -178,16 +181,8 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
limits: limits,
ingestLimiters: map[string]*rate.Limiter{},
quit: make(chan struct{}),
Replicas: replicas,
}

if cfg.EnableHATracker {
replicas, err := newClusterTracker(cfg.HATrackerConfig)
if err != nil {
return nil, err
}
d.replicas = replicas
}

go d.loop()

return d, nil
Expand Down Expand Up @@ -218,9 +213,7 @@ func (d *Distributor) loop() {
func (d *Distributor) Stop() {
close(d.quit)
d.ingesterPool.Stop()
if d.cfg.EnableHATracker {
d.replicas.stop()
}
d.Replicas.stop()
}

func (d *Distributor) tokenForLabels(userID string, labels []client.LabelAdapter) (uint32, error) {
Expand Down Expand Up @@ -279,7 +272,7 @@ func (d *Distributor) checkSample(ctx context.Context, userID, cluster, replica

// At this point we know we have both HA labels, we should lookup
// the cluster/instance here to see if we want to accept this sample.
err := d.replicas.checkReplica(ctx, userID, cluster, replica)
err := d.Replicas.checkReplica(ctx, userID, cluster, replica)
// checkReplica should only have returned an error if there was a real error talking to Consul, or if the replica labels don't match.
if err != nil { // Don't accept the sample.
return false, err
Expand All @@ -304,7 +297,7 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie
// Count the total samples in, prior to validation or deuplication, for comparison with other metrics.
incomingSamples.WithLabelValues(userID).Add(float64(numSamples))

if d.cfg.EnableHATracker && d.limits.AcceptHASamples(userID) && len(req.Timeseries) > 0 {
if d.limits.AcceptHASamples(userID) && len(req.Timeseries) > 0 {
cluster, replica := findHALabels(d.limits.HAReplicaLabel(userID), d.limits.HAClusterLabel(userID), req.Timeseries[0].Labels)
removeReplica, err = d.checkSample(ctx, userID, cluster, replica)
if err != nil {
Expand Down
32 changes: 23 additions & 9 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ func TestDistributorPushHAInstances(t *testing.T) {
ctx = user.InjectOrgID(context.Background(), "user")

for i, tc := range []struct {
enableTracker bool
acceptedReplica string
testReplica string
cluster string
Expand All @@ -121,6 +122,7 @@ func TestDistributorPushHAInstances(t *testing.T) {
expectedCode int32
}{
{
enableTracker: true,
acceptedReplica: "instance0",
testReplica: "instance0",
cluster: "cluster0",
Expand All @@ -129,32 +131,44 @@ func TestDistributorPushHAInstances(t *testing.T) {
},
// The 202 indicates that we didn't accept this sample.
{
enableTracker: true,
acceptedReplica: "instance2",
testReplica: "instance0",
cluster: "cluster0",
samples: 5,
expectedCode: 202,
},
// If the HA tracker is disabled we should still accept samples that have both labels.
{
enableTracker: false,
acceptedReplica: "instance0",
testReplica: "instance0",
cluster: "cluster0",
samples: 5,
expectedResponse: success,
},
} {
for _, shardByAllLabels := range []bool{true, false} {
t.Run(fmt.Sprintf("[%d](shardByAllLabels=%v)", i, shardByAllLabels), func(t *testing.T) {
d := prepare(t, 1, 1, 0, shardByAllLabels)
d.cfg.EnableHATracker = true
d.limits.Defaults.AcceptHASamples = true
codec := codec.Proto{Factory: ProtoReplicaDescFactory}
mock := kv.PrefixClient(consul.NewInMemoryClient(codec), "prefix")

r, err := newClusterTracker(HATrackerConfig{
KVStore: kv.Config{Mock: mock},
UpdateTimeout: 100 * time.Millisecond,
FailoverTimeout: time.Second,
})
assert.NoError(t, err)
d.replicas = r
if tc.enableTracker {
r, err := newClusterTracker(HATrackerConfig{
EnableHATracker: true,
KVStore: kv.Config{Mock: mock},
UpdateTimeout: 100 * time.Millisecond,
FailoverTimeout: time.Second,
})
assert.NoError(t, err)
d.Replicas = r
}

userID, err := user.ExtractOrgID(ctx)
assert.NoError(t, err)
err = d.replicas.checkReplica(ctx, userID, tc.cluster, tc.acceptedReplica)
err = d.Replicas.checkReplica(ctx, userID, tc.cluster, tc.acceptedReplica)
assert.NoError(t, err)

request := makeWriteRequestHA(tc.samples, tc.testReplica, tc.cluster)
Expand Down
37 changes: 25 additions & 12 deletions pkg/distributor/ha_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,21 +80,26 @@ type haTracker struct {
// HATrackerConfig contains the configuration require to
// create a HA Tracker.
type HATrackerConfig struct {
EnableHATracker bool `yaml:"enable_ha_tracker,omitempty"`
// We should only update the timestamp if the difference
// between the stored timestamp and the time we received a sample at
// is more than this duration.
UpdateTimeout time.Duration
UpdateTimeout time.Duration `yaml:"ha_tracker_update_timeout"`
// We should only failover to accepting samples from a replica
// other than the replica written in the KVStore if the difference
// between the stored timestamp and the time we received a sample is
// more than this duration
FailoverTimeout time.Duration
FailoverTimeout time.Duration `yaml:"ha_tracker_failover_timeout"`

KVStore kv.Config
}

// RegisterFlags adds the flags required to config this to the given FlagSet
// RegisterFlags adds the flags required to config this to the given FlagSet.
func (cfg *HATrackerConfig) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.EnableHATracker,
"distributor.ha-tracker.enable",
false,
"Enable the distributors HA tracker so that it can accept samples from Prometheus HA replicas gracefully (requires labels).")
f.DurationVar(&cfg.UpdateTimeout,
"distributor.ha-tracker.update-timeout",
15*time.Second,
Expand All @@ -112,11 +117,6 @@ func (cfg *HATrackerConfig) RegisterFlags(f *flag.FlagSet) {
func newClusterTracker(cfg HATrackerConfig) (*haTracker, error) {
codec := codec.Proto{Factory: ProtoReplicaDescFactory}

client, err := kv.NewClient(cfg.KVStore, codec)
if err != nil {
return nil, err
}

if cfg.FailoverTimeout <= cfg.UpdateTimeout {
return nil, fmt.Errorf("HA Tracker failover timeout must be greater than update timeout, %d is <= %d", cfg.FailoverTimeout, cfg.UpdateTimeout)
}
Expand All @@ -126,10 +126,17 @@ func newClusterTracker(cfg HATrackerConfig) (*haTracker, error) {
cfg: cfg,
done: make(chan struct{}),
elected: map[string]ReplicaDesc{},
client: client,
cancel: cancel,
}
go t.loop(ctx)

if cfg.EnableHATracker {
client, err := kv.NewClient(cfg.KVStore, codec)
if err != nil {
return nil, err
}
t.client = client
go t.loop(ctx)
}
return &t, nil
}

Expand All @@ -155,8 +162,10 @@ func (c *haTracker) loop(ctx context.Context) {

// Stop ends calls the trackers cancel function, which will end the loop for WatchPrefix.
func (c *haTracker) stop() {
c.cancel()
<-c.done
if c.cfg.EnableHATracker {
c.cancel()
<-c.done
}
}

// CheckReplica checks the cluster and replica against the backing KVStore and local cache in the
Expand All @@ -167,6 +176,10 @@ func (c *haTracker) stop() {
// accepting samples from another replica for the cluster, so that there isn't a bunch of error's returned
// to customers clients.
func (c *haTracker) checkReplica(ctx context.Context, userID, cluster, replica string) error {
// If HA tracking isn't enabled then accept the sample
if !c.cfg.EnableHATracker {
return nil
}
key := fmt.Sprintf("%s/%s", userID, cluster)
now := mtime.Now()
c.electedLock.RLock()
Expand Down
99 changes: 99 additions & 0 deletions pkg/distributor/ha_tracker_http.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package distributor

import (
"html/template"
"net/http"
"sort"
"strings"
"time"

"github.com/prometheus/prometheus/pkg/timestamp"
)

const trackerTpl = `
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>Cortex HA Tracker Status</title>
</head>
<body>
<h1>Cortex HA Tracker Status</h1>
<p>Current time: {{ .Now }}</p>
<table width="100%" border="1">
<thead>
<tr>
<th>User ID</th>
<th>Cluster</th>
<th>Replica</th>
<th>Elected Time</th>
<th>Time Until Update</th>
<th>Time Until Failover</th>
</tr>
</thead>
<tbody>
{{ range .Elected }}
<tr>
<td>{{ .UserID }}</td>
<td>{{ .Cluster }}</td>
<td>{{ .Replica }}</td>
<td>{{ .ElectedAt }}</td>
<td>{{ .UpdateTime }}</td>
<td>{{ .FailoverTime }}</td>
</tr>
{{ end }}
</tbody>
</table>
</body>
</html>`

var trackerTmpl *template.Template

func init() {
trackerTmpl = template.Must(template.New("ha-tracker").Parse(trackerTpl))
}

func (h *haTracker) ServeHTTP(w http.ResponseWriter, req *http.Request) {
h.electedLock.RLock()
type replica struct {
UserID, Cluster, Replica string
ElectedAt time.Time
UpdateTime, FailoverTime time.Duration
}

electedReplicas := []replica{}
for key, desc := range h.elected {
chunks := strings.SplitN(key, "/", 2)

electedReplicas = append(electedReplicas, replica{
UserID: chunks[0],
Cluster: chunks[1],
Replica: desc.Replica,
ElectedAt: timestamp.Time(desc.ReceivedAt),
UpdateTime: time.Until(timestamp.Time(desc.ReceivedAt).Add(h.cfg.UpdateTimeout)),
FailoverTime: time.Until(timestamp.Time(desc.ReceivedAt).Add(h.cfg.FailoverTimeout)),
})
}
h.electedLock.RUnlock()

sort.Slice(electedReplicas, func(i, j int) bool {
first := electedReplicas[i]
second := electedReplicas[j]

if first.UserID != second.UserID {
return first.UserID < second.UserID
}
return first.Cluster < second.Cluster
})

if err := trackerTmpl.Execute(w, struct {
Elected []replica
Now time.Time
}{
Elected: electedReplicas,
Now: time.Now(),
}); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
Loading