From 7f9a63dc3e67b87a4c09eaf964488202a9798048 Mon Sep 17 00:00:00 2001 From: VladislavSenkevich Date: Tue, 29 Oct 2024 16:22:56 +0300 Subject: [PATCH] GROUNDWORK-3440: add clusters services --- connectors/databricks/client/client.go | 43 --------- connectors/databricks/client/const.go | 5 +- connectors/databricks/client/get_clusters.go | 44 +++++++++ .../databricks/client/get_jobs_latency.go | 48 ++++++++++ connectors/databricks/client/models.go | 22 ++++- connectors/databricks/databricks.go | 89 +++-------------- connectors/databricks/utils/clusters.go | 57 +++++++++++ connectors/databricks/utils/jobs.go | 96 +++++++++++++++++++ 8 files changed, 281 insertions(+), 123 deletions(-) create mode 100644 connectors/databricks/client/get_clusters.go create mode 100644 connectors/databricks/client/get_jobs_latency.go create mode 100644 connectors/databricks/utils/clusters.go create mode 100644 connectors/databricks/utils/jobs.go diff --git a/connectors/databricks/client/client.go b/connectors/databricks/client/client.go index 5d7b7f3a..a5db21a3 100644 --- a/connectors/databricks/client/client.go +++ b/connectors/databricks/client/client.go @@ -1,9 +1,7 @@ package client import ( - "encoding/json" "fmt" - "time" "github.com/go-resty/resty/v2" ) @@ -17,44 +15,3 @@ func New(url, token string) *DatabricksClient { cl: resty.New().SetBaseURL(url).SetHeader("Authorization", fmt.Sprintf("Bearer %s", token)), } } - -func (d *DatabricksClient) GetJobsLatency(from, to time.Time) (map[int64][]Run, error) { - var ( - result = make(map[int64][]Run) - - hasMore = true - nextPageToken = "" - ) - - for hasMore { - resp, err := d.cl.R().Get( - fmt.Sprintf( - "%s?next_page_token=%s&start_time_from=%d&start_time_to=%d", - defaultJobsRunsListPath, - nextPageToken, - from.UnixMilli(), - to.UnixMilli(), - ), - ) - if err != nil { - return nil, err - } - if resp.StatusCode() != 200 { - return nil, fmt.Errorf("bad status code: %d", resp.StatusCode()) - } - - var runsResp RunsResponse - if err = json.Unmarshal(resp.Body(), &runsResp); err != nil { - return nil, err - } - - for _, run := range runsResp.Runs { - result[run.JobID] = append(result[run.JobID], run) - } - - hasMore = runsResp.HasMore - nextPageToken = runsResp.NextPageToken - } - - return result, nil -} diff --git a/connectors/databricks/client/const.go b/connectors/databricks/client/const.go index f5f89623..09897b5d 100644 --- a/connectors/databricks/client/const.go +++ b/connectors/databricks/client/const.go @@ -1,6 +1,7 @@ package client const ( - defaultJobsRunsListPath = "/api/2.1/jobs/runs/list" - defaultJobRunGetPath = "/api/2.1/jobs/runs/get" + defaultPathJobsRunsList = "/api/2.1/jobs/runs/list" + defaultPathJobsRunsGet = "/api/2.1/jobs/runs/get" + defaultPathClustersList = "/api/2.1/clusters/list" ) diff --git a/connectors/databricks/client/get_clusters.go b/connectors/databricks/client/get_clusters.go new file mode 100644 index 00000000..94661ad1 --- /dev/null +++ b/connectors/databricks/client/get_clusters.go @@ -0,0 +1,44 @@ +package client + +import ( + "encoding/json" + "fmt" +) + +func (d *DatabricksClient) GetClusters() ([]Cluster, error) { + var ( + result = make([]Cluster, 0) + hasMore = true + nextPageToken = "" + ) + + for hasMore { + resp, err := d.cl.R().Get( + fmt.Sprintf( + "%s?next_page_token=%s", + defaultPathClustersList, + nextPageToken, + ), + ) + if err != nil { + return nil, err + } + if resp.StatusCode() != 200 { + return nil, fmt.Errorf("bad status code: %d", resp.StatusCode()) + } + + var clustersResp ClustersResponse + if err = json.Unmarshal(resp.Body(), &clustersResp); err != nil { + return nil, err + } + + if clustersResp.Clusters != nil { + result = append(result, clustersResp.Clusters...) + } + + hasMore = clustersResp.HasMore + nextPageToken = clustersResp.NextPageToken + } + + return result, nil +} diff --git a/connectors/databricks/client/get_jobs_latency.go b/connectors/databricks/client/get_jobs_latency.go new file mode 100644 index 00000000..628e9796 --- /dev/null +++ b/connectors/databricks/client/get_jobs_latency.go @@ -0,0 +1,48 @@ +package client + +import ( + "encoding/json" + "fmt" + "time" +) + +func (d *DatabricksClient) GetJobsLatency(from, to time.Time) (map[int64][]Run, error) { + var ( + result = make(map[int64][]Run) + + hasMore = true + nextPageToken = "" + ) + + for hasMore { + resp, err := d.cl.R().Get( + fmt.Sprintf( + "%s?next_page_token=%s&start_time_from=%d&start_time_to=%d", + defaultPathJobsRunsList, + nextPageToken, + from.UnixMilli(), + to.UnixMilli(), + ), + ) + if err != nil { + return nil, err + } + if resp.StatusCode() != 200 { + return nil, fmt.Errorf("bad status code: %d", resp.StatusCode()) + } + + var runsResp RunsResponse + if err = json.Unmarshal(resp.Body(), &runsResp); err != nil { + return nil, err + } + + for _, run := range runsResp.Runs { + result[run.JobID] = append(result[run.JobID], run) + } + + hasMore = runsResp.HasMore + nextPageToken = runsResp.NextPageToken + } + + return result, nil +} diff --git a/connectors/databricks/client/models.go b/connectors/databricks/client/models.go index 589709cf..0536e16d 100644 --- a/connectors/databricks/client/models.go +++ b/connectors/databricks/client/models.go @@ -2,8 +2,8 @@ package client type RunsResponse struct { Runs []Run `json:"runs"` - NextPageToken string `json:"nextPageToken,omitempty"` - HasMore bool `json:"has_more"` + NextPageToken string `json:"next_page_token,omitempty"` + HasMore bool `json:"has_more,omitempty"` } type Run struct { @@ -22,3 +22,21 @@ type Run struct { StartTime int64 `json:"start_time,omitempty"` EndTime int64 `json:"end_time,omitempty"` } + +type ClustersResponse struct { + Clusters []Cluster `json:"clusters"` + NextPageToken string `json:"next_page_token,omitempty"` + HasMore bool `json:"has_more,omitempty"` +} + +type Cluster struct { + Name string `json:"cluster_name,omitempty"` + State string `json:"state,omitempty"` // PENDING | RUNNING | RESTARTING | RESIZING | TERMINATING | TERMINATED | ERROR | UNKNOWN + StateMessage string `json:"state_message,omitempty"` + DefaultTags string `json:"default_tags,omitempty"` + TerminationReason struct { + Code string `json:"code"` + Type string `json:"type"` + Parameters map[string]interface{} `json:"parameters,omitempty"` + } `json:"termination_reason,omitempty"` +} diff --git a/connectors/databricks/databricks.go b/connectors/databricks/databricks.go index c8db0942..3008c174 100644 --- a/connectors/databricks/databricks.go +++ b/connectors/databricks/databricks.go @@ -2,10 +2,9 @@ package databricks import ( "context" - "fmt" - "slices" "time" + "github.com/gwos/tcg/connectors/databricks/utils" "github.com/rs/zerolog/log" "github.com/gwos/tcg/connectors" @@ -35,7 +34,7 @@ func collectMetrics() { monitoredResources := make([]transit.MonitoredResource, 0) monitoredResourcesRef := make([]transit.ResourceRef, 0) - jobsResource, err := getJobsResources(databricksClient, from, to) + jobsResource, err := utils.GetJobsResource(databricksClient, from, to) if err != nil { log.Error().Err(err). Str("databricks_url", extConfig.DatabricksURL). @@ -44,10 +43,20 @@ func collectMetrics() { return } - monitoredResources = append(monitoredResources, *jobsResource) + clusterResource, err := utils.GetClustersResource(databricksClient) + if err != nil { + log.Error().Err(err). + Str("databricks_url", extConfig.DatabricksURL). + Str("databricks_access_token", extConfig.DatabricksAccessToken). + Msg("failed to get clusters resource") + return + } + + monitoredResources = append(monitoredResources, *jobsResource, *clusterResource) monitoredResourcesRef = append( monitoredResourcesRef, connectors.CreateResourceRef(jobsResource.Name, "", transit.ResourceTypeHost), + connectors.CreateResourceRef(clusterResource.Name, "", transit.ResourceTypeHost), ) lastRunTimeTo = to @@ -64,75 +73,3 @@ func collectMetrics() { log.Error().Err(err).Msg("failed to send metrics") } } - -func getJobsResources(databricksClient *client.DatabricksClient, from time.Time, to time.Time) (*transit.MonitoredResource, error) { - var ( - hostName = "jobs" - currentActiveJobsRuns = make(map[int64]int64) - ) - - jobsRuns, err := databricksClient.GetJobsLatency(from, to) - if err != nil { - return nil, fmt.Errorf("failed to get jobs latencies: %w", err) - } - - var services []transit.MonitoredService - for _, runs := range jobsRuns { - if len(runs) > 0 { - for _, run := range runs { - if !slices.Contains([]string{"QUEUED", "PENDING", "RUNNING", "TERMINATING"}, run.Status.State) { - service0, err := connectors.BuildServiceForMetric(hostName, connectors.MetricBuilder{ - Name: "latency_seconds", - CustomName: "latency_seconds", - Value: 0, - Graphed: true, - StartTimestamp: &transit.Timestamp{Time: time.UnixMilli(run.StartTime).Add(-2 * time.Minute)}, - EndTimestamp: &transit.Timestamp{Time: time.UnixMilli(run.StartTime).Add(-time.Minute)}, - }) - if err != nil { - log.Error().Err(err).Str("job_name", run.RunName).Msg("failed to build service for metric") - continue - } - service0.Name = run.RunName - - metricBuilder := connectors.MetricBuilder{ - Name: "latency_seconds", - CustomName: "latency_seconds", - Value: run.RunDuration / 1000, - Graphed: true, - StartTimestamp: &transit.Timestamp{Time: time.UnixMilli(run.StartTime)}, - EndTimestamp: &transit.Timestamp{Time: time.UnixMilli(run.EndTime)}, - } - service1, err := connectors.BuildServiceForMetric(hostName, metricBuilder) - if err != nil { - log.Error().Err(err).Str("job_name", run.RunName).Msg("failed to build service for metric") - continue - } - service1.Name = run.RunName - - service2, err := connectors.BuildServiceForMetric(hostName, connectors.MetricBuilder{ - Name: "latency_seconds", - CustomName: "latency_seconds", - Value: 0, - Graphed: true, - StartTimestamp: &transit.Timestamp{Time: time.UnixMilli(run.EndTime).Add(time.Minute)}, - EndTimestamp: &transit.Timestamp{Time: time.UnixMilli(run.EndTime).Add(2 * time.Minute)}, - }) - if err != nil { - log.Error().Err(err).Str("job_name", run.RunName).Msg("failed to build service for metric") - continue - } - service2.Name = run.RunName - - services = append(services, *service0, *service1, *service2) - } else { - currentActiveJobsRuns[run.JobID] = run.RunID - } - } - } - } - - activeJobsRuns = currentActiveJobsRuns - - return connectors.CreateResource(hostName, services) -} diff --git a/connectors/databricks/utils/clusters.go b/connectors/databricks/utils/clusters.go new file mode 100644 index 00000000..3e093699 --- /dev/null +++ b/connectors/databricks/utils/clusters.go @@ -0,0 +1,57 @@ +package utils + +import ( + "fmt" + + "github.com/rs/zerolog/log" + + "github.com/gwos/tcg/connectors" + "github.com/gwos/tcg/connectors/databricks/client" + "github.com/gwos/tcg/sdk/transit" +) + +const ( + defaultResourceNameClusters = "Clusters" +) + +func GetClustersResource(databricksClient *client.DatabricksClient) (*transit.MonitoredResource, error) { + clusters, err := databricksClient.GetClusters() + if err != nil { + return nil, fmt.Errorf("failed to get jobs latencies: %w", err) + } + + services := make([]transit.MonitoredService, 0, len(clusters)) + for _, cluster := range clusters { + service, err := connectors.CreateService(cluster.Name, defaultResourceNameClusters) + if err != nil { + log.Error().Err(err).Str("cluster_name", cluster.Name).Msg("failed to create service") + continue + } + switch cluster.State { + case "PENDING", "RESTARTING", "RESIZING": + service.Status = transit.ServicePending + case "TERMINATING", "TERMINATED", "ERROR": + service.Status = transit.ServiceUnscheduledCritical + case "UNKNOWN": + service.Status = transit.ServiceUnknown + default: + service.Status = transit.ServiceOk + } + + if service.Status != transit.ServiceOk { + if cluster.StateMessage != "" { + service.LastPluginOutput = cluster.StateMessage + } else { + service.LastPluginOutput = fmt.Sprintf( + "Termination reason: %s, %s", + cluster.TerminationReason.Code, + cluster.TerminationReason.Type, + ) + } + } + + services = append(services, *service) + } + + return connectors.CreateResource(defaultResourceNameClusters, services) +} diff --git a/connectors/databricks/utils/jobs.go b/connectors/databricks/utils/jobs.go new file mode 100644 index 00000000..478cf93a --- /dev/null +++ b/connectors/databricks/utils/jobs.go @@ -0,0 +1,96 @@ +package utils + +import ( + "fmt" + "slices" + "time" + + "github.com/rs/zerolog/log" + + "github.com/gwos/tcg/connectors" + "github.com/gwos/tcg/connectors/databricks/client" + "github.com/gwos/tcg/sdk/transit" +) + +const ( + defaultResourceNameJobs = "Jobs" +) + +func GetJobsResource(databricksClient *client.DatabricksClient, from time.Time, to time.Time) (*transit.MonitoredResource, error) { + jobsRuns, err := databricksClient.GetJobsLatency(from, to) + if err != nil { + return nil, fmt.Errorf("failed to get jobs latencies: %w", err) + } + + var services []transit.MonitoredService + for _, runs := range jobsRuns { + if len(runs) > 0 { + for _, run := range runs { + output := run.Status.TerminationDetails.Message + status := transit.ServicePending + if run.Status.State == "TERMINATED" { + if run.Status.TerminationDetails.Code != "SUCCESS" { + status = transit.ServiceUnscheduledCritical + } + } + + service0, err := connectors.BuildServiceForMetric(defaultResourceNameJobs, connectors.MetricBuilder{ + Name: "latency_seconds", + CustomName: "latency_seconds", + Value: 0, + Graphed: true, + StartTimestamp: &transit.Timestamp{Time: time.UnixMilli(run.StartTime).Add(-2 * time.Minute)}, + EndTimestamp: &transit.Timestamp{Time: time.UnixMilli(run.StartTime).Add(-time.Minute)}, + }) + if err != nil { + log.Error().Err(err).Str("job_name", run.RunName).Msg("failed to build service for metric") + continue + } + service0.Name = run.RunName + service0.Status = status + service0.LastPluginOutput = output + + metricBuilder := connectors.MetricBuilder{ + Name: "latency_seconds", + CustomName: "latency_seconds", + Value: run.RunDuration / 1000, + Graphed: true, + StartTimestamp: &transit.Timestamp{Time: time.UnixMilli(run.StartTime)}, + EndTimestamp: &transit.Timestamp{Time: time.UnixMilli(run.EndTime)}, + } + service1, err := connectors.BuildServiceForMetric(defaultResourceNameJobs, metricBuilder) + if err != nil { + log.Error().Err(err).Str("job_name", run.RunName).Msg("failed to build service for metric") + continue + } + service1.Name = run.RunName + service1.Status = status + service1.LastPluginOutput = output + + services = append(services, *service0, *service1) + + if !slices.Contains([]string{"QUEUED", "PENDING", "RUNNING", "TERMINATING"}, run.Status.State) { + service2, err := connectors.BuildServiceForMetric(defaultResourceNameJobs, connectors.MetricBuilder{ + Name: "latency_seconds", + CustomName: "latency_seconds", + Value: 0, + Graphed: true, + StartTimestamp: &transit.Timestamp{Time: time.UnixMilli(run.EndTime).Add(time.Minute)}, + EndTimestamp: &transit.Timestamp{Time: time.UnixMilli(run.EndTime).Add(2 * time.Minute)}, + }) + if err != nil { + log.Error().Err(err).Str("job_name", run.RunName).Msg("failed to build service for metric") + continue + } + service2.Name = run.RunName + service2.Status = status + service0.LastPluginOutput = output + + services = append(services, *service2) + } + } + } + } + + return connectors.CreateResource(defaultResourceNameJobs, services) +}