Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,16 @@ func main() {
srv.AddCluster(ctx, cluster.Name, cluster.PDAddrs)
}

// Start metrics monitor with dynamic PD address from active cluster
// Start metrics monitor for all clusters
metrics := services.NewMonitor(
srv.GetActivePDAddr,
srv.GetActiveClusterName,
func() []services.ClusterInfo {
clusters := srv.GetAllClusters()
result := make([]services.ClusterInfo, len(clusters))
for i, c := range clusters {
result[i] = services.ClusterInfo{Name: c.Name, PDAddr: c.PDAddr}
}
return result
},
5*time.Second,
cache,
)
Expand Down
23 changes: 23 additions & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,29 @@ func (s *Server) GetActivePDAddr() string {
return ""
}

// ClusterInfo holds basic cluster information for metrics polling
type ClusterInfo struct {
Name string
PDAddr string
}

// GetAllClusters returns info for all clusters (for metrics polling)
func (s *Server) GetAllClusters() []ClusterInfo {
s.mu.RLock()
defer s.mu.RUnlock()

clusters := make([]ClusterInfo, 0, len(s.clusters))
for _, conn := range s.clusters {
if len(conn.PDAddrs) > 0 {
clusters = append(clusters, ClusterInfo{
Name: conn.Name,
PDAddr: conn.PDAddrs[0],
})
}
}
return clusters
}

// Close closes all cluster connections
func (s *Server) Close() {
s.mu.Lock()
Expand Down
83 changes: 43 additions & 40 deletions pkg/services/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,31 +13,34 @@ import (
"github.com/GetStream/tikv-ui/pkg/utils"
)

// ClusterInfo holds basic cluster information for metrics polling
type ClusterInfo struct {
Name string
PDAddr string
}

type Monitor struct {
getPDAddr func() string
getClusterName func() string
interval time.Duration
client *http.Client
cache *utils.Cache
getClusters func() []ClusterInfo
interval time.Duration
client *http.Client
cache *utils.Cache
}

func NewMonitor(getPDAddr func() string, getClusterName func() string, interval time.Duration, cache *utils.Cache) *Monitor {
func NewMonitor(getClusters func() []ClusterInfo, interval time.Duration, cache *utils.Cache) *Monitor {
return &Monitor{
getPDAddr: getPDAddr,
getClusterName: getClusterName,
interval: interval,
cache: cache,
getClusters: getClusters,
interval: interval,
cache: cache,
client: &http.Client{
Timeout: 10 * time.Second,
Timeout: 60 * time.Second,
},
}
}

func (m *Monitor) Start(ctx context.Context) {
ticker := time.NewTicker(m.interval)

m.pollStores(ctx)
m.pollTiKVMetrics(ctx)
m.pollAllClusters(ctx)

go func() {
defer ticker.Stop()
Expand All @@ -47,28 +50,34 @@ func (m *Monitor) Start(ctx context.Context) {
case <-ctx.Done():
return
case <-ticker.C:
go m.pollStores(ctx)
go m.pollTiKVMetrics(ctx)
go m.pollAllClusters(ctx)
}
}
}()
}

func (m *Monitor) pollStores(ctx context.Context) {
pdAddr := m.getPDAddr()
if pdAddr == "" {
log.Printf("pd metrics: no active PD address")
func (m *Monitor) pollAllClusters(ctx context.Context) {
clusters := m.getClusters()
if len(clusters) == 0 {
log.Printf("metrics: no clusters available")
return
}

clusterName := m.getClusterName()
if clusterName == "" {
log.Printf("pd metrics: no active cluster name")
for _, cluster := range clusters {
m.pollStores(ctx, cluster)
m.pollTiKVMetrics(ctx, cluster.Name)
}
}

func (m *Monitor) pollStores(ctx context.Context, cluster ClusterInfo) {
if cluster.PDAddr == "" {
log.Printf("pd metrics [%s]: no PD address", cluster.Name)
return
}
storesURL := pdAddr + "/pd/api/v1/stores"
if !strings.HasPrefix(pdAddr, "http") {
storesURL = "http://" + pdAddr + "/pd/api/v1/stores"

storesURL := cluster.PDAddr + "/pd/api/v1/stores"
if !strings.HasPrefix(cluster.PDAddr, "http") {
storesURL = "http://" + cluster.PDAddr + "/pd/api/v1/stores"
}
req, err := http.NewRequestWithContext(
ctx,
Expand All @@ -77,52 +86,46 @@ func (m *Monitor) pollStores(ctx context.Context) {
nil,
)
if err != nil {
log.Printf("pd metrics: request error: %v", err)
log.Printf("pd metrics [%s]: request error: %v", cluster.Name, err)
return
}

resp, err := m.client.Do(req)
if err != nil {
log.Printf("pd metrics: http error: %v", err)
log.Printf("pd metrics [%s]: http error: %v", cluster.Name, err)
return
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
log.Printf("pd metrics: bad status %d", resp.StatusCode)
log.Printf("pd metrics [%s]: bad status %d", cluster.Name, resp.StatusCode)
return
}

var data types.PDStoresResponse
if err := json.NewDecoder(resp.Body).Decode(&data); err != nil {
log.Printf("pd metrics: decode error: %v", err)
log.Printf("pd metrics [%s]: decode error: %v", cluster.Name, err)
return
}

sort.Slice(data.Stores, func(i, j int) bool {
return data.Stores[i].Store.ID < data.Stores[j].Store.ID
})

m.cache.Set("metrics:"+clusterName, "pd", data)
m.cache.Set("metrics:"+cluster.Name, "pd", data)
}

func (m *Monitor) pollTiKVMetrics(ctx context.Context) {
clusterName := m.getClusterName()
if clusterName == "" {
log.Printf("tikv metrics: no active cluster name")
return
}

func (m *Monitor) pollTiKVMetrics(ctx context.Context, clusterName string) {
// Get stores from cache
cached, ok := m.cache.Get("metrics:"+clusterName, "pd")
if !ok {
log.Printf("tikv metrics: no PD stores in cache for cluster %s", clusterName)
log.Printf("tikv metrics [%s]: no PD stores in cache", clusterName)
return
}

pdData, ok := cached.(types.PDStoresResponse)
if !ok {
log.Printf("tikv metrics: invalid PD cache data")
log.Printf("tikv metrics [%s]: invalid PD cache data", clusterName)
return
}

Expand All @@ -147,7 +150,7 @@ func (m *Monitor) pollTiKVMetrics(ctx context.Context) {
}
newMetrics, err := m.fetchNodeMetrics(ctx, metricsURL, statusAddr)
if err != nil {
log.Printf("tikv metrics: node %s error: %v", statusAddr, err)
log.Printf("tikv metrics [%s]: node %s error: %v", clusterName, statusAddr, err)
continue
}

Expand Down