Skip to content

Commit

Permalink
[DCA] advanced dispatching + rebalancing (#4068)
Browse files Browse the repository at this point in the history
  • Loading branch information
ahmed-mez authored Sep 27, 2019
1 parent e1f2a00 commit 6d8b6db
Show file tree
Hide file tree
Showing 18 changed files with 2,064 additions and 36 deletions.
24 changes: 22 additions & 2 deletions cmd/cluster-agent/api/v1/clusterchecks.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@ package v1

import (
"encoding/json"
"net"
"net/http"

"github.com/gorilla/mux"

"github.com/DataDog/datadog-agent/pkg/clusteragent"
"github.com/DataDog/datadog-agent/pkg/clusteragent/clusterchecks"
cctypes "github.com/DataDog/datadog-agent/pkg/clusteragent/clusterchecks/types"
"github.com/DataDog/datadog-agent/pkg/util/log"
)

// Install registers v1 API endpoints
Expand Down Expand Up @@ -48,7 +50,14 @@ func postCheckStatus(sc clusteragent.ServerContext) func(w http.ResponseWriter,
return
}

response, err := sc.ClusterCheckHandler.PostStatus(nodeName, status)
clientIP, err := parseClientIP(r.RemoteAddr)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
incrementRequestMetric("postCheckStatus", http.StatusInternalServerError)
return
}

response, err := sc.ClusterCheckHandler.PostStatus(nodeName, clientIP, status)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
incrementRequestMetric("postCheckStatus", http.StatusInternalServerError)
Expand Down Expand Up @@ -146,6 +155,17 @@ func shouldHandle(w http.ResponseWriter, r *http.Request, h *clusterchecks.Handl

// clusterChecksDisabledHandler returns a 404 response when cluster-checks are disabled
func clusterChecksDisabledHandler(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNotFound)
w.WriteHeader(http.StatusPreconditionFailed)
w.Write([]byte("Cluster-checks are not enabled"))
}

// parseClientIP retrieves the http client IP from the remoteAddr attribute
func parseClientIP(remoteAddr string) (string, error) {
clientIP, _, err := net.SplitHostPort(remoteAddr)
if err != nil {
log.Debugf("Error while parsing CLC worker address %s: %v", remoteAddr, err)
return "", err
}

return clientIP, nil
}
56 changes: 56 additions & 0 deletions cmd/cluster-agent/api/v1/clusterchecks_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-2019 Datadog, Inc.

// +build clusterchecks

package v1

import "testing"

func TestParseClientIP(t *testing.T) {
tests := []struct {
name string
args string
expected string
}{
{
name: "valid ipv4",
args: "127.0.0.1:1337",
expected: "127.0.0.1",
},
{
name: "ipv4 no port",
args: "127.0.0.1:",
expected: "127.0.0.1",
},
{
name: "ipv6",
args: "[2001:db8:1f70::999:de8:7648:6e8]:1337",
expected: "2001:db8:1f70::999:de8:7648:6e8",
},
{
name: "valid ipv6 localhost",
args: "[::1]:1337",
expected: "::1",
},
{
name: "ipv6 no port",
args: "[::1]:",
expected: "::1",
},
{
name: "localhost",
args: "localhost:1337",
expected: "localhost",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got, _ := parseClientIP(tt.args); got != tt.expected {
t.Errorf("parseClientIP() == %v, expected %v", got, tt.expected)
}
})
}
}
4 changes: 2 additions & 2 deletions pkg/clusteragent/clusterchecks/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ func (h *Handler) GetConfigs(nodeName string) (types.ConfigResponse, error) {
}

// PostStatus handles status reports from the node agents
func (h *Handler) PostStatus(nodeName string, status types.NodeStatus) (types.StatusResponse, error) {
upToDate, err := h.dispatcher.processNodeStatus(nodeName, status)
func (h *Handler) PostStatus(nodeName, clientIP string, status types.NodeStatus) (types.StatusResponse, error) {
upToDate, err := h.dispatcher.processNodeStatus(nodeName, clientIP, status)
response := types.StatusResponse{
IsUpToDate: upToDate,
}
Expand Down
21 changes: 20 additions & 1 deletion pkg/clusteragent/clusterchecks/dispatcher_configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package clusterchecks
import (
"github.com/DataDog/datadog-agent/pkg/autodiscovery/integration"
"github.com/DataDog/datadog-agent/pkg/clusteragent/clusterchecks/types"
"github.com/DataDog/datadog-agent/pkg/collector/check"
)

// getAllConfigs returns all configurations known to the store, for reporting
Expand Down Expand Up @@ -46,6 +47,9 @@ func (d *dispatcher) addConfig(config integration.Config, targetNodeName string)
// Register config
digest := config.Digest()
d.store.digestToConfig[digest] = config
for _, instance := range config.Instances {
d.store.idToDigest[check.BuildID(config.Name, instance, config.InitConfig)] = digest
}

// No target node specified: store in danglingConfigs
if targetNodeName == "" {
Expand All @@ -55,7 +59,7 @@ func (d *dispatcher) addConfig(config integration.Config, targetNodeName string)
}

currentNode, foundCurrent := d.store.getNodeStore(d.store.digestToNode[digest])
targetNode := d.store.getOrCreateNodeStore(targetNodeName)
targetNode := d.store.getOrCreateNodeStore(targetNodeName, "")

// Dispatch to target node
targetNode.Lock()
Expand Down Expand Up @@ -83,6 +87,12 @@ func (d *dispatcher) removeConfig(digest string) {
delete(d.store.digestToConfig, digest)
delete(d.store.danglingConfigs, digest)

for k, v := range d.store.idToDigest {
if v == digest {
delete(d.store.idToDigest, k)
}
}

// Remove from node configs if assigned
if found {
node.Lock()
Expand Down Expand Up @@ -149,3 +159,12 @@ func (d *dispatcher) patchConfiguration(in integration.Config) (integration.Conf

return out, nil
}

// getConfigAndDigest returns config and digest of a check by checkID
func (d *dispatcher) getConfigAndDigest(checkID string) (integration.Config, string) {
d.store.RLock()
defer d.store.RUnlock()

digest := d.store.idToDigest[check.ID(checkID)]
return d.store.digestToConfig[digest], digest
}
39 changes: 39 additions & 0 deletions pkg/clusteragent/clusterchecks/dispatcher_main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,22 @@ import (
"github.com/DataDog/datadog-agent/pkg/autodiscovery/integration"
"github.com/DataDog/datadog-agent/pkg/config"
"github.com/DataDog/datadog-agent/pkg/status/health"
"github.com/DataDog/datadog-agent/pkg/util/clusteragent"
"github.com/DataDog/datadog-agent/pkg/util/kubernetes/clustername"
"github.com/DataDog/datadog-agent/pkg/util/log"
)

const firstRunnerStatsMinutes = 2 // collect runner stats after the first 2 minutes
const secondRunnerStatsMinutes = 5 // collect runner stats after the first 7 minutes
const finalRunnerStatsMinutes = 10 // collect runner stats endlessly every 10 minutes

// dispatcher holds the management logic for cluster-checks
type dispatcher struct {
store *clusterStore
nodeExpirationSeconds int64
extraTags []string
clcRunnersClient clusteragent.CLCRunnerClientInterface
advancedDispatching bool
}

func newDispatcher() *dispatcher {
Expand All @@ -39,6 +46,17 @@ func newDispatcher() *dispatcher {
d.extraTags = append(d.extraTags, fmt.Sprintf("%s:%s", clusterTagName, clusterTagValue))
}

d.advancedDispatching = config.Datadog.GetBool("cluster_checks.advanced_dispatching_enabled")
if !d.advancedDispatching {
return d
}

var err error
d.clcRunnersClient, err = clusteragent.GetCLCRunnerClient()
if err != nil {
log.Warnf("Cannot create CLC runners client, advanced dispatching will be disabled: %v", err)
d.advancedDispatching = false
}
return d
}

Expand Down Expand Up @@ -146,6 +164,10 @@ func (d *dispatcher) run(ctx context.Context) {
cleanupTicker := time.NewTicker(time.Duration(d.nodeExpirationSeconds/2) * time.Second)
defer cleanupTicker.Stop()

runnerStatsMinutes := firstRunnerStatsMinutes
runnerStatsTicker := time.NewTicker(time.Duration(runnerStatsMinutes) * time.Minute)
defer runnerStatsTicker.Stop()

for {
select {
case <-ctx.Done():
Expand All @@ -161,6 +183,23 @@ func (d *dispatcher) run(ctx context.Context) {
danglingConfs := d.retrieveAndClearDangling()
d.reschedule(danglingConfs)
}
case <-runnerStatsTicker.C:
// Collect stats with an exponential backoff 2 - 5 - 10 minutes
if runnerStatsMinutes == firstRunnerStatsMinutes {
runnerStatsMinutes = secondRunnerStatsMinutes
runnerStatsTicker = time.NewTicker(time.Duration(runnerStatsMinutes) * time.Minute)
} else if runnerStatsMinutes == secondRunnerStatsMinutes {
runnerStatsMinutes = finalRunnerStatsMinutes
runnerStatsTicker = time.NewTicker(time.Duration(runnerStatsMinutes) * time.Minute)
}

// Update runner stats and rebalance if needed
if d.advancedDispatching {
// Collect CLC runners stats and update cache
d.updateRunnersStats()
// Rebalance checks distribution
d.rebalance()
}
}
}
}
53 changes: 48 additions & 5 deletions pkg/clusteragent/clusterchecks/dispatcher_nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"github.com/DataDog/datadog-agent/pkg/util/log"
)

const defaultBusynessValue int = -1

// getNodeConfigs returns configurations dispatched to a given node
func (d *dispatcher) getNodeConfigs(nodeName string) ([]integration.Config, int64, error) {
d.store.RLock()
Expand All @@ -32,14 +34,14 @@ func (d *dispatcher) getNodeConfigs(nodeName string) ([]integration.Config, int6

// processNodeStatus keeps the node's status in the store, and returns true
// if the last configuration change matches the one sent by the node agent.
func (d *dispatcher) processNodeStatus(nodeName string, status types.NodeStatus) (bool, error) {
func (d *dispatcher) processNodeStatus(nodeName, clientIP string, status types.NodeStatus) (bool, error) {
var warmingUp bool

d.store.Lock()
if !d.store.active {
warmingUp = true
}
node := d.store.getOrCreateNodeStore(nodeName)
node := d.store.getOrCreateNodeStore(nodeName, clientIP)
d.store.Unlock()

node.Lock()
Expand Down Expand Up @@ -69,6 +71,7 @@ func (d *dispatcher) processNodeStatus(nodeName string, status types.NodeStatus)
func (d *dispatcher) getLeastBusyNode() string {
var leastBusyNode string
minCheckCount := int(-1)
minBusyness := int(-1)

d.store.RLock()
defer d.store.RUnlock()
Expand All @@ -77,9 +80,20 @@ func (d *dispatcher) getLeastBusyNode() string {
if name == "" {
continue
}
if minCheckCount == -1 || len(store.digestToConfig) < minCheckCount {
leastBusyNode = name
minCheckCount = len(store.digestToConfig)
if d.advancedDispatching && store.busyness > defaultBusynessValue {
// dispatching based on clc runners stats
// only when advancedDispatching is true and
// started collecting busyness values
if minBusyness == -1 || store.busyness < minBusyness {
leastBusyNode = name
minBusyness = store.busyness
}
} else {
// count-based round robin dispatching
if minCheckCount == -1 || len(store.digestToConfig) < minCheckCount {
leastBusyNode = name
minCheckCount = len(store.digestToConfig)
}
}
}
return leastBusyNode
Expand Down Expand Up @@ -122,3 +136,32 @@ func (d *dispatcher) expireNodes() {
log.Warn("No nodes reporting, cluster checks will not run")
}
}

// updateRunnersStats collects stats from the registred
// Cluster Level Check runners and updates the stats cache
func (d *dispatcher) updateRunnersStats() {
if d.clcRunnersClient == nil {
log.Debug("Cluster Level Check runner client was not correctly initialised")
return
}

d.store.Lock()
defer d.store.Unlock()
for name, node := range d.store.nodes {
node.RLock()
ip := node.clientIP
node.RUnlock()

stats, err := d.clcRunnersClient.GetRunnerStats(ip)
if err != nil {
log.Debugf("Cannot get CLC Runner stats with IP %s on node %s: %v", node.clientIP, name, err)
continue
}
node.Lock()
node.clcRunnerStats = stats
log.Tracef("Updated CLC Runner stats on node: %s, node IP: %s, stats: %v", name, node.clientIP, stats)
node.busyness = calculateBusyness(stats)
log.Debugf("Updated busyness on node: %s, node IP: %s, busyness value: %d", name, node.clientIP, node.busyness)
node.Unlock()
}
}
Loading

0 comments on commit 6d8b6db

Please sign in to comment.