Skip to content

Commit

Permalink
GROUNDWORK-3440: add clusters services
Browse files Browse the repository at this point in the history
  • Loading branch information
VladislavSenkevich committed Oct 29, 2024
1 parent c1df245 commit 7f9a63d
Show file tree
Hide file tree
Showing 8 changed files with 281 additions and 123 deletions.
43 changes: 0 additions & 43 deletions connectors/databricks/client/client.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package client

import (
"encoding/json"
"fmt"
"time"

"github.com/go-resty/resty/v2"
)
Expand All @@ -17,44 +15,3 @@ func New(url, token string) *DatabricksClient {
cl: resty.New().SetBaseURL(url).SetHeader("Authorization", fmt.Sprintf("Bearer %s", token)),
}
}

func (d *DatabricksClient) GetJobsLatency(from, to time.Time) (map[int64][]Run, error) {
var (
result = make(map[int64][]Run)

hasMore = true
nextPageToken = ""
)

for hasMore {
resp, err := d.cl.R().Get(
fmt.Sprintf(
"%s?next_page_token=%s&start_time_from=%d&start_time_to=%d",
defaultJobsRunsListPath,
nextPageToken,
from.UnixMilli(),
to.UnixMilli(),
),
)
if err != nil {
return nil, err
}
if resp.StatusCode() != 200 {
return nil, fmt.Errorf("bad status code: %d", resp.StatusCode())
}

var runsResp RunsResponse
if err = json.Unmarshal(resp.Body(), &runsResp); err != nil {
return nil, err
}

for _, run := range runsResp.Runs {
result[run.JobID] = append(result[run.JobID], run)
}

hasMore = runsResp.HasMore
nextPageToken = runsResp.NextPageToken
}

return result, nil
}
5 changes: 3 additions & 2 deletions connectors/databricks/client/const.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package client

const (
defaultJobsRunsListPath = "/api/2.1/jobs/runs/list"
defaultJobRunGetPath = "/api/2.1/jobs/runs/get"
defaultPathJobsRunsList = "/api/2.1/jobs/runs/list"
defaultPathJobsRunsGet = "/api/2.1/jobs/runs/get"
defaultPathClustersList = "/api/2.1/clusters/list"
)
44 changes: 44 additions & 0 deletions connectors/databricks/client/get_clusters.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package client

import (
"encoding/json"
"fmt"
)

func (d *DatabricksClient) GetClusters() ([]Cluster, error) {
var (
result = make([]Cluster, 0)
hasMore = true
nextPageToken = ""
)

for hasMore {
resp, err := d.cl.R().Get(
fmt.Sprintf(
"%s?next_page_token=%s",
defaultPathClustersList,
nextPageToken,
),
)
if err != nil {
return nil, err
}
if resp.StatusCode() != 200 {
return nil, fmt.Errorf("bad status code: %d", resp.StatusCode())
}

var clustersResp ClustersResponse
if err = json.Unmarshal(resp.Body(), &clustersResp); err != nil {
return nil, err
}

if clustersResp.Clusters != nil {
result = append(result, clustersResp.Clusters...)
}

hasMore = clustersResp.HasMore
nextPageToken = clustersResp.NextPageToken
}

return result, nil
}
48 changes: 48 additions & 0 deletions connectors/databricks/client/get_jobs_latency.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package client

import (
"encoding/json"
"fmt"
"time"
)

func (d *DatabricksClient) GetJobsLatency(from, to time.Time) (map[int64][]Run, error) {
var (
result = make(map[int64][]Run)

hasMore = true
nextPageToken = ""
)

for hasMore {
resp, err := d.cl.R().Get(
fmt.Sprintf(
"%s?next_page_token=%s&start_time_from=%d&start_time_to=%d",
defaultPathJobsRunsList,
nextPageToken,
from.UnixMilli(),
to.UnixMilli(),
),
)
if err != nil {
return nil, err
}
if resp.StatusCode() != 200 {
return nil, fmt.Errorf("bad status code: %d", resp.StatusCode())
}

var runsResp RunsResponse
if err = json.Unmarshal(resp.Body(), &runsResp); err != nil {
return nil, err
}

for _, run := range runsResp.Runs {
result[run.JobID] = append(result[run.JobID], run)
}

hasMore = runsResp.HasMore
nextPageToken = runsResp.NextPageToken
}

return result, nil
}
22 changes: 20 additions & 2 deletions connectors/databricks/client/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package client

type RunsResponse struct {
Runs []Run `json:"runs"`
NextPageToken string `json:"nextPageToken,omitempty"`
HasMore bool `json:"has_more"`
NextPageToken string `json:"next_page_token,omitempty"`
HasMore bool `json:"has_more,omitempty"`
}

type Run struct {
Expand All @@ -22,3 +22,21 @@ type Run struct {
StartTime int64 `json:"start_time,omitempty"`
EndTime int64 `json:"end_time,omitempty"`
}

type ClustersResponse struct {
Clusters []Cluster `json:"clusters"`
NextPageToken string `json:"next_page_token,omitempty"`
HasMore bool `json:"has_more,omitempty"`
}

type Cluster struct {
Name string `json:"cluster_name,omitempty"`
State string `json:"state,omitempty"` // PENDING | RUNNING | RESTARTING | RESIZING | TERMINATING | TERMINATED | ERROR | UNKNOWN
StateMessage string `json:"state_message,omitempty"`
DefaultTags string `json:"default_tags,omitempty"`
TerminationReason struct {
Code string `json:"code"`
Type string `json:"type"`
Parameters map[string]interface{} `json:"parameters,omitempty"`
} `json:"termination_reason,omitempty"`
}
89 changes: 13 additions & 76 deletions connectors/databricks/databricks.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@ package databricks

import (
"context"
"fmt"
"slices"
"time"

"github.com/gwos/tcg/connectors/databricks/utils"
"github.com/rs/zerolog/log"

"github.com/gwos/tcg/connectors"
Expand Down Expand Up @@ -35,7 +34,7 @@ func collectMetrics() {
monitoredResources := make([]transit.MonitoredResource, 0)
monitoredResourcesRef := make([]transit.ResourceRef, 0)

jobsResource, err := getJobsResources(databricksClient, from, to)
jobsResource, err := utils.GetJobsResource(databricksClient, from, to)
if err != nil {
log.Error().Err(err).
Str("databricks_url", extConfig.DatabricksURL).
Expand All @@ -44,10 +43,20 @@ func collectMetrics() {
return
}

monitoredResources = append(monitoredResources, *jobsResource)
clusterResource, err := utils.GetClustersResource(databricksClient)
if err != nil {
log.Error().Err(err).
Str("databricks_url", extConfig.DatabricksURL).
Str("databricks_access_token", extConfig.DatabricksAccessToken).
Msg("failed to get clusters resource")
return
}

monitoredResources = append(monitoredResources, *jobsResource, *clusterResource)
monitoredResourcesRef = append(
monitoredResourcesRef,
connectors.CreateResourceRef(jobsResource.Name, "", transit.ResourceTypeHost),
connectors.CreateResourceRef(clusterResource.Name, "", transit.ResourceTypeHost),
)

lastRunTimeTo = to
Expand All @@ -64,75 +73,3 @@ func collectMetrics() {
log.Error().Err(err).Msg("failed to send metrics")
}
}

func getJobsResources(databricksClient *client.DatabricksClient, from time.Time, to time.Time) (*transit.MonitoredResource, error) {
var (
hostName = "jobs"
currentActiveJobsRuns = make(map[int64]int64)
)

jobsRuns, err := databricksClient.GetJobsLatency(from, to)
if err != nil {
return nil, fmt.Errorf("failed to get jobs latencies: %w", err)
}

var services []transit.MonitoredService
for _, runs := range jobsRuns {
if len(runs) > 0 {
for _, run := range runs {
if !slices.Contains([]string{"QUEUED", "PENDING", "RUNNING", "TERMINATING"}, run.Status.State) {
service0, err := connectors.BuildServiceForMetric(hostName, connectors.MetricBuilder{
Name: "latency_seconds",
CustomName: "latency_seconds",
Value: 0,
Graphed: true,
StartTimestamp: &transit.Timestamp{Time: time.UnixMilli(run.StartTime).Add(-2 * time.Minute)},
EndTimestamp: &transit.Timestamp{Time: time.UnixMilli(run.StartTime).Add(-time.Minute)},
})
if err != nil {
log.Error().Err(err).Str("job_name", run.RunName).Msg("failed to build service for metric")
continue
}
service0.Name = run.RunName

metricBuilder := connectors.MetricBuilder{
Name: "latency_seconds",
CustomName: "latency_seconds",
Value: run.RunDuration / 1000,
Graphed: true,
StartTimestamp: &transit.Timestamp{Time: time.UnixMilli(run.StartTime)},
EndTimestamp: &transit.Timestamp{Time: time.UnixMilli(run.EndTime)},
}
service1, err := connectors.BuildServiceForMetric(hostName, metricBuilder)
if err != nil {
log.Error().Err(err).Str("job_name", run.RunName).Msg("failed to build service for metric")
continue
}
service1.Name = run.RunName

service2, err := connectors.BuildServiceForMetric(hostName, connectors.MetricBuilder{
Name: "latency_seconds",
CustomName: "latency_seconds",
Value: 0,
Graphed: true,
StartTimestamp: &transit.Timestamp{Time: time.UnixMilli(run.EndTime).Add(time.Minute)},
EndTimestamp: &transit.Timestamp{Time: time.UnixMilli(run.EndTime).Add(2 * time.Minute)},
})
if err != nil {
log.Error().Err(err).Str("job_name", run.RunName).Msg("failed to build service for metric")
continue
}
service2.Name = run.RunName

services = append(services, *service0, *service1, *service2)
} else {
currentActiveJobsRuns[run.JobID] = run.RunID
}
}
}
}

activeJobsRuns = currentActiveJobsRuns

return connectors.CreateResource(hostName, services)
}
57 changes: 57 additions & 0 deletions connectors/databricks/utils/clusters.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package utils

import (
"fmt"

"github.com/rs/zerolog/log"

"github.com/gwos/tcg/connectors"
"github.com/gwos/tcg/connectors/databricks/client"
"github.com/gwos/tcg/sdk/transit"
)

const (
defaultResourceNameClusters = "Clusters"
)

func GetClustersResource(databricksClient *client.DatabricksClient) (*transit.MonitoredResource, error) {
clusters, err := databricksClient.GetClusters()
if err != nil {
return nil, fmt.Errorf("failed to get jobs latencies: %w", err)
}

services := make([]transit.MonitoredService, 0, len(clusters))
for _, cluster := range clusters {
service, err := connectors.CreateService(cluster.Name, defaultResourceNameClusters)
if err != nil {
log.Error().Err(err).Str("cluster_name", cluster.Name).Msg("failed to create service")
continue
}
switch cluster.State {
case "PENDING", "RESTARTING", "RESIZING":
service.Status = transit.ServicePending
case "TERMINATING", "TERMINATED", "ERROR":
service.Status = transit.ServiceUnscheduledCritical
case "UNKNOWN":
service.Status = transit.ServiceUnknown
default:
service.Status = transit.ServiceOk
}

if service.Status != transit.ServiceOk {
if cluster.StateMessage != "" {
service.LastPluginOutput = cluster.StateMessage
} else {
service.LastPluginOutput = fmt.Sprintf(
"Termination reason: %s, %s",
cluster.TerminationReason.Code,
cluster.TerminationReason.Type,
)
}
}

services = append(services, *service)
}

return connectors.CreateResource(defaultResourceNameClusters, services)
}
Loading

0 comments on commit 7f9a63d

Please sign in to comment.