Skip to content

Commit

Permalink
Fix Go Functions metrics (#6944)
Browse files Browse the repository at this point in the history
* Fix go functions metrics

Signed-off-by: xiaolong.ran <rxl@apache.org>

* fix a little

Signed-off-by: xiaolong.ran <rxl@apache.org>

* fix ci error

Signed-off-by: xiaolong.ran <rxl@apache.org>
  • Loading branch information
wolfstudy authored May 15, 2020
1 parent b9e7c0c commit fdeaa30
Show file tree
Hide file tree
Showing 3 changed files with 179 additions and 247 deletions.
51 changes: 14 additions & 37 deletions pulsar-function-go/pf/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ import (
"github.com/golang/protobuf/ptypes/empty"

"github.com/apache/pulsar-client-go/pulsar"

log "github.com/apache/pulsar/pulsar-function-go/logutil"
pb "github.com/apache/pulsar/pulsar-function-go/pb"
io_prometheus_client "github.com/prometheus/client_model/go"
prometheus_client "github.com/prometheus/client_model/go"
)

type goInstance struct {
Expand Down Expand Up @@ -162,11 +163,9 @@ CLOSE:
return err
}

gi.stats.processTimeEnd()
gi.processResult(msgInput, output)

gi.stats.processTimeEnd() // Should this be called here or before processResult(..)?
gi.stats.incrTotalProcessedSuccessfully()

case <-idleTimer.C:
close(channel)
break CLOSE
Expand Down Expand Up @@ -470,15 +469,6 @@ func (gi *goInstance) getMetrics() *pb.MetricsData {
metricsData.ProcessedSuccessfullyTotal_1Min = int64(totalProcessedSuccessfully1min)
metricsData.SystemExceptionsTotal_1Min = int64(totalSysExceptions1min)
metricsData.UserExceptionsTotal_1Min = int64(totalUserExceptions1min)
//metrics_data.AvgProcessLatency_1Min = avg_process_latency_ms_1min

// get any user metrics
// Not sure yet where these are stored.
/*
user_metrics := self.contextimpl.get_metrics()
for metric_name, value in user_metrics.items():
metrics_data.userMetrics[metric_name] = value
*/

return &metricsData
}
Expand All @@ -496,30 +486,28 @@ func (gi *goInstance) resetMetrics() *empty.Empty {

// This method is used to get the required metrics for Prometheus.
// Note that this doesn't distinguish between parallel function instances!
func (gi *goInstance) getMatchingMetricFunc() func(lbl *io_prometheus_client.LabelPair) bool {
matchMetricFunc := func(lbl *io_prometheus_client.LabelPair) bool {
func (gi *goInstance) getMatchingMetricFunc() func(lbl *prometheus_client.LabelPair) bool {
matchMetricFunc := func(lbl *prometheus_client.LabelPair) bool {
return *lbl.Name == "fqfn" && *lbl.Value == gi.context.GetTenantAndNamespaceAndName()
}
return matchMetricFunc
}

// e.g. metricName = "pulsar_function_process_latency_ms"
func (gi *goInstance) getMatchingMetricFromRegistry(metricName string) io_prometheus_client.Metric {
func (gi *goInstance) getMatchingMetricFromRegistry(metricName string) prometheus_client.Metric {
metricFamilies, err := reg.Gather()
if err != nil {
log.Error("Something went wrong when calling reg.Gather() in getMatchingMetricFromRegistry(..) for " + metricName)
log.Errorf("Something went wrong when calling reg.Gather(), the metricName is: %s", metricName)
}
matchFamilyFunc := func(vect *io_prometheus_client.MetricFamily) bool {
matchFamilyFunc := func(vect *prometheus_client.MetricFamily) bool {
return *vect.Name == metricName
}
fiteredMetricFamilies := filter(metricFamilies, matchFamilyFunc)
if len(fiteredMetricFamilies) > 1 {
filteredMetricFamilies := filter(metricFamilies, matchFamilyFunc)
if len(filteredMetricFamilies) > 1 {
// handle this.
log.Error("Too many metric families for metricName = " + metricName)
// Should we panic here instead of report an error since it reflects a code problem, not a user problem?
log.Errorf("Too many metric families for metricName: %s " + metricName)
}
metricFunc := gi.getMatchingMetricFunc()
matchingMetric := getFirstMatch(fiteredMetricFamilies[0].Metric, metricFunc)
matchingMetric := getFirstMatch(filteredMetricFamilies[0].Metric, metricFunc)
return *matchingMetric
}

Expand All @@ -529,12 +517,14 @@ func (gi *goInstance) getTotalReceived() float32 {
val := metric.GetGauge().Value
return float32(*val)
}

func (gi *goInstance) getTotalProcessedSuccessfully() float32 {
metric := gi.getMatchingMetricFromRegistry(PulsarFunctionMetricsPrefix + TotalSuccessfullyProcessed)
// "pulsar_function_" + "processed_successfully_total", NewGaugeVec.
val := metric.GetGauge().Value
return float32(*val)
}

func (gi *goInstance) getTotalSysExceptions() float32 {
metric := gi.getMatchingMetricFromRegistry(PulsarFunctionMetricsPrefix + TotalSystemExceptions)
// "pulsar_function_"+ "system_exceptions_total", NewGaugeVec.
Expand Down Expand Up @@ -588,19 +578,6 @@ func (gi *goInstance) getTotalUserExceptions1min() float32 {
return float32(*val)
}

/*
func (gi *goInstance) get_avg_process_latency_1min() float32 {
metric := gi.getMatchingMetricFromRegistry(PULSAR_FUNCTION_METRICS_PREFIX + PROCESS_LATENCY_MS_1min)
// "pulsar_function_" + "process_latency_ms_1min", SummaryVec
count := metric.GetSummary().SampleCount
sum := metric.GetSummary().SampleSum
if *count <= 0.0 {
return 0.0
} else {
return float32(*sum) / float32(*count)
}
}*/

func (gi *goInstance) getTotalReceived1min() float32 {
metric := gi.getMatchingMetricFromRegistry(PulsarFunctionMetricsPrefix + TotalReceived1min)
// "pulsar_function_" + "received_total_1min", GaugeVec
Expand Down
177 changes: 84 additions & 93 deletions pulsar-function-go/pf/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@ import (
"time"

"github.com/prometheus/client_golang/prometheus"
io_prometheus_client "github.com/prometheus/client_model/go"
//"strings"
//"github.com/prometheus/common/expfmt"
//"time"

prometheus_client "github.com/prometheus/client_model/go"
)

var metricsLabelNames = []string{"tenant", "namespace", "name", "instance_id", "cluster", "fqfn"}
var exceptionLabelNames = []string{"error", "ts"}
var exceptionMetricsLabelNames = append(metricsLabelNames, exceptionLabelNames...)
var (
metricsLabelNames = []string{"tenant", "namespace", "name", "instance_id", "cluster", "fqfn"}
exceptionLabelNames = []string{"error", "ts"}
exceptionMetricsLabelNames = append(metricsLabelNames, exceptionLabelNames...)
)

const (
PulsarFunctionMetricsPrefix = "pulsar_function_"
Expand All @@ -52,73 +52,74 @@ const (
)

// Declare Prometheus
var statTotalProcessedSuccessfully = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: PulsarFunctionMetricsPrefix + TotalSuccessfullyProcessed,
Help: "Total number of messages processed successfully."},
metricsLabelNames)
var statTotalSysExceptions = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: PulsarFunctionMetricsPrefix + TotalSystemExceptions,
Help: "Total number of system exceptions."},
metricsLabelNames)
var statTotalUserExceptions = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: PulsarFunctionMetricsPrefix + TotalUserExceptions,
Help: "Total number of user exceptions."},
metricsLabelNames)

var statProcessLatencyMs = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Name: PulsarFunctionMetricsPrefix + ProcessLatencyMs,
Help: "Process latency in milliseconds."}, metricsLabelNames)

var statLastInvocation = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: PulsarFunctionMetricsPrefix + LastInvocation,
Help: "The timestamp of the last invocation of the function."}, metricsLabelNames)

var statTotalReceived = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: PulsarFunctionMetricsPrefix + TotalReceived,
Help: "Total number of messages received from source."}, metricsLabelNames)

// 1min windowed metrics
var statTotalProcessedSuccessfully1min = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: PulsarFunctionMetricsPrefix + TotalSuccessfullyProcessed1min,
Help: "Total number of messages processed successfully in the last 1 minute."}, metricsLabelNames)
var statTotalSysExceptions1min = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: PulsarFunctionMetricsPrefix + TotalSystemExceptions1min,
Help: "Total number of system exceptions in the last 1 minute."},
metricsLabelNames)
var statTotalUserExceptions1min = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: PulsarFunctionMetricsPrefix + TotalUserExceptions1min,
Help: "Total number of user exceptions in the last 1 minute."},
metricsLabelNames)

var statProcessLatencyMs1min = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Name: PulsarFunctionMetricsPrefix + ProcessLatencyMs1min,
Help: "Process latency in milliseconds in the last 1 minute."}, metricsLabelNames)

var statTotalReceived1min = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: PulsarFunctionMetricsPrefix + TotalReceived1min,
Help: "Total number of messages received from source in the last 1 minute."}, metricsLabelNames)

// exceptions
var userExceptions = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: PulsarFunctionMetricsPrefix + "user_exception",
Help: "Exception from user code."}, exceptionMetricsLabelNames)

var systemExceptions = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: PulsarFunctionMetricsPrefix + "system_exception",
Help: "Exception from system code."}, exceptionMetricsLabelNames)
var (
statTotalProcessedSuccessfully = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: PulsarFunctionMetricsPrefix + TotalSuccessfullyProcessed,
Help: "Total number of messages processed successfully."},
metricsLabelNames)
statTotalSysExceptions = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: PulsarFunctionMetricsPrefix + TotalSystemExceptions,
Help: "Total number of system exceptions."},
metricsLabelNames)
statTotalUserExceptions = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: PulsarFunctionMetricsPrefix + TotalUserExceptions,
Help: "Total number of user exceptions."},
metricsLabelNames)

statProcessLatencyMs = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Name: PulsarFunctionMetricsPrefix + ProcessLatencyMs,
Help: "Process latency in milliseconds."}, metricsLabelNames)

statLastInvocation = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: PulsarFunctionMetricsPrefix + LastInvocation,
Help: "The timestamp of the last invocation of the function."}, metricsLabelNames)

statTotalReceived = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: PulsarFunctionMetricsPrefix + TotalReceived,
Help: "Total number of messages received from source."}, metricsLabelNames)

// 1min windowed metrics
statTotalProcessedSuccessfully1min = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: PulsarFunctionMetricsPrefix + TotalSuccessfullyProcessed1min,
Help: "Total number of messages processed successfully in the last 1 minute."}, metricsLabelNames)
statTotalSysExceptions1min = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: PulsarFunctionMetricsPrefix + TotalSystemExceptions1min,
Help: "Total number of system exceptions in the last 1 minute."},
metricsLabelNames)
statTotalUserExceptions1min = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: PulsarFunctionMetricsPrefix + TotalUserExceptions1min,
Help: "Total number of user exceptions in the last 1 minute."},
metricsLabelNames)

statProcessLatencyMs1min = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Name: PulsarFunctionMetricsPrefix + ProcessLatencyMs1min,
Help: "Process latency in milliseconds in the last 1 minute."}, metricsLabelNames)

statTotalReceived1min = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: PulsarFunctionMetricsPrefix + TotalReceived1min,
Help: "Total number of messages received from source in the last 1 minute."}, metricsLabelNames)

userExceptions = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: PulsarFunctionMetricsPrefix + "user_exception",
Help: "Exception from user code."}, exceptionMetricsLabelNames)

systemExceptions = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: PulsarFunctionMetricsPrefix + "system_exception",
Help: "Exception from system code."}, exceptionMetricsLabelNames)
)

var reg *prometheus.Registry

Expand Down Expand Up @@ -156,12 +157,11 @@ type StatWithLabelValues struct {
statTotalProcessedSuccessfully1min prometheus.Gauge
statTotalSysExceptions1min prometheus.Gauge
statTotalUserExceptions1min prometheus.Gauge
//_stat_process_latency_ms_1min prometheus.Observer
statTotalReceived1min prometheus.Gauge
latestUserException []LatestException
latestSysException []LatestException
processStartTime int64
metricsLabels []string
statTotalReceived1min prometheus.Gauge
latestUserException []LatestException
latestSysException []LatestException
processStartTime int64
metricsLabels []string
}

func NewStatWithLabelValues(metricsLabels ...string) StatWithLabelValues {
Expand All @@ -188,7 +188,6 @@ func NewStatWithLabelValues(metricsLabels ...string) StatWithLabelValues {
statTotalProcessedSuccessfully1min,
statTotalSysExceptions1min,
statTotalUserExceptions1min,
//_stat_process_latency_ms_1min,
statTotalReceived1min,
[]LatestException{},
[]LatestException{},
Expand All @@ -199,8 +198,8 @@ func NewStatWithLabelValues(metricsLabels ...string) StatWithLabelValues {
}

func filter(
ss []*io_prometheus_client.MetricFamily,
test func(*io_prometheus_client.MetricFamily) bool) (ret []*io_prometheus_client.MetricFamily) {
ss []*prometheus_client.MetricFamily,
test func(*prometheus_client.MetricFamily) bool) (ret []*prometheus_client.MetricFamily) {
for _, s := range ss {
if test(s) {
ret = append(ret, s)
Expand All @@ -210,8 +209,8 @@ func filter(
}

func getFirstMatch(
metrics []*io_prometheus_client.Metric,
test func(*io_prometheus_client.LabelPair) bool) *io_prometheus_client.Metric {
metrics []*prometheus_client.Metric,
test func(*prometheus_client.LabelPair) bool) *prometheus_client.Metric {
for _, met := range metrics {
for _, lbl := range met.Label {
if test(lbl) {
Expand All @@ -237,7 +236,6 @@ func (stat *StatWithLabelValues) processTimeEnd() {
now := time.Now()
duration := now.UnixNano() - stat.processStartTime
stat.statProcessLatencyMs.Observe(float64(duration))
//stat._stat_process_latency_ms_1min.Observe(float64(duration))
}
}

Expand Down Expand Up @@ -305,12 +303,5 @@ func (stat *StatWithLabelValues) reset() {
stat.statTotalProcessedSuccessfully1min.Set(0.0)
stat.statTotalUserExceptions1min.Set(0.0)
stat.statTotalSysExceptions1min.Set(0.0)
//stat._stat_process_latency_ms_1min._sum.set(0.0)
//stat._stat_process_latency_ms_1min._count.set(0.0)
stat.statTotalReceived1min.Set(0.0)
}

/*
// start time for windowed metrics
util.FixedTimer(60, reset, name="windowed-metrics-timer").start()
*/
Loading

0 comments on commit fdeaa30

Please sign in to comment.