Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions docs/generated/metrics/metrics.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15554,6 +15554,54 @@ layers:
unit: COUNT
aggregation: AVG
derivative: NONE
- name: mma.store.cpu.capacity
exported_name: mma_store_cpu_capacity
description: Virtual CPU capacity as calculated by MMA based on load and system CPU utilization
y_axis_label: Nanoseconds/Sec
type: GAUGE
unit: NANOSECONDS
aggregation: AVG
derivative: NONE
- name: mma.store.cpu.load
exported_name: mma_store_cpu_load
description: CPU load generated by ranges on this store
y_axis_label: Nanoseconds/Sec
type: GAUGE
unit: NANOSECONDS
aggregation: AVG
derivative: NONE
- name: mma.store.cpu.utilization
exported_name: mma_store_cpu_utilization
description: Ratio of CPU load to capacity expressed as a percentage
y_axis_label: CPU Utilization
type: GAUGE
unit: PERCENT
aggregation: AVG
derivative: NONE
- name: mma.store.disk.capacity
exported_name: mma_store_disk_capacity
description: Virtual disk capacity as calculated by MMA based on load and system disk capacity
y_axis_label: Bytes
type: GAUGE
unit: BYTES
aggregation: AVG
derivative: NONE
- name: mma.store.disk.logical
exported_name: mma_store_disk_logical
description: Number of logical bytes occupied by ranges on this store
y_axis_label: Bytes
type: GAUGE
unit: BYTES
aggregation: AVG
derivative: NONE
- name: mma.store.disk.utilization
exported_name: mma_store_disk_utilization
description: Ratio of disk usage to capacity expressed as a percentage
y_axis_label: Disk Utilization
type: GAUGE
unit: PERCENT
aggregation: AVG
derivative: NONE
- name: node-id
exported_name: node_id
description: node ID with labels for advertised RPC and HTTP addresses
Expand Down
12 changes: 11 additions & 1 deletion pkg/kv/kvserver/allocator/mmaprototype/allocator_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ func NewAllocatorState(ts timeutil.TimeSource, rand *rand.Rand) *allocatorState

type metricsEtc struct {
counters *rangeOperationMetrics
loadAndCapacity *loadAndCapacityMetrics
passMetricsAndLogger *rebalancingPassMetricsAndLogger
metricsRegistered bool
}
Expand Down Expand Up @@ -162,13 +163,15 @@ func (a *allocatorState) ensureMetricsForLocalStoreLocked(
if !ok {
m = &metricsEtc{
counters: makeRangeOperationMetrics(),
loadAndCapacity: makeLoadAndCapacityMetrics(),
passMetricsAndLogger: makeRebalancingPassMetricsAndLogger(localStoreID),
}
a.metricsMap[localStoreID] = m
}
if !m.metricsRegistered {
if registry != nil {
registry.AddMetricStruct(*m.counters)
registry.AddMetricStruct(*m.loadAndCapacity)
registry.AddMetricStruct(m.passMetricsAndLogger.m)
m.metricsRegistered = true
} else if a.shouldLogUnregisteredMetrics() {
Expand Down Expand Up @@ -201,7 +204,14 @@ func (a *allocatorState) SetStore(store StoreAttributesAndLocality) {
func (a *allocatorState) ProcessStoreLoadMsg(ctx context.Context, msg *StoreLoadMsg) {
a.mu.Lock()
defer a.mu.Unlock()
a.cs.processStoreLoadMsg(ctx, msg)

// Pass the metrics struct for this store if they exist in the map.
m, ok := a.metricsMap[msg.StoreID]
if ok {
a.cs.processStoreLoadMsg(ctx, msg, m.loadAndCapacity)
} else {
a.cs.processStoreLoadMsg(ctx, msg, nil)
}
}

// SetDiskUtilThresholds implements the Allocator interface.
Expand Down
22 changes: 21 additions & 1 deletion pkg/kv/kvserver/allocator/mmaprototype/cluster_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -1336,7 +1336,9 @@ func newClusterState(ts timeutil.TimeSource, interner *stringInterner) *clusterS
return cs
}

func (cs *clusterState) processStoreLoadMsg(ctx context.Context, storeMsg *StoreLoadMsg) {
func (cs *clusterState) processStoreLoadMsg(
ctx context.Context, storeMsg *StoreLoadMsg, metrics *loadAndCapacityMetrics,
) {
now := cs.ts.Now()
cs.gcPendingChanges(ctx, now)

Expand Down Expand Up @@ -1370,6 +1372,24 @@ func (cs *clusterState) processStoreLoadMsg(ctx context.Context, storeMsg *Store
ss.adjusted.secondaryLoad = storeMsg.SecondaryLoad
ss.maxFractionPendingIncrease, ss.maxFractionPendingDecrease = 0, 0

// Report the load and capacity that was just applied if we have a valid
// metrics handle.
if metrics != nil {
cpuLoad := int64(storeMsg.Load[CPURate])
cpuCapacity := int64(storeMsg.Capacity[CPURate])
cpuUtil := float64(cpuLoad) * 100 / float64(cpuCapacity)
diskLoad := int64(storeMsg.Load[ByteSize])
diskCapacity := int64(storeMsg.Capacity[ByteSize])
diskUtil := float64(diskLoad) * 100 / float64(diskCapacity)

metrics.CPULoad.Update(cpuLoad)
metrics.CPUCapacity.Update(cpuCapacity)
metrics.CPUUtilization.Update(cpuUtil)
metrics.DiskLoad.Update(diskLoad)
metrics.DiskCapacity.Update(diskCapacity)
metrics.DiskUtilization.Update(diskUtil)
}

// Find any load pending changes for ranges which involve this store, that
// can now be removed from the loadPendingChanges. We don't need to undo the
// corresponding delta adjustment as the reported load already contains the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,7 @@ func TestClusterState(t *testing.T) {
// Consider making it relative to ts.
for line := range strings.Lines(d.Input) {
msg := parseStoreLoadMsg(t, line)
cs.processStoreLoadMsg(context.Background(), &msg)
cs.processStoreLoadMsg(context.Background(), &msg, nil)
}
return ""

Expand Down
66 changes: 66 additions & 0 deletions pkg/kv/kvserver/allocator/mmaprototype/mma_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,72 @@ var (
// away when everything is migrated to MMA and SMA is removed.
)

// loadAndCapacityMetrics contains per-store metrics related to the virtual
// load and capacity calculated at each store that are used by MMA to perform
// allocation. The allocator on each node calculates the load and capacity of
// every store, but we only report metrics for the local stores.
type loadAndCapacityMetrics struct {
// The metrics below populate the CPU load and capacity as seen by MMA.
CPULoad *metric.Gauge
CPUCapacity *metric.Gauge
CPUUtilization *metric.GaugeFloat64

// The metrics below populate the disk load and capacity as seen by MMA.
DiskLoad *metric.Gauge
DiskCapacity *metric.Gauge
DiskUtilization *metric.GaugeFloat64
}

var (
metaCPULoad = metric.Metadata{
Name: "mma.store.cpu.load",
Help: "CPU load generated by ranges on this store",
Measurement: "Nanoseconds/Sec",
Unit: metric.Unit_NANOSECONDS,
}
metaCPUCapacity = metric.Metadata{
Name: "mma.store.cpu.capacity",
Help: "Virtual CPU capacity as calculated by MMA based on load and system CPU utilization",
Measurement: "Nanoseconds/Sec",
Unit: metric.Unit_NANOSECONDS,
}
metaCPUUtilization = metric.Metadata{
Name: "mma.store.cpu.utilization",
Help: "Ratio of CPU load to capacity expressed as a percentage",
Measurement: "CPU Utilization",
Unit: metric.Unit_PERCENT,
}
metaDiskLoad = metric.Metadata{
Name: "mma.store.disk.logical",
Help: "Number of logical bytes occupied by ranges on this store",
Measurement: "Bytes",
Unit: metric.Unit_BYTES,
}
metaDiskCapacity = metric.Metadata{
Name: "mma.store.disk.capacity",
Help: "Virtual disk capacity as calculated by MMA based on load and system disk capacity",
Measurement: "Bytes",
Unit: metric.Unit_BYTES,
}
metaDiskUtilization = metric.Metadata{
Name: "mma.store.disk.utilization",
Help: "Ratio of disk usage to capacity expressed as a percentage",
Measurement: "Disk Utilization",
Unit: metric.Unit_PERCENT,
}
)

func makeLoadAndCapacityMetrics() *loadAndCapacityMetrics {
return &loadAndCapacityMetrics{
CPULoad: metric.NewGauge(metaCPULoad),
CPUCapacity: metric.NewGauge(metaCPUCapacity),
CPUUtilization: metric.NewGaugeFloat64(metaCPUUtilization),
DiskLoad: metric.NewGauge(metaDiskLoad),
DiskCapacity: metric.NewGauge(metaDiskCapacity),
DiskUtilization: metric.NewGaugeFloat64(metaDiskUtilization),
}
}

// overloadKind represents the various time-based overload states for a store.
type overloadKind uint8

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1778,6 +1778,12 @@ mma_overloaded_store_short_dur_failure: mma.overloaded_store.short_dur.failure
mma_overloaded_store_short_dur_success: mma.overloaded_store.short_dur.success
mma_span_config_normalization_error: mma.span_config.normalization.error
mma_span_config_normalization_soft_error: mma.span_config.normalization.soft_error
mma_store_cpu_capacity: mma.store.cpu.capacity
mma_store_cpu_load: mma.store.cpu.load
mma_store_cpu_utilization: mma.store.cpu.utilization
mma_store_disk_capacity: mma.store.disk.capacity
mma_store_disk_logical: mma.store.disk.logical
mma_store_disk_utilization: mma.store.disk.utilization
node_id: node_id
obs_tablemetadata_update_job_duration: obs.tablemetadata.update_job.duration
obs_tablemetadata_update_job_duration_bucket: obs.tablemetadata.update_job.duration.bucket
Expand Down