Skip to content

Commit

Permalink
backend: add prometheus metrics collection to KFP server. (kubeflow#4354
Browse files Browse the repository at this point in the history
)

* enable pagination when expanding experiment in both the home page and the archive page

* Revert "enable pagination when expanding experiment in both the home page and the archive page"

This reverts commit 5b67273.

* prometheus configs; basic metrics in pipeline server to collect
prometheus metrics

* make version consistent
s Please enter the commit message for your changes. Lines starting

* check if we gc workflows

* add prom deps

* upload counts

* remove non-code changes

* more metrics

* upload server metrics guarded by flag

* todo for a flag

* fix test

* fix tests

* fix tests
  • Loading branch information
jingzhang36 authored and Jeffwan committed Dec 9, 2020
1 parent c0f854d commit 4c6a390
Show file tree
Hide file tree
Showing 15 changed files with 536 additions and 100 deletions.
1 change: 1 addition & 0 deletions backend/src/apiserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ go_library(
"@com_github_jinzhu_gorm//dialects/sqlite:go_default_library",
"@com_github_minio_minio_go//:go_default_library",
"@com_github_spf13_viper//:go_default_library",
"@com_github_prometheus_client_golang//prometheus/promhttp:go_default_library",
"@org_golang_google_grpc//:go_default_library",
"@org_golang_google_grpc//reflection:go_default_library",
],
Expand Down
17 changes: 12 additions & 5 deletions backend/src/apiserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,17 @@ import (
"github.com/spf13/viper"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"

"github.com/prometheus/client_golang/prometheus/promhttp"
)

var (
rpcPortFlag = flag.String("rpcPortFlag", ":8887", "RPC Port")
httpPortFlag = flag.String("httpPortFlag", ":8888", "Http Proxy Port")
configPath = flag.String("config", "", "Path to JSON file containing config")
sampleConfigPath = flag.String("sampleconfig", "", "Path to samples")

collectMetricsFlag = flag.Bool("collectMetricsFlag", true, "Whether to collect Prometheus metrics in API server.")
)

type RegisterHttpHandlerFromEndpoint func(ctx context.Context, mux *runtime.ServeMux, endpoint string, opts []grpc.DialOption) error
Expand Down Expand Up @@ -87,10 +91,10 @@ func startRpcServer(resourceManager *resource.ResourceManager) {
glog.Fatalf("Failed to start RPC server: %v", err)
}
s := grpc.NewServer(grpc.UnaryInterceptor(apiServerInterceptor), grpc.MaxRecvMsgSize(math.MaxInt32))
api.RegisterPipelineServiceServer(s, server.NewPipelineServer(resourceManager))
api.RegisterExperimentServiceServer(s, server.NewExperimentServer(resourceManager))
api.RegisterRunServiceServer(s, server.NewRunServer(resourceManager))
api.RegisterJobServiceServer(s, server.NewJobServer(resourceManager))
api.RegisterPipelineServiceServer(s, server.NewPipelineServer(resourceManager, &server.PipelineServerOptions{CollectMetrics: *collectMetricsFlag}))
api.RegisterExperimentServiceServer(s, server.NewExperimentServer(resourceManager, &server.ExperimentServerOptions{CollectMetrics: *collectMetricsFlag}))
api.RegisterRunServiceServer(s, server.NewRunServer(resourceManager, &server.RunServerOptions{CollectMetrics: *collectMetricsFlag}))
api.RegisterJobServiceServer(s, server.NewJobServer(resourceManager, &server.JobServerOptions{CollectMetrics: *collectMetricsFlag}))
api.RegisterReportServiceServer(s, server.NewReportServer(resourceManager))
api.RegisterVisualizationServiceServer(
s,
Expand Down Expand Up @@ -132,7 +136,7 @@ func startHttpProxy(resourceManager *resource.ResourceManager) {
// multipart upload is only supported in HTTP. In long term, we should have gRPC endpoints that
// accept pipeline url for importing.
// https://github.com/grpc-ecosystem/grpc-gateway/issues/410
pipelineUploadServer := server.NewPipelineUploadServer(resourceManager)
pipelineUploadServer := server.NewPipelineUploadServer(resourceManager, &server.PipelineUploadServerOptions{CollectMetrics: *collectMetricsFlag})
topMux.HandleFunc("/apis/v1beta1/pipelines/upload", pipelineUploadServer.UploadPipeline)
topMux.HandleFunc("/apis/v1beta1/pipelines/upload_version", pipelineUploadServer.UploadPipelineVersion)
topMux.HandleFunc("/apis/v1beta1/healthz", func(w http.ResponseWriter, r *http.Request) {
Expand All @@ -141,6 +145,9 @@ func startHttpProxy(resourceManager *resource.ResourceManager) {

topMux.Handle("/apis/", mux)

// Register a handler for Prometheus to poll.
topMux.Handle("/metrics", promhttp.Handler())

http.ListenAndServe(*httpPortFlag, topMux)
glog.Info("Http Proxy started")
}
Expand Down
2 changes: 2 additions & 0 deletions backend/src/apiserver/resource/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ go_library(
"@com_github_cenkalti_backoff//:go_default_library",
"@com_github_golang_glog//:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_prometheus_client_golang//prometheus:go_default_library",
"@com_github_prometheus_client_golang//prometheus/promauto:go_default_library",
"@io_k8s_apimachinery//pkg/api/errors:go_default_library",
"@io_k8s_apimachinery//pkg/apis/meta/v1:go_default_library",
"@io_k8s_apimachinery//pkg/types:go_default_library",
Expand Down
13 changes: 13 additions & 0 deletions backend/src/apiserver/resource/resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import (
scheduledworkflow "github.com/kubeflow/pipelines/backend/src/crd/pkg/apis/scheduledworkflow/v1beta1"
scheduledworkflowclient "github.com/kubeflow/pipelines/backend/src/crd/pkg/client/clientset/versioned/typed/scheduledworkflow/v1beta1"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"k8s.io/apimachinery/pkg/types"
Expand All @@ -45,6 +47,15 @@ const (
DefaultBucketNameEnvVar = "BUCKET_NAME"
)

// Metric variables. Please prefix the metric names with resource_manager_.
var (
// Count the removed workflows due to garbage collection.
workflowGCCounter = promauto.NewCounter(prometheus.CounterOpts{
Name: "resource_manager_workflow_gc",
Help: "The number of gabarage-collected workflows",
})
)

type ClientManagerInterface interface {
ExperimentStore() storage.ExperimentStoreInterface
PipelineStore() storage.PipelineStoreInterface
Expand Down Expand Up @@ -700,6 +711,8 @@ func (r *ResourceManager) ReportWorkflowResource(workflow *util.Workflow) error
if err != nil {
return util.NewInternalServerError(err, "Failed to delete the completed workflow for run %s", runId)
}
// TODO(jingzhang36): find a proper way to pass collectMetricsFlag here.
workflowGCCounter.Inc()
}

if jobId == "" {
Expand Down
3 changes: 3 additions & 0 deletions backend/src/apiserver/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ go_library(
"@com_github_golang_protobuf//jsonpb:go_default_library_gen",
"@com_github_pkg_errors//:go_default_library",
"@com_github_robfig_cron//:go_default_library",
"@com_github_prometheus_client_golang//prometheus:go_default_library",
"@com_github_prometheus_client_golang//prometheus/promauto:go_default_library",
"@com_github_prometheus_client_golang//prometheus/promhttp:go_default_library",
"@com_github_spf13_viper//:go_default_library",
"@com_github_stretchr_testify//assert:go_default_library",
"@io_bazel_rules_go//proto/wkt:empty_go_proto",
Expand Down
84 changes: 82 additions & 2 deletions backend/src/apiserver/server/experiment_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,66 @@ import (
"github.com/kubeflow/pipelines/backend/src/apiserver/resource"
"github.com/kubeflow/pipelines/backend/src/common/util"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

// Metric variables. Please prefix the metric names with experiment_server_.
var (
// Used to calculate the request rate.
createExperimentRequests = promauto.NewCounter(prometheus.CounterOpts{
Name: "experiment_server_create_requests",
Help: "The total number of CreateExperiment requests",
})

getExperimentRequests = promauto.NewCounter(prometheus.CounterOpts{
Name: "experiment_server_get_requests",
Help: "The total number of GetExperiment requests",
})

listExperimentRequests = promauto.NewCounter(prometheus.CounterOpts{
Name: "experiment_server_list_requests",
Help: "The total number of ListExperiments requests",
})

deleteExperimentRequests = promauto.NewCounter(prometheus.CounterOpts{
Name: "experiment_server_delete_requests",
Help: "The total number of DeleteExperiment requests",
})

archiveExperimentRequests = promauto.NewCounter(prometheus.CounterOpts{
Name: "experiment_server_archive_requests",
Help: "The total number of ArchiveExperiment requests",
})

unarchiveExperimentRequests = promauto.NewCounter(prometheus.CounterOpts{
Name: "experiment_server_unarchive_requests",
Help: "The total number of UnarchiveExperiment requests",
})

// TODO(jingzhang36): error count and success count.

experimentCount = promauto.NewGauge(prometheus.GaugeOpts{
Name: "experiment_server_run_count",
Help: "The current number of experiments in Kubeflow Pipelines instance",
})
)

type ExperimentServerOptions struct {
CollectMetrics bool
}

type ExperimentServer struct {
resourceManager *resource.ResourceManager
options *ExperimentServerOptions
}

func (s *ExperimentServer) CreateExperiment(ctx context.Context, request *api.CreateExperimentRequest) (
*api.Experiment, error) {
if s.options.CollectMetrics {
createExperimentRequests.Inc()
}

err := ValidateCreateExperimentRequest(request)
if err != nil {
return nil, util.Wrap(err, "Validate experiment request failed.")
Expand All @@ -32,11 +84,19 @@ func (s *ExperimentServer) CreateExperiment(ctx context.Context, request *api.Cr
if err != nil {
return nil, util.Wrap(err, "Create experiment failed.")
}

if s.options.CollectMetrics {
experimentCount.Inc()
}
return ToApiExperiment(newExperiment), nil
}

func (s *ExperimentServer) GetExperiment(ctx context.Context, request *api.GetExperimentRequest) (
*api.Experiment, error) {
if s.options.CollectMetrics {
getExperimentRequests.Inc()
}

if !common.IsMultiUserSharedReadMode() {
err := s.canAccessExperiment(ctx, request.Id)
if err != nil {
Expand All @@ -53,6 +113,10 @@ func (s *ExperimentServer) GetExperiment(ctx context.Context, request *api.GetEx

func (s *ExperimentServer) ListExperiment(ctx context.Context, request *api.ListExperimentsRequest) (
*api.ListExperimentsResponse, error) {
if s.options.CollectMetrics {
listExperimentRequests.Inc()
}

opts, err := validatedListOptions(&model.Experiment{}, request.PageToken, int(request.PageSize), request.SortBy, request.Filter)

if err != nil {
Expand Down Expand Up @@ -99,6 +163,10 @@ func (s *ExperimentServer) ListExperiment(ctx context.Context, request *api.List
}

func (s *ExperimentServer) DeleteExperiment(ctx context.Context, request *api.DeleteExperimentRequest) (*empty.Empty, error) {
if s.options.CollectMetrics {
deleteExperimentRequests.Inc()
}

err := s.canAccessExperiment(ctx, request.Id)
if err != nil {
return nil, util.Wrap(err, "Failed to authorize the request.")
Expand All @@ -108,6 +176,10 @@ func (s *ExperimentServer) DeleteExperiment(ctx context.Context, request *api.De
if err != nil {
return nil, err
}

if s.options.CollectMetrics {
experimentCount.Dec()
}
return &empty.Empty{}, nil
}

Expand Down Expand Up @@ -156,6 +228,10 @@ func (s *ExperimentServer) canAccessExperiment(ctx context.Context, experimentID
}

func (s *ExperimentServer) ArchiveExperiment(ctx context.Context, request *api.ArchiveExperimentRequest) (*empty.Empty, error) {
if s.options.CollectMetrics {
archiveExperimentRequests.Inc()
}

err := s.canAccessExperiment(ctx, request.Id)
if err != nil {
return nil, util.Wrap(err, "Failed to authorize the requests.")
Expand All @@ -168,6 +244,10 @@ func (s *ExperimentServer) ArchiveExperiment(ctx context.Context, request *api.A
}

func (s *ExperimentServer) UnarchiveExperiment(ctx context.Context, request *api.UnarchiveExperimentRequest) (*empty.Empty, error) {
if s.options.CollectMetrics {
unarchiveExperimentRequests.Inc()
}

err := s.canAccessExperiment(ctx, request.Id)
if err != nil {
return nil, util.Wrap(err, "Failed to authorize the requests.")
Expand All @@ -179,6 +259,6 @@ func (s *ExperimentServer) UnarchiveExperiment(ctx context.Context, request *api
return &empty.Empty{}, nil
}

func NewExperimentServer(resourceManager *resource.ResourceManager) *ExperimentServer {
return &ExperimentServer{resourceManager: resourceManager}
func NewExperimentServer(resourceManager *resource.ResourceManager, options *ExperimentServerOptions) *ExperimentServer {
return &ExperimentServer{resourceManager: resourceManager, options: options}
}
Loading

0 comments on commit 4c6a390

Please sign in to comment.