Skip to content

Commit

Permalink
Send metrics from metrics service to BigQuery
Browse files Browse the repository at this point in the history
Summary:
This adds support for the metrics service on the cloud side to listen
to messages over nats and send metrics to BigQuery.
To keep the table schema sane and this easy to use, the extra prometheus metrics
labels are just converted into a JSON encoded string and packed into a single
column.

Test Plan:
Published messages to nats directly with the expected wire encoded
protos. Ensured that the bigquery tables got created and populated.

Reviewers: michelle, zasgar, nserrino, #third_party_approvers

Reviewed By: michelle, nserrino, #third_party_approvers

JIRA Issues: PP-3115

Signed-off-by: Vihang Mehta <vihang@pixielabs.ai>

Differential Revision: https://phab.corp.pixielabs.ai/D10858

GitOrigin-RevId: d61ee2e
  • Loading branch information
vihangm authored and copybaranaut committed Apr 18, 2022
1 parent 2b7a029 commit 3c8df5b
Show file tree
Hide file tree
Showing 13 changed files with 218 additions and 9 deletions.
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ require (
github.com/ory/kratos-client-go v0.5.4-alpha.1
github.com/phayes/freeport v0.0.0-20171002181615-b8543db493a5
github.com/prometheus/client_golang v1.11.0
github.com/prometheus/common v0.26.0
github.com/prometheus/prometheus v2.5.0+incompatible
github.com/rivo/tview v0.0.0-20200404204604-ca37f83cb2e7
github.com/rivo/uniseg v0.1.0
github.com/sahilm/fuzzy v0.1.0
Expand Down Expand Up @@ -175,6 +177,7 @@ require (
github.com/googleapis/gnostic v0.5.1 // indirect
github.com/gorilla/securecookie v1.1.1 // indirect
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
github.com/hashicorp/go-hclog v1.1.0 // indirect
github.com/hashicorp/go-immutable-radix v1.0.0 // indirect
github.com/hashicorp/go-msgpack v1.1.5 // indirect
Expand Down Expand Up @@ -232,7 +235,6 @@ require (
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.26.0 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
github.com/russross/blackfriday v1.5.2 // indirect
github.com/satori/go.uuid v1.2.0 // indirect
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -657,6 +657,7 @@ github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 h1:+9834+KizmvFV7pXQGSXQTsaW
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0/go.mod h1:z0ButlSOZa5vEBq9m2m2hlwIgKw+rp3sdCBRoJY+30Y=
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk=
github.com/grpc-ecosystem/grpc-gateway v1.9.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY=
github.com/grpc-ecosystem/grpc-gateway v1.16.0 h1:gmcG1KaJ57LophUzW0Hy8NmPhnMZb4M0+kPpLofRdBo=
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
github.com/hashicorp/consul/api v1.1.0/go.mod h1:VmuI/Lkw1nC05EYQWNKwWGbkg+FbDBtguAZLlVdkD9Q=
github.com/hashicorp/consul/sdk v0.1.1/go.mod h1:VKf9jXwCTEY1QZP2MOLRhb5i/I/ssyNV1vwHyQBF0x8=
Expand Down Expand Up @@ -1075,6 +1076,8 @@ github.com/prometheus/procfs v0.2.0/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4O
github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/prometheus/procfs v0.7.3 h1:4jVXhlkAyzOScmCkXBTOLRLTz8EeU+eyjrwB/EPq0VU=
github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/prometheus/prometheus v2.5.0+incompatible h1:7QPitgO2kOFG8ecuRn9O/4L9+10He72rVRJvMXrE9Hg=
github.com/prometheus/prometheus v2.5.0+incompatible/go.mod h1:oAIUtOny2rjMX0OWN5vPR5/q/twIROJvdqnQKDdil/s=
github.com/rivo/tview v0.0.0-20200404204604-ca37f83cb2e7 h1:Jfm2O5tRzzHt5LeM9F4AuwcNGxCH7erPl8GeVOzJKd0=
github.com/rivo/tview v0.0.0-20200404204604-ca37f83cb2e7/go.mod h1:6lkG1x+13OShEf0EaOCaTQYyB7d5nSbb181KtjlS+84=
github.com/rivo/uniseg v0.1.0 h1:+2KBaVoUmb9XzDsrx/Ct0W/EYOSFf/nWTauy++DprtY=
Expand Down
9 changes: 9 additions & 0 deletions go_deps.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -1712,6 +1712,7 @@ def pl_go_dependencies():

go_repository(
name = "com_github_grpc_ecosystem_grpc_gateway",
build_naming_convention = "go_default_library",
importpath = "github.com/grpc-ecosystem/grpc-gateway",
sum = "h1:gmcG1KaJ57LophUzW0Hy8NmPhnMZb4M0+kPpLofRdBo=",
version = "v1.16.0",
Expand Down Expand Up @@ -2989,6 +2990,14 @@ def pl_go_dependencies():
version = "v0.7.3",
)

go_repository(
name = "com_github_prometheus_prometheus",
build_file_proto_mode = "disable",
importpath = "github.com/prometheus/prometheus",
sum = "h1:7QPitgO2kOFG8ecuRn9O/4L9+10He72rVRJvMXrE9Hg=",
version = "v2.5.0+incompatible",
)

go_repository(
name = "com_github_puerkitobio_goquery",
importpath = "github.com/PuerkitoBio/goquery",
Expand Down
1 change: 1 addition & 0 deletions k8s/cloud/base/bq_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ metadata:
name: pl-bq-config
data:
PL_BQ_PROJECT: ""
PL_BQ_DATASET: ""
PL_BQ_SA_KEY_PATH: ""
9 changes: 9 additions & 0 deletions k8s/cloud/dev/bq_config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
apiVersion: v1
kind: ConfigMap
metadata:
name: pl-bq-config
data:
PL_BQ_PROJECT: "pl-pixies"
PL_BQ_DATASET: "pixie_viziers_dev"
PL_BQ_SA_KEY_PATH: "/creds/bq.client.default.credentials_file"
2 changes: 2 additions & 0 deletions k8s/cloud/dev/kustomization.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ resources:
- ../base
- ../overlays/exposed_services_ilb
patchesStrategicMerge:
# bq_config is useful for testing, but we don't want dev clusters to typically send data to bq.
# - bq_config.yaml
- db_config.yaml
- ory_service_config.yaml
- proxy_envoy.yaml
Expand Down
1 change: 1 addition & 0 deletions k8s/cloud/prod/bq_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ metadata:
name: pl-bq-config
data:
PL_BQ_PROJECT: "pixie-prod"
PL_BQ_DATASET: "pixie_viziers_prod"
PL_BQ_SA_KEY_PATH: "/creds/bq.client.default.credentials_file"
1 change: 1 addition & 0 deletions k8s/cloud/staging/bq_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ metadata:
name: pl-bq-config
data:
PL_BQ_PROJECT: "pixie-prod"
PL_BQ_DATASET: "pixie_viziers_staging"
PL_BQ_SA_KEY_PATH: "/creds/bq.client.default.credentials_file"
1 change: 1 addition & 0 deletions k8s/cloud/testing/bq_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ metadata:
name: pl-bq-config
data:
PL_BQ_PROJECT: "pl-pixies"
PL_BQ_DATASET: "pixie_viziers_testing"
PL_BQ_SA_KEY_PATH: "/creds/bq.client.default.credentials_file"
1 change: 1 addition & 0 deletions src/cloud/metrics/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ go_library(
"@com_github_spf13_pflag//:pflag",
"@com_github_spf13_viper//:viper",
"@com_google_cloud_go_bigquery//:bigquery",
"@org_golang_google_api//googleapi",
"@org_golang_google_api//option",
],
)
7 changes: 7 additions & 0 deletions src/cloud/metrics/controllers/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,14 @@ go_library(
importpath = "px.dev/pixie/src/cloud/metrics/controllers",
visibility = ["//src/cloud:__subpackages__"],
deps = [
"//src/cloud/shared/vzshard",
"//src/shared/cvmsgspb:cvmsgs_pl_go_proto",
"@com_github_gogo_protobuf//proto",
"@com_github_gogo_protobuf//types",
"@com_github_nats_io_nats_go//:nats_go",
"@com_github_prometheus_common//model",
"@com_github_prometheus_prometheus//prompb",
"@com_github_sirupsen_logrus//:logrus",
"@com_google_cloud_go_bigquery//:bigquery",
],
)
162 changes: 156 additions & 6 deletions src/cloud/metrics/controllers/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,169 @@
package controllers

import (
"context"
"encoding/json"
"fmt"
"math"
"sync"
"time"

"cloud.google.com/go/bigquery"
"github.com/gogo/protobuf/proto"
"github.com/gogo/protobuf/types"
"github.com/nats-io/nats.go"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/prompb"
log "github.com/sirupsen/logrus"

"px.dev/pixie/src/cloud/shared/vzshard"
"px.dev/pixie/src/shared/cvmsgspb"
)

// Server defines an gRPC server type.
// The topic on which to listen to metrics sent by viziers.
const vzMetricsTopic = "VZMetrics"

// The table where these metrics are written.
const bqMetricsTable = "vizier_metrics"

// Row represents a bq row.
type Row struct {
Metric string `bigquery:"metric"`
// Labels is a JSON encoded representation of the various labels.
Labels string `bigquery:"labels"`
Value float64 `bigquery:"value"`
Timestamp time.Time `bigquery:"timestamp"`
}

// Server defines an metrics server type.
type Server struct {
nc *nats.Conn
bqClient *bigquery.Client
nc *nats.Conn
bqDataset *bigquery.Dataset

done chan struct{}
once sync.Once
}

// NewServer creates GRPC handlers.
func NewServer(nc *nats.Conn, bqClient *bigquery.Client) *Server {
// NewServer creates a server.
func NewServer(nc *nats.Conn, bqDataset *bigquery.Dataset) *Server {
return &Server{
nc, bqClient,
nc: nc,
bqDataset: bqDataset,

done: make(chan struct{}),
}
}

// Start sets up the listeners starts handling messages.
func (s *Server) Start() {
table, err := s.createOrGetBQTable()
if err != nil {
log.WithError(err).Fatal("Failed to get table from BigQuery")
}

for _, shard := range vzshard.GenerateShardRange() {
s.startShardedHandler(shard, table)
}
}

func (s *Server) createOrGetBQTable() (*bigquery.Table, error) {
table := s.bqDataset.Table(bqMetricsTable)

// Check if the table already exists, if so, just return.
_, err := table.Metadata(context.Background())
if err == nil {
return table, nil
}

// Table needs to be created.
schema, err := bigquery.InferSchema(Row{})
if err != nil {
return nil, err
}
err = table.Create(context.Background(), &bigquery.TableMetadata{
Schema: schema,
})
if err != nil {
return nil, err
}
return table, nil
}

func (s *Server) startShardedHandler(shard string, table *bigquery.Table) {
if s.nc == nil {
return
}
natsCh := make(chan *nats.Msg, 8192)
sub, err := s.nc.ChanSubscribe(fmt.Sprintf("v2c.%s.*.%s", shard, vzMetricsTopic), natsCh)
if err != nil {
log.WithError(err).Fatal("Failed to subscribe to NATS channel")
}

go func() {
for {
select {
case <-s.done:
sub.Unsubscribe()
return
case msg := <-natsCh:
pb := &cvmsgspb.V2CMessage{}
err := proto.Unmarshal(msg.Data, pb)
if err != nil {
log.WithError(err).Error("Could not unmarshal message")
continue
}
anyMsg := pb.Msg
ts := &prompb.TimeSeries{}
err = types.UnmarshalAny(anyMsg, ts)
if err != nil {
log.WithError(err).Error("Could not nested message")
continue
}
bqWrite(table, ts)
}
}
}()
}

func bqWrite(table *bigquery.Table, timeseries *prompb.TimeSeries) {
inserter := table.Inserter()
inserter.SkipInvalidRows = true

var metricName string
labels := make(map[string]string)
for _, l := range timeseries.Labels {
if l.Name == model.MetricNameLabel {
metricName = l.Value
continue
}
labels[l.Name] = l.Value
}
labelsJSON, _ := json.Marshal(labels)

var batch []Row
for _, s := range timeseries.Samples {
v := float64(s.Value)
if math.IsNaN(v) || math.IsInf(v, 0) {
continue
}

batch = append(batch, Row{
Metric: metricName,
Labels: string(labelsJSON),
Value: v,
Timestamp: time.Unix(s.Timestamp, 0),
})
}

err := inserter.Put(context.Background(), batch)
if err != nil {
log.WithError(err).Warn("bigquery insertion failed")
}
}

// Stop performs any necessary cleanup before shutdown.
func (s *Server) Stop() {
s.once.Do(func() {
close(s.done)
})
}
26 changes: 24 additions & 2 deletions src/cloud/metrics/metrics_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
log "github.com/sirupsen/logrus"
"github.com/spf13/pflag"
"github.com/spf13/viper"
"google.golang.org/api/googleapi"
"google.golang.org/api/option"

"px.dev/pixie/src/cloud/metrics/controllers"
Expand All @@ -40,6 +41,8 @@ import (
func init() {
pflag.String("bq_project", "", "The BigQuery project to write metrics to.")
pflag.String("bq_sa_key_path", "", "The service account for the BigQuery instance that should be used.")

pflag.String("bq_dataset", "vizier_metrics", "The BigQuery dataset to write metrics to.")
}

func main() {
Expand All @@ -60,13 +63,32 @@ func main() {
var client *bigquery.Client
var err error

if viper.GetString("bq_sa_key_path") != "" {
if viper.GetString("bq_sa_key_path") != "" && viper.GetString("bq_project") != "" {
client, err = bigquery.NewClient(context.Background(), viper.GetString("bq_project"), option.WithCredentialsFile(viper.GetString("bq_sa_key_path")))
if err != nil {
log.WithError(err).Fatal("Could not start up BigQuery client for metrics server")
}
defer client.Close()
_ = controllers.NewServer(nc, client)

dsName := viper.GetString("bq_dataset")
if dsName == "" {
log.WithError(err).Fatal("Missing a BigQuery dataset name.")
}

dataset := client.Dataset(dsName)
err = dataset.Create(context.Background(), nil)
apiError, ok := err.(*googleapi.Error)
if !ok {
log.WithError(err).Fatal("Problem with BigQuery dataset")
}
// StatusConflict indicates that this dataset already exists.
// If so, we can carry along. Else we hit something else unexpected.
if apiError.Code != http.StatusConflict {
log.WithError(err).Fatal("Problem with BigQuery dataset")
}
mc := controllers.NewServer(nc, dataset)
mc.Start()
defer mc.Stop()
} else {
log.Info("No BigQuery instance configured, no metrics will be sent")
}
Expand Down

0 comments on commit 3c8df5b

Please sign in to comment.